#******************************************************************************************************
# compact_measurement.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/!4/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
#******************************************************************************************************
from enum import IntFlag
from gsf import Limits
from gsf.endianorder import BigEndian
from ..ticks import Ticks
from .measurement import Measurement
from .constants import StateFlags
from .signalindexcache import SignalIndexCache
from typing import List, Tuple, Optional
from uuid import UUID
import numpy as np
[docs]
class CompactStateFlags(IntFlag):
"""
Enumeration constants represent each flag in the 8-bit compact measurement state flags.
"""
DATARANGE = 0x01
DATAQUALITY = 0x02
TIMEQUALITY = 0x04
SYSTEMISSUE = 0x08
CALCULATEDVALUE = 0x10
DISCARDEDVALUE = 0x20
BASETIMEOFFSET = 0x40
TIMEINDEX = 0x80
DATARANGEMASK: StateFlags = 0x000000FC
DATAQUALITYMASK: StateFlags = 0x0000EF03
TIMEQUALITYMASK: StateFlags = 0x00BF0000
SYSTEMISSUEMASK: StateFlags = 0xE0000000
CALCULATEDVALUEMASK: StateFlags = 0x00001000
DISCARDEDVALUEMASK: StateFlags = 0x00400000
FIXEDLENGTH: np.uint32 = 9
def _map_to_fullflags(compactflags: CompactStateFlags) -> StateFlags:
fullflags: StateFlags = StateFlags.NORMAL
if (compactflags & CompactStateFlags.DATARANGE) > 0:
fullflags |= DATARANGEMASK
if (compactflags & CompactStateFlags.DATAQUALITY) > 0:
fullflags |= DATAQUALITYMASK
if (compactflags & CompactStateFlags.TIMEQUALITY) > 0:
fullflags |= TIMEQUALITYMASK
if (compactflags & CompactStateFlags.SYSTEMISSUE) > 0:
fullflags |= SYSTEMISSUEMASK
if (compactflags & CompactStateFlags.CALCULATEDVALUE) > 0:
fullflags |= CALCULATEDVALUEMASK
if (compactflags & CompactStateFlags.DISCARDEDVALUE) > 0:
fullflags |= DISCARDEDVALUEMASK
return fullflags
def _map_to_compactflags(fullflags: StateFlags) -> CompactStateFlags:
compactflags: CompactStateFlags = 0
if (fullflags & DATARANGEMASK) > 0:
compactflags |= CompactStateFlags.DATARANGE
if (fullflags & DATAQUALITYMASK) > 0:
compactflags |= CompactStateFlags.DATAQUALITY
if (fullflags & TIMEQUALITYMASK) > 0:
compactflags |= CompactStateFlags.TIMEQUALITY
if (fullflags & SYSTEMISSUEMASK) > 0:
compactflags |= CompactStateFlags.SYSTEMISSUE
if (fullflags & CALCULATEDVALUEMASK) > 0:
compactflags |= CompactStateFlags.CALCULATEDVALUE
if (fullflags & DISCARDEDVALUEMASK) > 0:
compactflags |= CompactStateFlags.DISCARDEDVALUE
return compactflags
[docs]
class CompactMeasurement(Measurement):
"""
Represents a measured value, in simple compact format, for transmission or reception in STTP.
"""
def __init__(self,
signalindexcache: SignalIndexCache,
includetime: bool,
usemillisecondresolution: bool,
basetimeoffsets: List[np.int64],
signalid: UUID = ...,
value: np.float64 = ...,
timestamp: np.uint64 = ...,
flags: StateFlags = ...
):
super().__init__(signalid, value, timestamp, flags)
self._signalindexcache = signalindexcache
self._includetime = includetime
self._usemillisecondresolution = usemillisecondresolution
self._basetimeoffsets = basetimeoffsets
self._timeindex = 0
self._usingbasetimeoffset = False
[docs]
def get_binarylength(self) -> np.uint32: # sourcery skip: assign-if-exp
"""
Gets the binary byte length of a `CompactMeasurement`
"""
length = FIXEDLENGTH
if not self._includetime:
return length
basetimeoffset = self._basetimeoffsets[self._timeindex]
if basetimeoffset > 0:
# See if timestamp will fit within space allowed for active base offset. We cache result so that post call
# to GetBinaryLength, result will speed other subsequent parsing operations by not having to reevaluate.
difference = self.timestampvalue - basetimeoffset
if difference > 0:
if self._usemillisecondresolution:
self._usingbasetimeoffset = np.int64(
difference / Ticks.PERMILLISECOND) < Limits.MAXUINT16
else:
self._usingbasetimeoffset = difference < Limits.MAXUINT32
else:
self._usingbasetimeoffset = False
if self._usingbasetimeoffset:
if self._usemillisecondresolution:
length += 2 # Use two bytes for millisecond resolution timestamp with valid offset
else:
length += 4 # Use four bytes for tick resolution timestamp with valid offset
else:
length += 8 # Use eight bytes for full fidelity time
else:
# Use eight bytes for full fidelity time
length += 8
return length
[docs]
def get_timestamp_c2(self) -> np.uint16:
"""
Gets offset compressed millisecond-resolution 2-byte timestamp.
"""
return np.uint16((self.timestampvalue - self._basetimeoffsets[self._timeindex]) / Ticks.PERMILLISECOND)
[docs]
def get_timestamp_c4(self) -> np.uint32:
"""
Gets offset compressed tick-resolution 4-byte timestamp.
"""
return np.uint32(self.timestampvalue - self._basetimeoffsets[self._timeindex])
[docs]
def get_compact_stateflags(self) -> np.byte:
"""
Gets byte level compact state flags with encoded time index and base time offset bits.
"""
# Encode compact state flags
flags: CompactStateFlags = _map_to_compactflags(self.flags)
if self._timeindex != 0:
flags |= CompactStateFlags.TIMEINDEX
if self._usingbasetimeoffset:
flags |= CompactStateFlags.BASETIMEOFFSET
return np.byte(flags)
[docs]
def set_compact_stateflags(self, value: np.byte):
"""
Sets byte level compact state flags with encoded time index and base time offset bits.
"""
# Decode compact state flags
flags = CompactStateFlags(value)
self.flags = _map_to_fullflags(flags)
self._timeindex = 1 if flags & CompactStateFlags.TIMEINDEX > 0 else 0
self._usingbasetimeoffset = (
flags & CompactStateFlags.BASETIMEOFFSET) > 0
@property
def runtimeid(self) -> np.int32:
"""
Gets the 4-byte run-time signal index for this measurement.
"""
return self._signalindexcache.signalindex(self.signalid)
@runtimeid.setter
def runtimeid(self, value: np.int32):
"""
Sets the 4-byte run-time signal index for this measurement.
Notes
-----
This assigns `CompactMeasurement` signal ID from the specified signal index
based on lookup in the active `SignalIndexCache`.
"""
self.signalid = self._signalindexcache.signalid(value)
[docs]
def decode(self, buffer: bytes) -> Tuple[int, Optional[Exception]]:
"""
Parses a `CompactMeasurement` from the specified byte buffer.
"""
if len(buffer) < FIXEDLENGTH:
return 0, ValueError("not enough buffer available to deserialize compact measurement")
# Basic Compact Measurement Format:
# Field: Bytes:
# -------- -------
# Flags 1
# ID 4
# Value 4
# [Time] 0/2/4/8
# Decode state flags
self.set_compact_stateflags(buffer[0])
index = 1
# Decode runtime ID
self.runtimeid = np.int32(BigEndian.to_uint32(buffer[index:]))
index += 4
# Decode value
self.value = np.float64(BigEndian.to_float32(buffer[index:]))
index += 4
if not self._includetime:
return index, None
if self._usingbasetimeoffset:
basetimeoffset = np.uint64(self._basetimeoffsets[self._timeindex])
if self._usemillisecondresolution:
# Decode 2-byte millisecond offset timestamp
if basetimeoffset > 0:
self.timestamp = basetimeoffset + np.uint64(BigEndian.to_uint16(buffer[index:])) * Ticks.PERMILLISECOND
index += 2
else:
# Decode 4-byte tick offset timestamp
if basetimeoffset > 0:
self.timestamp = basetimeoffset + np.uint64(BigEndian.to_uint32(buffer[index:]))
index += 4
else:
# Decode 8-byte full fidelity timestamp
# Note that only a full fidelity timestamp can carry leap second flags
self.timestamp = BigEndian.to_uint64(buffer[index:])
index += 8
return index, None