Source code for gsf.encoding7bit

# ******************************************************************************************************
#  encoding7bit.py - Gbtc
#
#  Copyright © 2021, Grid Protection Alliance.  All Rights Reserved.
#
#  Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
#  the NOTICE file distributed with this work for additional information regarding copyright ownership.
#  The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
#  file except in compliance with the License. You may obtain a copy of the License at:
#
#      http://opensource.org/licenses/MIT
#
#  Unless agreed to in writing, the subject software distributed under the License is distributed on an
#  "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
#  License for the specific language governing permissions and limitations.
#
#  Code Modification History:
#  ----------------------------------------------------------------------------------------------------
#  02/01/2021 - J. Ritchie Carroll
#       Generated original version of source code.
#
# ******************************************************************************************************

from . import ByteSize
from typing import Callable
import numpy as np


[docs] class Encoding7Bit: """ Defines 7-bit encoding/decoding functions. """
[docs] @staticmethod def write_int16(stream_writer: Callable[[np.uint8], int], value: np.int16) -> int: """ Writes 16-bit signed integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 16-bit value to write """ return Encoding7Bit.write_uint16(stream_writer, np.frombuffer(int(value).to_bytes(ByteSize.INT16, "little", signed=True), np.uint16)[0])
[docs] @staticmethod def write_uint16(stream_writer: Callable[[np.uint8], int], value: np.uint16) -> int: """ Writes 16-bit unsigned integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 16-bit value to write """ np128 = np.uint16(128) np7 = np.uint16(7) def np_stream_writer(uint16): return stream_writer(np.uint8(uint16)) if value < np128: np_stream_writer(value) # 1 return 1 np_stream_writer(value | np128) # 1 np_stream_writer(value >> np7) # 2 return 2
[docs] @staticmethod def write_int32(stream_writer: Callable[[np.uint8], int], value: np.int32) -> int: """ Writes 32-bit signed integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 32-bit value to write """ return Encoding7Bit.write_uint32(stream_writer, np.frombuffer(int(value).to_bytes(ByteSize.INT32, "little", signed=True), np.uint32)[0])
[docs] @staticmethod def write_uint32(stream_writer: Callable[[np.uint8], int], value: np.uint32) -> int: """ Writes 32-bit unsigned integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 32-bit value to write """ np128 = np.uint32(128) np7 = np.uint32(7) def np_stream_writer(uint32): return stream_writer(np.uint8(uint32)) if value < np128: np_stream_writer(value) # 1 return 1 np_stream_writer(value | np128) # 1 if value < np128 * np128: np_stream_writer(value >> np7) # 2 return 2 np_stream_writer((value >> np7) | np128) # 2 if value < np128 * np128 * np128: np_stream_writer(value >> (np7 + np7)) # 3 return 3 np_stream_writer((value >> (np7 + np7)) | np128) # 3 if value < np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7)) # 4 return 4 np_stream_writer((value >> (np7 + np7 + np7)) | np128) # 4 np_stream_writer(value >> (np7 + np7 + np7 + np7)) # 5 return 5
[docs] @staticmethod def write_int64(stream_writer: Callable[[np.uint8], int], value: np.int64) -> int: """ Writes 64-bit signed integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 64-bit value to write """ return Encoding7Bit.write_uint64(stream_writer, np.frombuffer(int(value).to_bytes(ByteSize.INT64, "little", signed=True), np.uint64)[0])
[docs] @staticmethod def write_uint64(stream_writer: Callable[[np.uint8], int], value: np.uint64) -> int: """ Writes 64-bit unsigned integer value using 7-bit encoding to the provided stream writer. Parameters ---------- stream_writer: function to write a byte value to a stream value: 64-bit value to write """ np128 = np.uint64(128) np7 = np.uint64(7) def np_stream_writer(uint64): return stream_writer(np.uint8(uint64)) if value < np128: np_stream_writer(value) # 1 return 1 np_stream_writer(value | np128) # 1 if value < np128 * np128: np_stream_writer(value >> np7) # 2 return 2 np_stream_writer((value >> np7) | np128) # 2 if value < np128 * np128 * np128: np_stream_writer(value >> (np7 + np7)) # 3 return 3 np_stream_writer((value >> (np7 + np7)) | np128) # 3 if value < np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7)) # 4 return 4 np_stream_writer((value >> (np7 + np7 + np7)) | np128) # 4 if value < np128 * np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7 + np7)) # 5 return 5 np_stream_writer((value >> (np7 + np7 + np7 + np7)) | np128) # 5 if value < np128 * np128 * np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7 + np7 + np7)) # 6 return 6 np_stream_writer((value >> (np7 + np7 + np7 + np7 + np7)) | np128) # 6 if value < np128 * np128 * np128 * np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7 + np7 + np7 + np7)) # 7 return 7 np_stream_writer( (value >> (np7 + np7 + np7 + np7 + np7 + np7)) | np128) # 7 if value < np128 * np128 * np128 * np128 * np128 * np128 * np128 * np128: np_stream_writer(value >> (np7 + np7 + np7 + np7 + np7 + np7 + np7)) # 8 return 8 np_stream_writer(value >> (np7 + np7 + np7 + np7 + np7 + np7 + np7) | np128) # 8 np_stream_writer(value >> (np7 + np7 + np7 + np7 + np7 + np7 + np7 + np7)) # 9 return 9
[docs] @staticmethod def read_int16(stream_reader: Callable[[], np.uint8]) -> np.int16: """ Reads 16-bit signed integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_reader: function to read a byte value from a stream Notes ----- Call expects one to two bytes to be available in base stream. """ return np.frombuffer(int(Encoding7Bit.read_uint16(stream_reader)).to_bytes(ByteSize.UINT16, "little"), np.int16)[0]
[docs] @staticmethod def read_uint16(stream_reader: Callable[[], np.uint8]) -> np.uint16: """ Reads 16-bit unsigned integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_reader: function to read a byte value from a stream Notes ----- Call expects one to two bytes to be available in base stream. """ np128 = np.uint16(128) np7 = np.uint16(7) def np_stream_reader(): return np.uint16(stream_reader()) value = np_stream_reader() if value < np128: return value value ^= (np_stream_reader() << (np7)) return value ^ np.uint16(0x80)
[docs] @staticmethod def read_int32(stream_reader: Callable[[], np.uint8]) -> np.int32: """ Reads 32-bit signed integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_reader: function to read a byte value from a stream Notes ----- Call expects one to five bytes to be available in base stream. """ return np.frombuffer(int(Encoding7Bit.read_uint32(stream_reader)).to_bytes(ByteSize.UINT32, "little"), np.int32)[0]
[docs] @staticmethod def read_uint32(stream_reader: Callable[[], np.uint8]) -> np.uint32: """ Reads 32-bit unsigned integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_reader: function to read a byte value from a stream Notes ----- Call expects one to five bytes to be available in base stream. """ np128 = np.uint32(128) np7 = np.uint32(7) def np_stream_reader(): return np.uint32(stream_reader()) value = np_stream_reader() if value < np128: return value value ^= (np_stream_reader() << (np7)) if value < np128 * np128: return value ^ np.uint32(0x80) value ^= (np_stream_reader() << (np7 + np7)) if value < np128 * np128 * np128: return value ^ np.uint32(0x4080) value ^= (np_stream_reader() << (np7 + np7 + np7)) if value < np128 * np128 * np128 * np128: return value ^ np.uint32(0x204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7)) return value ^ np.uint32(0x10204080)
[docs] @staticmethod def read_int64(stream_reader: Callable[[], np.uint8]) -> np.int64: """ Reads 64-bit signed integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_writer: function to write a byte value to a stream value: 64-bit value to write Notes ----- Call expects one to nine bytes to be available in base stream. """ return np.frombuffer(int(Encoding7Bit.read_uint64(stream_reader)).to_bytes(ByteSize.UINT64, "little"), np.int64)[0]
[docs] @staticmethod def read_uint64(stream_reader: Callable[[], np.uint8]) -> np.uint64: """ Reads 64-bit unsigned integer value using 7-bit encoding from the provided stream reader. Parameters ---------- stream_writer: function to write a byte value to a stream value: 64-bit value to write Notes ----- Call expects one to nine bytes to be available in base stream. """ np128 = np.uint64(128) np7 = np.uint64(7) def np_stream_reader(): return np.uint64(stream_reader()) value = np_stream_reader() if value < np128: return value value ^= (np_stream_reader() << (np7)) if value < np128 * np128: return value ^ np.uint64(0x80) value ^= (np_stream_reader() << (np7 + np7)) if value < np128 * np128 * np128: return value ^ np.uint64(0x4080) value ^= (np_stream_reader() << (np7 + np7 + np7)) if value < np128 * np128 * np128 * np128: return value ^ np.uint64(0x204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7)) if value < np128 * np128 * np128 * np128 * np128: return value ^ np.uint64(0x10204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7 + np7)) if value < np128 * np128 * np128 * np128 * np128 * np128: return value ^ np.uint64(0x810204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7 + np7 + np7)) if value < np128 * np128 * np128 * np128 * np128 * np128 * np128: return value ^ np.uint64(0x40810204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7 + np7 + np7 + np7)) if value < np128 * np128 * np128 * np128 * np128 * np128 * np128 * np128: return value ^ np.uint64(0x2040810204080) value ^= (np_stream_reader() << (np7 + np7 + np7 + np7 + np7 + np7 + np7 + np7)) return value ^ np.uint64(0x102040810204080)