Source code for sttp.transport.signalindexcache

# ******************************************************************************************************
#  signalindexcache.py - Gbtc
#
#  Copyright © 2022, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  08/15/2022 - J. Ritchie Carroll
#       Generated original version of source code.
#
# ******************************************************************************************************

from __future__ import annotations
from gsf import Empty, Limits
from gsf.endianorder import BigEndian
from .tssc.decoder import Decoder
from typing import Dict, List, Set, Tuple, Optional, TYPE_CHECKING
from uuid import UUID
import numpy as np

if TYPE_CHECKING:
    from datasubscriber import DataSubscriber


[docs] class SignalIndexCache: """ Represents a mapping of 32-bit runtime IDs to 128-bit globally unique measurement IDs. The class additionally provides reverse lookup and an extra mapping to human-readable measurement keys. """ def __init__(self): self._reference: Dict[np.int32, np.uint32] = {} self._signalidlist: List[UUID] = [] self._sourcelist: List[str] = [] self._idlist: List[np.uint64] = [] self._signalidcache: Dict[UUID, np.int32] = {} self._binarylength = np.uint32(0) self._tsscdecoder: Optional[Decoder] = None def _add_record(self, ds: DataSubscriber, signalindex: np.int32, signalid: UUID, source: str, id: np.uint64, charsizeestimate: np.uint32 = 1): index = np.uint32(len(self._signalidlist)) self._reference[signalindex] = index self._signalidlist.append(signalid) self._sourcelist.append(source) self._idlist.append(id) self._signalidcache[signalid] = signalindex # Lookup measurement metadata, registering it if not defined already metadata = ds.lookup_metadata(signalid, source, id) # Char size here helps provide a rough-estimate on binary length used to reserve # bytes for a vector, if exact size is needed call RecalculateBinaryLength first self._binarylength += np.uint32(32 + len(source) * charsizeestimate)
[docs] def contains(self, signalindex: np.int32) -> bool: """ Determines if the specified signalindex exists with the `SignalIndexCache`. """ return signalindex in self._reference
[docs] def signalid(self, signalindex: np.int32) -> UUID: """ Returns the signal ID Guid for the specified signalindex in the `SignalIndexCache`. """ if (index := self._reference.get(signalindex)) is not None: return self._signalidlist[index] return Empty.GUID
@property def signalids(self) -> Set[UUID]: """ Gets a set for all the Guid values found in the `SignalIndexCache`. """ return set(self._signalidlist)
[docs] def source(self, signalindex: np.int32) -> str: """ Returns the `Measurement` source string for the specified signalindex in the `SignalIndexCache`. """ if (index := self._reference.get(signalindex)) is not None: return self._sourcelist[index] return Empty.STRING
[docs] def id(self, signalindex: np.int32) -> np.uint64: """ Returns the `Measurement` integer ID for the specified signalindex in the `SignalIndexCache`. """ if (index := self._reference.get(signalindex)) is not None: return self._idlist[index] return np.uint64(Limits.MAXUINT64)
[docs] def record(self, signalindex: np.int32) -> Tuple[UUID, str, np.uint64, bool]: """ Record returns the key `Measurement` values, signal ID Guid, source string, and integer ID and a final boolean value representing find success for the specified signalindex in the `SignalIndexCache`. """ if (index := self._reference.get(signalindex)) is not None: return (self._signalidlist[index], self._sourcelist[index], self._idlist[index], True) return Empty.GUID, Empty.STRING, np.uint64(0), False
[docs] def signalindex(self, signalid: UUID) -> np.int32: """ Returns the signal index for the specified signal ID Guid in the `SignalIndexCache`. """ if (signalindex := self._signalidcache.get(signalid)) is not None: return signalindex return -1
@property def count(self) -> np.uint32: """ Gets the number of `Measurement` records that can be found in the `SignalIndexCache`. """ return np.uint32(len(self._signalidcache))
[docs] def decode(self, ds: DataSubscriber, buffer: bytes) -> Tuple[UUID, Optional[Exception]]: """ Parses a `SignalIndexCache` from the specified byte buffer received from a `DataPublisher`. """ length = len(buffer) if length < 4: return Empty.GUID, ValueError("not enough buffer provided to parse") # Byte size of cache binarylength = BigEndian.to_uint32(buffer) offset = 4 if length < binarylength: return Empty.GUID, ValueError("not enough buffer provided to parse") subscriberid = UUID(bytes=buffer[offset:offset + 16]) offset += 16 # Number of references referencecount = BigEndian.to_uint32(buffer[offset:]) offset += 4 for _ in range(referencecount): # Signal index signalindex = np.int32(BigEndian.to_uint32(buffer[offset:])) offset += 4 # Signal ID signalid = UUID(bytes=buffer[offset:offset + 16]) offset += 16 # Measurement key Source sourcesize = BigEndian.to_uint32(buffer[offset:]) offset += 4 source = ds.decodestr(buffer[offset: offset + sourcesize]) offset += sourcesize # Measurement key ID keyid = BigEndian.to_uint64(buffer[offset:]) offset += 8 self._add_record(ds, signalindex, signalid, source, keyid) return (subscriberid, None)