Source code for sttp.reader
# ******************************************************************************************************
# reader.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/24/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from __future__ import annotations
from .transport.measurement import Measurement
from typing import List, Optional, Tuple, TYPE_CHECKING
from queue import Full, Queue
import contextlib
if TYPE_CHECKING:
from subscriber import Subscriber
[docs]
class MeasurementReader:
"""
Defines an STTP measurement reader.
"""
def __init__(self, subscriber: Subscriber):
"""
Creates a new `MeasurementReader`.
"""
self._queue = Queue(1)
self._subscriber = subscriber
self._subscriber.set_newmeasurements_receiver(self._read_measurements)
self._disposed = False
[docs]
def dispose(self):
"""
Cleanly shuts down a `MeasurmentReader` that is no longer being used.
This method will release any waiting threads.
"""
self._disposed = True
with contextlib.suppress(Full):
self._queue.put_nowait(Measurement())
self._task_done()
def _task_done(self):
with contextlib.suppress(ValueError):
self._queue.task_done()
def _read_measurements(self, measurements: List[Measurement]):
for measurement in measurements:
if self._disposed:
break
self._queue.put(measurement)
self._queue.join()
[docs]
def next_measurement(self) -> Tuple[Optional[Measurement], bool]:
"""
Blocks current thread until a new measurement arrived.
"""
current = self._queue.get()
self._task_done()
return (None, False) if self._disposed else (current, True)