Source code for sttp.publisher

# ******************************************************************************************************
#  publisher.py - Gbtc
#
#  Copyright © 2026, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  01/06/2026 - Generated by porting C++ PublisherInstance
#       Ported from cppapi/src/lib/transport/PublisherInstance.{h,cpp}
#
# ******************************************************************************************************

# Ported from cppapi/src/lib/transport/PublisherInstance.cpp : class PublisherInstance
# Differences: Simplified Python wrapper; otherwise parity maintained.

from .data.dataset import DataSet
from .metadata.signaltype import SignalType
from .metadata.signalreference import SignalKind, SignalReference
from .transport.constants import ServerResponse, ServerCommand 
from .transport.datapublisher import DataPublisher
from .transport.measurement import Measurement
from .transport.subscriberconnection import SubscriberConnection
from .metadata.record.measurement import MeasurementRecord
from typing import List, Callable
from threading import Lock
from datetime import datetime, timezone
from uuid import UUID
from xml.etree import ElementTree as ET
import re
import uuid
import os
import sys

[docs] class Publisher: """ Represents an STTP data publisher. Notes ----- The `Publisher` class exists as a simplified implementation wrapping the `DataPublisher` class found in the `transport` namespace. This class maintains an internal instance of the `DataPublisher` class and is intended to simplify common uses of STTP data publication, similar to how `Subscriber` wraps `DataSubscriber`. """ def __init__(self): """ Creates a new `Publisher`. """ # DataPublisher reference self._datapublisher = DataPublisher() # Callback references self._statusmessage_logger: Callable[[str], None] | None = self.default_statusmessage_logger self._errormessage_logger: Callable[[str], None] | None = self.default_errormessage_logger self._clientconnected_receiver: Callable[[SubscriberConnection], None] | None = self.default_clientconnected_receiver self._clientdisconnected_receiver: Callable[[SubscriberConnection], None] | None = self.default_clientdisconnected_receiver self._usercommand_receiver: Callable[[SubscriberConnection, ServerCommand, bytes], None] | None = None # Lock used to synchronize console writes self._consolelock = Lock() # Wire up internal callbacks self._datapublisher.statusmessage_callback = self._handle_status_message self._datapublisher.errormessage_callback = self._handle_error_message self._datapublisher.clientconnected_callback = self._handle_client_connected self._datapublisher.clientdisconnected_callback = self._handle_client_disconnected self._datapublisher.usercommand_callback = self._handle_user_command # Define metadata self._metadata_path : str | None = None
[docs] def dispose(self): """ Cleanly shuts down a `Publisher` that is no longer being used, e.g., during a normal application exit. """ if self._datapublisher is not None: self._datapublisher.dispose()
@property def started(self) -> bool: """ Gets flag that determines if `Publisher` is currently started and listening for connections. """ return self._datapublisher.is_started @property def statusmessage_logger(self) -> Callable[[str], None] | None: """ Gets or sets a function to handle status messages. Signature: `def statusmessage_logger(message: str)` """ return self._statusmessage_logger @statusmessage_logger.setter def statusmessage_logger(self, value: Callable[[str], None] | None): self._statusmessage_logger = value @property def errormessage_logger(self) -> Callable[[str], None] | None: """ Gets or sets a function to handle error messages. Signature: `def errormessage_logger(message: str)` """ return self._errormessage_logger @errormessage_logger.setter def errormessage_logger(self, value: Callable[[str], None] | None): self._errormessage_logger = value @property def clientconnected_receiver(self) -> Callable[[SubscriberConnection], None] | None: """ Gets or sets a function to handle client connected events. Signature: `def clientconnected_receiver(connection: SubscriberConnection)` """ return self._clientconnected_receiver @clientconnected_receiver.setter def clientconnected_receiver(self, value: Callable[[SubscriberConnection], None] | None): self._clientconnected_receiver = value @property def clientdisconnected_receiver(self) -> Callable[[SubscriberConnection], None] | None: """ Gets or sets a function to handle client disconnected events. Signature: `def clientdisconnected_receiver(connection: SubscriberConnection)` """ return self._clientdisconnected_receiver @clientdisconnected_receiver.setter def clientdisconnected_receiver(self, value: Callable[[SubscriberConnection], None] | None): self._clientdisconnected_receiver = value @property def usercommand_receiver(self) -> Callable[[SubscriberConnection, ServerCommand, bytes], None] | None: """ Gets or sets a function to handle user-defined commands from clients. Signature: `def usercommand_receiver(connection: SubscriberConnection, command: ServerCommand, data: bytes)` """ return self._usercommand_receiver @usercommand_receiver.setter def usercommand_receiver(self, value: Callable[[SubscriberConnection, ServerCommand, bytes], None] | None): self._usercommand_receiver = value @property def metadata_path(self) -> str | None: """ Gets or sets the file path to for persisted metadata used by the publisher. Setting this property will cause the publisher to verify that the file exists, and if not, attempt to create it from the included `MetadataTemplate.xml` file. """ return self._metadata_path @metadata_path.setter def metadata_path(self, value: str): self._metadata_path = value # Verify metadata file exists, creating from template if not found if not os.path.isfile(self._metadata_path): template_path = os.path.join(os.path.dirname(__file__), "MetadataTemplate.xml") if os.path.isfile(template_path): with open(template_path, "r") as template_file: template_content = template_file.read() with open(self._metadata_path, "w") as metadata_file: metadata_file.write(template_content) else: raise FileNotFoundError(f"Cannot create metadata '{self._metadata_path}': template not found at '{template_path}'") @property def subscriber_connections(self) -> List[SubscriberConnection]: """ Gets a list of currently connected clients. Returns ------- List[SubscriberConnection] List of currently connected clients """ with self._datapublisher._subscriber_connections_lock: connections = list(self._datapublisher._subscriber_connections) return connections
[docs] def start(self, port: int, ipv6: bool = False): """ Starts the publisher listening on the specified port. Parameters ---------- port : int TCP port number to listen on ipv6 : bool, optional Set to True to use IPv6, False for IPv4 (default) """ self._datapublisher.start(port, ipv6)
[docs] def stop(self): """ Stops the publisher and disconnects all clients. """ self._datapublisher.stop()
[docs] def define_metadata(self, metadata: DataSet): """ Defines the metadata for the publisher from a DataSet. Parameters ---------- metadata : DataSet DataSet containing metadata tables (MeasurementDetail, DeviceDetail, etc.) """ self._datapublisher.define_metadata(metadata)
[docs] def load_metadata(self) -> Exception | None: """ Loads metadata from the file specified by the 'metadata_path' property. Returns ------- Exception | None Returns an exception if loading metadata fails, otherwise None on success. """ self._validate_metadata_path() assert self.metadata_path is not None # Load metadata from XML file and define it for the publisher dataset, err = DataSet.from_xml(open(self.metadata_path).read()) if err is None: self.define_metadata(dataset) return None else: return ValueError(f"Failed to load metadata from '{self._metadata_path}': {err}")
[docs] def filter_metadata(self, filter_expression: str) -> List[MeasurementRecord]: """ Filters metadata using a filter expression against the 'MeasurementDetail' table and returns matching measurement metadata. Parameters ---------- filter_expression : str Filter expression to apply to metadata (e.g., "SignalAcronym <> 'STAT'") Returns ------- List[MeasurementRecord] List of MeasurementRecord objects matching the filter """ return self._datapublisher.filter_metadata(filter_expression)
[docs] def define_output_source(self, name: str, acronym: str | None = None) -> str: """ Defines an output source in the publisher metadata, i.e., a virtual device in the configuration used to group output measurements. This source device will be persisted to the XML metadata file defined at 'metadata_path' for future runs. Parameters ---------- name : str Name of the output source Returns ------- str Device `Acronym` of the output source """ self._validate_metadata_path() assert self.metadata_path is not None # Generate valid acronym, use name if acronym not provided if acronym is None: acronym = self.get_valid_acronym(name) else: acronym = self.get_valid_acronym(acronym) # Parse existing metadata tree = ET.parse(self.metadata_path) root = tree.getroot() # Check if device with this acronym already exists for device in root.findall('.//DeviceDetail'): existing_acronym = device.find('Acronym') if existing_acronym is not None and existing_acronym.text == acronym: # Record already exists, return acronym return acronym # Create new DeviceDetail element device_detail = ET.Element('DeviceDetail') # Add child elements ET.SubElement(device_detail, 'UniqueID').text = str(uuid.uuid4()) ET.SubElement(device_detail, 'IsConcentrator').text = '0' ET.SubElement(device_detail, 'Acronym').text = acronym ET.SubElement(device_detail, 'Name').text = name ET.SubElement(device_detail, 'AccessID').text = '0' ET.SubElement(device_detail, 'ParentAcronym') ET.SubElement(device_detail, 'ProtocolName').text = 'STTP' ET.SubElement(device_detail, 'FramesPerSecond').text = '1' ET.SubElement(device_detail, 'CompanyAcronym').text = 'GPA' ET.SubElement(device_detail, 'VendorAcronym') ET.SubElement(device_detail, 'VendorDeviceName') ET.SubElement(device_detail, 'Longitude').text = '0.0' ET.SubElement(device_detail, 'Latitude').text = '0.0' ET.SubElement(device_detail, 'InterconnectionName') ET.SubElement(device_detail, 'ContactList') ET.SubElement(device_detail, 'Enabled').text = '1' # Format timestamp with timezone offset now = datetime.now(timezone.utc) # Get local offset local_offset = datetime.now().astimezone().strftime('%z') formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}" timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset ET.SubElement(device_detail, 'UpdatedOn').text = timestamp # Insert the new DeviceDetail after schema but before SchemaVersion/MeasurementDetail # Order should be: DeviceDetail, xs:schema, SchemaVersion, MeasurementDetail insert_index = None schema_found = False for i, child in enumerate(root): # Track if we've seen the schema if child.tag.endswith('schema'): schema_found = True # If we've seen the schema, insert after it but before other elements elif schema_found and child.tag in ('SchemaVersion', 'MeasurementDetail', 'PhasorDetail'): insert_index = i break # If there's already a DeviceDetail after the schema, insert after the last one elif schema_found and child.tag == 'DeviceDetail': insert_index = i + 1 # If no schema found or no insertion point determined, insert at beginning if insert_index is None: insert_index = 0 root.insert(insert_index, device_detail) # Indent the entire tree to ensure proper formatting ET.indent(tree, space=" ") # Save the updated XML tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True) return acronym
[docs] def define_output_measurement(self, deviceacronym: str, pointtag: str, description: str, signaltype: str | SignalType = SignalType.CALC, sourceindex: int | None = None) -> UUID: """ Defines an output measurement in the publisher metadata. This measurement will be persisted to the XML metadata file defined at 'metadata_path' for future runs. Parameters ---------- deviceacronym : str Device `Acronym` of the output measurement pointtag : str Name of the output measurement description : str Description of the output measurement signaltype : str | SignalType, optional Signal type of the output measurement (default is `CALC`) sourceindex : int | None, optional If this measurement is part of a collection, value represents the source index of the measurement in the collection (default is None) Returns ------- UUID Signal ID of the output measurement """ self._validate_metadata_path() assert self.metadata_path is not None # Ensure valid device acronym deviceacronym = self.get_valid_acronym(deviceacronym) # Ensure valid point tag pointtag = self.get_valid_acronym(pointtag) # Parse existing metadata tree = ET.parse(self.metadata_path) root = tree.getroot() # Check if measurement with this point tag already exists for measurement in root.findall('.//MeasurementDetail'): existing_point_tag = measurement.find('PointTag') if existing_point_tag is not None and existing_point_tag.text == pointtag: # Record already exists, return signal ID signal_id_elem = measurement.find('SignalID') if signal_id_elem is not None and signal_id_elem.text: return UUID(signal_id_elem.text) # Find the next available index for this device max_index = 0 for measurement in root.findall('.//MeasurementDetail'): device_elem = measurement.find('DeviceAcronym') id_elem = measurement.find('ID') if device_elem is not None and device_elem.text == deviceacronym: if id_elem is not None and id_elem.text: # Parse the index from ID field (format: DEVICE:INDEX) parts = id_elem.text.split(':') if len(parts) == 2: try: index = int(parts[1]) max_index = max(max_index, index) except ValueError: pass # Increment to get next index next_index = max_index + 1 # Generate new signal ID signal_id = uuid.uuid4() # Create new MeasurementDetail element measurement_detail = ET.Element('MeasurementDetail') # Attempt to parse a common signal type, defaulting to provided string if not recognized parsed_signaltype = signaltype if isinstance(signaltype, SignalType) else SignalType.parse(signaltype) signaltype_str = parsed_signaltype.acronym if parsed_signaltype != SignalType.UNKN else self.get_valid_acronym(str(signaltype)) signalkind = parsed_signaltype.signalkind # Attempt to create standard signal reference if signal type is recognized if parsed_signaltype != SignalType.UNKN: signalreference = SignalReference.tostring(signaltype_str, signalkind, sourceindex) else: signalreference = f"{deviceacronym}-{signaltype_str}" # Add child elements ET.SubElement(measurement_detail, 'DeviceAcronym').text = deviceacronym ET.SubElement(measurement_detail, 'ID').text = f"{deviceacronym}:{next_index}" ET.SubElement(measurement_detail, 'SignalID').text = str(signal_id) ET.SubElement(measurement_detail, 'PointTag').text = pointtag ET.SubElement(measurement_detail, 'SignalReference').text = signalreference ET.SubElement(measurement_detail, 'SignalAcronym').text = signaltype_str if sourceindex is not None and (signalkind == SignalKind.ANGLE or signalkind == SignalKind.MAGNITUDE): ET.SubElement(measurement_detail, 'PhasorSourceIndex').text = str(sourceindex) ET.SubElement(measurement_detail, 'Description').text = description ET.SubElement(measurement_detail, 'Internal').text = 'true' ET.SubElement(measurement_detail, 'Enabled').text = 'true' # Format timestamp with timezone offset now = datetime.now(timezone.utc) # Get local offset local_offset = datetime.now().astimezone().strftime('%z') formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}" timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset ET.SubElement(measurement_detail, 'UpdatedOn').text = timestamp # Insert after last MeasurementDetail; else before first PhasorDetail; # else before SchemaVersion; else after last DeviceDetail; else at end insert_index = len(root) last_measurement = None first_phasor = None schema_version = None last_device = None for i, child in enumerate(root): if child.tag == 'DeviceDetail': last_device = i elif child.tag == 'MeasurementDetail': last_measurement = i elif child.tag == 'PhasorDetail' and first_phasor is None: first_phasor = i elif child.tag == 'SchemaVersion' and schema_version is None: schema_version = i if last_measurement is not None: insert_index = last_measurement + 1 elif first_phasor is not None: insert_index = first_phasor elif schema_version is not None: insert_index = schema_version elif last_device is not None: insert_index = last_device + 1 root.insert(insert_index, measurement_detail) # Indent the entire tree to ensure proper formatting ET.indent(tree, space=" ") # Save the updated XML tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True) return signal_id
[docs] def define_output_phasor(self, deviceacronym: str, label: str, phasortype: str, phase: str, sourceindex: int, destination_phasor_id: int = 0) -> int: """ Defines an output phasor in the publisher metadata. This phasor will be persisted to the XML metadata file defined at 'metadata_path' for future runs. Parameters ---------- deviceacronym : str Device `Acronym` of the output phasor label : str Label of the output phasor phasortype : str Type of the phasor, e.g., `V` for voltage or `I` for current phase : str Phase of the phasor, e.g., `+` for positive sequence, `A` for phase A sourceindex : int Source index of the phasor in the device destination_phasor_id : int, optional Destination phasor ID for cross-referencing (default is `0`) Returns ------- int Phasor `ID` of the output phasor """ self._validate_metadata_path() assert self.metadata_path is not None # Ensure valid device acronym deviceacronym = self.get_valid_acronym(deviceacronym) # Parse existing metadata tree = ET.parse(self.metadata_path) root = tree.getroot() # Check if phasor with this device acronym and source index already exists for phasor in root.findall('.//PhasorDetail'): existing_device = phasor.find('DeviceAcronym') existing_sourceindex = phasor.find('SourceIndex') if (existing_device is not None and existing_device.text == deviceacronym and existing_sourceindex is not None and existing_sourceindex.text == str(sourceindex)): # Record already exists, return phasor ID id_elem = phasor.find('ID') if id_elem is not None and id_elem.text: return int(id_elem.text) # Find the next available phasor ID across all devices max_id = 0 for phasor in root.findall('.//PhasorDetail'): id_elem = phasor.find('ID') if id_elem is not None and id_elem.text: try: phasor_id = int(id_elem.text) max_id = max(max_id, phasor_id) except ValueError: pass # Increment to get next ID next_id = max_id + 1 # Create new PhasorDetail element phasor_detail = ET.Element('PhasorDetail') # Add child elements ET.SubElement(phasor_detail, 'ID').text = str(next_id) ET.SubElement(phasor_detail, 'DeviceAcronym').text = deviceacronym ET.SubElement(phasor_detail, 'Label').text = label ET.SubElement(phasor_detail, 'Type').text = phasortype ET.SubElement(phasor_detail, 'Phase').text = phase ET.SubElement(phasor_detail, 'DestinationPhasorID').text = str(destination_phasor_id) ET.SubElement(phasor_detail, 'SourceIndex').text = str(sourceindex) # Format timestamp with timezone offset now = datetime.now(timezone.utc) # Get local offset local_offset = datetime.now().astimezone().strftime('%z') formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}" timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset ET.SubElement(phasor_detail, 'UpdatedOn').text = timestamp # Insert after last PhasorDetail; else before SchemaVersion; # else after last MeasurementDetail; else at end insert_index = len(root) last_phasor = None schema_version = None last_measurement = None for i, child in enumerate(root): if child.tag == 'MeasurementDetail': last_measurement = i elif child.tag == 'PhasorDetail': last_phasor = i elif child.tag == 'SchemaVersion' and schema_version is None: schema_version = i if last_phasor is not None: insert_index = last_phasor + 1 elif schema_version is not None: insert_index = schema_version elif last_measurement is not None: insert_index = last_measurement + 1 root.insert(insert_index, phasor_detail) # Indent the entire tree to ensure proper formatting ET.indent(tree, space=" ") # Save the updated XML tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True) return next_id
[docs] def get_signal_id(self, point_tag: str) -> UUID | None: """ Gets the signal ID for the specified point tag as defined in the metadata file. Parameters ---------- point_tag : str Name of the point tag Returns ------- UUID | None Signal ID if found, otherwise None """ self._validate_metadata_path() assert self.metadata_path is not None # Parse existing metadata tree = ET.parse(self.metadata_path) root = tree.getroot() # Search for measurement with matching point tag for measurement in root.findall('.//MeasurementDetail'): existing_point_tag = measurement.find('PointTag') if existing_point_tag is not None and existing_point_tag.text == point_tag: # Found matching point tag, return signal ID signal_id_elem = measurement.find('SignalID') if signal_id_elem is not None and signal_id_elem.text: return UUID(signal_id_elem.text) # Point tag not found return None
[docs] def get_valid_acronym(self, acronym: str) -> str: """ Gets a valid acronym by converting to uppercase, removing spaces, and stripping invalid characters. """ # Generate valid acronym from source_name acronym = acronym.upper().replace(" ", "") # Remove any characters not matching the allowed pattern acronym = re.sub(r'[^A-Z0-9\-!_\.@#\$]', '', acronym) return acronym
def _validate_metadata_path(self): if self.metadata_path is None: raise ValueError("Metadata path is not set.") elif not os.path.isfile(self.metadata_path): raise FileNotFoundError(f"Metadata file not found at '{self.metadata_path}'. Set the 'metadata_path' property to a valid file path.")
[docs] def publish_measurements(self, measurements: List[Measurement]): """ Publishes a list of measurements to subscribed clients. Parameters ---------- measurements : List[Measurement] List of measurements to publish """ self._datapublisher.publish_measurements(measurements)
[docs] def send_userresponse(self, connection: SubscriberConnection, responsecode: ServerResponse, commandcode: ServerCommand, data: bytes | bytearray | None = None): """ Sends a user response to a specific connected client. Parameters ---------- connection : SubscriberConnection Client connection to send the response to responsecode : ServerResponse Response to send commandcode : ServerCommand Command associated with the response data : bytes | bytearray | None Additional data to send with the response """ connection.send_response(responsecode, commandcode, data)
[docs] def broadcast_userresponse(self, responsecode: ServerResponse, commandcode: ServerCommand, data: bytes | bytearray | None = None): """ Broadcasts a user response to all connected clients. Parameters ---------- responsecode : ServerResponse Response to broadcast commandcode : ServerCommand Command associated with the response data : bytes | bytearray | None Additional data to send with the response """ with self._datapublisher._subscriber_connections_lock: for connection in self._datapublisher._subscriber_connections: self.send_userresponse(connection, responsecode, commandcode, data)
# Configuration properties (delegate to DataPublisher) @property def maximum_allowed_connections(self) -> int: """ Gets or sets the maximum number of allowed client connections. Set to -1 for unlimited connections. """ return self._datapublisher.maximum_allowed_connections @maximum_allowed_connections.setter def maximum_allowed_connections(self, value: int): self._datapublisher.maximum_allowed_connections = value @property def is_metadata_refresh_allowed(self) -> bool: """ Gets or sets flag that determines if metadata refresh is allowed by clients. """ return self._datapublisher.is_metadata_refresh_allowed @is_metadata_refresh_allowed.setter def is_metadata_refresh_allowed(self, value: bool): self._datapublisher.is_metadata_refresh_allowed = value @property def is_nan_value_filter_allowed(self) -> bool: """ Gets or sets flag that determines if NaN value filtering is allowed. """ return self._datapublisher.is_nan_value_filter_allowed @is_nan_value_filter_allowed.setter def is_nan_value_filter_allowed(self, value: bool): self._datapublisher.is_nan_value_filter_allowed = value @property def is_nan_value_filter_forced(self) -> bool: """ Gets or sets flag that determines if NaN value filtering is forced. """ return self._datapublisher.is_nan_value_filter_forced @is_nan_value_filter_forced.setter def is_nan_value_filter_forced(self, value: bool): self._datapublisher.is_nan_value_filter_forced = value # Internal callback handlers def _handle_status_message(self, message: str): """Internal handler for status messages.""" if self._statusmessage_logger: self._statusmessage_logger(message) def _handle_error_message(self, message: str): """Internal handler for error messages.""" if self._errormessage_logger: self._errormessage_logger(message) def _handle_client_connected(self, connection: SubscriberConnection): """Internal handler for client connected events.""" if self._clientconnected_receiver: self._clientconnected_receiver(connection) def _handle_client_disconnected(self, connection: SubscriberConnection): """Internal handler for client disconnected events.""" if self._clientdisconnected_receiver: self._clientdisconnected_receiver(connection) def _handle_user_command(self, connection: SubscriberConnection, commandcode: ServerCommand, data: bytes): """Internal handler for user-defined commands.""" if self._usercommand_receiver: self._usercommand_receiver(connection, commandcode, data) # Default callback implementations
[docs] def default_statusmessage_logger(self, message: str): """Default status message logger that writes to stdout.""" with self._consolelock: print(message, file=sys.stdout) sys.stdout.flush()
[docs] def default_errormessage_logger(self, message: str): """Default error message logger that writes to stderr.""" with self._consolelock: print(f"ERROR: {message}", file=sys.stderr) sys.stderr.flush()
[docs] def default_clientconnected_receiver(self, connection: SubscriberConnection): """Default client connected handler.""" with self._consolelock: print(f"Client connected: {connection.connection_id}", file=sys.stdout) sys.stdout.flush()
[docs] def default_clientdisconnected_receiver(self, connection: SubscriberConnection): """Default client disconnected handler.""" with self._consolelock: print(f"Client disconnected: {connection.connection_id}", file=sys.stdout) sys.stdout.flush()