Source code for sttp.transport.tssc.encoder

# ******************************************************************************************************
#  encoder.py - Gbtc
#
#  Copyright © 2026, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  01/05/2026 - Generated by porting C++ TSSCEncoder
#       Ported from cppapi/src/lib/transport/tssc/TSSCEncoder.{h,cpp}
#
# ******************************************************************************************************

# Ported from cppapi/src/lib/transport/tssc/TSSCEncoder.cpp : class TSSCEncoder
# Differences: Python uses bytearray for buffer; otherwise parity maintained.

from gsf import Limits
from .pointmetadata import PointMetadata, CodeWords
from typing import List, Optional
import numpy as np
import struct

INT32_MAX = np.int32(Limits.MAXINT32)
INT64_MAX = np.int64(Limits.MAXINT64)


[docs] class Encoder: """ The encoder for the Time-Series Special Compression (TSSC) algorithm of STTP. """ # Bit masks _BITS28 = np.uint32(0xFFFFFFF) _BITS24 = np.uint32(0xFFFFFF) _BITS20 = np.uint32(0xFFFFF) _BITS16 = np.uint32(0xFFFF) _BITS12 = np.uint32(0xFFF) _BITS8 = np.uint32(0xFF) _BITS4 = np.uint32(0xF) _BITS0 = np.uint32(0x0) def __init__(self): """ Creates a new TSSC encoder. """ self._data: Optional[bytearray] = None self._position = np.uint32(0) self._last_position = np.uint32(0) self._prev_timestamp1 = np.int64(0) self._prev_timestamp2 = np.int64(0) self._prev_timedelta1 = INT64_MAX self._prev_timedelta2 = INT64_MAX self._prev_timedelta3 = INT64_MAX self._prev_timedelta4 = INT64_MAX self._last_point: Optional[PointMetadata] = self._new_point_metadata() self._points: List[Optional[PointMetadata]] = [] # Bitstream state self._bitstream_buffer_index = np.int32(-1) self._bitstream_cache_bitcount = np.int32(0) self._bitstream_cache = np.int32(0)
[docs] def reset(self): """ Resets the TSSC Encoder to the initial state. """ self._data = None self._points.clear() self._last_point = self._new_point_metadata() self._position = np.uint32(0) self._last_position = np.uint32(0) self._clear_bitstream() self._prev_timedelta1 = INT64_MAX self._prev_timedelta2 = INT64_MAX self._prev_timedelta3 = INT64_MAX self._prev_timedelta4 = INT64_MAX self._prev_timestamp1 = np.int64(0) self._prev_timestamp2 = np.int64(0)
[docs] def set_buffer(self, data: bytearray, offset: np.uint32, length: np.uint32): """ Sets the internal buffer to write data to. """ self._clear_bitstream() self._data = data self._position = offset self._last_position = offset + length
[docs] def finish_block(self) -> np.uint32: """ Finishes the current block and returns position after the last byte written. """ self._bitstream_flush() return self._position
[docs] def try_add_measurement(self, id: np.int32, timestamp: np.int64, quality: np.uint32, value: np.float32) -> bool: """ Adds the supplied measurement to the stream. If the stream is full, returns False. """ # If there are fewer than 100 bytes available assume we cannot add any more if self._last_position - self._position < 100: return False # Setup tracking for metadata associated with measurement ID point_count = len(self._points) point = self._points[id] if id < point_count else None if point is None: point = self._new_point_metadata() if id >= point_count: while len(self._points) <= id: self._points.append(None) point.prevnextpointid1 = id + np.int32(1) self._points[id] = point # Encode measurement ID if self._last_point.prevnextpointid1 != id: self._write_pointid_change(id) # Encode measurement timestamp if self._prev_timestamp1 != timestamp: self._write_timestamp_change(timestamp) # Encode measurement quality if point.prevstateflags1 != quality: self._write_quality_change(quality, point) # Encode measurement value - reinterpret float as uint32 for bit manipulation value_raw = np.uint32(struct.unpack('I', struct.pack('f', value))[0]) if point.prevvalue1 == value_raw: self._last_point.write_code(CodeWords.VALUE1) elif point.prevvalue2 == value_raw: self._last_point.write_code(CodeWords.VALUE2) point.prevvalue2 = np.uint32(point.prevvalue1) point.prevvalue1 = value_raw elif point.prevvalue3 == value_raw: self._last_point.write_code(CodeWords.VALUE3) point.prevvalue3 = np.uint32(point.prevvalue2) point.prevvalue2 = np.uint32(point.prevvalue1) point.prevvalue1 = value_raw elif value_raw == 0: self._last_point.write_code(CodeWords.VALUEZERO) point.prevvalue3 = np.uint32(point.prevvalue2) point.prevvalue2 = np.uint32(point.prevvalue1) point.prevvalue1 = np.uint32(0) else: bits_changed = value_raw ^ np.uint32(point.prevvalue1) if bits_changed <= self._BITS4: self._last_point.write_code(CodeWords.VALUEXOR4) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) elif bits_changed <= self._BITS8: self._last_point.write_code(CodeWords.VALUEXOR8) self._data[self._position] = np.uint8(bits_changed) self._position += 1 elif bits_changed <= self._BITS12: self._last_point.write_code(CodeWords.VALUEXOR12) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) self._data[self._position] = np.uint8(bits_changed >> 4) self._position += 1 elif bits_changed <= self._BITS16: self._last_point.write_code(CodeWords.VALUEXOR16) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._position += 2 elif bits_changed <= self._BITS20: self._last_point.write_code(CodeWords.VALUEXOR20) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) self._data[self._position] = np.uint8(bits_changed >> 4) self._data[self._position + 1] = np.uint8(bits_changed >> 12) self._position += 2 elif bits_changed <= self._BITS24: self._last_point.write_code(CodeWords.VALUEXOR24) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._data[self._position + 2] = np.uint8(bits_changed >> 16) self._position += 3 elif bits_changed <= self._BITS28: self._last_point.write_code(CodeWords.VALUEXOR28) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) self._data[self._position] = np.uint8(bits_changed >> 4) self._data[self._position + 1] = np.uint8(bits_changed >> 12) self._data[self._position + 2] = np.uint8(bits_changed >> 20) self._position += 3 else: self._last_point.write_code(CodeWords.VALUEXOR32) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._data[self._position + 2] = np.uint8(bits_changed >> 16) self._data[self._position + 3] = np.uint8(bits_changed >> 24) self._position += 4 point.prevvalue3 = np.uint32(point.prevvalue2) point.prevvalue2 = np.uint32(point.prevvalue1) point.prevvalue1 = value_raw self._last_point = point return True
def _new_point_metadata(self) -> PointMetadata: """Creates a new point metadata instance with write bits callback.""" return PointMetadata( writebits=lambda code, length: self._write_bits(code, length), readbit=None, readbits5=None ) def _write_pointid_change(self, id: np.int32): """Writes point ID change using XOR encoding.""" bits_changed = np.uint32(np.int32(id) ^ np.int32(self._last_point.prevnextpointid1)) if bits_changed <= self._BITS4: self._last_point.write_code(CodeWords.POINTIDXOR4) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) elif bits_changed <= self._BITS8: self._last_point.write_code(CodeWords.POINTIDXOR8) self._data[self._position] = np.uint8(bits_changed) self._position += 1 elif bits_changed <= self._BITS12: self._last_point.write_code(CodeWords.POINTIDXOR12) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) self._data[self._position] = np.uint8(bits_changed >> 4) self._position += 1 elif bits_changed <= self._BITS16: self._last_point.write_code(CodeWords.POINTIDXOR16) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._position += 2 elif bits_changed <= self._BITS20: self._last_point.write_code(CodeWords.POINTIDXOR20) self._write_bits(np.int32(bits_changed & 15), np.int32(4)) self._data[self._position] = np.uint8(bits_changed >> 4) self._data[self._position + 1] = np.uint8(bits_changed >> 12) self._position += 2 elif bits_changed <= self._BITS24: self._last_point.write_code(CodeWords.POINTIDXOR24) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._data[self._position + 2] = np.uint8(bits_changed >> 16) self._position += 3 else: self._last_point.write_code(CodeWords.POINTIDXOR32) self._data[self._position] = np.uint8(bits_changed) self._data[self._position + 1] = np.uint8(bits_changed >> 8) self._data[self._position + 2] = np.uint8(bits_changed >> 16) self._data[self._position + 3] = np.uint8(bits_changed >> 24) self._position += 4 self._last_point.prevnextpointid1 = id def _write_timestamp_change(self, timestamp: np.int64): """Writes timestamp change using delta encoding.""" if self._prev_timestamp2 == timestamp: self._last_point.write_code(CodeWords.TIMESTAMP2) elif self._prev_timestamp1 < timestamp: if self._prev_timestamp1 + self._prev_timedelta1 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA1FORWARD) elif self._prev_timestamp1 + self._prev_timedelta2 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA2FORWARD) elif self._prev_timestamp1 + self._prev_timedelta3 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA3FORWARD) elif self._prev_timestamp1 + self._prev_timedelta4 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA4FORWARD) else: self._last_point.write_code(CodeWords.TIMEXOR7BIT) self._encode7bit_uint64(np.uint64(np.uint64(timestamp) ^ np.uint64(self._prev_timestamp1))) else: if self._prev_timestamp1 - self._prev_timedelta1 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA1REVERSE) elif self._prev_timestamp1 - self._prev_timedelta2 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA2REVERSE) elif self._prev_timestamp1 - self._prev_timedelta3 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA3REVERSE) elif self._prev_timestamp1 - self._prev_timedelta4 == timestamp: self._last_point.write_code(CodeWords.TIMEDELTA4REVERSE) else: self._last_point.write_code(CodeWords.TIMEXOR7BIT) self._encode7bit_uint64(np.uint64(np.uint64(timestamp) ^ np.uint64(self._prev_timestamp1))) # Save the smallest delta time (cast to int64 to prevent overflow in subtraction) min_delta = abs(np.int64(self._prev_timestamp1) - np.int64(timestamp)) if (min_delta < self._prev_timedelta4 and min_delta != self._prev_timedelta1 and min_delta != self._prev_timedelta2 and min_delta != self._prev_timedelta3): if min_delta < self._prev_timedelta1: self._prev_timedelta4 = self._prev_timedelta3 self._prev_timedelta3 = self._prev_timedelta2 self._prev_timedelta2 = self._prev_timedelta1 self._prev_timedelta1 = min_delta elif min_delta < self._prev_timedelta2: self._prev_timedelta4 = self._prev_timedelta3 self._prev_timedelta3 = self._prev_timedelta2 self._prev_timedelta2 = min_delta elif min_delta < self._prev_timedelta3: self._prev_timedelta4 = self._prev_timedelta3 self._prev_timedelta3 = min_delta else: self._prev_timedelta4 = min_delta self._prev_timestamp2 = self._prev_timestamp1 self._prev_timestamp1 = timestamp def _write_quality_change(self, quality: np.uint32, point: PointMetadata): """Writes quality change using 7-bit encoding.""" if point.prevstateflags2 == quality: self._last_point.write_code(CodeWords.STATEFLAGS2) else: self._last_point.write_code(CodeWords.STATEFLAGS7BIT32) self._encode7bit_uint32(quality) point.prevstateflags2 = point.prevstateflags1 point.prevstateflags1 = quality def _clear_bitstream(self): """Clears the bitstream state.""" self._bitstream_buffer_index = np.int32(-1) self._bitstream_cache_bitcount = np.int32(0) self._bitstream_cache = np.int32(0) def _write_bits(self, code: np.int32, length: np.int32): """Writes bits to the bitstream.""" if self._bitstream_buffer_index < 0: self._bitstream_buffer_index = np.int32(self._position) self._position += 1 self._bitstream_cache = (self._bitstream_cache << length) | code self._bitstream_cache_bitcount += length if self._bitstream_cache_bitcount > 7: self._bitstream_end() def _bitstream_flush(self): """Flushes the bitstream.""" if self._bitstream_cache_bitcount > 0: if self._bitstream_buffer_index < 0: self._bitstream_buffer_index = np.int32(self._position) self._position += 1 self._last_point.write_code(CodeWords.ENDOFSTREAM) if self._bitstream_cache_bitcount > 7: self._bitstream_end() if self._bitstream_cache_bitcount > 0: # Make up 8 bits by padding self._bitstream_cache <<= (8 - self._bitstream_cache_bitcount) self._data[self._bitstream_buffer_index] = np.uint8(self._bitstream_cache) self._bitstream_cache = np.int32(0) self._bitstream_buffer_index = np.int32(-1) self._bitstream_cache_bitcount = np.int32(0) def _bitstream_end(self): """Ends the bitstream by flushing full bytes.""" while self._bitstream_cache_bitcount > 7: self._data[self._bitstream_buffer_index] = np.uint8( self._bitstream_cache >> (self._bitstream_cache_bitcount - 8) ) self._bitstream_cache_bitcount -= 8 if self._bitstream_cache_bitcount > 0: self._bitstream_buffer_index = np.int32(self._position) self._position += 1 else: self._bitstream_buffer_index = np.int32(-1) def _encode7bit_uint32(self, value: np.uint32): """Encodes a uint32 using 7-bit variable length encoding.""" if value < 128: self._data[self._position] = np.uint8(value) self._position += 1 return self._data[self._position] = np.uint8(value | 128) if value < 16384: self._data[self._position + 1] = np.uint8(value >> 7) self._position += 2 return self._data[self._position + 1] = np.uint8((value >> 7) | 128) if value < 2097152: self._data[self._position + 2] = np.uint8(value >> 14) self._position += 3 return self._data[self._position + 2] = np.uint8((value >> 14) | 128) if value < 268435456: self._data[self._position + 3] = np.uint8(value >> 21) self._position += 4 return self._data[self._position + 3] = np.uint8((value >> 21) | 128) self._data[self._position + 4] = np.uint8(value >> 28) self._position += 5 def _encode7bit_uint64(self, value: np.uint64): """Encodes a uint64 using 7-bit variable length encoding.""" if value < 128: self._data[self._position] = np.uint8(value) self._position += 1 return self._data[self._position] = np.uint8(value | 128) if value < 16384: self._data[self._position + 1] = np.uint8(value >> 7) self._position += 2 return self._data[self._position + 1] = np.uint8((value >> 7) | 128) if value < 2097152: self._data[self._position + 2] = np.uint8(value >> 14) self._position += 3 return self._data[self._position + 2] = np.uint8((value >> 14) | 128) if value < 268435456: self._data[self._position + 3] = np.uint8(value >> 21) self._position += 4 return self._data[self._position + 3] = np.uint8((value >> 21) | 128) if value < 34359738368: self._data[self._position + 4] = np.uint8(value >> 28) self._position += 5 return self._data[self._position + 4] = np.uint8((value >> 28) | 128) if value < 4398046511104: self._data[self._position + 5] = np.uint8(value >> 35) self._position += 6 return self._data[self._position + 5] = np.uint8((value >> 35) | 128) if value < 562949953421312: self._data[self._position + 6] = np.uint8(value >> 42) self._position += 7 return self._data[self._position + 6] = np.uint8((value >> 42) | 128) if value < 72057594037927936: self._data[self._position + 7] = np.uint8(value >> 49) self._position += 8 return self._data[self._position + 7] = np.uint8((value >> 49) | 128) self._data[self._position + 8] = np.uint8(value >> 56) self._position += 9