Source code for sttp.publisher

# ******************************************************************************************************
#  publisher.py - Gbtc
#
#  Copyright © 2026, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  01/06/2026 - Generated by porting C++ PublisherInstance
#       Ported from cppapi/src/lib/transport/PublisherInstance.{h,cpp}
#
# ******************************************************************************************************

# Ported from cppapi/src/lib/transport/PublisherInstance.cpp : class PublisherInstance
# Differences: Simplified Python wrapper; otherwise parity maintained.

from .data.dataset import DataSet
from .transport.datapublisher import DataPublisher
from .transport.measurement import Measurement
from .transport.subscriberconnection import SubscriberConnection
from typing import List, Optional, Callable
from threading import Lock
import sys


[docs] class Publisher: """ Represents an STTP data publisher. Notes ----- The `Publisher` class exists as a simplified implementation wrapping the `DataPublisher` class found in the `transport` namespace. This class maintains an internal instance of the `DataPublisher` class and is intended to simplify common uses of STTP data publication, similar to how `Subscriber` wraps `DataSubscriber`. """ def __init__(self): """ Creates a new `Publisher`. """ # DataPublisher reference self._datapublisher = DataPublisher() # Callback references self._statusmessage_logger: Optional[Callable[[str], None]] = self.default_statusmessage_logger self._errormessage_logger: Optional[Callable[[str], None]] = self.default_errormessage_logger self._clientconnected_receiver: Optional[Callable[[SubscriberConnection], None]] = self.default_clientconnected_receiver self._clientdisconnected_receiver: Optional[Callable[[SubscriberConnection], None]] = self.default_clientdisconnected_receiver # Lock used to synchronize console writes self._consolelock = Lock() # Wire up internal callbacks self._datapublisher.statusmessage_callback = self._handle_status_message self._datapublisher.errormessage_callback = self._handle_error_message self._datapublisher.clientconnected_callback = self._handle_client_connected self._datapublisher.clientdisconnected_callback = self._handle_client_disconnected
[docs] def dispose(self): """ Cleanly shuts down a `Publisher` that is no longer being used, e.g., during a normal application exit. """ if self._datapublisher is not None: self._datapublisher.dispose()
@property def started(self) -> bool: """ Gets flag that determines if `Publisher` is currently started and listening for connections. """ return self._datapublisher.is_started @property def statusmessage_logger(self) -> Optional[Callable[[str], None]]: """ Gets or sets a function to handle status messages. Signature: `def statusmessage_logger(message: str)` """ return self._statusmessage_logger @statusmessage_logger.setter def statusmessage_logger(self, value: Optional[Callable[[str], None]]): self._statusmessage_logger = value @property def errormessage_logger(self) -> Optional[Callable[[str], None]]: """ Gets or sets a function to handle error messages. Signature: `def errormessage_logger(message: str)` """ return self._errormessage_logger @errormessage_logger.setter def errormessage_logger(self, value: Optional[Callable[[str], None]]): self._errormessage_logger = value @property def clientconnected_receiver(self) -> Optional[Callable[[SubscriberConnection], None]]: """ Gets or sets a function to handle client connected events. Signature: `def clientconnected_receiver(connection: SubscriberConnection)` """ return self._clientconnected_receiver @clientconnected_receiver.setter def clientconnected_receiver(self, value: Optional[Callable[[SubscriberConnection], None]]): self._clientconnected_receiver = value @property def clientdisconnected_receiver(self) -> Optional[Callable[[SubscriberConnection], None]]: """ Gets or sets a function to handle client disconnected events. Signature: `def clientdisconnected_receiver(connection: SubscriberConnection)` """ return self._clientdisconnected_receiver @clientdisconnected_receiver.setter def clientdisconnected_receiver(self, value: Optional[Callable[[SubscriberConnection], None]]): self._clientdisconnected_receiver = value
[docs] def start(self, port: int, ipv6: bool = False): """ Starts the publisher listening on the specified port. Parameters ---------- port : int TCP port number to listen on ipv6 : bool, optional Set to True to use IPv6, False for IPv4 (default) """ self._datapublisher.start(port, ipv6)
[docs] def stop(self): """ Stops the publisher and disconnects all clients. """ self._datapublisher.stop()
[docs] def define_metadata(self, metadata: DataSet): """ Defines the metadata for the publisher from a DataSet. Parameters ---------- metadata : DataSet DataSet containing metadata tables (MeasurementDetail, DeviceDetail, etc.) """ self._datapublisher.define_metadata(metadata)
[docs] def filter_metadata(self, filter_expression: str) -> List: """ Filters metadata using a filter expression and returns matching measurement metadata. Parameters ---------- filter_expression : str Filter expression to apply to metadata (e.g., "SignalAcronym <> 'STAT'") Returns ------- List List of MeasurementMetadata objects matching the filter """ return self._datapublisher.filter_metadata(filter_expression)
[docs] def publish_measurements(self, measurements: List[Measurement]): """ Publishes a list of measurements to subscribed clients. Parameters ---------- measurements : List[Measurement] List of measurements to publish """ self._datapublisher.publish_measurements(measurements)
# Configuration properties (delegate to DataPublisher) @property def maximum_allowed_connections(self) -> int: """ Gets or sets the maximum number of allowed client connections. Set to -1 for unlimited connections. """ return self._datapublisher.maximum_allowed_connections @maximum_allowed_connections.setter def maximum_allowed_connections(self, value: int): self._datapublisher.maximum_allowed_connections = value @property def is_metadata_refresh_allowed(self) -> bool: """ Gets or sets flag that determines if metadata refresh is allowed by clients. """ return self._datapublisher.is_metadata_refresh_allowed @is_metadata_refresh_allowed.setter def is_metadata_refresh_allowed(self, value: bool): self._datapublisher.is_metadata_refresh_allowed = value @property def is_nan_value_filter_allowed(self) -> bool: """ Gets or sets flag that determines if NaN value filtering is allowed. """ return self._datapublisher.is_nan_value_filter_allowed @is_nan_value_filter_allowed.setter def is_nan_value_filter_allowed(self, value: bool): self._datapublisher.is_nan_value_filter_allowed = value @property def is_nan_value_filter_forced(self) -> bool: """ Gets or sets flag that determines if NaN value filtering is forced. """ return self._datapublisher.is_nan_value_filter_forced @is_nan_value_filter_forced.setter def is_nan_value_filter_forced(self, value: bool): self._datapublisher.is_nan_value_filter_forced = value # Internal callback handlers def _handle_status_message(self, message: str): """Internal handler for status messages.""" if self._statusmessage_logger: self._statusmessage_logger(message) def _handle_error_message(self, message: str): """Internal handler for error messages.""" if self._errormessage_logger: self._errormessage_logger(message) def _handle_client_connected(self, connection: SubscriberConnection): """Internal handler for client connected events.""" if self._clientconnected_receiver: self._clientconnected_receiver(connection) def _handle_client_disconnected(self, connection: SubscriberConnection): """Internal handler for client disconnected events.""" if self._clientdisconnected_receiver: self._clientdisconnected_receiver(connection) # Default callback implementations
[docs] def default_statusmessage_logger(self, message: str): """Default status message logger that writes to stdout.""" with self._consolelock: print(message, file=sys.stdout) sys.stdout.flush()
[docs] def default_errormessage_logger(self, message: str): """Default error message logger that writes to stderr.""" with self._consolelock: print(f"ERROR: {message}", file=sys.stderr) sys.stderr.flush()
[docs] def default_clientconnected_receiver(self, connection: SubscriberConnection): """Default client connected handler.""" with self._consolelock: print(f"Client connected: {connection.connection_id}", file=sys.stdout) sys.stdout.flush()
[docs] def default_clientdisconnected_receiver(self, connection: SubscriberConnection): """Default client disconnected handler.""" with self._consolelock: print(f"Client disconnected: {connection.connection_id}", file=sys.stdout) sys.stdout.flush()