# ******************************************************************************************************
# subscriber.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/23/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
# pyright: reportAttributeAccessIssue=false
from __future__ import annotations
from gsf import Limits
from gsf.endianorder import BigEndian
from .data.dataset import DataSet
from .transport.bufferblock import BufferBlock
from .transport.constants import ConnectStatus, Defaults, ServerCommand, ServerResponse
from .transport.datasubscriber import DataSubscriber
from .transport.measurement import Measurement
from .transport.signalindexcache import SignalIndexCache
from .config import Config
from .settings import Settings
from .reader import MeasurementReader
from .metadata.cache import MetadataCache
from .metadata.record.measurement import MeasurementRecord
from typing import List, Callable
from time import time
from uuid import UUID
from threading import Lock
import sys
import numpy as np
[docs]
class Subscriber:
"""
Represents an STTP data subscriber.
Notes
-----
The `Subscriber` class exists as a simplified implementation of the `DataSubscriber`
class found in the `transport` namespace. This class maintains an internal instance
of the `DataSubscriber` class for subscription based functionality and is intended
to simplify common uses of STTP data reception.
"""
def __init__(self):
"""
Creates a new `Subscriber`.
"""
# Configuration reference
self._config = Config()
# DataSubscriber reference
self._datasubscriber = DataSubscriber()
# Callback references
self._statusmessage_logger: Callable[[str], None] | None = self.default_statusmessage_logger
self._errormessage_logger: Callable[[str], None] | None = self.default_errormessage_logger
self._metadatanotification_receiver: Callable[[DataSet], None] | None = None
self._data_starttime_receiver: Callable[[np.int64], None] | None = None
self._configurationchanged_receiver: Callable[[], None] | None = None
self._historicalreadcomplete_receiver: Callable[[], None] | None = None
self._connectionestablished_receiver: Callable[[], None] | None = self.default_connectionestablished_receiver
self._connectionterminated_receiver: Callable[[], None] | None = self.default_connectionterminated_receiver
# MeasurementReader reference
self._reader: MeasurementReader | None = None
# Lock used to synchronize console writes
self._consolelock = Lock()
[docs]
def dispose(self):
"""
Cleanly shuts down a `Subscriber` that is no longer being used, e.g.,
during a normal application exit.
"""
if self._datasubscriber is not None:
self._datasubscriber.dispose()
@property
def connected(self) -> bool:
"""
Gets flag that determines if `Subscriber` is currently connected to a data publisher.
"""
return self._datasubscriber.connected
@property
def subscribed(self) -> bool:
"""
Gets flag that determines if `Subscriber` is currently subscribed to a data stream.
"""
return self._datasubscriber.subscribed
@property
def metadatacache(self) -> MetadataCache:
"""
Gets the current metadata cache.
"""
return self._datasubscriber.metadatacache
@property
def activesignalindexcache(self) -> SignalIndexCache:
"""
Gets the active signal index cache.
"""
return self._datasubscriber.activesignalindexcache
@property
def subscriberid(self) -> UUID:
"""
Gets the subscriber ID as assigned by the data publisher upon receipt of the `SignalIndexCache`.
"""
return self._datasubscriber.subscriberid
@property
def total_commandchannel_bytesreceived(self) -> np.uint64:
"""
Gets the total number of bytes received via the command channel since last connection.
"""
return self._datasubscriber.total_commandchannel_bytesreceived
@property
def total_datachannel_bytesreceived(self) -> np.uint64:
"""
Gets the total number of bytes received via the data channel since last connection.
"""
return self._datasubscriber.total_datachannel_bytesreceived
@property
def total_measurementsreceived(self) -> np.uint64:
"""
Gets the total number of measurements received since last subscription.
"""
return self._datasubscriber.total_measurementsreceived
@property
def statusmessage_logger(self) -> Callable[[str], None] | None:
"""
Gets or sets a function to handle status messages.
Signature: `def statusmessage_logger(message: str)`
"""
return self._statusmessage_logger
@statusmessage_logger.setter
def statusmessage_logger(self, value: Callable[[str], None] | None):
"""
Defines the callback that handles informational message logging.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_statusmessage_logger` method.
"""
self._statusmessage_logger = value
@property
def errormessage_logger(self) -> Callable[[str], None] | None:
"""
Gets or sets a function to handle error messages.
Signature: `def errormessage_logger(message: str)`
"""
return self._errormessage_logger
@errormessage_logger.setter
def errormessage_logger(self, value: Callable[[str], None] | None):
"""
Defines the callback that handles error message logging.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_errormessage_logger` method.
"""
self._errormessage_logger = value
@property
def metadatanotification_receiver(self) -> Callable[[DataSet], None] | None:
"""
Gets or sets a function to handle metadata received notifications.
Signature: `def metadatanotification_receiver(metadata: DataSet)`
"""
return self._metadatanotification_receiver
@metadatanotification_receiver.setter
def metadatanotification_receiver(self, value: Callable[[DataSet], None] | None):
"""
Defines the callback that handles reception of the metadata received notification response.
Receiver parameter defines full XML response received from publisher.
Parsed metadata available via `Subscriber.metadatacache` property.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_metadatanotification_receiver` method.
"""
self._metadatanotification_receiver = value
@property
def subscriptionupdated_receiver(self) -> Callable[[SignalIndexCache], None] | None:
"""
Gets or sets a function to handle subscription updated notifications.
Signature: `def subscriptionupdated_receiver(signalindexcache: SignalIndexCache)`
"""
return self._datasubscriber.subscriptionupdated_callback
@subscriptionupdated_receiver.setter
def subscriptionupdated_receiver(self, value: Callable[[SignalIndexCache], None] | None):
"""
Defines the callback that handles notifications that a new `SignalIndexCache` has been received.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_subscriptionupdated_receiver` method.
"""
self._datasubscriber.subscriptionupdated_callback = value
@property
def data_starttime_receiver(self) -> Callable[[np.int64], None] | None:
"""
Gets or sets a function to handle data start time notifications.
Signature: `def data_starttime_receiver(starttime: np.int64)`
"""
return self._data_starttime_receiver
@data_starttime_receiver.setter
def data_starttime_receiver(self, value: Callable[[np.int64], None] | None):
"""
Defines the callback that handles notification of first received measurement.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_data_starttime_receiver` method.
"""
self._data_starttime_receiver = value
@property
def configurationchanged_receiver(self) -> Callable[[], None] | None:
"""
Gets or sets a function to handle configuration changed notifications.
Signature: `def configurationchanged_receiver()`
"""
return self._configurationchanged_receiver
@configurationchanged_receiver.setter
def configurationchanged_receiver(self, value: Callable[[], None] | None):
"""
Defines the callback that handles notifications that the data publisher configuration has changed.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_configurationchanged_receiver` method.
"""
self._configurationchanged_receiver = value
@property
def newmeasurements_receiver(self) -> Callable[[List[Measurement]], None] | None:
"""
Gets or sets a function to handle new measurements.
Signature: `def newmeasurements_receiver(measurements: List[Measurement])`
"""
return self._datasubscriber.newmeasurements_callback
@newmeasurements_receiver.setter
def newmeasurements_receiver(self, value: Callable[[List[Measurement]], None] | None):
"""
Defines the callback that handles reception of new measurements.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_newmeasurements_receiver` method.
"""
self._datasubscriber.newmeasurements_callback = value
@property
def newbufferblock_receiver(self) -> Callable[[List[BufferBlock]], None] | None:
"""
Gets or sets a function to handle new buffer blocks.
Signature: `def newbufferblock_receiver(bufferblocks: List[BufferBlock])`
"""
return self._datasubscriber.newbufferblocks_callback
@newbufferblock_receiver.setter
def newbufferblock_receiver(self, value: Callable[[List[BufferBlock]], None] | None):
"""
Defines the callback that handles reception of new buffer blocks.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_newbufferblock_receiver` method.
"""
self._datasubscriber.newbufferblocks_callback = value
@property
def notification_receiver(self) -> Callable[[str], None] | None:
"""
Gets or sets a function to handle notifications.
Signature: `def notification_receiver(notification: str)`
"""
return self._datasubscriber.notificationreceived_callback
@notification_receiver.setter
def notification_receiver(self, value: Callable[[str], None] | None):
"""
Defines the callback that handles reception of a notification.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_notification_receiver` method.
"""
self._datasubscriber.notificationreceived_callback = value
@property
def userresponse_receiver(self) -> Callable[[ServerResponse, ServerCommand, bytes], None] | None:
"""
Gets or sets a function to handle user-defined responses.
Signature: `def userresponse_receiver(responsecode: ServerResponse, commandcode: ServerCommand, data: bytes)`
"""
return self._datasubscriber.userresponsereceived_callback
@userresponse_receiver.setter
def userresponse_receiver(self, value: Callable[[ServerResponse, ServerCommand, bytes], None] | None):
"""
Defines the callback that handles reception of a user-defined response.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_userresponse_receiver` method.
"""
self._datasubscriber.userresponsereceived_callback = value
@property
def historicalreadcomplete_receiver(self) -> Callable[[], None] | None:
"""
Gets or sets a function to handle historical read complete notifications.
Signature: `def historicalreadcomplete_receiver()`
"""
return self._historicalreadcomplete_receiver
@historicalreadcomplete_receiver.setter
def historicalreadcomplete_receiver(self, value: Callable[[], None] | None):
"""
Defines the callback that handles notification that temporal processing has completed, i.e.,
the end of a historical playback data stream has been reached.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_historicalreadcomplete_receiver` method.
"""
self._historicalreadcomplete_receiver = value
@property
def connectionestablished_receiver(self) -> Callable[[], None] | None:
"""
Gets or sets a function to handle connection established notifications.
Signature: `def connectionestablished_receiver()`
"""
return self._connectionestablished_receiver
@connectionestablished_receiver.setter
def connectionestablished_receiver(self, value: Callable[[], None] | None):
"""
Defines the callback that handles notification that a connection has been established.
Default implementation simply writes connection feedback to status message handler.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_connectionestablished_receiver` method.
"""
self._connectionestablished_receiver = value
@property
def connectionterminated_receiver(self) -> Callable[[], None] | None:
"""
Gets or sets a function to handle connection terminated notifications.
Signature: `def connectionterminated_receiver()`
"""
return self._connectionterminated_receiver
@connectionterminated_receiver.setter
def connectionterminated_receiver(self, value: Callable[[], None] | None):
"""
Defines the callback that handles notification that a connection has been terminated.
Default implementation simply writes connection terminated feedback to error message handler.
Assignment will take effect immediately, even while subscription is active.
Prefer using property setter over `set_connectionterminated_receiver` method.
"""
self._connectionterminated_receiver = value
[docs]
def adjustedvalue(self, measurement: Measurement) -> np.float64:
"""
Gets the Value of a `Measurement` with any linear adjustments applied from the
measurement's Adder and Multiplier metadata, if found.
"""
return self._datasubscriber.adjustedvalue(measurement)
[docs]
def connect(self, address: str, config: Config | None = None) -> Exception | None:
"""
Starts the client-based connection cycle to an STTP publisher. Config parameter controls
connection related settings. When the config defines `AutoReconnect` as True, the connection
will automatically be retried when the connection drops. If the config parameter defines
`AutoRequestMetadata` as True, then upon successful connection, meta-data will be requested.
When the config defines both `AutoRequestMetadata` and `AutoSubscribe` as True, subscription
will occur after reception of metadata. When the config defines `AutoRequestMetadata` as
False and `AutoSubscribe` as True, subscription will occur at successful connection.
"""
if config is None:
config = Config()
try:
lastcolonindex = address.rindex(":")
except ValueError:
return ValueError("address does not contain a port number, format: hostname:port")
hostname = address[:lastcolonindex]
portname = address[lastcolonindex + 1:]
try:
port = int(portname)
except ValueError as ex:
return ValueError(f"invalid port number \"{portname}\": {ex}")
if port < 1 or port > Limits.MAXUINT16:
return ValueError(f"port number \"{portname}\" is out of range: must be 1 to {Limits.MAXUINT16}")
self._config = Config() if config is ... else config
return self._connect(hostname, np.uint16(port))
def _connect(self, hostname: str, port: np.uint16) -> Exception | None:
ds = self._datasubscriber
con = ds.connector
# Set connection properties
con.hostname = hostname
con.port = port
con.maxretries = self._config.maxretries
con.retryinterval = self._config.retryinterval
con.maxretryinterval = self._config.maxretryinterval
con.autoreconnect = self._config.autoreconnect
ds.compress_payloaddata = self._config.compress_payloaddata
ds.compress_metadata = self._config.compress_metadata
ds.compress_signalindexcache = self._config.compress_signalindexcache
ds.socket_timeout = self._config.socket_timeout
ds.version = self._config.version
# Register direct Subscriber callbacks
con.errormessage_callback = self._errormessage_logger
ds.statusmessage_callback = self._statusmessage_logger
ds.errormessage_callback = self._errormessage_logger
# Register callbacks with intermediate handlers
con.reconnect_callback = self._handle_reconnect
ds.metadatareceived_callback = self._handle_metadatareceived
ds.connectionterminated_callback = self._handle_connectionterminated
ds.data_starttime_callback = self._handle_data_starttime
ds.configurationchanged_callback = self._handle_configurationchanged
ds.processingcomplete_callback = self._handle_processingcomplete
err: Exception | None = None
# Connect and subscribe to publisher
status = con.connect(ds)
if status == ConnectStatus.SUCCESS:
if self._connectionestablished_receiver is not None:
self._connectionestablished_receiver()
# If automatically parsing metadata, request metadata upon successful connection,
# after metadata is received the SubscriberInstance will then initiate subscribe;
# otherwise, subscribe is initiated immediately (when auto subscribe requested)
if self._config.autorequestmetadata:
self.request_metadata()
elif self._config.autosubscribe:
ds.subscribe()
elif status == ConnectStatus.FAILED:
err = RuntimeError("all connection attempts failed")
elif status == ConnectStatus.CANCELED:
err = RuntimeError("connection canceled")
return err
[docs]
def disconnect(self):
"""
Disconnects from an STTP publisher.
"""
self._datasubscriber.disconnect()
[docs]
def subscribe(self, filterexpression: str, settings: Settings | None = None):
"""
Subscribe sets up a request indicating that the `Subscriber` would like to start receiving
streaming data from a data publisher. If the subscriber is already connected, the updated
filter expression and subscription settings will be requested immediately; otherwise, the
settings will be used when the connection to the data publisher is established.
The filterExpression defines the desired measurements for a subscription. Examples include:
* Directly specified signal IDs (UUID values in string format):
38A47B0-F10B-4143-9A0A-0DBC4FFEF1E8; E4BBFE6A-35BD-4E5B-92C9-11FF913E7877
* Directly specified tag names:
DOM_GPLAINS-BUS1:VH; TVA_SHELBY-BUS1:VH
* Directly specified identifiers in "measurement key" format:
PPA:15; STAT:20
* A filter expression against a selection view:
FILTER ActiveMeasurements WHERE Company='GPA' AND SignalType='FREQ'
Settings parameter controls subscription related settings.
"""
ds = self._datasubscriber
sub = ds.subscription
if settings is None:
settings = Settings()
sub.filterexpression = filterexpression
sub.throttled = settings.throttled
sub.publishinterval = settings.publishinterval
if settings.udpport > 0:
sub.udpdatachannel = True
sub.datachannel_localport = settings.udpport
sub.datachannel_interface = settings.udpinterface
else:
sub.udpdatachannel = False
sub.datachannel_localport = Defaults.DATACHANNEL_LOCALPORT
sub.datachannel_interface = Defaults.DATACHANNEL_INTERFACE
sub.includetime = settings.includetime
sub.enabletimereasonabilitycheck = settings.enabletimereasonabilitycheck
sub.lagtime = settings.lagtime
sub.leadtime = settings.leadtime
sub.uselocalclockasrealtime = settings.uselocalclockasrealtime
sub.use_millisecondresolution = settings.use_millisecondresolution
sub.request_nanvaluefilter = settings.request_nanvaluefilter
sub.starttime = settings.starttime
sub.stoptime = settings.stoptime
sub.constraintparameters = settings.constraintparameters
sub.processinginterval = settings.processinginterval
sub.extra_connectionstring_parameters = settings.extra_connectionstring_parameters
if ds.connected:
ds.subscribe()
[docs]
def unsubscribe(self):
"""
Sends a request to the data publisher indicating that the Subscriber would
like to stop receiving streaming data.
"""
self._datasubscriber.unsubscribe()
[docs]
def read_measurements(self) -> MeasurementReader:
"""
Sets up a new `MeasurementReader` to start reading measurements.
"""
if self._reader is None:
self._reader = MeasurementReader(self)
return self._reader
[docs]
def send_usercommand(self, commandcode: ServerCommand, data: bytes | bytearray | None = None):
"""
Sends a user-defined command to the data publisher.
Parameters
----------
commandcode : ServerCommand
Command to send
data : bytes | bytearray | None
Additional data to send with the command
"""
if commandcode < ServerCommand.USERCOMMAND00 or commandcode > ServerCommand.USERCOMMAND15:
raise ValueError(f"commandcode {commandcode} is not a valid user-defined command")
self._datasubscriber.send_servercommand(commandcode, data)
# Local callback handlers:
[docs]
def statusmessage(self, message: str):
"""
Executes the defined status message logger callback.
"""
if self._statusmessage_logger is not None:
self._statusmessage_logger(message)
[docs]
def errormessage(self, message: str):
"""
Executes the defined error message logger callback.
"""
if self._errormessage_logger is not None:
self._errormessage_logger(message)
# Intermediate callback handlers:
def _handle_reconnect(self, ds: DataSubscriber):
if ds.connected:
if self._connectionestablished_receiver is not None:
self._connectionestablished_receiver()
# If automatically parsing metadata, request metadata upon successful connection,
# after metadata is received the SubscriberInstance will then initiate subscribe;
# otherwise, subscribe is initiated immediately (when auto subscribe requested)
if self._config.autorequestmetadata:
self.request_metadata()
elif self._config.autosubscribe:
ds.subscribe()
else:
ds.disconnect()
self.statusmessage("Connection retry attempts exceeded.")
def _handle_metadatareceived(self, metadata: bytes):
parsestarted = time()
dataset, err = DataSet.from_xml(metadata)
if err is not None:
self.errormessage(f"Failed to parse metadata: {err}")
return
# Generate a record model focused implementation of parsed XML
# metadata with lookup maps to simplify typical metadata usages
self._datasubscriber.metadatacache = MetadataCache(dataset)
self._show_metadatasummary(dataset, parsestarted)
if self._metadatanotification_receiver is not None:
self._metadatanotification_receiver(dataset)
if self._config.autorequestmetadata and self._config.autosubscribe:
self._datasubscriber.subscribe()
def _show_metadatasummary(self, dataset: DataSet, parsestarted: float):
tabledetails = [" Discovered:\n"]
totalrows = 0
for table in dataset:
tablename = table.name
tablerows = table.rowcount
totalrows += tablerows
tabledetails.append(f" {tablerows:,} {tablename} records\n")
message = [
f"Parsed {totalrows:,} metadata records in {(time() - parsestarted):.3f} seconds\n",
"".join(tabledetails)
]
schemaversion = dataset["SchemaVersion"]
if schemaversion is not None:
message.append(f"Metadata schema version: {schemaversion.rowvalue_as_string_byname(0, 'VersionNumber')}")
else:
message.append("No SchemaVersion table found in metadata")
self.statusmessage("".join(message))
def _handle_connectionterminated(self):
# Release any blocking reader
if self._reader is not None:
self.set_newmeasurements_receiver(None)
self._reader.dispose()
self._reader = None
if self._connectionterminated_receiver is not None:
self._connectionterminated_receiver()
def _handle_data_starttime(self, starttime: np.int64):
if self._data_starttime_receiver is not None:
self._data_starttime_receiver(starttime)
def _handle_configurationchanged(self):
if self._configurationchanged_receiver is not None:
self._configurationchanged_receiver()
def _handle_processingcomplete(self, message: str):
self.statusmessage(message)
if self._historicalreadcomplete_receiver is not None:
self._historicalreadcomplete_receiver()
[docs]
def default_statusmessage_logger(self, message: str):
"""
Implements the default handler for the status message callback.
Default implementation synchronously writes output to stdio.
Using user implementation with logging is recommended.
"""
self._consolelock.acquire()
try:
print(message)
finally:
self._consolelock.release()
[docs]
def default_errormessage_logger(self, message: str):
"""
Implements the default handler for the error message callback.
Default implementation synchronously writes output to stderr.
Using user implementation with logging is recommended.
"""
self._consolelock.acquire()
try:
print(message, file=sys.stderr)
finally:
self._consolelock.release()
[docs]
def default_connectionestablished_receiver(self):
"""
Implements the default handler for the connection established callback.
Default implementation simply writes connection feedback to status message callback.
"""
con = self._datasubscriber.connector
self.statusmessage(f"Connection to {con.hostname}:{con.port} established.")
[docs]
def default_connectionterminated_receiver(self):
"""
Implements the default handler for the connection terminated callback.
Default implementation simply writes connection terminated feedback to error message callback.
"""
con = self._datasubscriber.connector
self.errormessage(f"Connection to {con.hostname}:{con.port} terminated.")
[docs]
def set_statusmessage_logger(self, callback: Callable[[str], None] | None):
"""
Defines the callback that handles informational message logging.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `statusmessage_logger` property setter for consistency with Publisher API.
"""
self._statusmessage_logger = callback
[docs]
def set_errormessage_logger(self, callback: Callable[[str], None] | None):
"""
Defines the callback that handles error message logging.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `errormessage_logger` property setter for consistency with Publisher API.
"""
self._errormessage_logger = callback
[docs]
def set_subscriptionupdated_receiver(self, callback: Callable[[SignalIndexCache], None] | None):
"""
Defines the callback that handles notifications that a new `SignalIndexCache` has been received.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `subscriptionupdated_receiver` property setter for consistency with Publisher API.
"""
self._datasubscriber.subscriptionupdated_callback = callback
[docs]
def set_data_starttime_receiver(self, callback: Callable[[np.int64], None] | None):
"""
Defines the callback that handles notification of first received measurement.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `data_starttime_receiver` property setter for consistency with Publisher API.
"""
self._data_starttime_receiver = callback
[docs]
def set_configurationchanged_receiver(self, callback: Callable[[], None] | None):
"""
Defines the callback that handles notifications that the data publisher configuration has changed.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `configurationchanged_receiver` property setter for consistency with Publisher API.
"""
self._configurationchanged_receiver = callback
[docs]
def set_newmeasurements_receiver(self, callback: Callable[[List[Measurement]], None] | None):
"""
Defines the callback that handles reception of new measurements.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `newmeasurements_receiver` property setter for consistency with Publisher API.
"""
self._datasubscriber.newmeasurements_callback = callback
[docs]
def set_newbufferblock_receiver(self, callback: Callable[[List[BufferBlock]], None] | None):
"""
Defines the callback that handles reception of new buffer blocks.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `newbufferblock_receiver` property setter for consistency with Publisher API.
"""
self._datasubscriber.newbufferblocks_callback = callback
[docs]
def set_notification_receiver(self, callback: Callable[[str], None] | None):
"""
Defines the callback that handles reception of a notification.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `notification_receiver` property setter for consistency with Publisher API.
"""
self._datasubscriber.notificationreceived_callback = callback
[docs]
def set_historicalreadcomplete_receiver(self, callback: Callable[[], None] | None):
"""
Defines the callback that handles notification that temporal processing has completed, i.e.,
the end of a historical playback data stream has been reached.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `historicalreadcomplete_receiver` property setter for consistency with Publisher API.
"""
self._historicalreadcomplete_receiver = callback
[docs]
def set_connectionestablished_receiver(self, callback: Callable[[], None] | None):
"""
Defines the callback that handles notification that a connection has been established.
Default implementation simply writes connection feedback to status message handler.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `connectionestablished_receiver` property setter for consistency with Publisher API.
"""
self._connectionestablished_receiver = callback
[docs]
def set_connectionterminated_receiver(self, callback: Callable[[], None] | None):
"""
Defines the callback that handles notification that a connection has been terminated.
Default implementation simply writes connection terminated feedback to error message handler.
Assignment will take effect immediately, even while subscription is active.
.. deprecated::
Prefer using `connectionterminated_receiver` property setter for consistency with Publisher API.
"""
self._connectionterminated_receiver = callback