Source code for sttp.transport.tssc.pointmetadata

# ******************************************************************************************************
#  pointmetadata.py - Gbtc
#
#  Copyright © 2022, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at =
#
#      http =//opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History =
#  ----------------------------------------------------------------------------------------------------
#  08/30/2022 - J. Ritchie Carroll
#       Generated original version of source code.
#
# ******************************************************************************************************

from gsf import Limits
from typing import Callable, Tuple, Optional
import numpy as np


[docs] class CodeWords: ENDOFSTREAM = np.byte(0) POINTIDXOR4 = np.byte(1) POINTIDXOR8 = np.byte(2) POINTIDXOR12 = np.byte(3) POINTIDXOR16 = np.byte(4) POINTIDXOR20 = np.byte(5) POINTIDXOR24 = np.byte(6) POINTIDXOR32 = np.byte(7) TIMEDELTA1FORWARD = np.byte(8) TIMEDELTA2FORWARD = np.byte(9) TIMEDELTA3FORWARD = np.byte(10) TIMEDELTA4FORWARD = np.byte(11) TIMEDELTA1REVERSE = np.byte(12) TIMEDELTA2REVERSE = np.byte(13) TIMEDELTA3REVERSE = np.byte(14) TIMEDELTA4REVERSE = np.byte(15) TIMESTAMP2 = np.byte(16) TIMEXOR7BIT = np.byte(17) STATEFLAGS2 = np.byte(18) STATEFLAGS7BIT32 = np.byte(19) VALUE1 = np.byte(20) VALUE2 = np.byte(21) VALUE3 = np.byte(22) VALUEZERO = np.byte(23) VALUEXOR4 = np.byte(24) VALUEXOR8 = np.byte(25) VALUEXOR12 = np.byte(26) VALUEXOR16 = np.byte(27) VALUEXOR20 = np.byte(28) VALUEXOR24 = np.byte(29) VALUEXOR28 = np.byte(30) VALUEXOR32 = np.byte(31)
BYTE_0 = np.byte(0) BYTE_1 = np.byte(1) BYTE_2 = np.byte(2) INT32_0 = np.int32(0) INT32_1 = np.int32(1) INT32_2 = np.int32(2) INT32_3 = np.int32(3) INT32_5 = np.int32(5) INT32_6 = np.int32(6) INT32_7 = np.int32(7) INT32_8 = np.int32(8) UINT32_0 = np.uint32(0)
[docs] class PointMetadata: def __init__(self, writebits: Optional[Callable[[np.int32, np.int32], None]], readbit: Optional[Callable[[], np.int32]], readbits5: Optional[Callable[[], np.int32]] ): self.prevnextpointid1 = INT32_0 self.prevstateflags1 = UINT32_0 self.prevstateflags2 = UINT32_0 self.prevvalue1 = UINT32_0 self.prevvalue2 = UINT32_0 self.prevvalue3 = UINT32_0 self._commandstats = np.zeros(32, np.byte) self._commands_sent_sincelastchange = 0 # Bit codes for the 4 modes of encoding self._mode = 4 # Mode 1 means no prefix self._mode21 = BYTE_0 self._mode31 = BYTE_0 self._mode301 = BYTE_0 self._mode41 = CodeWords.VALUE1 self._mode401 = CodeWords.VALUE2 self._mode4001 = CodeWords.VALUE3 self._startupmode = 0 self._writebits = writebits self._readbit = readbit self._readbits5 = readbits5
[docs] def write_code(self, code: np.int32) -> Optional[Exception]: if self._mode == 1: self._writebits(code, INT32_5) elif self._mode == 2: if code == np.int32(self._mode21): self._writebits(INT32_1, INT32_1) else: self._writebits(code, INT32_6) elif self._mode == 3: if code == np.int32(self._mode31): self._writebits(INT32_1, INT32_1) elif code == np.int32(self._mode301): self._writebits(INT32_1, INT32_2) else: self._writebits(code, INT32_7) elif self._mode == 4: if code == np.int32(self._mode41): self._writebits(INT32_1, INT32_1) elif code == np.int32(self._mode401): self._writebits(INT32_1, INT32_2) elif code == np.int32(self._mode4001): self._writebits(INT32_1, INT32_3) else: self._writebits(code, INT32_8) else: return RuntimeError("coding Error") return self._update_codestatistics(code)
[docs] def read_code(self) -> Tuple[np.int32, Optional[Exception]]: # sourcery skip: assign-if-exp if self._mode == 1: code = self._readbits5() elif self._mode == 2: if self._readbit() == INT32_1: code = np.int32(self._mode21) else: code = self._readbits5() elif self._mode == 3: if self._readbit() == INT32_1: code = np.int32(self._mode31) elif self._readbit() == INT32_1: code = np.int32(self._mode301) else: code = self._readbits5() elif self._mode == 4: if self._readbit() == INT32_1: code = np.int32(self._mode41) elif self._readbit() == INT32_1: code = np.int32(self._mode401) elif self._readbit() == INT32_1: code = np.int32(self._mode4001) else: code = self._readbits5() else: return 0, RuntimeError("unsupported compression mode") err = self._update_codestatistics(code) return code, err
def _update_codestatistics(self, code: np.int32) -> Optional[Exception]: self._commands_sent_sincelastchange += 1 self._commandstats[code] += BYTE_1 if self._startupmode == 0 and self._commands_sent_sincelastchange > 5: self._startupmode += 1 return self._adapt_commands() if self._startupmode == 1 and self._commands_sent_sincelastchange > 20: self._startupmode += 1 return self._adapt_commands() if self._startupmode == 2 and self._commands_sent_sincelastchange > 100: return self._adapt_commands() return None def _adapt_commands(self) -> Optional[Exception]: code1 = BYTE_0 count1 = 0 code2 = BYTE_1 count2 = 0 code3 = BYTE_2 count3 = 0 total = 0 for i in range(len(self._commandstats)): count = int(self._commandstats[i]) self._commandstats[i] = BYTE_0 total += count if count > count3: if count > count1: code3 = code2 count3 = count2 code2 = code1 count2 = count1 code1 = np.byte(i) count1 = count elif count > count2: code3 = code2 count3 = count2 code2 = np.byte(i) count2 = count else: code3 = np.byte(i) count3 = count mode1size = total * 5 mode2size = count1 + (total - count1) * 6 mode3size = count1 + count2 * 2 + (total - count1 - count2) * 7 mode4size = count1 + count2 * 2 + count3 * 3 + (total - count1 - count2 - count3) * 8 minsize = Limits.MAXINT32 minsize = min(minsize, mode1size) minsize = min(minsize, mode2size) minsize = min(minsize, mode3size) minsize = min(minsize, mode4size) if minsize == mode1size: self._mode = 1 elif minsize == mode2size: self._mode = 2 self._mode21 = code1 elif minsize == mode3size: self._mode = 3 self._mode31 = code1 self._mode301 = code2 elif minsize == mode4size: self._mode = 4 self._mode41 = code1 self._mode401 = code2 self._mode4001 = code3 elif self._writebits is None: return RuntimeError("subscriber coding error") else: return RuntimeError("publisher coding error") self._commands_sent_sincelastchange = 0 return None