Source code for sttp.transport.subscriberconnection

# ******************************************************************************************************
#  subscriberconnection.py - Gbtc
#
#  Copyright © 2026, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  01/06/2026 - Generated by porting C++ SubscriberConnection
#       Ported from cppapi/src/lib/transport/SubscriberConnection.{h,cpp}
#
# ******************************************************************************************************

# Ported from cppapi/src/lib/transport/SubscriberConnection.cpp : class SubscriberConnection
# Differences: Python uses socket and threading; otherwise parity maintained.

from gsf import Empty, Limits
from gsf.endianorder import BigEndian
from ..data.dataset import DataSet
from ..data.filterexpressionparser import FilterExpressionParser
from .measurement import Measurement
from .compactmeasurement import CompactMeasurement
from .signalindexcache import SignalIndexCache
from .constants import OperationalModes, OperationalEncoding, ServerCommand, ServerResponse
from .constants import DataPacketFlags, Defaults
from .tssc.encoder import Encoder as TSSCEncoder
from ..ticks import Ticks
from typing import List, Optional, Set, TYPE_CHECKING, Tuple
from uuid import UUID, uuid4
from threading import Thread, Lock, Timer
from time import time
import os
import socket
import gzip
import numpy as np

if TYPE_CHECKING:
    from .datapublisher import DataPublisher

MAXPACKET_SIZE = 32768
PAYLOADHEADER_SIZE = 4
RESPONSEHEADER_SIZE = 6
TSSC_BUFFER_SIZE = 32768
DEFAULT_LAGTIME = 10.0
DEFAULT_LEADTIME = 5.0
DEFAULT_PUBLISHINTERVAL = 1.0


[docs] class SubscriberConnection: """ Represents a connection to a data subscriber. """ def __init__(self, parent: 'DataPublisher', client_socket: socket.socket, address: Tuple[str, int]): """ Creates a new subscriber connection. """ self._parent = parent self._socket = client_socket self._address = address # Connection identification self._subscriber_id = uuid4() self._instance_id = uuid4() self._connection_id = f"{address[0]}:{address[1]}" self._hostname = address[0] self._ipaddress = address[0] # Protocol state self._version = np.byte(0) self._operational_modes = np.uint32(OperationalModes.NOFLAGS) self._encoding = OperationalEncoding.UTF8 # Subscription state self._validated = False self._connection_accepted = False self._subscribed = False self._subscription_info = "" self._stopped = True # Temporal subscription parameters self._start_time_constraint = np.int64(Limits.MAXINT64) self._stop_time_constraint = np.int64(Limits.MAXINT64) self._processing_interval = np.int32(-1) self._temporal_subscription_canceled = False # Compression and encoding options self._using_payload_compression = False self._include_time = True self._use_localclock_as_realtime = False self._enable_time_reasonability_check = False self._lag_time = DEFAULT_LAGTIME self._lead_time = DEFAULT_LEADTIME self._publish_interval = DEFAULT_PUBLISHINTERVAL self._use_millisecond_resolution = False self._track_latest_measurements = False self._is_nan_filtered = parent.is_nan_value_filter_forced if parent else False # Signal index cache self._signal_index_cache: Optional[SignalIndexCache] = None self._signal_index_cache_lock = Lock() # TSSC compression self._tssc_encoder = TSSCEncoder() self._tssc_working_buffer = bytearray(TSSC_BUFFER_SIZE) self._tssc_reset_requested = False self._tssc_sequence_number = 0 # Start at 0, decoder expects 0 for first packet # Base time offsets for compact encoding self._time_index = np.int32(0) self._base_time_offsets = [np.int64(0), np.int64(0)] # Statistics self._total_commandchannel_bytes_sent = np.uint64(0) self._total_datachannel_bytes_sent = np.uint64(0) self._total_measurements_sent = np.uint64(0) # Threading self._read_thread: Optional[Thread] = None self._ping_timer: Optional[Timer] = None self._write_lock = Lock() # Cipher keys (for encrypted data channel - simplified for initial implementation) self._keys = [bytearray(), bytearray()] self._ivs = [bytearray(), bytearray()] # Properties @property def connection_id(self) -> str: """Gets the connection identifier (IP:port).""" return self._connection_id @property def subscriber_id(self) -> UUID: """Gets the subscriber UUID.""" return self._subscriber_id @subscriber_id.setter def subscriber_id(self, value: UUID): """Sets the subscriber UUID.""" self._subscriber_id = value @property def instance_id(self) -> UUID: """Gets the instance UUID.""" return self._instance_id @property def connection_id(self) -> str: """Gets the connection identification string.""" return self._connection_id @property def hostname(self) -> str: """Gets the subscriber hostname.""" return self._hostname @property def ipaddress(self) -> str: """Gets the subscriber IP address.""" return self._ipaddress @property def is_validated(self) -> bool: """Gets flag indicating if subscriber has been validated.""" return self._validated @property def is_connected(self) -> bool: """Gets flag indicating if subscriber is connected.""" return self._connection_accepted @property def is_subscribed(self) -> bool: """Gets flag indicating if subscriber is subscribed.""" return self._subscribed @is_subscribed.setter def is_subscribed(self, value: bool): """Sets subscription state.""" self._subscribed = value @property def is_temporal_subscription(self) -> bool: """Gets flag indicating if this is a temporal subscription.""" return self._start_time_constraint < Limits.MAXINT64 @property def version(self) -> np.byte: """Gets the subscriber protocol version.""" return self._version @property def operational_modes(self) -> np.uint32: """Gets the operational modes.""" return self._operational_modes @property def encoding(self) -> np.uint32: """Gets the string encoding.""" return self._encoding @property def using_payload_compression(self) -> bool: """Gets flag indicating if payload compression (TSSC) is enabled.""" return self._using_payload_compression @property def signal_index_cache(self) -> Optional[SignalIndexCache]: """Gets the signal index cache.""" with self._signal_index_cache_lock: return self._signal_index_cache # Connection management
[docs] def start(self): """Starts the subscriber connection.""" try: # Set socket options self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._socket.settimeout(30.0) self._connection_accepted = True self._stopped = False # Start ping timer (every 5 seconds) self._start_ping_timer() # Start read thread self._read_thread = Thread(target=self._read_command_channel, daemon=True) self._read_thread.start() except Exception as ex: self._parent._dispatch_error_message(f"Failed to start subscriber connection: {ex}") self.stop()
[docs] def stop(self): """Stops the subscriber connection.""" if self._stopped: return self._stopped = True self._connection_accepted = False self._subscribed = False # Stop ping timer if self._ping_timer: self._ping_timer.cancel() self._ping_timer = None # Close socket try: self._socket.shutdown(socket.SHUT_RDWR) except: pass try: self._socket.close() except: pass # Notify parent if self._parent: self._parent._connection_terminated(self)
def _start_ping_timer(self): """Starts the periodic ping timer.""" def _ping(): if not self._stopped: try: self.send_response(ServerResponse.NODATA, ServerCommand.SUBSCRIBE) except: pass self._ping_timer = Timer(5.0, _ping) self._ping_timer.daemon = True self._ping_timer.start() self._ping_timer = Timer(5.0, _ping) self._ping_timer.daemon = True self._ping_timer.start() # Command channel I/O def _read_command_channel(self): """Background thread that reads commands from the subscriber.""" try: while not self._stopped: try: # Read payload header (4 bytes) header = self._socket.recv(PAYLOADHEADER_SIZE) if not header or len(header) < PAYLOADHEADER_SIZE: break # Parse payload length payload_length = BigEndian.to_uint32(header) if payload_length == 0: continue if payload_length > MAXPACKET_SIZE: self._parent._dispatch_error_message( f"Client {self._connection_id} sent packet with invalid size: {payload_length}" ) break # Read payload payload = bytearray() remaining = payload_length while remaining > 0: chunk = self._socket.recv(min(remaining, 8192)) if not chunk: raise ConnectionError("Connection closed while reading payload") payload.extend(chunk) remaining -= len(chunk) # Parse and handle command self._parse_command(payload) except TimeoutError: # Socket timeout is expected when there's no data to read, just continue continue except Exception as ex: import traceback traceback.print_exc() if not self._stopped: self._parent._dispatch_error_message( f"Error reading from {self._connection_id}: {ex}" ) finally: self.stop() def _parse_command(self, buffer: bytearray): """Parses and handles a command from the subscriber.""" if len(buffer) < 1: return command = buffer[0] data = buffer[1:] if len(buffer) > 1 else bytearray() try: if command == ServerCommand.SUBSCRIBE: self._handle_subscribe(data) elif command == ServerCommand.UNSUBSCRIBE: self._handle_unsubscribe() elif command == ServerCommand.METADATAREFRESH: self._handle_metadata_refresh(data) elif command == ServerCommand.ROTATECIPHERKEYS: self._handle_rotate_cipher_keys() elif command == ServerCommand.UPDATEPROCESSINGINTERVAL: self._handle_update_processing_interval(data) elif command == ServerCommand.DEFINEOPERATIONALMODES: self._handle_define_operational_modes(data) elif command == ServerCommand.CONFIRMUPDATESIGNALINDEXCACHE: self._handle_confirm_signal_index_cache(data) elif command == ServerCommand.CONFIRMNOTIFICATION: self._handle_confirm_notification(data) elif command == ServerCommand.CONFIRMUPDATEBASETIMES: self._handle_confirm_update_base_times(data) elif command >= ServerCommand.USERCOMMAND00 and command <= ServerCommand.USERCOMMAND15: self._handle_user_command(command, data) else: self._parent._dispatch_error_message( f"Client {self._connection_id} sent unrecognized command: 0x{command:02X}" ) except Exception as ex: self._parent._dispatch_error_message( f"Error handling command 0x{command:02X} from {self._connection_id}: {ex}" )
[docs] def send_response(self, response_code: np.uint8, command_code: np.uint8, data: Optional[bytes] = None, message: Optional[str] = None) -> bool: """Sends a response to the subscriber.""" try: # Build response packet if message is not None: data = self._encode_string(message) elif data is None: data = bytearray() # Response header structure (6 bytes total): # - byte 0: response code # - byte 1: command code # - bytes 2-5: payload data length (4 bytes, big-endian uint32) # Followed by payload data packet = bytearray() packet.append(response_code) packet.append(command_code) packet.extend(BigEndian.from_uint32(len(data))) packet.extend(data) # Send with payload header (total packet length) with self._write_lock: header = BigEndian.from_uint32(len(packet)) total_bytes = header + packet self._socket.sendall(total_bytes) self._total_commandchannel_bytes_sent += len(header) + len(packet) return True except Exception as ex: self._parent._dispatch_error_message( f"Failed to send response to {self._connection_id}: {ex}" ) return False
[docs] def send_data_packet(self, command_code: np.uint8, data: bytes) -> bool: """Sends a data packet (server command) to the subscriber.""" try: # Data packet structure (5 bytes header + data): # - byte 0: command code # - bytes 1-4: payload data length (4 bytes, big-endian uint32) # Followed by payload data packet = bytearray() packet.append(command_code) packet.extend(BigEndian.from_uint32(len(data))) packet.extend(data) # Send with payload header (total packet length) with self._write_lock: header = BigEndian.from_uint32(len(packet)) self._socket.sendall(header + packet) self._total_commandchannel_bytes_sent += len(header) + len(packet) return True except Exception as ex: self._parent._dispatch_error_message( f"Failed to send data packet to {self._connection_id}: {ex}" ) return False
# Command handlers def _handle_subscribe(self, data: bytearray): """Handles a subscribe request from the subscriber.""" try: # Parse subscription info # Format (per C++ HandleSubscribe): # - 1 byte: flags # - 4 bytes: connection string length (big-endian) # - N bytes: connection string (contains filter expression as key=value pair) offset = 0 # Read flags if len(data) < 1: self._handle_subscribe_failure("Insufficient data for subscription flags") return flags = data[offset] offset += 1 # Read connection string length (4 bytes, big-endian uint32) if len(data) < offset + 4: self._handle_subscribe_failure("Insufficient data for connection string length") return connection_string_length = BigEndian.to_uint32(data[offset:offset+4]) offset += 4 # Read connection string if len(data) < offset + connection_string_length: self._handle_subscribe_failure(f"Insufficient data for connection string") return connection_string_bytes = data[offset:offset+connection_string_length] connection_string = connection_string_bytes.decode('utf-8') if connection_string_bytes else "" offset += connection_string_length # Parse connection string for parameters # Format: key1=value1;key2=value2;... params = {} if connection_string: for pair in connection_string.split(';'): if '=' in pair: key, value = pair.split('=', 1) params[key.strip()] = value.strip() # Extract filter expression from connection string filter_expression = params.get('filterExpression', '') # Remove curly braces if present (subscriber wraps filter in braces) if filter_expression.startswith('{') and filter_expression.endswith('}'): filter_expression = filter_expression[1:-1] if not filter_expression: filter_expression = "FILTER ActiveMeasurements WHERE ID IS NOT NULL" self._parent._dispatch_status_message( f"{self._connection_id} subscribed with expression: {filter_expression}" ) # Parse filter expression and build signal index cache signal_ids = self._parse_subscription_request(filter_expression) if not signal_ids: self._handle_subscribe_failure("No measurements matched subscription filter") return # Create signal index cache cache = SignalIndexCache() cache.subscriber_id = self._subscriber_id for idx, signal_id in enumerate(signal_ids): cache.add_measurement_key(idx, signal_id, Empty.STRING, Empty.STRING) # Store cache with self._signal_index_cache_lock: self._signal_index_cache = cache # Update routing tables self._parent._routing_tables.update_routes(self, signal_ids) # Send success response first (no data payload) self.send_response(ServerResponse.SUCCEEDED, ServerCommand.SUBSCRIBE) # Then send signal index cache as a response with UPDATESIGNALINDEXCACHE response code serialized_cache = self._serialize_signal_index_cache(cache) self.send_response(ServerResponse.UPDATESIGNALINDEXCACHE, ServerCommand.SUBSCRIBE, data=serialized_cache) self._subscribed = True self._parent._dispatch_status_message( f"{self._connection_id} subscribed to {len(signal_ids)} measurements" ) except Exception as ex: import traceback traceback.print_exc() # Convert exception to string carefully to avoid pickle issues try: error_msg = str(ex) except Exception: error_msg = repr(ex) self._handle_subscribe_failure(f"Failed to process subscription: {error_msg}") def _handle_subscribe_failure(self, message: str): """Handles subscription failure.""" self.send_response(ServerResponse.FAILED, ServerCommand.SUBSCRIBE, message=message) self._parent._dispatch_error_message(f"{self._connection_id}: {message}") def _handle_unsubscribe(self): """Handles an unsubscribe request.""" self._subscribed = False # Remove from routing tables self._parent._routing_tables.remove_routes(self) with self._signal_index_cache_lock: self._signal_index_cache = None self.send_response(ServerResponse.SUCCEEDED, ServerCommand.UNSUBSCRIBE, message="Unsubscribe successful") self._parent._dispatch_status_message(f"{self._connection_id} unsubscribed") def _handle_metadata_refresh(self, data: bytearray): """Handles a metadata refresh request.""" try: metadata = self._parent.metadata if not metadata: self.send_response(ServerResponse.FAILED, ServerCommand.METADATAREFRESH, message="No metadata available") return # Serialize metadata (XML format) metadata_xml = metadata.to_xml() metadata_bytes = metadata_xml.encode('utf-8') # Save to file for debugging with open("C:\\temp\\publisher_metadata.xml", "w", encoding="utf-8") as f: f.write(metadata_xml) # Compress if requested compress_metadata = bool(int(self._operational_modes) & int(OperationalModes.COMPRESSMETADATA)) if compress_metadata: uncompressed_size = len(metadata_bytes) metadata_bytes = gzip.compress(metadata_bytes) self._parent._dispatch_status_message( f"Compressed metadata from {uncompressed_size} to {len(metadata_bytes)} bytes" ) else: self._parent._dispatch_status_message( f"Sent {len(metadata_bytes)} bytes of metadata to {self._connection_id}" ) self.send_response(ServerResponse.SUCCEEDED, ServerCommand.METADATAREFRESH, data=metadata_bytes) except Exception as ex: import traceback traceback.print_exc() self.send_response(ServerResponse.FAILED, ServerCommand.METADATAREFRESH, message=f"Failed to send metadata: {ex}") def _handle_rotate_cipher_keys(self): """Handles cipher key rotation (simplified - not fully implemented).""" # For now, just acknowledge self.send_response(ServerResponse.SUCCEEDED, ServerCommand.ROTATECIPHERKEYS) def _handle_update_processing_interval(self, data: bytearray): """Handles processing interval update request.""" try: if len(data) >= 4: interval = BigEndian.to_int32(data) self._processing_interval = interval self.send_response(ServerResponse.SUCCEEDED, ServerCommand.UPDATEPROCESSINGINTERVAL, message=f"Processing interval set to {interval}ms") self._parent._dispatch_status_message( f"{self._connection_id} processing interval set to {interval}ms" ) else: self.send_response(ServerResponse.FAILED, ServerCommand.UPDATEPROCESSINGINTERVAL, message="Invalid data length") except Exception as ex: self.send_response(ServerResponse.FAILED, ServerCommand.UPDATEPROCESSINGINTERVAL, message=str(ex)) def _handle_define_operational_modes(self, data: bytearray): """Handles operational modes definition.""" try: if len(data) < 4: self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES, message="Invalid data length") return modes = BigEndian.to_uint32(data) version = np.uint8(modes & OperationalModes.PRESTANDARDVERSIONMASK) # Validate version (support versions 1-3 like C++ implementation) if version < 1 or version > 3: message = f"Client connection rejected: requested protocol version {version} not supported. This STTP data publisher implementation only supports version 1 to 3 of the protocol." self._parent._dispatch_error_message( f"{message} Operational modes may not be set correctly for client \"{self._connection_id}\" -- disconnecting client" ) self._validated = False self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES, message=message) # Schedule disconnection after a moment to allow response to be sent import threading def delayed_disconnect(): import time time.sleep(1) self.stop() threading.Thread(target=delayed_disconnect, daemon=True).start() return # Set the negotiated version self._version = version # Set cache index if version > 1 if self._version > 1: self._current_cache_index = np.uint8(1) self._operational_modes = np.uint32(modes) self._encoding = modes & OperationalModes.ENCODINGMASK # Check for TSSC compression self._using_payload_compression = bool(modes & OperationalModes.COMPRESSPAYLOADDATA) # Validate and send response self._validated = True message = f"STTP v{version} client connection accepted: requested operational modes applied." self.send_response(ServerResponse.SUCCEEDED, ServerCommand.DEFINEOPERATIONALMODES, message=message) self._parent._dispatch_status_message( f"{self._connection_id} STTP v{version} operational modes set: 0x{modes:08X}" ) except Exception as ex: self.send_response(ServerResponse.FAILED, ServerCommand.DEFINEOPERATIONALMODES, message=str(ex)) def _handle_confirm_signal_index_cache(self, data: bytearray): """Handles signal index cache confirmation.""" # Client confirmed receipt of signal index cache self._parent._dispatch_status_message( f"{self._connection_id} confirmed signal index cache" ) def _handle_confirm_notification(self, data: bytearray): """Handles notification confirmation.""" pass def _handle_confirm_update_base_times(self, data: bytearray): """Handles base time update confirmation.""" pass def _handle_user_command(self, command: np.uint8, data: bytearray): """Handles user-defined command.""" # Dispatch to parent for handling if self._parent.usercommand_callback: self._parent.usercommand_callback(self, command, bytes(data)) # Measurement publication
[docs] def publish_measurements(self, measurements: List[Measurement]): """Publishes measurements to the subscriber.""" if not self._subscribed or self._stopped: return try: if self._using_payload_compression: self._publish_tssc_measurements(measurements) else: self._publish_compact_measurements(measurements) except Exception as ex: self._parent._dispatch_error_message( f"Failed to publish measurements to {self._connection_id}: {ex}" )
def _publish_compact_measurements(self, measurements: List[Measurement]): """Publishes measurements using compact format.""" if not measurements: return # Build compact measurement packet packet = bytearray() # Data packet flags flags = DataPacketFlags.COMPACT # if self._include_time: # flags |= DataPacketFlags.SYNCHRONIZED packet.append(flags) # Encode each measurement for measurement in measurements: cache = self._signal_index_cache if not cache: continue # Get signal index index = cache.get_signal_index(measurement.signalid) if index < 0: continue # Encode compact measurement compact_bytes = CompactMeasurement.encode( index, measurement.timestamp, measurement.flags, measurement.value ) packet.extend(compact_bytes) if len(packet) > 1: self._send_data_packet(packet, len(measurements)) def _publish_tssc_measurements(self, measurements: List[Measurement]): """Publishes measurements using TSSC compression.""" if not measurements: return # Reset encoder buffer self._tssc_encoder.set_buffer(self._tssc_working_buffer, np.uint32(0), np.uint32(len(self._tssc_working_buffer))) count = 0 matched = 0 no_cache = 0 not_found = 0 try: for i, measurement in enumerate(measurements): cache = self._signal_index_cache if not cache: no_cache += 1 continue # Get signal index index = cache.get_signal_index(measurement.signalid) if index < 0: not_found += 1 continue matched += 1 # Try to add measurement to TSSC encoder add_result = self._tssc_encoder.try_add_measurement( np.int32(index), measurement.timestamp, measurement.flags, measurement.value ) if add_result: count += 1 else: # Buffer full, send what we have if count > 0: self._send_tssc_packet(count) count = 0 # Reset and try again self._tssc_encoder.set_buffer(self._tssc_working_buffer, np.uint32(0), np.uint32(len(self._tssc_working_buffer))) if self._tssc_encoder.try_add_measurement( np.int32(index), measurement.timestamp, measurement.flags, measurement.value ): count += 1 except Exception as loop_ex: self._parent._dispatch_error_message( f"Failed to encode measurements for {self._connection_id}: {loop_ex}" ) # Send remaining measurements if count > 0: self._send_tssc_packet(count) def _send_data_packet(self, packet: bytearray, count: int): """Sends a data packet to the subscriber.""" try: # Data packets are sent as ServerResponse.DATAPACKET responses self.send_response(ServerResponse.DATAPACKET, ServerCommand.SUBSCRIBE, data=bytes(packet)) self._total_measurements_sent += count except Exception as ex: self._parent._dispatch_error_message( f"Failed to send data packet to {self._connection_id}: {ex}" ) self.stop() def _send_tssc_packet(self, count: int): """Sends a TSSC-compressed packet.""" # Finish encoding length = self._tssc_encoder.finish_block() if length == 0: return # Build packet with TSSC header packet = bytearray() # Serialize data packet flags packet.append(DataPacketFlags.COMPRESSED) # Serialize total number of measurement values to follow packet.extend(BigEndian.from_int32(count)) # Add TSSC version number (85 is the recognized version) packet.append(85) # Add sequence number (uint16, not int32!) packet.extend(BigEndian.from_uint16(np.uint16(self._tssc_sequence_number))) self._tssc_sequence_number += 1 # Do not increment sequence number to 0 if self._tssc_sequence_number == 0: self._tssc_sequence_number = 1 # Add compressed data packet.extend(self._tssc_working_buffer[:length]) self._send_data_packet(packet, count) # Helper methods def _parse_subscription_request(self, filter_expression: str) -> Set[UUID]: """ Parses a subscription filter expression and returns matching signal IDs. Matches C++ SubscriberConnection::ParseSubscriptionRequest which filters from m_filteringMetadata (ActiveMeasurements table). """ signal_ids = set() try: # Define an empty schema if none has been defined if not self._parent._filtering_metadata: # Load ActiveMeasurementsSchema schema_path = os.path.join(os.path.dirname(__file__), "ActiveMeasurementsSchema.xml") with open(schema_path, 'r') as f: schema_xml = f.read() self._parent._filtering_metadata, err = DataSet.from_xml(schema_xml) if err: raise RuntimeError(f"Failed to load ActiveMeasurementsSchema.xml: {err}") # Get ActiveMeasurements table from filtering metadata active_measurements = self._parent._filtering_metadata.table("ActiveMeasurements") if not active_measurements: raise RuntimeError("ActiveMeasurements table not found in filtering metadata") # Parse filter expression against ActiveMeasurements # Matching C++: parser->SetDataSet(m_parent->m_filteringMetadata) and SetPrimaryTableName("ActiveMeasurements") # Use select_datarows which takes dataset and primary table name filtered_rows, err = FilterExpressionParser.select_datarows( self._parent._filtering_metadata, filter_expression, "ActiveMeasurements" ) if err is not None: raise RuntimeError(f"Error filtering ActiveMeasurements: {err}") if not filtered_rows: return signal_ids # Get SignalID column index signal_id_col = active_measurements.column_byname("SignalID") if not signal_id_col: raise RuntimeError("SignalID column not found in ActiveMeasurements table") # Extract signal IDs from filtered rows for row in filtered_rows: signal_id = row[signal_id_col.index] if signal_id and isinstance(signal_id, UUID): signal_ids.add(signal_id) except Exception as ex: self._parent._dispatch_error_message( f"Error parsing subscription request: {ex}" ) return signal_ids def _serialize_signal_index_cache(self, cache: SignalIndexCache) -> bytes: """Serializes a signal index cache for transmission.""" # Build signal index cache packet buffer = bytearray() # Reserve space for binary length (will be filled in later) binary_length_offset = len(buffer) buffer.extend(BigEndian.from_uint32(0)) # Write subscriber ID buffer.extend(self._subscriber_id.bytes) # Write reference count (number of signals) signal_mapping = cache.signal_id_cache buffer.extend(BigEndian.from_int32(len(signal_mapping))) # Write each signal with its metadata for index, signal_id in sorted(signal_mapping.items()): # Write runtime signal index buffer.extend(BigEndian.from_int32(index)) # Write signal ID buffer.extend(signal_id.bytes) # Write source (empty string for now - not currently tracked in cache) source = "" source_bytes = source.encode('utf-8') buffer.extend(BigEndian.from_uint32(len(source_bytes))) if source_bytes: buffer.extend(source_bytes) # Write ID (use signal index as ID) buffer.extend(BigEndian.from_uint64(np.uint64(index))) # Update binary length field (total size excluding the length field itself) binary_length = len(buffer) - 4 buffer[binary_length_offset:binary_length_offset+4] = BigEndian.from_uint32(binary_length) # Compress if requested compress_cache = self._operational_modes & OperationalModes.COMPRESSSIGNALINDEXCACHE if compress_cache: compressed = bytearray(gzip.compress(bytes(buffer))) # For protocol version > 1, prepend cache index byte if self._version > 1: result = bytearray() result.append(0x00) # Cache index 0 result.extend(compressed) return bytes(result) else: return bytes(compressed) else: # For protocol version > 1, prepend cache index byte even for uncompressed if self._version > 1: result = bytearray() result.append(0xFF) # Placeholder for cache index result.extend(buffer) return bytes(result) else: return bytes(buffer) def _encode_string(self, value: str) -> bytearray: """Encodes a string based on operational encoding.""" if self._encoding == OperationalEncoding.UTF8: encoded = value.encode('utf-8') elif self._encoding == OperationalEncoding.UTF16LE: encoded = value.encode('utf-16-le') elif self._encoding == OperationalEncoding.UTF16BE: encoded = value.encode('utf-16-be') else: encoded = value.encode('utf-8') # Prepend length result = bytearray() result.extend(BigEndian.from_int32(len(encoded))) result.extend(encoded) return result def _decode_string(self, data: bytes) -> str: """Decodes a string based on operational encoding.""" if self._encoding == OperationalEncoding.UTF8: return data.decode('utf-8') elif self._encoding == OperationalEncoding.UTF16LE: return data.decode('utf-16-le') elif self._encoding == OperationalEncoding.UTF16BE: return data.decode('utf-16-be') else: return data.decode('utf-8')