#******************************************************************************************************
# streamencoder.py - Gbtc
#
# Copyright © 2021, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 01/31/2021 - J. Ritchie Carroll
# Generated original version of source code.
#
#******************************************************************************************************
from .encoding7bit import Encoding7Bit
from . import ByteSize
from typing import Callable, Optional
import sys
import numpy as np
[docs]
class StreamEncoder:
"""
Defines functions for encoding and decoding native types to and from a stream.
For this class, a stream is simply an abstract notion based on provided functions
that read and write byte buffers as Python `bytes` objects advancing a position
in the base stream. The read/write functions simply wrap a base object that can
handle input and output as bytes, e.g., a `bytearray` or a `socket`.
"""
# Source C# reference: GSF.IO.StreamExtensions
def __init__(self, read: Callable[[int], bytes], write: Callable[[bytes], int], default_byteorder: str = sys.byteorder):
"""
Parameters
----------
read : func(length: int) -> bytes
Read function that accepts desired number of bytes to read and returns `bytes` object of read bytes.
Actual length of returned `bytes` object may be less than desired number of bytes.
write: func(buffer: bytes) -> int
Write function that accepts a `bytes` object and returns count of bytes written. It is expected that
call to write will successfully write all bytes, i.e., returned length should match `buffer` length.
"""
self._read = read
self._write = write
self._default_byteorder = default_byteorder
self._default_is_native = self._default_byteorder == sys.byteorder
@property
def default_byteorder(self) -> str:
return self._default_byteorder
[docs]
def write(self, source_buffer: bytes, offset: int, count: int) -> int:
if self._write(source_buffer[offset:offset + count]) != count:
raise RuntimeError(f"Failed to write {count:,} bytes to stream")
return count
[docs]
def read(self, target_buffer: bytearray, offset: int, count: int) -> int:
# `count` is requested size, value is treated as max return size
buffer = self._read(count)
read_length = len(buffer)
for i in range(read_length):
target_buffer[offset + i] = buffer[i]
return read_length
[docs]
def write_byte(self, value: np.uint8) -> int:
size = ByteSize.UINT8
if self._write(int(value).to_bytes(size, self._default_byteorder)) != size:
raise RuntimeError("Failed to write 1-byte to stream")
return size
[docs]
def read_byte(self) -> np.uint8:
size = ByteSize.UINT8
# call expects one byte to be available in base stream
buffer = self._read(size)
if len(buffer) != size:
raise RuntimeError("Failed to read 1-byte from stream")
return np.uint8(buffer[0])
[docs]
def write_bool(self, value: bool) -> int:
if value:
self.write_byte(1)
else:
self.write_byte(0)
return 1
[docs]
def read_bool(self) -> bool:
# call expects one byte to be available in base stream
return self.read_byte() != 0
def _write_int(self, size: int, value: int, signed: bool, byteorder: Optional[str]) -> int:
# sourcery skip: remove-unnecessary-cast
if self._write(int(value).to_bytes(size, self._default_byteorder if byteorder is None else byteorder, signed=signed)) != size:
raise RuntimeError(f"Failed to write {size}-bytes to stream")
return size
def _read_int(self, size: int, dtype: np.dtype, byteorder: Optional[str]) -> int:
# call expects needed bytes to be available in base stream
buffer = self._read(size)
if len(buffer) != size:
raise RuntimeError(f"Failed to read {size}-bytes from stream")
if not (byteorder is None and self._default_is_native) and byteorder != sys.byteorder:
dtype = dtype.newbyteorder()
return np.frombuffer(buffer, dtype)[0]
[docs]
def write_int16(self, value: np.int16, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.INT16, value, True, byteorder)
[docs]
def read_int16(self, byteorder: Optional[str] = None) -> np.int16:
return self._read_int(ByteSize.INT16, np.dtype(np.int16), byteorder)
[docs]
def write_uint16(self, value: np.uint16, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.UINT16, value, False, byteorder)
[docs]
def read_uint16(self, byteorder: Optional[str] = None) -> np.uint16:
return self._read_int(ByteSize.UINT16, np.dtype(np.uint16), byteorder)
[docs]
def write_int32(self, value: np.int32, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.INT32, value, True, byteorder)
[docs]
def read_int32(self, byteorder: Optional[str] = None) -> np.int32:
return self._read_int(ByteSize.INT32, np.dtype(np.int32), byteorder)
[docs]
def write_uint32(self, value: np.uint32, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.UINT32, value, False, byteorder)
[docs]
def read_uint32(self, byteorder: Optional[str] = None) -> np.uint32:
return self._read_int(ByteSize.UINT32, np.dtype(np.uint32), byteorder)
[docs]
def write_int64(self, value: np.int64, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.INT64, value, True, byteorder)
[docs]
def read_int64(self, byteorder: Optional[str] = None) -> np.int64:
return self._read_int(ByteSize.INT64, np.dtype(np.int64), byteorder)
[docs]
def write_uint64(self, value: np.uint64, byteorder: Optional[str] = None) -> int:
return self._write_int(ByteSize.UINT64, value, False, byteorder)
[docs]
def read_uint64(self, byteorder: Optional[str] = None) -> np.uint64:
return self._read_int(ByteSize.UINT64, np.dtype(np.uint64), byteorder)
[docs]
def write7bit_uint32(self, value: np.uint32) -> int:
return Encoding7Bit.WriteUInt32(self.write_byte, value)
[docs]
def read7bit_uint32(self) -> np.uint32:
# call expects one to five bytes to be available in base stream
return Encoding7Bit.ReadUInt32(self.read_byte)
[docs]
def write7bit_uint64(self, value: np.uint64) -> int:
return Encoding7Bit.WriteUInt64(self.write_byte, value)
[docs]
def read7bit_uint64(self) -> np.uint64:
# call expects one to nine bytes to be available in base stream
return Encoding7Bit.ReadUInt64(self.read_byte)