# ******************************************************************************************************
# publisher.py - Gbtc
#
# Copyright © 2026, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/06/2026 - Generated by porting C++ PublisherInstance
# Ported from cppapi/src/lib/transport/PublisherInstance.{h,cpp}
#
# ******************************************************************************************************
# Ported from cppapi/src/lib/transport/PublisherInstance.cpp : class PublisherInstance
# Differences: Simplified Python wrapper; otherwise parity maintained.
from .data.dataset import DataSet
from .metadata.signaltype import SignalType
from .metadata.signalreference import SignalKind, SignalReference
from .transport.constants import ServerResponse, ServerCommand
from .transport.datapublisher import DataPublisher
from .transport.measurement import Measurement
from .transport.subscriberconnection import SubscriberConnection
from .metadata.record.measurement import MeasurementRecord
from typing import List, Callable
from threading import Lock
from datetime import datetime, timezone
from uuid import UUID
from xml.etree import ElementTree as ET
import re
import uuid
import os
import sys
[docs]
class Publisher:
"""
Represents an STTP data publisher.
Notes
-----
The `Publisher` class exists as a simplified implementation wrapping the `DataPublisher`
class found in the `transport` namespace. This class maintains an internal instance
of the `DataPublisher` class and is intended to simplify common uses of STTP data
publication, similar to how `Subscriber` wraps `DataSubscriber`.
"""
def __init__(self):
"""
Creates a new `Publisher`.
"""
# DataPublisher reference
self._datapublisher = DataPublisher()
# Callback references
self._statusmessage_logger: Callable[[str], None] | None = self.default_statusmessage_logger
self._errormessage_logger: Callable[[str], None] | None = self.default_errormessage_logger
self._clientconnected_receiver: Callable[[SubscriberConnection], None] | None = self.default_clientconnected_receiver
self._clientdisconnected_receiver: Callable[[SubscriberConnection], None] | None = self.default_clientdisconnected_receiver
self._usercommand_receiver: Callable[[SubscriberConnection, ServerCommand, bytes], None] | None = None
# Lock used to synchronize console writes
self._consolelock = Lock()
# Wire up internal callbacks
self._datapublisher.statusmessage_callback = self._handle_status_message
self._datapublisher.errormessage_callback = self._handle_error_message
self._datapublisher.clientconnected_callback = self._handle_client_connected
self._datapublisher.clientdisconnected_callback = self._handle_client_disconnected
self._datapublisher.usercommand_callback = self._handle_user_command
# Define metadata
self._metadata_path : str | None = None
[docs]
def dispose(self):
"""
Cleanly shuts down a `Publisher` that is no longer being used, e.g.,
during a normal application exit.
"""
if self._datapublisher is not None:
self._datapublisher.dispose()
@property
def started(self) -> bool:
"""
Gets flag that determines if `Publisher` is currently started and listening for connections.
"""
return self._datapublisher.is_started
@property
def statusmessage_logger(self) -> Callable[[str], None] | None:
"""
Gets or sets a function to handle status messages.
Signature: `def statusmessage_logger(message: str)`
"""
return self._statusmessage_logger
@statusmessage_logger.setter
def statusmessage_logger(self, value: Callable[[str], None] | None):
self._statusmessage_logger = value
@property
def errormessage_logger(self) -> Callable[[str], None] | None:
"""
Gets or sets a function to handle error messages.
Signature: `def errormessage_logger(message: str)`
"""
return self._errormessage_logger
@errormessage_logger.setter
def errormessage_logger(self, value: Callable[[str], None] | None):
self._errormessage_logger = value
@property
def clientconnected_receiver(self) -> Callable[[SubscriberConnection], None] | None:
"""
Gets or sets a function to handle client connected events.
Signature: `def clientconnected_receiver(connection: SubscriberConnection)`
"""
return self._clientconnected_receiver
@clientconnected_receiver.setter
def clientconnected_receiver(self, value: Callable[[SubscriberConnection], None] | None):
self._clientconnected_receiver = value
@property
def clientdisconnected_receiver(self) -> Callable[[SubscriberConnection], None] | None:
"""
Gets or sets a function to handle client disconnected events.
Signature: `def clientdisconnected_receiver(connection: SubscriberConnection)`
"""
return self._clientdisconnected_receiver
@clientdisconnected_receiver.setter
def clientdisconnected_receiver(self, value: Callable[[SubscriberConnection], None] | None):
self._clientdisconnected_receiver = value
@property
def usercommand_receiver(self) -> Callable[[SubscriberConnection, ServerCommand, bytes], None] | None:
"""
Gets or sets a function to handle user-defined commands from clients.
Signature: `def usercommand_receiver(connection: SubscriberConnection, command: ServerCommand, data: bytes)`
"""
return self._usercommand_receiver
@usercommand_receiver.setter
def usercommand_receiver(self, value: Callable[[SubscriberConnection, ServerCommand, bytes], None] | None):
self._usercommand_receiver = value
@property
def metadata_path(self) -> str | None:
"""
Gets or sets the file path to for persisted metadata used by the publisher.
Setting this property will cause the publisher to verify that the file exists, and if not,
attempt to create it from the included `MetadataTemplate.xml` file.
"""
return self._metadata_path
@metadata_path.setter
def metadata_path(self, value: str):
self._metadata_path = value
# Verify metadata file exists, creating from template if not found
if not os.path.isfile(self._metadata_path):
template_path = os.path.join(os.path.dirname(__file__), "MetadataTemplate.xml")
if os.path.isfile(template_path):
with open(template_path, "r") as template_file:
template_content = template_file.read()
with open(self._metadata_path, "w") as metadata_file:
metadata_file.write(template_content)
else:
raise FileNotFoundError(f"Cannot create metadata '{self._metadata_path}': template not found at '{template_path}'")
@property
def subscriber_connections(self) -> List[SubscriberConnection]:
"""
Gets a list of currently connected clients.
Returns
-------
List[SubscriberConnection]
List of currently connected clients
"""
with self._datapublisher._subscriber_connections_lock:
connections = list(self._datapublisher._subscriber_connections)
return connections
[docs]
def start(self, port: int, ipv6: bool = False):
"""
Starts the publisher listening on the specified port.
Parameters
----------
port : int
TCP port number to listen on
ipv6 : bool, optional
Set to True to use IPv6, False for IPv4 (default)
"""
self._datapublisher.start(port, ipv6)
[docs]
def stop(self):
"""
Stops the publisher and disconnects all clients.
"""
self._datapublisher.stop()
[docs]
def define_output_source(self, name: str, acronym: str | None = None) -> str:
"""
Defines an output source in the publisher metadata, i.e., a virtual device in the
configuration used to group output measurements. This source device will be persisted
to the XML metadata file defined at 'metadata_path' for future runs.
Parameters
----------
name : str
Name of the output source
Returns
-------
str
Device `Acronym` of the output source
"""
self._validate_metadata_path()
assert self.metadata_path is not None
# Generate valid acronym, use name if acronym not provided
if acronym is None:
acronym = self.get_valid_acronym(name)
else:
acronym = self.get_valid_acronym(acronym)
# Parse existing metadata
tree = ET.parse(self.metadata_path)
root = tree.getroot()
# Check if device with this acronym already exists
for device in root.findall('.//DeviceDetail'):
existing_acronym = device.find('Acronym')
if existing_acronym is not None and existing_acronym.text == acronym:
# Record already exists, return acronym
return acronym
# Create new DeviceDetail element
device_detail = ET.Element('DeviceDetail')
# Add child elements
ET.SubElement(device_detail, 'UniqueID').text = str(uuid.uuid4())
ET.SubElement(device_detail, 'IsConcentrator').text = '0'
ET.SubElement(device_detail, 'Acronym').text = acronym
ET.SubElement(device_detail, 'Name').text = name
ET.SubElement(device_detail, 'AccessID').text = '0'
ET.SubElement(device_detail, 'ParentAcronym')
ET.SubElement(device_detail, 'ProtocolName').text = 'STTP'
ET.SubElement(device_detail, 'FramesPerSecond').text = '1'
ET.SubElement(device_detail, 'CompanyAcronym').text = 'GPA'
ET.SubElement(device_detail, 'VendorAcronym')
ET.SubElement(device_detail, 'VendorDeviceName')
ET.SubElement(device_detail, 'Longitude').text = '0.0'
ET.SubElement(device_detail, 'Latitude').text = '0.0'
ET.SubElement(device_detail, 'InterconnectionName')
ET.SubElement(device_detail, 'ContactList')
ET.SubElement(device_detail, 'Enabled').text = '1'
# Format timestamp with timezone offset
now = datetime.now(timezone.utc)
# Get local offset
local_offset = datetime.now().astimezone().strftime('%z')
formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}"
timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset
ET.SubElement(device_detail, 'UpdatedOn').text = timestamp
# Insert the new DeviceDetail after schema but before SchemaVersion/MeasurementDetail
# Order should be: DeviceDetail, xs:schema, SchemaVersion, MeasurementDetail
insert_index = None
schema_found = False
for i, child in enumerate(root):
# Track if we've seen the schema
if child.tag.endswith('schema'):
schema_found = True
# If we've seen the schema, insert after it but before other elements
elif schema_found and child.tag in ('SchemaVersion', 'MeasurementDetail', 'PhasorDetail'):
insert_index = i
break
# If there's already a DeviceDetail after the schema, insert after the last one
elif schema_found and child.tag == 'DeviceDetail':
insert_index = i + 1
# If no schema found or no insertion point determined, insert at beginning
if insert_index is None:
insert_index = 0
root.insert(insert_index, device_detail)
# Indent the entire tree to ensure proper formatting
ET.indent(tree, space=" ")
# Save the updated XML
tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True)
return acronym
[docs]
def define_output_measurement(self, deviceacronym: str, pointtag: str, description: str, signaltype: str | SignalType = SignalType.CALC, sourceindex: int | None = None) -> UUID:
"""
Defines an output measurement in the publisher metadata. This measurement will
be persisted to the XML metadata file defined at 'metadata_path' for future runs.
Parameters
----------
deviceacronym : str
Device `Acronym` of the output measurement
pointtag : str
Name of the output measurement
description : str
Description of the output measurement
signaltype : str | SignalType, optional
Signal type of the output measurement (default is `CALC`)
sourceindex : int | None, optional
If this measurement is part of a collection, value represents
the source index of the measurement in the collection
(default is None)
Returns
-------
UUID
Signal ID of the output measurement
"""
self._validate_metadata_path()
assert self.metadata_path is not None
# Ensure valid device acronym
deviceacronym = self.get_valid_acronym(deviceacronym)
# Ensure valid point tag
pointtag = self.get_valid_acronym(pointtag)
# Parse existing metadata
tree = ET.parse(self.metadata_path)
root = tree.getroot()
# Check if measurement with this point tag already exists
for measurement in root.findall('.//MeasurementDetail'):
existing_point_tag = measurement.find('PointTag')
if existing_point_tag is not None and existing_point_tag.text == pointtag:
# Record already exists, return signal ID
signal_id_elem = measurement.find('SignalID')
if signal_id_elem is not None and signal_id_elem.text:
return UUID(signal_id_elem.text)
# Find the next available index for this device
max_index = 0
for measurement in root.findall('.//MeasurementDetail'):
device_elem = measurement.find('DeviceAcronym')
id_elem = measurement.find('ID')
if device_elem is not None and device_elem.text == deviceacronym:
if id_elem is not None and id_elem.text:
# Parse the index from ID field (format: DEVICE:INDEX)
parts = id_elem.text.split(':')
if len(parts) == 2:
try:
index = int(parts[1])
max_index = max(max_index, index)
except ValueError:
pass
# Increment to get next index
next_index = max_index + 1
# Generate new signal ID
signal_id = uuid.uuid4()
# Create new MeasurementDetail element
measurement_detail = ET.Element('MeasurementDetail')
# Attempt to parse a common signal type, defaulting to provided string if not recognized
parsed_signaltype = signaltype if isinstance(signaltype, SignalType) else SignalType.parse(signaltype)
signaltype_str = parsed_signaltype.acronym if parsed_signaltype != SignalType.UNKN else self.get_valid_acronym(str(signaltype))
signalkind = parsed_signaltype.signalkind
# Attempt to create standard signal reference if signal type is recognized
if parsed_signaltype != SignalType.UNKN:
signalreference = SignalReference.tostring(signaltype_str, signalkind, sourceindex)
else:
signalreference = f"{deviceacronym}-{signaltype_str}"
# Add child elements
ET.SubElement(measurement_detail, 'DeviceAcronym').text = deviceacronym
ET.SubElement(measurement_detail, 'ID').text = f"{deviceacronym}:{next_index}"
ET.SubElement(measurement_detail, 'SignalID').text = str(signal_id)
ET.SubElement(measurement_detail, 'PointTag').text = pointtag
ET.SubElement(measurement_detail, 'SignalReference').text = signalreference
ET.SubElement(measurement_detail, 'SignalAcronym').text = signaltype_str
if sourceindex is not None and (signalkind == SignalKind.ANGLE or signalkind == SignalKind.MAGNITUDE):
ET.SubElement(measurement_detail, 'PhasorSourceIndex').text = str(sourceindex)
ET.SubElement(measurement_detail, 'Description').text = description
ET.SubElement(measurement_detail, 'Internal').text = 'true'
ET.SubElement(measurement_detail, 'Enabled').text = 'true'
# Format timestamp with timezone offset
now = datetime.now(timezone.utc)
# Get local offset
local_offset = datetime.now().astimezone().strftime('%z')
formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}"
timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset
ET.SubElement(measurement_detail, 'UpdatedOn').text = timestamp
# Insert after last MeasurementDetail; else before first PhasorDetail;
# else before SchemaVersion; else after last DeviceDetail; else at end
insert_index = len(root)
last_measurement = None
first_phasor = None
schema_version = None
last_device = None
for i, child in enumerate(root):
if child.tag == 'DeviceDetail':
last_device = i
elif child.tag == 'MeasurementDetail':
last_measurement = i
elif child.tag == 'PhasorDetail' and first_phasor is None:
first_phasor = i
elif child.tag == 'SchemaVersion' and schema_version is None:
schema_version = i
if last_measurement is not None:
insert_index = last_measurement + 1
elif first_phasor is not None:
insert_index = first_phasor
elif schema_version is not None:
insert_index = schema_version
elif last_device is not None:
insert_index = last_device + 1
root.insert(insert_index, measurement_detail)
# Indent the entire tree to ensure proper formatting
ET.indent(tree, space=" ")
# Save the updated XML
tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True)
return signal_id
[docs]
def define_output_phasor(self, deviceacronym: str, label: str, phasortype: str, phase: str, sourceindex: int, destination_phasor_id: int = 0) -> int:
"""
Defines an output phasor in the publisher metadata. This phasor will
be persisted to the XML metadata file defined at 'metadata_path' for
future runs.
Parameters
----------
deviceacronym : str
Device `Acronym` of the output phasor
label : str
Label of the output phasor
phasortype : str
Type of the phasor, e.g., `V` for voltage or `I` for current
phase : str
Phase of the phasor, e.g., `+` for positive sequence, `A` for phase A
sourceindex : int
Source index of the phasor in the device
destination_phasor_id : int, optional
Destination phasor ID for cross-referencing (default is `0`)
Returns
-------
int
Phasor `ID` of the output phasor
"""
self._validate_metadata_path()
assert self.metadata_path is not None
# Ensure valid device acronym
deviceacronym = self.get_valid_acronym(deviceacronym)
# Parse existing metadata
tree = ET.parse(self.metadata_path)
root = tree.getroot()
# Check if phasor with this device acronym and source index already exists
for phasor in root.findall('.//PhasorDetail'):
existing_device = phasor.find('DeviceAcronym')
existing_sourceindex = phasor.find('SourceIndex')
if (existing_device is not None and existing_device.text == deviceacronym and
existing_sourceindex is not None and existing_sourceindex.text == str(sourceindex)):
# Record already exists, return phasor ID
id_elem = phasor.find('ID')
if id_elem is not None and id_elem.text:
return int(id_elem.text)
# Find the next available phasor ID across all devices
max_id = 0
for phasor in root.findall('.//PhasorDetail'):
id_elem = phasor.find('ID')
if id_elem is not None and id_elem.text:
try:
phasor_id = int(id_elem.text)
max_id = max(max_id, phasor_id)
except ValueError:
pass
# Increment to get next ID
next_id = max_id + 1
# Create new PhasorDetail element
phasor_detail = ET.Element('PhasorDetail')
# Add child elements
ET.SubElement(phasor_detail, 'ID').text = str(next_id)
ET.SubElement(phasor_detail, 'DeviceAcronym').text = deviceacronym
ET.SubElement(phasor_detail, 'Label').text = label
ET.SubElement(phasor_detail, 'Type').text = phasortype
ET.SubElement(phasor_detail, 'Phase').text = phase
ET.SubElement(phasor_detail, 'DestinationPhasorID').text = str(destination_phasor_id)
ET.SubElement(phasor_detail, 'SourceIndex').text = str(sourceindex)
# Format timestamp with timezone offset
now = datetime.now(timezone.utc)
# Get local offset
local_offset = datetime.now().astimezone().strftime('%z')
formatted_offset = f"{local_offset[:3]}:{local_offset[3:]}"
timestamp = now.strftime(f'%Y-%m-%dT%H:%M:%S.%f')[:-3] + formatted_offset
ET.SubElement(phasor_detail, 'UpdatedOn').text = timestamp
# Insert after last PhasorDetail; else before SchemaVersion;
# else after last MeasurementDetail; else at end
insert_index = len(root)
last_phasor = None
schema_version = None
last_measurement = None
for i, child in enumerate(root):
if child.tag == 'MeasurementDetail':
last_measurement = i
elif child.tag == 'PhasorDetail':
last_phasor = i
elif child.tag == 'SchemaVersion' and schema_version is None:
schema_version = i
if last_phasor is not None:
insert_index = last_phasor + 1
elif schema_version is not None:
insert_index = schema_version
elif last_measurement is not None:
insert_index = last_measurement + 1
root.insert(insert_index, phasor_detail)
# Indent the entire tree to ensure proper formatting
ET.indent(tree, space=" ")
# Save the updated XML
tree.write(self.metadata_path, encoding='utf-8', xml_declaration=True)
return next_id
[docs]
def get_signal_id(self, point_tag: str) -> UUID | None:
"""
Gets the signal ID for the specified point tag as defined in the metadata file.
Parameters
----------
point_tag : str
Name of the point tag
Returns
-------
UUID | None
Signal ID if found, otherwise None
"""
self._validate_metadata_path()
assert self.metadata_path is not None
# Parse existing metadata
tree = ET.parse(self.metadata_path)
root = tree.getroot()
# Search for measurement with matching point tag
for measurement in root.findall('.//MeasurementDetail'):
existing_point_tag = measurement.find('PointTag')
if existing_point_tag is not None and existing_point_tag.text == point_tag:
# Found matching point tag, return signal ID
signal_id_elem = measurement.find('SignalID')
if signal_id_elem is not None and signal_id_elem.text:
return UUID(signal_id_elem.text)
# Point tag not found
return None
[docs]
def get_valid_acronym(self, acronym: str) -> str:
"""
Gets a valid acronym by converting to uppercase, removing spaces, and stripping invalid characters.
"""
# Generate valid acronym from source_name
acronym = acronym.upper().replace(" ", "")
# Remove any characters not matching the allowed pattern
acronym = re.sub(r'[^A-Z0-9\-!_\.@#\$]', '', acronym)
return acronym
def _validate_metadata_path(self):
if self.metadata_path is None:
raise ValueError("Metadata path is not set.")
elif not os.path.isfile(self.metadata_path):
raise FileNotFoundError(f"Metadata file not found at '{self.metadata_path}'. Set the 'metadata_path' property to a valid file path.")
[docs]
def publish_measurements(self, measurements: List[Measurement]):
"""
Publishes a list of measurements to subscribed clients.
Parameters
----------
measurements : List[Measurement]
List of measurements to publish
"""
self._datapublisher.publish_measurements(measurements)
[docs]
def send_userresponse(self, connection: SubscriberConnection, responsecode: ServerResponse, commandcode: ServerCommand, data: bytes | bytearray | None = None):
"""
Sends a user response to a specific connected client.
Parameters
----------
connection : SubscriberConnection
Client connection to send the response to
responsecode : ServerResponse
Response to send
commandcode : ServerCommand
Command associated with the response
data : bytes | bytearray | None
Additional data to send with the response
"""
connection.send_response(responsecode, commandcode, data)
[docs]
def broadcast_userresponse(self, responsecode: ServerResponse, commandcode: ServerCommand, data: bytes | bytearray | None = None):
"""
Broadcasts a user response to all connected clients.
Parameters
----------
responsecode : ServerResponse
Response to broadcast
commandcode : ServerCommand
Command associated with the response
data : bytes | bytearray | None
Additional data to send with the response
"""
with self._datapublisher._subscriber_connections_lock:
for connection in self._datapublisher._subscriber_connections:
self.send_userresponse(connection, responsecode, commandcode, data)
# Configuration properties (delegate to DataPublisher)
@property
def maximum_allowed_connections(self) -> int:
"""
Gets or sets the maximum number of allowed client connections.
Set to -1 for unlimited connections.
"""
return self._datapublisher.maximum_allowed_connections
@maximum_allowed_connections.setter
def maximum_allowed_connections(self, value: int):
self._datapublisher.maximum_allowed_connections = value
@property
def is_metadata_refresh_allowed(self) -> bool:
"""
Gets or sets flag that determines if metadata refresh is allowed by clients.
"""
return self._datapublisher.is_metadata_refresh_allowed
@is_metadata_refresh_allowed.setter
def is_metadata_refresh_allowed(self, value: bool):
self._datapublisher.is_metadata_refresh_allowed = value
@property
def is_nan_value_filter_allowed(self) -> bool:
"""
Gets or sets flag that determines if NaN value filtering is allowed.
"""
return self._datapublisher.is_nan_value_filter_allowed
@is_nan_value_filter_allowed.setter
def is_nan_value_filter_allowed(self, value: bool):
self._datapublisher.is_nan_value_filter_allowed = value
@property
def is_nan_value_filter_forced(self) -> bool:
"""
Gets or sets flag that determines if NaN value filtering is forced.
"""
return self._datapublisher.is_nan_value_filter_forced
@is_nan_value_filter_forced.setter
def is_nan_value_filter_forced(self, value: bool):
self._datapublisher.is_nan_value_filter_forced = value
# Internal callback handlers
def _handle_status_message(self, message: str):
"""Internal handler for status messages."""
if self._statusmessage_logger:
self._statusmessage_logger(message)
def _handle_error_message(self, message: str):
"""Internal handler for error messages."""
if self._errormessage_logger:
self._errormessage_logger(message)
def _handle_client_connected(self, connection: SubscriberConnection):
"""Internal handler for client connected events."""
if self._clientconnected_receiver:
self._clientconnected_receiver(connection)
def _handle_client_disconnected(self, connection: SubscriberConnection):
"""Internal handler for client disconnected events."""
if self._clientdisconnected_receiver:
self._clientdisconnected_receiver(connection)
def _handle_user_command(self, connection: SubscriberConnection, commandcode: ServerCommand, data: bytes):
"""Internal handler for user-defined commands."""
if self._usercommand_receiver:
self._usercommand_receiver(connection, commandcode, data)
# Default callback implementations
[docs]
def default_statusmessage_logger(self, message: str):
"""Default status message logger that writes to stdout."""
with self._consolelock:
print(message, file=sys.stdout)
sys.stdout.flush()
[docs]
def default_errormessage_logger(self, message: str):
"""Default error message logger that writes to stderr."""
with self._consolelock:
print(f"ERROR: {message}", file=sys.stderr)
sys.stderr.flush()
[docs]
def default_clientconnected_receiver(self, connection: SubscriberConnection):
"""Default client connected handler."""
with self._consolelock:
print(f"Client connected: {connection.connection_id}", file=sys.stdout)
sys.stdout.flush()
[docs]
def default_clientdisconnected_receiver(self, connection: SubscriberConnection):
"""Default client disconnected handler."""
with self._consolelock:
print(f"Client disconnected: {connection.connection_id}", file=sys.stdout)
sys.stdout.flush()