Source code for sttp.metadata.cache

# ******************************************************************************************************
#  metadata/cache.py - Gbtc
#
#  Copyright © 2021, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  02/07/2021 - J. Ritchie Carroll
#       Generated original version of source code.
#
# ******************************************************************************************************

from gsf import Empty
from ..data.dataset import DataSet
from ..data.datarow import DataRow
from ..data.datatype import default_datatype
from .record.measurement import MeasurementRecord, SignalType
from .record.device import DeviceRecord
from .record.phasor import PhasorRecord
from typing import List, Dict, Tuple, Optional
from uuid import UUID, uuid1
import numpy as np


[docs] class MetadataCache: """ Represents a collection of parsed STTP metadata records. """ def __init__(self, dataset: DataSet = ...): self.signalid_measurement_map: Dict[UUID, MeasurementRecord] = {} """ Defines map of unique measurement signal IDs to measurement records. Measurement signal IDs (a UUID) are typically unique across disparate systems. """ self.id_measurement_map: Dict[np.uint64, MeasurementRecord] = {} """ Defines map of measurement key IDs to measurement records. Measurement key IDs are typically unique for a given publisher. """ self.pointtag_measurement_map: Dict[str, MeasurementRecord] = {} """ Defines map of measurement point tags to measurement records. Measurement point tags are typically unique for a given publisher. """ self.signalref_measurement_map: Dict[str, MeasurementRecord] = {} """ Defines map of measurement signal references to measurement records. Measurement signal references are typically unique for a given publisher. """ self.deviceacronym_device_map: Dict[str, DeviceRecord] = {} """ Defines map of device acronym to device records. Device acronyms are typically unique for a given publisher. """ self.deviceid_device_map: Dict[UUID, DeviceRecord] = {} """ Defines map of unique device IDs to device records. Device IDs (a UUID) are typically unique across disparate systems. """ self.measurement_records: List[MeasurementRecord] = [] """ Defines list of measurement records in the cache. """ self.device_records: List[DeviceRecord] = [] """ Defines list of device records in the cache. """ self.phasorRecords: List[PhasorRecord] = [] """ Defines list of phasor records in the cache. """ if dataset is ...: return self._extract_measurements(dataset) self._extract_devices(dataset) self._extract_phasors(dataset) # Extract measurement records from MeasurementDetail table rows def _extract_measurements(self, dataset: DataSet): measurement_records: List[MeasurementRecord] = [] for measurement in dataset["MeasurementDetail"]: get_rowvalue = lambda columnname, default = None: self._get_rowvalue(measurement, columnname, default) (source, id) = self._parse_measurementkey(get_rowvalue("ID", Empty.STRING)) measurement_records.append(MeasurementRecord( # `signalid`: Extract signal ID, the unique measurement guid get_rowvalue("SignalID", uuid1()), # 'adder': Extract the measurement adder get_rowvalue("Adder", np.float64(0.0)), # 'multiplier': Extract the measurement multiplier get_rowvalue("Multiplier", np.float64(1.0)), # `id`: STTP numeric point ID of measurement (from measurement key) id, # `source`: Source instance name of measurement (from measurement key) source, # `signaltypename`: Extract the measurement signal type name get_rowvalue("SignalAcronym", "UNKN"), # `signalreference`: Extract the measurement signal reference get_rowvalue("SignalReference"), # `pointtag`: Extract the measurement point tag get_rowvalue("PointTag"), # `deviceacronym`: Extract the measurement's parent device acronym get_rowvalue("DeviceAcronym"), # `description`: Extract the measurement description name get_rowvalue("Description"), # `updatedon`: Extract the last update time for measurement metadata get_rowvalue("UpdatedOn") )) for measurement in measurement_records: self.id_measurement_map[measurement.id] = measurement for measurement in measurement_records: self.signalid_measurement_map[measurement.signalid] = measurement for measurement in measurement_records: self.pointtag_measurement_map[measurement.pointtag] = measurement for measurement in measurement_records: self.signalref_measurement_map[measurement.signalreference] = measurement self.measurement_records = measurement_records # Extract device records from DeviceDetail table rows def _extract_devices(self, dataset: DataSet): device_records: List[DeviceRecord] = [] default_nodeid = uuid1() for device in dataset["DeviceDetail"]: get_rowvalue = lambda columnname, default = None: self._get_rowvalue(device, columnname, default) device_records.append(DeviceRecord( # `nodeid`: Extract node ID guid for the device get_rowvalue("NodeID", default_nodeid), # `deviceid`: Extract device ID, the unique device guid get_rowvalue("UniqueID", uuid1()), # `acronym`: Alpha-numeric identifier of the device get_rowvalue("Acronym"), # `name`: Free form name for the device get_rowvalue("Name"), # `accessid`: Access ID for the device get_rowvalue("AccessID"), # `parentacronym`: Alpha-numeric parent identifier of the device get_rowvalue("ParentAcronym"), # `protocolname`: Protocol name of the device get_rowvalue("ProtocolName"), # `framespersecond`: Data rate for the device get_rowvalue("FramesPerSecond", DeviceRecord.DEFAULT_FRAMESPERSECOND), # `companyacronym`: Company acronym of the device get_rowvalue("CompanyAcronym"), # `vendoracronym`: Vendor acronym of the device get_rowvalue("VendorAcronym"), # `vendordevicename`: Vendor device name of the device get_rowvalue("VendorDeviceName"), # `longitude`: Longitude of the device get_rowvalue("Longitude"), # `latitude`: Latitude of the device get_rowvalue("Latitude"), # `updatedon`: Extract the last update time for device metadata get_rowvalue("UpdatedOn") )) for device in device_records: self.deviceacronym_device_map[device.acronym] = device for device in device_records: self.deviceid_device_map[device.deviceid] = device self.device_records = device_records # Associate measurements with parent devices for measurement in self.measurement_records: device = self.find_device_acronym(measurement.deviceacronym) if device is not None: measurement.device = device device.measurements.add(measurement) # Extract phasor records from PhasorDetail table rows def _extract_phasors(self, dataset: DataSet): phasor_records: List[PhasorRecord] = [] for phasor in dataset["PhasorDetail"]: get_rowvalue = lambda columnname, default = None: self._get_rowvalue(phasor, columnname, default) phasor_records.append(PhasorRecord( # `id`: unique integer identifier for phasor get_rowvalue("ID"), # `deviceacronym`: Alpha-numeric identifier of the associated device get_rowvalue("DeviceAcronym"), # `label`: Free form label for the phasor get_rowvalue("Label"), # `type`: Phasor type for the phasor get_rowvalue("Type", PhasorRecord.DEFAULT_TYPE), # `phase`: Phasor phase for the phasor get_rowvalue("Phase", PhasorRecord.DEFAULT_PHASE), # `sourceindex`: Source index for the phasor get_rowvalue("SourceIndex"), # `basekv`: BaseKV level for the phasor get_rowvalue("BaseKV"), # `updatedon`: Extract the last update time for phasor metadata get_rowvalue("UpdatedOn") )) # Associate phasors with parent device and associated angle/magnitude measurements for phasor in phasor_records: device = self.find_device_acronym(phasor.deviceacronym) if device is not None: phasor.device = device device.phasors.add(phasor) angle = self.find_measurement_signalreference(f"{device.acronym}-PA{phasor.sourceindex}") magnitude = self.find_measurement_signalreference(f"{device.acronym}-PM{phasor.sourceindex}") if angle is not None and magnitude is not None: phasor.measurements.clear() angle.phasor = phasor phasor.measurements.append(angle) # Must be index 0 magnitude.phasor = phasor phasor.measurements.append(magnitude) # Must be index 1 self.phasorRecords = phasor_records def _get_rowvalue(self, row: DataRow, columnname: str, default: Optional[object] = None): value, err = row.value_byname(columnname) if value is None or err is not None: if default is not None: return default if (column := row.parent.column_byname(columnname)) is None: return default return default_datatype(column.datatype) return value def _parse_measurementkey(self, value: str) -> Tuple[str, np.uint64]: defaultvalue = "_", np.uint64(0) try: parts = value.split(":") return defaultvalue if len(parts) != 2 else (parts[0], np.uint64(parts[1])) except Exception: return defaultvalue
[docs] def add_measurement(self, measurement: MeasurementRecord): self.signalid_measurement_map[measurement.signalid] = measurement if measurement.id > 0: self.id_measurement_map[measurement.id] = measurement if len(measurement.pointtag) > 0: self.pointtag_measurement_map[measurement.pointtag] = measurement if len(measurement.signalreference) > 0: self.signalref_measurement_map[measurement.signalreference] = measurement self.measurement_records.append(measurement)
[docs] def find_measurement_signalid(self, signalid: UUID) -> Optional[MeasurementRecord]: return self.signalid_measurement_map.get(signalid)
[docs] def find_measurement_id(self, id: np.uint64) -> Optional[MeasurementRecord]: return self.id_measurement_map.get(id)
[docs] def find_measurement_pointtag(self, pointtag: str) -> Optional[MeasurementRecord]: return self.pointtag_measurement_map.get(pointtag)
[docs] def find_measurement_signalreference(self, signalreference: str) -> Optional[MeasurementRecord]: return self.signalref_measurement_map.get(signalreference)
[docs] def find_measurements_signaltype(self, signaltype: SignalType, instancename: Optional[str] = None) -> List[MeasurementRecord]: return self.find_measurements_signaltypename(signaltype.name, instancename)
[docs] def find_measurements_signaltypename(self, signaltypename: str, instancename: Optional[str] = None) -> List[MeasurementRecord]: signaltypename = signaltypename.upper() return [ record for record in self.measurement_records if record.signaltypename.upper() == signaltypename and (instancename is None or record.instancename == instancename) ]
[docs] def find_measurements(self, searchval: str, instancename: Optional[str] = None) -> List[MeasurementRecord]: records = set() if searchval in self.pointtag_measurement_map: record = self.pointtag_measurement_map[searchval] if instancename is None or record.instancename == instancename: records.add(record) if (record := self.signalref_measurement_map.get(searchval)) is not None: if instancename is None or record.instancename == instancename: records.add(record) for record in self.measurement_records: if (searchval in record.description or searchval in record.deviceacronym) and \ (instancename is None or record.instancename == instancename): records.add(record) return list(records)
[docs] def find_device_acronym(self, deviceacronym: str) -> Optional[DeviceRecord]: return self.deviceacronym_device_map.get(deviceacronym)
[docs] def find_device_id(self, deviceid: UUID) -> Optional[DeviceRecord]: return self.deviceid_device_map.get(deviceid)
[docs] def find_devices(self, searchval: str) -> List[DeviceRecord]: records = set() if searchval in self.deviceacronym_device_map: records.add(self.deviceacronym_device_map[searchval]) for record in self.device_records: if (searchval in record.acronym or searchval in record.name or searchval in record.parentacronym or searchval in record.companyacronym or searchval in record.vendoracronym or searchval in record.vendordevicename): records.add(record) return list(records)