Source code for sttp.transport.subscriberconnector

# ******************************************************************************************************
#  subscriberconnector.py - Gbtc
#
#  Copyright © 2022, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  08/17/2022 - J. Ritchie Carroll
#       Generated original version of source code.
#
# ******************************************************************************************************

from __future__ import annotations
from gsf import Empty
from .constants import ConnectStatus
from typing import List, Callable, Optional, TYPE_CHECKING
from threading import Lock, Thread, Event
from concurrent.futures import ThreadPoolExecutor
from sys import stderr
import math
import numpy as np

if TYPE_CHECKING:
    from datasubscriber import DataSubscriber


[docs] class SubscriberConnector: """ Represents a connector that will establish or automatically reestablish a connection from a `DataSubscriber` to a `DataPublisher`. """ DEFAULT_ERRORMESSAGE_CALLBACK: Callable[[str], None] = lambda msg: print(msg, file=stderr) DEFAULT_RECONNECT_CALLBACK: Callable[[DataSubscriber], None] = lambda _: ... DEFAULT_HOSTNAME = Empty.STRING DEFAULT_PORT = np.uint16(0) DEFAULT_MAXRETRIES = np.int32(-1) DEFAULT_RETRYINTERVAL = 1.0 DEFAULT_MAXRETRYINTERVAL = 30.0 DEFAULT_AUTORECONNECT = True def __init__(self, errormessage_callback: Callable[[str], None] = ..., reconnect_callback: Callable[[DataSubscriber], None] = ..., hostname: str = ..., port: np.uint16 = ..., maxretries: np.int32 = ..., retryinterval: float = ..., maxretryinterval: float = ..., autoreconnect: bool = ... ): self.errormessage_callback = SubscriberConnector.DEFAULT_ERRORMESSAGE_CALLBACK if errormessage_callback is ... else errormessage_callback """ Called when an error message should be logged. """ self.reconnect_callback = SubscriberConnector.DEFAULT_RECONNECT_CALLBACK if reconnect_callback is ... else reconnect_callback """ Called when `SubscriberConnector` attempts to reconnect. """ self.hostname = SubscriberConnector.DEFAULT_HOSTNAME if hostname is ... else hostname """ `DataPublisher` DNS name or IP. """ self.port = SubscriberConnector.DEFAULT_PORT if port is ... else port """ TCP/IP listening port of the `DataPublisher`. """ self.maxretries = SubscriberConnector.DEFAULT_MAXRETRIES if maxretries is ... else maxretries """ MaxRetries defines the maximum number of times to retry a connection. Set value to -1 to retry infinitely. """ self.retryinterval = SubscriberConnector.DEFAULT_RETRYINTERVAL if retryinterval is ... else retryinterval """ defines the base retry interval, in milliseconds. Retries will exponentially back-off starting from this interval. """ self.maxretryinterval = SubscriberConnector.DEFAULT_MAXRETRYINTERVAL if maxretryinterval is ... else maxretryinterval """ Defines the maximum retry interval, in milliseconds. """ self.autoreconnect = SubscriberConnector.DEFAULT_AUTORECONNECT if autoreconnect is ... else autoreconnect """ Defines flag that determines if connections should be automatically reattempted. """ self._connectattempt = np.int32(0) self._connectionrefused = False self._cancel = False self._reconnectthread: Optional[Thread] = None self._reconnectthread_mutex = Lock() self._waittimer = Event() self._waittimer_mutex = Lock() self._threadpool = ThreadPoolExecutor(thread_name_prefix="SC-PoolThread")
[docs] def dispose(self): """ Cleanly shuts down a `SubscriberConnector` that is no longer being used, e.g., during a normal application exit. """ self.cancel() self._threadpool.shutdown(wait=False)
def _autoreconnect(self, ds: DataSubscriber): if self._cancel or ds.disposing: return # Make sure to wait on any running reconnect to complete... self._reconnectthread_mutex.acquire() reconnectthread = self._reconnectthread self._reconnectthread_mutex.release() if reconnectthread is not None and reconnectthread.is_alive(): reconnectthread.join() reconnectthread = Thread(target=lambda: self._run_reconnectthread(ds), name="ReconnectThread") self._reconnectthread_mutex.acquire() self._reconnectthread = reconnectthread self._reconnectthread_mutex.release() reconnectthread.start() def _run_reconnectthread(self, ds: DataSubscriber): # Reset connection attempt counter if last attempt was not refused if not self._connectionrefused: self.reset_connection() if self.maxretries != -1 and self._connectattempt >= self.maxretries: self._dispatch_errormessage("Maximum connection retries attempted. Auto-reconnect canceled.") return self._wait_for_retry() if self._cancel or ds.disposing: return if self._connect(ds, True) == ConnectStatus.CANCELED: return # Notify the user that reconnect attempt was completed. if not self._cancel and self.reconnect_callback is not None: self.reconnect_callback(ds) def _wait_for_retry(self): # Apply exponential back-off algorithm for retry attempt delays exponent = 12 if self._connectattempt > 13 else int(self._connectattempt - 1) retryinterval = 0.0 if self._connectattempt > 0: retryinterval = self.retryinterval * math.pow(2, exponent) retryinterval = min(retryinterval, self.maxretryinterval) # Notify the user that we are attempting to reconnect. message: List[str] = ["Connection"] if self._connectattempt > 0: message.append(f" attempt {self._connectattempt + 1:,}") message.append(f" to \"{self.hostname}:{self.port}\" was terminated. ") if retryinterval > 0.0: message.append(f"Attempting to reconnect in {retryinterval:.2f} seconds...") else: message.append("Attempting to reconnect...") self._dispatch_errormessage("".join(message)) waittimer = Event() self._waittimer_mutex.acquire() self._waittimer = waittimer self._waittimer_mutex.release() waittimer.wait(retryinterval)
[docs] def connect(self, ds: DataSubscriber) -> ConnectStatus: """ Initiates a connection sequence for a `DataSubscriber` """ return ConnectStatus.CANCELED if self._cancel else self._connect(ds, False)
def _connect(self, ds: DataSubscriber, autoreconnecting: bool) -> ConnectStatus: if self.autoreconnect: ds.autoreconnect_callback = lambda: self._autoreconnect(ds) self._cancel = False while not ds.disposing: if self.maxretries != -1 and self._connectattempt >= self.maxretries: self._dispatch_errormessage("Maximum connection retries attempted. Auto-reconnect canceled.") break self._connectattempt += 1 if ds.disposing: return ConnectStatus.CANCELED if ds._connect(self.hostname, self.port, autoreconnecting) is None: break if not ds.disposing and self.retryinterval > 0: autoreconnecting = True self._wait_for_retry() if self._cancel: return ConnectStatus.CANCELED if ds.disposing: return ConnectStatus.CANCELED return ConnectStatus.SUCCESS if ds.connected else ConnectStatus.FAILED
[docs] def cancel(self): """ Stops all current and future connection sequences. """ if self._cancel: return self._cancel = True self._waittimer_mutex.acquire() waittimer = self._waittimer self._waittimer_mutex.release() if waittimer is not None: waittimer.set() self._reconnectthread_mutex.acquire() reconnectthread = self._reconnectthread self._reconnectthread_mutex.release() if reconnectthread is not None and reconnectthread.is_alive(): reconnectthread.join()
[docs] def reset_connection(self): """ Resets `SubscriberConnector` for a new connection. """ self._connectattempt = np.int32(0) self._cancel = False
def _dispatch_errormessage(self, message: str): if self.errormessage_callback is not None: self._threadpool.submit(self.errormessage_callback, message)