# ******************************************************************************************************
# routingtables.py - Gbtc
#
# Copyright © 2026, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/05/2026 - Generated by porting C++ RoutingTables
# Ported from cppapi/src/lib/transport/RoutingTables.{h,cpp}
#
# ******************************************************************************************************
# Ported from cppapi/src/lib/transport/RoutingTables.cpp : class RoutingTables
# Differences: Python uses threading and dict; otherwise parity maintained.
from typing import Dict, Set, List, TYPE_CHECKING
from uuid import UUID
from threading import Thread, RLock
from queue import Queue
if TYPE_CHECKING:
from .subscriberconnection import SubscriberConnection
from .measurement import Measurement
[docs]
class RoutingTables:
"""
Manages routing of measurements to subscribed connections.
"""
def __init__(self):
"""
Creates a new routing tables instance.
"""
# Routing table: SignalID -> Set of SubscriberConnections
self._active_routes: Dict[UUID, Set['SubscriberConnection']] = {}
self._active_routes_lock = RLock()
# Operation queue for thread-safe route updates
self._operation_queue: Queue = Queue()
self._enabled = True
# Background thread for processing routing table operations
self._routing_thread = Thread(target=self._process_routing_operations, daemon=True)
self._routing_thread.start()
def _process_routing_operations(self):
"""
Background thread that processes routing table update operations.
"""
while self._enabled:
try:
operation = self._operation_queue.get(timeout=1.0)
if operation is not None:
operation()
except:
# Queue get timeout or shutdown
pass
[docs]
def update_routes(self, connection: 'SubscriberConnection', signal_ids: Set[UUID]):
"""
Updates the routing table to route specified signal IDs to the given connection.
"""
def _update():
with self._active_routes_lock:
# Clone current routes (shallow copy of dict, new sets for each signal)
new_routes = {signal_id: set(destinations) for signal_id, destinations in self._active_routes.items()}
# Remove connection from any routes not in the new signal set
for signal_id, destinations in list(new_routes.items()):
if signal_id not in signal_ids:
destinations.discard(connection)
if not destinations:
del new_routes[signal_id]
# Add connection to desired signal routes
for signal_id in signal_ids:
if signal_id not in new_routes:
new_routes[signal_id] = set()
new_routes[signal_id].add(connection)
# Atomically update active routes
self._active_routes = new_routes
# Execute immediately instead of queuing - critical for subscription flow
_update()
[docs]
def remove_routes(self, connection: 'SubscriberConnection'):
"""
Removes all routes for the specified connection.
"""
def _remove():
with self._active_routes_lock:
# Clone current routes (shallow copy of dict, new sets for each signal)
new_routes = {signal_id: set(destinations) for signal_id, destinations in self._active_routes.items()}
# Remove connection from all routes
for signal_id in list(new_routes.keys()):
new_routes[signal_id].discard(connection)
if not new_routes[signal_id]:
del new_routes[signal_id]
# Atomically update active routes
self._active_routes = new_routes
self._operation_queue.put(_remove)
[docs]
def publish_measurements(self, measurements: List['Measurement']):
"""
Routes measurements to appropriate subscriber connections.
"""
# Group measurements by destination connection
routed_measurements: Dict['SubscriberConnection', List['Measurement']] = {}
with self._active_routes_lock:
for measurement in measurements:
destinations = self._active_routes.get(measurement.signalid)
if destinations:
for connection in destinations:
if connection is None:
continue
if connection not in routed_measurements:
routed_measurements[connection] = []
routed_measurements[connection].append(measurement)
# Publish to each destination
for connection, connection_measurements in routed_measurements.items():
if connection.is_subscribed and not connection.is_temporal_subscription:
connection.publish_measurements(connection_measurements)
[docs]
def clear(self):
"""
Clears all routing tables.
"""
with self._active_routes_lock:
self._active_routes.clear()
# Clear operation queue
while not self._operation_queue.empty():
try:
self._operation_queue.get_nowait()
except:
break
[docs]
def dispose(self):
"""
Shuts down the routing tables.
"""
self._enabled = False
self._operation_queue.put(None) # Signal thread to exit
if self._routing_thread.is_alive():
self._routing_thread.join(timeout=2.0)
self.clear()