# ******************************************************************************************************
# publisher.py - Gbtc
#
# Copyright © 2026, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/06/2026 - Generated by porting C++ PublisherInstance
# Ported from cppapi/src/lib/transport/PublisherInstance.{h,cpp}
#
# ******************************************************************************************************
# Ported from cppapi/src/lib/transport/PublisherInstance.cpp : class PublisherInstance
# Differences: Simplified Python wrapper; otherwise parity maintained.
from .data.dataset import DataSet
from .transport.datapublisher import DataPublisher
from .transport.measurement import Measurement
from .transport.subscriberconnection import SubscriberConnection
from typing import List, Optional, Callable
from threading import Lock
import sys
[docs]
class Publisher:
"""
Represents an STTP data publisher.
Notes
-----
The `Publisher` class exists as a simplified implementation wrapping the `DataPublisher`
class found in the `transport` namespace. This class maintains an internal instance
of the `DataPublisher` class and is intended to simplify common uses of STTP data
publication, similar to how `Subscriber` wraps `DataSubscriber`.
"""
def __init__(self):
"""
Creates a new `Publisher`.
"""
# DataPublisher reference
self._datapublisher = DataPublisher()
# Callback references
self._statusmessage_logger: Optional[Callable[[str], None]] = self.default_statusmessage_logger
self._errormessage_logger: Optional[Callable[[str], None]] = self.default_errormessage_logger
self._clientconnected_receiver: Optional[Callable[[SubscriberConnection], None]] = self.default_clientconnected_receiver
self._clientdisconnected_receiver: Optional[Callable[[SubscriberConnection], None]] = self.default_clientdisconnected_receiver
# Lock used to synchronize console writes
self._consolelock = Lock()
# Wire up internal callbacks
self._datapublisher.statusmessage_callback = self._handle_status_message
self._datapublisher.errormessage_callback = self._handle_error_message
self._datapublisher.clientconnected_callback = self._handle_client_connected
self._datapublisher.clientdisconnected_callback = self._handle_client_disconnected
[docs]
def dispose(self):
"""
Cleanly shuts down a `Publisher` that is no longer being used, e.g.,
during a normal application exit.
"""
if self._datapublisher is not None:
self._datapublisher.dispose()
@property
def started(self) -> bool:
"""
Gets flag that determines if `Publisher` is currently started and listening for connections.
"""
return self._datapublisher.is_started
@property
def statusmessage_logger(self) -> Optional[Callable[[str], None]]:
"""
Gets or sets a function to handle status messages.
Signature: `def statusmessage_logger(message: str)`
"""
return self._statusmessage_logger
@statusmessage_logger.setter
def statusmessage_logger(self, value: Optional[Callable[[str], None]]):
self._statusmessage_logger = value
@property
def errormessage_logger(self) -> Optional[Callable[[str], None]]:
"""
Gets or sets a function to handle error messages.
Signature: `def errormessage_logger(message: str)`
"""
return self._errormessage_logger
@errormessage_logger.setter
def errormessage_logger(self, value: Optional[Callable[[str], None]]):
self._errormessage_logger = value
@property
def clientconnected_receiver(self) -> Optional[Callable[[SubscriberConnection], None]]:
"""
Gets or sets a function to handle client connected events.
Signature: `def clientconnected_receiver(connection: SubscriberConnection)`
"""
return self._clientconnected_receiver
@clientconnected_receiver.setter
def clientconnected_receiver(self, value: Optional[Callable[[SubscriberConnection], None]]):
self._clientconnected_receiver = value
@property
def clientdisconnected_receiver(self) -> Optional[Callable[[SubscriberConnection], None]]:
"""
Gets or sets a function to handle client disconnected events.
Signature: `def clientdisconnected_receiver(connection: SubscriberConnection)`
"""
return self._clientdisconnected_receiver
@clientdisconnected_receiver.setter
def clientdisconnected_receiver(self, value: Optional[Callable[[SubscriberConnection], None]]):
self._clientdisconnected_receiver = value
[docs]
def start(self, port: int, ipv6: bool = False):
"""
Starts the publisher listening on the specified port.
Parameters
----------
port : int
TCP port number to listen on
ipv6 : bool, optional
Set to True to use IPv6, False for IPv4 (default)
"""
self._datapublisher.start(port, ipv6)
[docs]
def stop(self):
"""
Stops the publisher and disconnects all clients.
"""
self._datapublisher.stop()
[docs]
def publish_measurements(self, measurements: List[Measurement]):
"""
Publishes a list of measurements to subscribed clients.
Parameters
----------
measurements : List[Measurement]
List of measurements to publish
"""
self._datapublisher.publish_measurements(measurements)
# Configuration properties (delegate to DataPublisher)
@property
def maximum_allowed_connections(self) -> int:
"""
Gets or sets the maximum number of allowed client connections.
Set to -1 for unlimited connections.
"""
return self._datapublisher.maximum_allowed_connections
@maximum_allowed_connections.setter
def maximum_allowed_connections(self, value: int):
self._datapublisher.maximum_allowed_connections = value
@property
def is_metadata_refresh_allowed(self) -> bool:
"""
Gets or sets flag that determines if metadata refresh is allowed by clients.
"""
return self._datapublisher.is_metadata_refresh_allowed
@is_metadata_refresh_allowed.setter
def is_metadata_refresh_allowed(self, value: bool):
self._datapublisher.is_metadata_refresh_allowed = value
@property
def is_nan_value_filter_allowed(self) -> bool:
"""
Gets or sets flag that determines if NaN value filtering is allowed.
"""
return self._datapublisher.is_nan_value_filter_allowed
@is_nan_value_filter_allowed.setter
def is_nan_value_filter_allowed(self, value: bool):
self._datapublisher.is_nan_value_filter_allowed = value
@property
def is_nan_value_filter_forced(self) -> bool:
"""
Gets or sets flag that determines if NaN value filtering is forced.
"""
return self._datapublisher.is_nan_value_filter_forced
@is_nan_value_filter_forced.setter
def is_nan_value_filter_forced(self, value: bool):
self._datapublisher.is_nan_value_filter_forced = value
# Internal callback handlers
def _handle_status_message(self, message: str):
"""Internal handler for status messages."""
if self._statusmessage_logger:
self._statusmessage_logger(message)
def _handle_error_message(self, message: str):
"""Internal handler for error messages."""
if self._errormessage_logger:
self._errormessage_logger(message)
def _handle_client_connected(self, connection: SubscriberConnection):
"""Internal handler for client connected events."""
if self._clientconnected_receiver:
self._clientconnected_receiver(connection)
def _handle_client_disconnected(self, connection: SubscriberConnection):
"""Internal handler for client disconnected events."""
if self._clientdisconnected_receiver:
self._clientdisconnected_receiver(connection)
# Default callback implementations
[docs]
def default_statusmessage_logger(self, message: str):
"""Default status message logger that writes to stdout."""
with self._consolelock:
print(message, file=sys.stdout)
sys.stdout.flush()
[docs]
def default_errormessage_logger(self, message: str):
"""Default error message logger that writes to stderr."""
with self._consolelock:
print(f"ERROR: {message}", file=sys.stderr)
sys.stderr.flush()
[docs]
def default_clientconnected_receiver(self, connection: SubscriberConnection):
"""Default client connected handler."""
with self._consolelock:
print(f"Client connected: {connection.connection_id}", file=sys.stdout)
sys.stdout.flush()
[docs]
def default_clientdisconnected_receiver(self, connection: SubscriberConnection):
"""Default client disconnected handler."""
with self._consolelock:
print(f"Client disconnected: {connection.connection_id}", file=sys.stdout)
sys.stdout.flush()