# ******************************************************************************************************
# encoder.py - Gbtc
#
# Copyright © 2026, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/05/2026 - Generated by porting C++ TSSCEncoder
# Ported from cppapi/src/lib/transport/tssc/TSSCEncoder.{h,cpp}
#
# ******************************************************************************************************
# Ported from cppapi/src/lib/transport/tssc/TSSCEncoder.cpp : class TSSCEncoder
# Differences: Python uses bytearray for buffer; otherwise parity maintained.
from gsf import Limits
from .pointmetadata import PointMetadata, CodeWords
from typing import List, Optional
import numpy as np
import struct
INT32_MAX = np.int32(Limits.MAXINT32)
INT64_MAX = np.int64(Limits.MAXINT64)
[docs]
class Encoder:
"""
The encoder for the Time-Series Special Compression (TSSC) algorithm of STTP.
"""
# Bit masks
_BITS28 = np.uint32(0xFFFFFFF)
_BITS24 = np.uint32(0xFFFFFF)
_BITS20 = np.uint32(0xFFFFF)
_BITS16 = np.uint32(0xFFFF)
_BITS12 = np.uint32(0xFFF)
_BITS8 = np.uint32(0xFF)
_BITS4 = np.uint32(0xF)
_BITS0 = np.uint32(0x0)
def __init__(self):
"""
Creates a new TSSC encoder.
"""
self._data: Optional[bytearray] = None
self._position = np.uint32(0)
self._last_position = np.uint32(0)
self._prev_timestamp1 = np.int64(0)
self._prev_timestamp2 = np.int64(0)
self._prev_timedelta1 = INT64_MAX
self._prev_timedelta2 = INT64_MAX
self._prev_timedelta3 = INT64_MAX
self._prev_timedelta4 = INT64_MAX
self._last_point: Optional[PointMetadata] = self._new_point_metadata()
self._points: List[Optional[PointMetadata]] = []
# Bitstream state
self._bitstream_buffer_index = np.int32(-1)
self._bitstream_cache_bitcount = np.int32(0)
self._bitstream_cache = np.int32(0)
[docs]
def reset(self):
"""
Resets the TSSC Encoder to the initial state.
"""
self._data = None
self._points.clear()
self._last_point = self._new_point_metadata()
self._position = np.uint32(0)
self._last_position = np.uint32(0)
self._clear_bitstream()
self._prev_timedelta1 = INT64_MAX
self._prev_timedelta2 = INT64_MAX
self._prev_timedelta3 = INT64_MAX
self._prev_timedelta4 = INT64_MAX
self._prev_timestamp1 = np.int64(0)
self._prev_timestamp2 = np.int64(0)
[docs]
def set_buffer(self, data: bytearray, offset: np.uint32, length: np.uint32):
"""
Sets the internal buffer to write data to.
"""
self._clear_bitstream()
self._data = data
self._position = offset
self._last_position = offset + length
[docs]
def finish_block(self) -> np.uint32:
"""
Finishes the current block and returns position after the last byte written.
"""
self._bitstream_flush()
return self._position
[docs]
def try_add_measurement(self, id: np.int32, timestamp: np.int64, quality: np.uint32, value: np.float32) -> bool:
"""
Adds the supplied measurement to the stream. If the stream is full, returns False.
"""
# If there are fewer than 100 bytes available assume we cannot add any more
if self._last_position - self._position < 100:
return False
# Setup tracking for metadata associated with measurement ID
point_count = len(self._points)
point = self._points[id] if id < point_count else None
if point is None:
point = self._new_point_metadata()
if id >= point_count:
while len(self._points) <= id:
self._points.append(None)
point.prevnextpointid1 = id + np.int32(1)
self._points[id] = point
# Encode measurement ID
if self._last_point.prevnextpointid1 != id:
self._write_pointid_change(id)
# Encode measurement timestamp
if self._prev_timestamp1 != timestamp:
self._write_timestamp_change(timestamp)
# Encode measurement quality
if point.prevstateflags1 != quality:
self._write_quality_change(quality, point)
# Encode measurement value - reinterpret float as uint32 for bit manipulation
value_raw = np.uint32(struct.unpack('I', struct.pack('f', value))[0])
if point.prevvalue1 == value_raw:
self._last_point.write_code(CodeWords.VALUE1)
elif point.prevvalue2 == value_raw:
self._last_point.write_code(CodeWords.VALUE2)
point.prevvalue2 = np.uint32(point.prevvalue1)
point.prevvalue1 = value_raw
elif point.prevvalue3 == value_raw:
self._last_point.write_code(CodeWords.VALUE3)
point.prevvalue3 = np.uint32(point.prevvalue2)
point.prevvalue2 = np.uint32(point.prevvalue1)
point.prevvalue1 = value_raw
elif value_raw == 0:
self._last_point.write_code(CodeWords.VALUEZERO)
point.prevvalue3 = np.uint32(point.prevvalue2)
point.prevvalue2 = np.uint32(point.prevvalue1)
point.prevvalue1 = np.uint32(0)
else:
bits_changed = value_raw ^ np.uint32(point.prevvalue1)
if bits_changed <= self._BITS4:
self._last_point.write_code(CodeWords.VALUEXOR4)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
elif bits_changed <= self._BITS8:
self._last_point.write_code(CodeWords.VALUEXOR8)
self._data[self._position] = np.uint8(bits_changed)
self._position += 1
elif bits_changed <= self._BITS12:
self._last_point.write_code(CodeWords.VALUEXOR12)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
self._data[self._position] = np.uint8(bits_changed >> 4)
self._position += 1
elif bits_changed <= self._BITS16:
self._last_point.write_code(CodeWords.VALUEXOR16)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._position += 2
elif bits_changed <= self._BITS20:
self._last_point.write_code(CodeWords.VALUEXOR20)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
self._data[self._position] = np.uint8(bits_changed >> 4)
self._data[self._position + 1] = np.uint8(bits_changed >> 12)
self._position += 2
elif bits_changed <= self._BITS24:
self._last_point.write_code(CodeWords.VALUEXOR24)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._data[self._position + 2] = np.uint8(bits_changed >> 16)
self._position += 3
elif bits_changed <= self._BITS28:
self._last_point.write_code(CodeWords.VALUEXOR28)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
self._data[self._position] = np.uint8(bits_changed >> 4)
self._data[self._position + 1] = np.uint8(bits_changed >> 12)
self._data[self._position + 2] = np.uint8(bits_changed >> 20)
self._position += 3
else:
self._last_point.write_code(CodeWords.VALUEXOR32)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._data[self._position + 2] = np.uint8(bits_changed >> 16)
self._data[self._position + 3] = np.uint8(bits_changed >> 24)
self._position += 4
point.prevvalue3 = np.uint32(point.prevvalue2)
point.prevvalue2 = np.uint32(point.prevvalue1)
point.prevvalue1 = value_raw
self._last_point = point
return True
def _new_point_metadata(self) -> PointMetadata:
"""Creates a new point metadata instance with write bits callback."""
return PointMetadata(
writebits=lambda code, length: self._write_bits(code, length),
readbit=None,
readbits5=None
)
def _write_pointid_change(self, id: np.int32):
"""Writes point ID change using XOR encoding."""
bits_changed = np.uint32(np.int32(id) ^ np.int32(self._last_point.prevnextpointid1))
if bits_changed <= self._BITS4:
self._last_point.write_code(CodeWords.POINTIDXOR4)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
elif bits_changed <= self._BITS8:
self._last_point.write_code(CodeWords.POINTIDXOR8)
self._data[self._position] = np.uint8(bits_changed)
self._position += 1
elif bits_changed <= self._BITS12:
self._last_point.write_code(CodeWords.POINTIDXOR12)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
self._data[self._position] = np.uint8(bits_changed >> 4)
self._position += 1
elif bits_changed <= self._BITS16:
self._last_point.write_code(CodeWords.POINTIDXOR16)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._position += 2
elif bits_changed <= self._BITS20:
self._last_point.write_code(CodeWords.POINTIDXOR20)
self._write_bits(np.int32(bits_changed & 15), np.int32(4))
self._data[self._position] = np.uint8(bits_changed >> 4)
self._data[self._position + 1] = np.uint8(bits_changed >> 12)
self._position += 2
elif bits_changed <= self._BITS24:
self._last_point.write_code(CodeWords.POINTIDXOR24)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._data[self._position + 2] = np.uint8(bits_changed >> 16)
self._position += 3
else:
self._last_point.write_code(CodeWords.POINTIDXOR32)
self._data[self._position] = np.uint8(bits_changed)
self._data[self._position + 1] = np.uint8(bits_changed >> 8)
self._data[self._position + 2] = np.uint8(bits_changed >> 16)
self._data[self._position + 3] = np.uint8(bits_changed >> 24)
self._position += 4
self._last_point.prevnextpointid1 = id
def _write_timestamp_change(self, timestamp: np.int64):
"""Writes timestamp change using delta encoding."""
if self._prev_timestamp2 == timestamp:
self._last_point.write_code(CodeWords.TIMESTAMP2)
elif self._prev_timestamp1 < timestamp:
if self._prev_timestamp1 + self._prev_timedelta1 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA1FORWARD)
elif self._prev_timestamp1 + self._prev_timedelta2 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA2FORWARD)
elif self._prev_timestamp1 + self._prev_timedelta3 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA3FORWARD)
elif self._prev_timestamp1 + self._prev_timedelta4 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA4FORWARD)
else:
self._last_point.write_code(CodeWords.TIMEXOR7BIT)
self._encode7bit_uint64(np.uint64(np.uint64(timestamp) ^ np.uint64(self._prev_timestamp1)))
else:
if self._prev_timestamp1 - self._prev_timedelta1 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA1REVERSE)
elif self._prev_timestamp1 - self._prev_timedelta2 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA2REVERSE)
elif self._prev_timestamp1 - self._prev_timedelta3 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA3REVERSE)
elif self._prev_timestamp1 - self._prev_timedelta4 == timestamp:
self._last_point.write_code(CodeWords.TIMEDELTA4REVERSE)
else:
self._last_point.write_code(CodeWords.TIMEXOR7BIT)
self._encode7bit_uint64(np.uint64(np.uint64(timestamp) ^ np.uint64(self._prev_timestamp1)))
# Save the smallest delta time (cast to int64 to prevent overflow in subtraction)
min_delta = abs(np.int64(self._prev_timestamp1) - np.int64(timestamp))
if (min_delta < self._prev_timedelta4 and
min_delta != self._prev_timedelta1 and
min_delta != self._prev_timedelta2 and
min_delta != self._prev_timedelta3):
if min_delta < self._prev_timedelta1:
self._prev_timedelta4 = self._prev_timedelta3
self._prev_timedelta3 = self._prev_timedelta2
self._prev_timedelta2 = self._prev_timedelta1
self._prev_timedelta1 = min_delta
elif min_delta < self._prev_timedelta2:
self._prev_timedelta4 = self._prev_timedelta3
self._prev_timedelta3 = self._prev_timedelta2
self._prev_timedelta2 = min_delta
elif min_delta < self._prev_timedelta3:
self._prev_timedelta4 = self._prev_timedelta3
self._prev_timedelta3 = min_delta
else:
self._prev_timedelta4 = min_delta
self._prev_timestamp2 = self._prev_timestamp1
self._prev_timestamp1 = timestamp
def _write_quality_change(self, quality: np.uint32, point: PointMetadata):
"""Writes quality change using 7-bit encoding."""
if point.prevstateflags2 == quality:
self._last_point.write_code(CodeWords.STATEFLAGS2)
else:
self._last_point.write_code(CodeWords.STATEFLAGS7BIT32)
self._encode7bit_uint32(quality)
point.prevstateflags2 = point.prevstateflags1
point.prevstateflags1 = quality
def _clear_bitstream(self):
"""Clears the bitstream state."""
self._bitstream_buffer_index = np.int32(-1)
self._bitstream_cache_bitcount = np.int32(0)
self._bitstream_cache = np.int32(0)
def _write_bits(self, code: np.int32, length: np.int32):
"""Writes bits to the bitstream."""
if self._bitstream_buffer_index < 0:
self._bitstream_buffer_index = np.int32(self._position)
self._position += 1
self._bitstream_cache = (self._bitstream_cache << length) | code
self._bitstream_cache_bitcount += length
if self._bitstream_cache_bitcount > 7:
self._bitstream_end()
def _bitstream_flush(self):
"""Flushes the bitstream."""
if self._bitstream_cache_bitcount > 0:
if self._bitstream_buffer_index < 0:
self._bitstream_buffer_index = np.int32(self._position)
self._position += 1
self._last_point.write_code(CodeWords.ENDOFSTREAM)
if self._bitstream_cache_bitcount > 7:
self._bitstream_end()
if self._bitstream_cache_bitcount > 0:
# Make up 8 bits by padding
self._bitstream_cache <<= (8 - self._bitstream_cache_bitcount)
self._data[self._bitstream_buffer_index] = np.uint8(self._bitstream_cache)
self._bitstream_cache = np.int32(0)
self._bitstream_buffer_index = np.int32(-1)
self._bitstream_cache_bitcount = np.int32(0)
def _bitstream_end(self):
"""Ends the bitstream by flushing full bytes."""
while self._bitstream_cache_bitcount > 7:
self._data[self._bitstream_buffer_index] = np.uint8(
self._bitstream_cache >> (self._bitstream_cache_bitcount - 8)
)
self._bitstream_cache_bitcount -= 8
if self._bitstream_cache_bitcount > 0:
self._bitstream_buffer_index = np.int32(self._position)
self._position += 1
else:
self._bitstream_buffer_index = np.int32(-1)
def _encode7bit_uint32(self, value: np.uint32):
"""Encodes a uint32 using 7-bit variable length encoding."""
if value < 128:
self._data[self._position] = np.uint8(value)
self._position += 1
return
self._data[self._position] = np.uint8(value | 128)
if value < 16384:
self._data[self._position + 1] = np.uint8(value >> 7)
self._position += 2
return
self._data[self._position + 1] = np.uint8((value >> 7) | 128)
if value < 2097152:
self._data[self._position + 2] = np.uint8(value >> 14)
self._position += 3
return
self._data[self._position + 2] = np.uint8((value >> 14) | 128)
if value < 268435456:
self._data[self._position + 3] = np.uint8(value >> 21)
self._position += 4
return
self._data[self._position + 3] = np.uint8((value >> 21) | 128)
self._data[self._position + 4] = np.uint8(value >> 28)
self._position += 5
def _encode7bit_uint64(self, value: np.uint64):
"""Encodes a uint64 using 7-bit variable length encoding."""
if value < 128:
self._data[self._position] = np.uint8(value)
self._position += 1
return
self._data[self._position] = np.uint8(value | 128)
if value < 16384:
self._data[self._position + 1] = np.uint8(value >> 7)
self._position += 2
return
self._data[self._position + 1] = np.uint8((value >> 7) | 128)
if value < 2097152:
self._data[self._position + 2] = np.uint8(value >> 14)
self._position += 3
return
self._data[self._position + 2] = np.uint8((value >> 14) | 128)
if value < 268435456:
self._data[self._position + 3] = np.uint8(value >> 21)
self._position += 4
return
self._data[self._position + 3] = np.uint8((value >> 21) | 128)
if value < 34359738368:
self._data[self._position + 4] = np.uint8(value >> 28)
self._position += 5
return
self._data[self._position + 4] = np.uint8((value >> 28) | 128)
if value < 4398046511104:
self._data[self._position + 5] = np.uint8(value >> 35)
self._position += 6
return
self._data[self._position + 5] = np.uint8((value >> 35) | 128)
if value < 562949953421312:
self._data[self._position + 6] = np.uint8(value >> 42)
self._position += 7
return
self._data[self._position + 6] = np.uint8((value >> 42) | 128)
if value < 72057594037927936:
self._data[self._position + 7] = np.uint8(value >> 49)
self._position += 8
return
self._data[self._position + 7] = np.uint8((value >> 49) | 128)
self._data[self._position + 8] = np.uint8(value >> 56)
self._position += 9