Source code for sttp.transport.routingtables

# ******************************************************************************************************
#  routingtables.py - Gbtc
#
#  Copyright © 2026, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  01/05/2026 - Generated by porting C++ RoutingTables
#       Ported from cppapi/src/lib/transport/RoutingTables.{h,cpp}
#
# ******************************************************************************************************

# Ported from cppapi/src/lib/transport/RoutingTables.cpp : class RoutingTables
# Differences: Python uses threading and dict; otherwise parity maintained.

from typing import Dict, Set, List, TYPE_CHECKING
from uuid import UUID
from threading import Thread, RLock
from queue import Queue

if TYPE_CHECKING:
    from .subscriberconnection import SubscriberConnection
    from .measurement import Measurement


[docs] class RoutingTables: """ Manages routing of measurements to subscribed connections. """ def __init__(self): """ Creates a new routing tables instance. """ # Routing table: SignalID -> Set of SubscriberConnections self._active_routes: Dict[UUID, Set['SubscriberConnection']] = {} self._active_routes_lock = RLock() # Operation queue for thread-safe route updates self._operation_queue: Queue = Queue() self._enabled = True # Background thread for processing routing table operations self._routing_thread = Thread(target=self._process_routing_operations, daemon=True) self._routing_thread.start() def _process_routing_operations(self): """ Background thread that processes routing table update operations. """ while self._enabled: try: operation = self._operation_queue.get(timeout=1.0) if operation is not None: operation() except: # Queue get timeout or shutdown pass
[docs] def update_routes(self, connection: 'SubscriberConnection', signal_ids: Set[UUID]): """ Updates the routing table to route specified signal IDs to the given connection. """ def _update(): with self._active_routes_lock: # Clone current routes (shallow copy of dict, new sets for each signal) new_routes = {signal_id: set(destinations) for signal_id, destinations in self._active_routes.items()} # Remove connection from any routes not in the new signal set for signal_id, destinations in list(new_routes.items()): if signal_id not in signal_ids: destinations.discard(connection) if not destinations: del new_routes[signal_id] # Add connection to desired signal routes for signal_id in signal_ids: if signal_id not in new_routes: new_routes[signal_id] = set() new_routes[signal_id].add(connection) # Atomically update active routes self._active_routes = new_routes # Execute immediately instead of queuing - critical for subscription flow _update()
[docs] def remove_routes(self, connection: 'SubscriberConnection'): """ Removes all routes for the specified connection. """ def _remove(): with self._active_routes_lock: # Clone current routes (shallow copy of dict, new sets for each signal) new_routes = {signal_id: set(destinations) for signal_id, destinations in self._active_routes.items()} # Remove connection from all routes for signal_id in list(new_routes.keys()): new_routes[signal_id].discard(connection) if not new_routes[signal_id]: del new_routes[signal_id] # Atomically update active routes self._active_routes = new_routes self._operation_queue.put(_remove)
[docs] def publish_measurements(self, measurements: List['Measurement']): """ Routes measurements to appropriate subscriber connections. """ # Group measurements by destination connection routed_measurements: Dict['SubscriberConnection', List['Measurement']] = {} with self._active_routes_lock: for measurement in measurements: destinations = self._active_routes.get(measurement.signalid) if destinations: for connection in destinations: if connection is None: continue if connection not in routed_measurements: routed_measurements[connection] = [] routed_measurements[connection].append(measurement) # Publish to each destination for connection, connection_measurements in routed_measurements.items(): if connection.is_subscribed and not connection.is_temporal_subscription: connection.publish_measurements(connection_measurements)
[docs] def clear(self): """ Clears all routing tables. """ with self._active_routes_lock: self._active_routes.clear() # Clear operation queue while not self._operation_queue.empty(): try: self._operation_queue.get_nowait() except: break
[docs] def dispose(self): """ Shuts down the routing tables. """ self._enabled = False self._operation_queue.put(None) # Signal thread to exit if self._routing_thread.is_alive(): self._routing_thread.join(timeout=2.0) self.clear()