# ******************************************************************************************************
# subscriberconnection.py - Gbtc
#
# Copyright © 2026, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/06/2026 - Generated by porting C++ SubscriberConnection
# Ported from cppapi/src/lib/transport/SubscriberConnection.{h,cpp}
#
# ******************************************************************************************************
# Ported from cppapi/src/lib/transport/SubscriberConnection.cpp : class SubscriberConnection
# Differences: Python uses socket and threading; otherwise parity maintained.
from gsf import Empty, Limits
from gsf.endianorder import BigEndian
from ..data.dataset import DataSet
from ..data.filterexpressionparser import FilterExpressionParser
from .measurement import Measurement
from .compactmeasurement import CompactMeasurement
from .signalindexcache import SignalIndexCache
from .constants import OperationalModes, OperationalEncoding, ServerCommand, ServerResponse
from .constants import DataPacketFlags, Defaults
from .tssc.encoder import Encoder as TSSCEncoder
from ..ticks import Ticks
from typing import List, Optional, Set, TYPE_CHECKING, Tuple
from uuid import UUID, uuid4
from threading import Thread, Lock, Timer
from time import time
import os
import socket
import gzip
import numpy as np
if TYPE_CHECKING:
from .datapublisher import DataPublisher
MAXPACKET_SIZE = 32768
PAYLOADHEADER_SIZE = 4
RESPONSEHEADER_SIZE = 6
TSSC_BUFFER_SIZE = 32768
DEFAULT_LAGTIME = 10.0
DEFAULT_LEADTIME = 5.0
DEFAULT_PUBLISHINTERVAL = 1.0
[docs]
class SubscriberConnection:
"""
Represents a connection to a data subscriber.
"""
def __init__(self, parent: 'DataPublisher', client_socket: socket.socket, address: Tuple[str, int]):
"""
Creates a new subscriber connection.
"""
self._parent = parent
self._socket = client_socket
self._address = address
# Connection identification
self._subscriber_id = uuid4()
self._instance_id = uuid4()
self._connection_id = f"{address[0]}:{address[1]}"
self._hostname = address[0]
self._ipaddress = address[0]
# Protocol state
self._version = np.byte(0)
self._operational_modes = np.uint32(OperationalModes.NOFLAGS)
self._encoding = OperationalEncoding.UTF8
# Subscription state
self._validated = False
self._connection_accepted = False
self._subscribed = False
self._subscription_info = ""
self._stopped = True
# Temporal subscription parameters
self._start_time_constraint = np.int64(Limits.MAXINT64)
self._stop_time_constraint = np.int64(Limits.MAXINT64)
self._processing_interval = np.int32(-1)
self._temporal_subscription_canceled = False
# Compression and encoding options
self._using_payload_compression = False
self._include_time = True
self._use_localclock_as_realtime = False
self._enable_time_reasonability_check = False
self._lag_time = DEFAULT_LAGTIME
self._lead_time = DEFAULT_LEADTIME
self._publish_interval = DEFAULT_PUBLISHINTERVAL
self._use_millisecond_resolution = False
self._track_latest_measurements = False
self._is_nan_filtered = parent.is_nan_value_filter_forced if parent else False
# Signal index cache
self._signal_index_cache: Optional[SignalIndexCache] = None
self._signal_index_cache_lock = Lock()
# TSSC compression
self._tssc_encoder = TSSCEncoder()
self._tssc_working_buffer = bytearray(TSSC_BUFFER_SIZE)
self._tssc_reset_requested = False
self._tssc_sequence_number = 0 # Start at 0, decoder expects 0 for first packet
# Base time offsets for compact encoding
self._time_index = np.int32(0)
self._base_time_offsets = [np.int64(0), np.int64(0)]
# Statistics
self._total_commandchannel_bytes_sent = np.uint64(0)
self._total_datachannel_bytes_sent = np.uint64(0)
self._total_measurements_sent = np.uint64(0)
# Threading
self._read_thread: Optional[Thread] = None
self._ping_timer: Optional[Timer] = None
self._write_lock = Lock()
# Cipher keys (for encrypted data channel - simplified for initial implementation)
self._keys = [bytearray(), bytearray()]
self._ivs = [bytearray(), bytearray()]
# Properties
@property
def connection_id(self) -> str:
"""Gets the connection identifier (IP:port)."""
return self._connection_id
@property
def subscriber_id(self) -> UUID:
"""Gets the subscriber UUID."""
return self._subscriber_id
@subscriber_id.setter
def subscriber_id(self, value: UUID):
"""Sets the subscriber UUID."""
self._subscriber_id = value
@property
def instance_id(self) -> UUID:
"""Gets the instance UUID."""
return self._instance_id
@property
def connection_id(self) -> str:
"""Gets the connection identification string."""
return self._connection_id
@property
def hostname(self) -> str:
"""Gets the subscriber hostname."""
return self._hostname
@property
def ipaddress(self) -> str:
"""Gets the subscriber IP address."""
return self._ipaddress
@property
def is_validated(self) -> bool:
"""Gets flag indicating if subscriber has been validated."""
return self._validated
@property
def is_connected(self) -> bool:
"""Gets flag indicating if subscriber is connected."""
return self._connection_accepted
@property
def is_subscribed(self) -> bool:
"""Gets flag indicating if subscriber is subscribed."""
return self._subscribed
@is_subscribed.setter
def is_subscribed(self, value: bool):
"""Sets subscription state."""
self._subscribed = value
@property
def is_temporal_subscription(self) -> bool:
"""Gets flag indicating if this is a temporal subscription."""
return self._start_time_constraint < Limits.MAXINT64
@property
def version(self) -> np.byte:
"""Gets the subscriber protocol version."""
return self._version
@property
def operational_modes(self) -> np.uint32:
"""Gets the operational modes."""
return self._operational_modes
@property
def encoding(self) -> np.uint32:
"""Gets the string encoding."""
return self._encoding
@property
def using_payload_compression(self) -> bool:
"""Gets flag indicating if payload compression (TSSC) is enabled."""
return self._using_payload_compression
@property
def signal_index_cache(self) -> Optional[SignalIndexCache]:
"""Gets the signal index cache."""
with self._signal_index_cache_lock:
return self._signal_index_cache
# Connection management
[docs]
def start(self):
"""Starts the subscriber connection."""
try:
# Set socket options
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._socket.settimeout(30.0)
self._connection_accepted = True
self._stopped = False
# Start ping timer (every 5 seconds)
self._start_ping_timer()
# Start read thread
self._read_thread = Thread(target=self._read_command_channel, daemon=True)
self._read_thread.start()
except Exception as ex:
self._parent._dispatch_error_message(f"Failed to start subscriber connection: {ex}")
self.stop()
[docs]
def stop(self):
"""Stops the subscriber connection."""
if self._stopped:
return
self._stopped = True
self._connection_accepted = False
self._subscribed = False
# Stop ping timer
if self._ping_timer:
self._ping_timer.cancel()
self._ping_timer = None
# Close socket
try:
self._socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self._socket.close()
except:
pass
# Notify parent
if self._parent:
self._parent._connection_terminated(self)
def _start_ping_timer(self):
"""Starts the periodic ping timer."""
def _ping():
if not self._stopped:
try:
self.send_response(ServerResponse.NODATA, ServerCommand.SUBSCRIBE)
except:
pass
self._ping_timer = Timer(5.0, _ping)
self._ping_timer.daemon = True
self._ping_timer.start()
self._ping_timer = Timer(5.0, _ping)
self._ping_timer.daemon = True
self._ping_timer.start()
# Command channel I/O
def _read_command_channel(self):
"""Background thread that reads commands from the subscriber."""
try:
while not self._stopped:
try:
# Read payload header (4 bytes)
header = self._socket.recv(PAYLOADHEADER_SIZE)
if not header or len(header) < PAYLOADHEADER_SIZE:
break
# Parse payload length
payload_length = BigEndian.to_uint32(header)
if payload_length == 0:
continue
if payload_length > MAXPACKET_SIZE:
self._parent._dispatch_error_message(
f"Client {self._connection_id} sent packet with invalid size: {payload_length}"
)
break
# Read payload
payload = bytearray()
remaining = payload_length
while remaining > 0:
chunk = self._socket.recv(min(remaining, 8192))
if not chunk:
raise ConnectionError("Connection closed while reading payload")
payload.extend(chunk)
remaining -= len(chunk)
# Parse and handle command
self._parse_command(payload)
except TimeoutError:
# Socket timeout is expected when there's no data to read, just continue
continue
except Exception as ex:
import traceback
traceback.print_exc()
if not self._stopped:
self._parent._dispatch_error_message(
f"Error reading from {self._connection_id}: {ex}"
)
finally:
self.stop()
def _parse_command(self, buffer: bytearray):
"""Parses and handles a command from the subscriber."""
if len(buffer) < 1:
return
command = buffer[0]
data = buffer[1:] if len(buffer) > 1 else bytearray()
try:
if command == ServerCommand.SUBSCRIBE:
self._handle_subscribe(data)
elif command == ServerCommand.UNSUBSCRIBE:
self._handle_unsubscribe()
elif command == ServerCommand.METADATAREFRESH:
self._handle_metadata_refresh(data)
elif command == ServerCommand.ROTATECIPHERKEYS:
self._handle_rotate_cipher_keys()
elif command == ServerCommand.UPDATEPROCESSINGINTERVAL:
self._handle_update_processing_interval(data)
elif command == ServerCommand.DEFINEOPERATIONALMODES:
self._handle_define_operational_modes(data)
elif command == ServerCommand.CONFIRMUPDATESIGNALINDEXCACHE:
self._handle_confirm_signal_index_cache(data)
elif command == ServerCommand.CONFIRMNOTIFICATION:
self._handle_confirm_notification(data)
elif command == ServerCommand.CONFIRMUPDATEBASETIMES:
self._handle_confirm_update_base_times(data)
elif command >= ServerCommand.USERCOMMAND00 and command <= ServerCommand.USERCOMMAND15:
self._handle_user_command(command, data)
else:
self._parent._dispatch_error_message(
f"Client {self._connection_id} sent unrecognized command: 0x{command:02X}"
)
except Exception as ex:
self._parent._dispatch_error_message(
f"Error handling command 0x{command:02X} from {self._connection_id}: {ex}"
)
[docs]
def send_response(self, response_code: np.uint8, command_code: np.uint8,
data: Optional[bytes] = None, message: Optional[str] = None) -> bool:
"""Sends a response to the subscriber."""
try:
# Build response packet
if message is not None:
data = self._encode_string(message)
elif data is None:
data = bytearray()
# Response header structure (6 bytes total):
# - byte 0: response code
# - byte 1: command code
# - bytes 2-5: payload data length (4 bytes, big-endian uint32)
# Followed by payload data
packet = bytearray()
packet.append(response_code)
packet.append(command_code)
packet.extend(BigEndian.from_uint32(len(data)))
packet.extend(data)
# Send with payload header (total packet length)
with self._write_lock:
header = BigEndian.from_uint32(len(packet))
total_bytes = header + packet
self._socket.sendall(total_bytes)
self._total_commandchannel_bytes_sent += len(header) + len(packet)
return True
except Exception as ex:
self._parent._dispatch_error_message(
f"Failed to send response to {self._connection_id}: {ex}"
)
return False
[docs]
def send_data_packet(self, command_code: np.uint8, data: bytes) -> bool:
"""Sends a data packet (server command) to the subscriber."""
try:
# Data packet structure (5 bytes header + data):
# - byte 0: command code
# - bytes 1-4: payload data length (4 bytes, big-endian uint32)
# Followed by payload data
packet = bytearray()
packet.append(command_code)
packet.extend(BigEndian.from_uint32(len(data)))
packet.extend(data)
# Send with payload header (total packet length)
with self._write_lock:
header = BigEndian.from_uint32(len(packet))
self._socket.sendall(header + packet)
self._total_commandchannel_bytes_sent += len(header) + len(packet)
return True
except Exception as ex:
self._parent._dispatch_error_message(
f"Failed to send data packet to {self._connection_id}: {ex}"
)
return False
# Command handlers
def _handle_subscribe(self, data: bytearray):
"""Handles a subscribe request from the subscriber."""
try:
# Parse subscription info
# Format (per C++ HandleSubscribe):
# - 1 byte: flags
# - 4 bytes: connection string length (big-endian)
# - N bytes: connection string (contains filter expression as key=value pair)
offset = 0
# Read flags
if len(data) < 1:
self._handle_subscribe_failure("Insufficient data for subscription flags")
return
flags = data[offset]
offset += 1
# Read connection string length (4 bytes, big-endian uint32)
if len(data) < offset + 4:
self._handle_subscribe_failure("Insufficient data for connection string length")
return
connection_string_length = BigEndian.to_uint32(data[offset:offset+4])
offset += 4
# Read connection string
if len(data) < offset + connection_string_length:
self._handle_subscribe_failure(f"Insufficient data for connection string")
return
connection_string_bytes = data[offset:offset+connection_string_length]
connection_string = connection_string_bytes.decode('utf-8') if connection_string_bytes else ""
offset += connection_string_length
# Parse connection string for parameters
# Format: key1=value1;key2=value2;...
params = {}
if connection_string:
for pair in connection_string.split(';'):
if '=' in pair:
key, value = pair.split('=', 1)
params[key.strip()] = value.strip()
# Extract filter expression from connection string
filter_expression = params.get('filterExpression', '')
# Remove curly braces if present (subscriber wraps filter in braces)
if filter_expression.startswith('{') and filter_expression.endswith('}'):
filter_expression = filter_expression[1:-1]
if not filter_expression:
filter_expression = "FILTER ActiveMeasurements WHERE ID IS NOT NULL"
self._parent._dispatch_status_message(
f"{self._connection_id} subscribed with expression: {filter_expression}"
)
# Parse filter expression and build signal index cache
signal_ids = self._parse_subscription_request(filter_expression)
if not signal_ids:
self._handle_subscribe_failure("No measurements matched subscription filter")
return
# Create signal index cache
cache = SignalIndexCache()
cache.subscriber_id = self._subscriber_id
for idx, signal_id in enumerate(signal_ids):
cache.add_measurement_key(idx, signal_id, Empty.STRING, Empty.STRING)
# Store cache
with self._signal_index_cache_lock:
self._signal_index_cache = cache
# Update routing tables
self._parent._routing_tables.update_routes(self, signal_ids)
# Send success response first (no data payload)
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.SUBSCRIBE)
# Then send signal index cache as a response with UPDATESIGNALINDEXCACHE response code
serialized_cache = self._serialize_signal_index_cache(cache)
self.send_response(ServerResponse.UPDATESIGNALINDEXCACHE, ServerCommand.SUBSCRIBE, data=serialized_cache)
self._subscribed = True
self._parent._dispatch_status_message(
f"{self._connection_id} subscribed to {len(signal_ids)} measurements"
)
except Exception as ex:
import traceback
traceback.print_exc()
# Convert exception to string carefully to avoid pickle issues
try:
error_msg = str(ex)
except Exception:
error_msg = repr(ex)
self._handle_subscribe_failure(f"Failed to process subscription: {error_msg}")
def _handle_subscribe_failure(self, message: str):
"""Handles subscription failure."""
self.send_response(ServerResponse.FAILED, ServerCommand.SUBSCRIBE, message=message)
self._parent._dispatch_error_message(f"{self._connection_id}: {message}")
def _handle_unsubscribe(self):
"""Handles an unsubscribe request."""
self._subscribed = False
# Remove from routing tables
self._parent._routing_tables.remove_routes(self)
with self._signal_index_cache_lock:
self._signal_index_cache = None
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.UNSUBSCRIBE,
message="Unsubscribe successful")
self._parent._dispatch_status_message(f"{self._connection_id} unsubscribed")
def _handle_metadata_refresh(self, data: bytearray):
"""Handles a metadata refresh request."""
try:
metadata = self._parent.metadata
if not metadata:
self.send_response(ServerResponse.FAILED, ServerCommand.METADATAREFRESH,
message="No metadata available")
return
# Serialize metadata (XML format)
metadata_xml = metadata.to_xml()
metadata_bytes = metadata_xml.encode('utf-8')
# Save to file for debugging
with open("C:\\temp\\publisher_metadata.xml", "w", encoding="utf-8") as f:
f.write(metadata_xml)
# Compress if requested
compress_metadata = bool(int(self._operational_modes) & int(OperationalModes.COMPRESSMETADATA))
if compress_metadata:
uncompressed_size = len(metadata_bytes)
metadata_bytes = gzip.compress(metadata_bytes)
self._parent._dispatch_status_message(
f"Compressed metadata from {uncompressed_size} to {len(metadata_bytes)} bytes"
)
else:
self._parent._dispatch_status_message(
f"Sent {len(metadata_bytes)} bytes of metadata to {self._connection_id}"
)
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.METADATAREFRESH,
data=metadata_bytes)
except Exception as ex:
import traceback
traceback.print_exc()
self.send_response(ServerResponse.FAILED, ServerCommand.METADATAREFRESH,
message=f"Failed to send metadata: {ex}")
def _handle_rotate_cipher_keys(self):
"""Handles cipher key rotation (simplified - not fully implemented)."""
# For now, just acknowledge
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.ROTATECIPHERKEYS)
def _handle_update_processing_interval(self, data: bytearray):
"""Handles processing interval update request."""
try:
if len(data) >= 4:
interval = BigEndian.to_int32(data)
self._processing_interval = interval
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.UPDATEPROCESSINGINTERVAL,
message=f"Processing interval set to {interval}ms")
self._parent._dispatch_status_message(
f"{self._connection_id} processing interval set to {interval}ms"
)
else:
self.send_response(ServerResponse.FAILED, ServerCommand.UPDATEPROCESSINGINTERVAL,
message="Invalid data length")
except Exception as ex:
self.send_response(ServerResponse.FAILED, ServerCommand.UPDATEPROCESSINGINTERVAL,
message=str(ex))
def _handle_define_operational_modes(self, data: bytearray):
"""Handles operational modes definition."""
try:
if len(data) < 4:
self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES,
message="Invalid data length")
return
modes = BigEndian.to_uint32(data)
version = np.uint8(modes & OperationalModes.PRESTANDARDVERSIONMASK)
# Validate version (support versions 1-3 like C++ implementation)
if version < 1 or version > 3:
message = f"Client connection rejected: requested protocol version {version} not supported. This STTP data publisher implementation only supports version 1 to 3 of the protocol."
self._parent._dispatch_error_message(
f"{message} Operational modes may not be set correctly for client \"{self._connection_id}\" -- disconnecting client"
)
self._validated = False
self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES,
message=message)
# Schedule disconnection after a moment to allow response to be sent
import threading
def delayed_disconnect():
import time
time.sleep(1)
self.stop()
threading.Thread(target=delayed_disconnect, daemon=True).start()
return
# Set the negotiated version
self._version = version
# Set cache index if version > 1
if self._version > 1:
self._current_cache_index = np.uint8(1)
self._operational_modes = np.uint32(modes)
self._encoding = modes & OperationalModes.ENCODINGMASK
# Check for TSSC compression
self._using_payload_compression = bool(modes & OperationalModes.COMPRESSPAYLOADDATA)
# Validate and send response
self._validated = True
message = f"STTP v{version} client connection accepted: requested operational modes applied."
self.send_response(ServerResponse.SUCCEEDED, ServerCommand.DEFINEOPERATIONALMODES,
message=message)
self._parent._dispatch_status_message(
f"{self._connection_id} STTP v{version} operational modes set: 0x{modes:08X}"
)
except Exception as ex:
self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES,
message=str(ex))
def _handle_confirm_signal_index_cache(self, data: bytearray):
"""Handles signal index cache confirmation."""
# Client confirmed receipt of signal index cache
self._parent._dispatch_status_message(
f"{self._connection_id} confirmed signal index cache"
)
def _handle_confirm_notification(self, data: bytearray):
"""Handles notification confirmation."""
pass
def _handle_confirm_update_base_times(self, data: bytearray):
"""Handles base time update confirmation."""
pass
def _handle_user_command(self, command: np.uint8, data: bytearray):
"""Handles user-defined command."""
# Dispatch to parent for handling
if self._parent.usercommand_callback:
self._parent.usercommand_callback(self, command, bytes(data))
# Measurement publication
[docs]
def publish_measurements(self, measurements: List[Measurement]):
"""Publishes measurements to the subscriber."""
if not self._subscribed or self._stopped:
return
try:
if self._using_payload_compression:
self._publish_tssc_measurements(measurements)
else:
self._publish_compact_measurements(measurements)
except Exception as ex:
self._parent._dispatch_error_message(
f"Failed to publish measurements to {self._connection_id}: {ex}"
)
def _publish_compact_measurements(self, measurements: List[Measurement]):
"""Publishes measurements using compact format."""
if not measurements:
return
# Build compact measurement packet
packet = bytearray()
# Data packet flags
flags = DataPacketFlags.COMPACT
# if self._include_time:
# flags |= DataPacketFlags.SYNCHRONIZED
packet.append(flags)
# Encode each measurement
for measurement in measurements:
cache = self._signal_index_cache
if not cache:
continue
# Get signal index
index = cache.get_signal_index(measurement.signalid)
if index < 0:
continue
# Encode compact measurement
compact_bytes = CompactMeasurement.encode(
index, measurement.timestamp, measurement.flags, measurement.value
)
packet.extend(compact_bytes)
if len(packet) > 1:
self._send_data_packet(packet, len(measurements))
def _publish_tssc_measurements(self, measurements: List[Measurement]):
"""Publishes measurements using TSSC compression."""
if not measurements:
return
# Reset encoder buffer
self._tssc_encoder.set_buffer(self._tssc_working_buffer, np.uint32(0),
np.uint32(len(self._tssc_working_buffer)))
count = 0
matched = 0
no_cache = 0
not_found = 0
try:
for i, measurement in enumerate(measurements):
cache = self._signal_index_cache
if not cache:
no_cache += 1
continue
# Get signal index
index = cache.get_signal_index(measurement.signalid)
if index < 0:
not_found += 1
continue
matched += 1
# Try to add measurement to TSSC encoder
add_result = self._tssc_encoder.try_add_measurement(
np.int32(index),
measurement.timestamp,
measurement.flags,
measurement.value
)
if add_result:
count += 1
else:
# Buffer full, send what we have
if count > 0:
self._send_tssc_packet(count)
count = 0
# Reset and try again
self._tssc_encoder.set_buffer(self._tssc_working_buffer, np.uint32(0),
np.uint32(len(self._tssc_working_buffer)))
if self._tssc_encoder.try_add_measurement(
np.int32(index),
measurement.timestamp,
measurement.flags,
measurement.value
):
count += 1
except Exception as loop_ex:
self._parent._dispatch_error_message(
f"Failed to encode measurements for {self._connection_id}: {loop_ex}"
)
# Send remaining measurements
if count > 0:
self._send_tssc_packet(count)
def _send_data_packet(self, packet: bytearray, count: int):
"""Sends a data packet to the subscriber."""
try:
# Data packets are sent as ServerResponse.DATAPACKET responses
self.send_response(ServerResponse.DATAPACKET, ServerCommand.SUBSCRIBE, data=bytes(packet))
self._total_measurements_sent += count
except Exception as ex:
self._parent._dispatch_error_message(
f"Failed to send data packet to {self._connection_id}: {ex}"
)
self.stop()
def _send_tssc_packet(self, count: int):
"""Sends a TSSC-compressed packet."""
# Finish encoding
length = self._tssc_encoder.finish_block()
if length == 0:
return
# Build packet with TSSC header
packet = bytearray()
# Serialize data packet flags
packet.append(DataPacketFlags.COMPRESSED)
# Serialize total number of measurement values to follow
packet.extend(BigEndian.from_int32(count))
# Add TSSC version number (85 is the recognized version)
packet.append(85)
# Add sequence number (uint16, not int32!)
packet.extend(BigEndian.from_uint16(np.uint16(self._tssc_sequence_number)))
self._tssc_sequence_number += 1
# Do not increment sequence number to 0
if self._tssc_sequence_number == 0:
self._tssc_sequence_number = 1
# Add compressed data
packet.extend(self._tssc_working_buffer[:length])
self._send_data_packet(packet, count)
# Helper methods
def _parse_subscription_request(self, filter_expression: str) -> Set[UUID]:
"""
Parses a subscription filter expression and returns matching signal IDs.
Matches C++ SubscriberConnection::ParseSubscriptionRequest which filters
from m_filteringMetadata (ActiveMeasurements table).
"""
signal_ids = set()
try:
# Define an empty schema if none has been defined
if not self._parent._filtering_metadata:
# Load ActiveMeasurementsSchema
schema_path = os.path.join(os.path.dirname(__file__), "ActiveMeasurementsSchema.xml")
with open(schema_path, 'r') as f:
schema_xml = f.read()
self._parent._filtering_metadata, err = DataSet.from_xml(schema_xml)
if err:
raise RuntimeError(f"Failed to load ActiveMeasurementsSchema.xml: {err}")
# Get ActiveMeasurements table from filtering metadata
active_measurements = self._parent._filtering_metadata.table("ActiveMeasurements")
if not active_measurements:
raise RuntimeError("ActiveMeasurements table not found in filtering metadata")
# Parse filter expression against ActiveMeasurements
# Matching C++: parser->SetDataSet(m_parent->m_filteringMetadata) and SetPrimaryTableName("ActiveMeasurements")
# Use select_datarows which takes dataset and primary table name
filtered_rows, err = FilterExpressionParser.select_datarows(
self._parent._filtering_metadata, filter_expression, "ActiveMeasurements"
)
if err is not None:
raise RuntimeError(f"Error filtering ActiveMeasurements: {err}")
if not filtered_rows:
return signal_ids
# Get SignalID column index
signal_id_col = active_measurements.column_byname("SignalID")
if not signal_id_col:
raise RuntimeError("SignalID column not found in ActiveMeasurements table")
# Extract signal IDs from filtered rows
for row in filtered_rows:
signal_id = row[signal_id_col.index]
if signal_id and isinstance(signal_id, UUID):
signal_ids.add(signal_id)
except Exception as ex:
self._parent._dispatch_error_message(
f"Error parsing subscription request: {ex}"
)
return signal_ids
def _serialize_signal_index_cache(self, cache: SignalIndexCache) -> bytes:
"""Serializes a signal index cache for transmission."""
# Build signal index cache packet
buffer = bytearray()
# Reserve space for binary length (will be filled in later)
binary_length_offset = len(buffer)
buffer.extend(BigEndian.from_uint32(0))
# Write subscriber ID
buffer.extend(self._subscriber_id.bytes)
# Write reference count (number of signals)
signal_mapping = cache.signal_id_cache
buffer.extend(BigEndian.from_int32(len(signal_mapping)))
# Write each signal with its metadata
for index, signal_id in sorted(signal_mapping.items()):
# Write runtime signal index
buffer.extend(BigEndian.from_int32(index))
# Write signal ID
buffer.extend(signal_id.bytes)
# Write source (empty string for now - not currently tracked in cache)
source = ""
source_bytes = source.encode('utf-8')
buffer.extend(BigEndian.from_uint32(len(source_bytes)))
if source_bytes:
buffer.extend(source_bytes)
# Write ID (use signal index as ID)
buffer.extend(BigEndian.from_uint64(np.uint64(index)))
# Update binary length field (total size excluding the length field itself)
binary_length = len(buffer) - 4
buffer[binary_length_offset:binary_length_offset+4] = BigEndian.from_uint32(binary_length)
# Compress if requested
compress_cache = self._operational_modes & OperationalModes.COMPRESSSIGNALINDEXCACHE
if compress_cache:
compressed = bytearray(gzip.compress(bytes(buffer)))
# For protocol version > 1, prepend cache index byte
if self._version > 1:
result = bytearray()
result.append(0x00) # Cache index 0
result.extend(compressed)
return bytes(result)
else:
return bytes(compressed)
else:
# For protocol version > 1, prepend cache index byte even for uncompressed
if self._version > 1:
result = bytearray()
result.append(0xFF) # Placeholder for cache index
result.extend(buffer)
return bytes(result)
else:
return bytes(buffer)
def _encode_string(self, value: str) -> bytearray:
"""Encodes a string based on operational encoding."""
if self._encoding == OperationalEncoding.UTF8:
encoded = value.encode('utf-8')
elif self._encoding == OperationalEncoding.UTF16LE:
encoded = value.encode('utf-16-le')
elif self._encoding == OperationalEncoding.UTF16BE:
encoded = value.encode('utf-16-be')
else:
encoded = value.encode('utf-8')
# Prepend length
result = bytearray()
result.extend(BigEndian.from_int32(len(encoded)))
result.extend(encoded)
return result
def _decode_string(self, data: bytes) -> str:
"""Decodes a string based on operational encoding."""
if self._encoding == OperationalEncoding.UTF8:
return data.decode('utf-8')
elif self._encoding == OperationalEncoding.UTF16LE:
return data.decode('utf-16-le')
elif self._encoding == OperationalEncoding.UTF16BE:
return data.decode('utf-16-be')
else:
return data.decode('utf-8')