# ******************************************************************************************************
# dataset.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/25/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from __future__ import annotations
from gsf import Convert, Empty
from .datatable import DataTable
from .datatype import DataType, parse_xsddatatype
from typing import Dict, Iterator, List, Tuple, Union, Optional
from decimal import Decimal
from datetime import datetime
from uuid import UUID
from io import BytesIO, StringIO
from xml.etree import ElementTree
from xml.etree.ElementTree import Element
import numpy as np
XMLSCHEMA_NAMESPACE = "http://www.w3.org/2001/XMLSchema"
"""
Defines schema namespace for the W3C XML Schema Definition Language (XSD) used by STTP metadata tables.
"""
EXT_XMLSCHEMADATA_NAMESPACE = "urn:schemas-microsoft-com:xml-msdata"
"""
Defines extended types for XSD elements, e.g., Guid and expression data types.
"""
[docs]
class DataSet:
"""
Represents an in-memory cache of records that is structured similarly to information
defined in a database. The data set object consists of a collection of data table objects.
See https://sttp.github.io/documentation/data-sets/ for more information.
Note that this implementation uses a case-insensitive map for `DataTable` name lookups.
Internally, case-insensitive lookups are accomplished using `str.upper()`.
"""
DEFAULT_NAME = "DataSet"
def __init__(self,
name: str = ...
):
"""
Creates a new `DataSet`.
"""
self._tables: Dict[str, DataTable] = {}
self.name = DataSet.DEFAULT_NAME if name is ... else name
"""
Defines the name of the `DataSet`.
"""
# Case-insensitive get table by name; None returned when value does not exist
def __getitem__(self, key: str) -> DataTable:
return self.table(key)
def __delitem__(self, key: str):
del self._tables[key]
def __len__(self) -> int:
return len(self._tables)
# Case-insensitive table name search
def __contains__(self, item: str) -> bool:
return self[item] is not None
def __iter__(self) -> Iterator[DataTable]:
return iter(self._tables.values())
[docs]
def clear_tables(self):
"""
Clears the internal table collection.
Any existing tables will be deleted.
"""
self._tables = {}
[docs]
def add_table(self, table: DataTable):
"""
Adds the specified table to the `DataSet`.
"""
self._tables[table.name.upper()] = table
[docs]
def table(self, tablename: str) -> Optional[DataTable]:
"""
Gets the `DataTable` for the specified table name if it exists;
otherwise, None is returned. Lookup is case-insensitive.
"""
return self._tables.get(tablename.upper())
[docs]
def tablenames(self) -> List[str]:
"""
Gets the table names defined in the `DataSet`.
"""
return [table.name for table in self._tables.values()]
[docs]
def tables(self) -> List[DataTable]:
"""
Gets the `DataTable` instances defined in the `DataSet`.
"""
return list(self._tables.values())
[docs]
def create_table(self, name: str) -> DataTable:
"""
Creates a new `DataTable` associated with the `DataSet`.
Use `add_table` to add the new table to the `DataSet`.
"""
return DataTable(self, name)
@property
def tablecount(self) -> int:
"""
Gets the total number of tables defined in the `DataSet`.
"""
return len(self._tables)
[docs]
def remove_table(self, tablename: str) -> bool:
"""
Removes the specified table name from the `DataSet`. Returns
True if table was removed; otherwise, False if it did not exist.
Lookup is case-insensitive.
"""
return self._tables.pop(tablename.upper()) is not None
def __repr__(self):
image: List[str] = [f"{self.name} ["]
for i, table in enumerate(self._tables):
if i > 0:
image.append(", ")
image.append(table.name)
image.append("]")
return "".join(image)
[docs]
@staticmethod
def from_xml(buffer: Union[str, bytes]) -> Tuple[DataSet, Optional[Exception]]:
"""
Creates a new `DataSet` as read from the XML in the specified buffer.
"""
dataset = DataSet()
err = dataset.parse_xml(buffer)
return dataset, err
[docs]
def parse_xml(self, buffer: Union[str, bytes]) -> Optional[Exception]:
"""
Loads the `DataSet` from the XML in the specified buffer.
"""
err: Optional[Exception] = None
try:
doc = ElementTree.fromstring(buffer)
except Exception as ex:
err = ex
if err is not None:
return err
bufferio = StringIO(buffer) if isinstance(buffer, str) else BytesIO(buffer)
namespaces: Dict[str, str] = dict(
[node for _, node in ElementTree.iterparse(bufferio, events=["start-ns"])])
if namespaces.get(Empty.STRING) is not None:
del namespaces[Empty.STRING]
return self.parse_xmldoc(doc, namespaces)
[docs]
def parse_xmldoc(self, root: Element, namespaces: Dict[str, str]) -> Optional[Exception]:
"""
Loads the `DataSet` from an existing root XML document element.
"""
def get_schemaprefix(target_namespace: str):
prefix = ""
for key in namespaces:
if namespaces[key] == target_namespace:
prefix = key
break
if len(prefix) > 0:
prefix += ":"
return prefix
xs = get_schemaprefix(XMLSCHEMA_NAMESPACE)
# Find schema node
schema = root.find(f"{xs}schema", namespaces)
if schema is None:
return RuntimeError("failed to parse DataSet XML: Cannot find schema node")
if (id := schema.attrib.get("id")) is None or id != root.tag:
return RuntimeError(f"failed to parse DataSet XML: Cannot find schema node matching \"{root.tag}\"")
# Populate DataSet schema
self._load_schema(schema, namespaces, xs)
# Populate DataSet records
self._load_records(root)
return None
def _load_schema(self, schema: Element, namespaces: Dict[str, str], xs: str):
EXT_PREFIX = f"{{{EXT_XMLSCHEMADATA_NAMESPACE}}}"
# Find choice elements representing schema table definitions
tablenodes = schema.findall(f"{xs}element/{xs}complexType/{xs}choice/{xs}element", namespaces)
for tablenode in tablenodes:
if (tablename := tablenode.attrib.get("name")) is None:
continue
datatable = self.create_table(tablename)
# Find sequence elements representing schema table field definitions
fieldnodes = tablenode.findall(f"{xs}complexType/{xs}sequence/{xs}element", namespaces)
for fieldnode in fieldnodes:
if (fieldname := fieldnode.attrib.get("name")) is None:
continue
if (typename := fieldnode.attrib.get("type")) is None:
continue
if typename.startswith(xs):
typename = typename[len(xs):]
# Check for extended data type (allows XSD Guid field definitions)
extdatatype = fieldnode.attrib.get(f"{EXT_PREFIX}DataType")
datatype, found = parse_xsddatatype(typename, extdatatype)
# Columns with unsupported XSD data types are skipped
if not found:
continue
# Check for computed expression
expression = fieldnode.attrib.get(f"{EXT_PREFIX}Expression")
datacolumn = datatable.create_column(fieldname, datatype, expression)
datatable.add_column(datacolumn)
self.add_table(datatable)
def _load_records(self, root: Element): # sourcery skip: low-code-quality
# Each root node child that matches a table name represents a record
for record in root:
table = self.table(record.tag)
if table is None:
continue
datarow = table.create_row()
# Each child node of a record represents a field value
for field in record:
column = table.column_byname(field.tag)
if column is None:
continue
index = column.index
datatype = column.datatype
value = field.text
if datatype == DataType.STRING:
datarow[index] = Empty.STRING if value is None else value
elif datatype == DataType.GUID:
datarow[index] = Empty.GUID if value is None else UUID(value)
elif datatype == DataType.DATETIME:
datarow[index] = Empty.DATETIME if value is None else Convert.from_str(value, datetime)
elif datatype == DataType.BOOLEAN:
datarow[index] = False if value is None else bool(value)
elif datatype == DataType.SINGLE:
datarow[index] = Empty.SINGLE if value is None else Convert.from_str(value, np.float32)
elif datatype == DataType.DOUBLE:
datarow[index] = Empty.DOUBLE if value is None else Convert.from_str(value, np.float64)
elif datatype == DataType.DECIMAL:
datarow[index] = Empty.DECIMAL if value is None else Decimal(value)
elif datatype == DataType.INT8:
datarow[index] = Empty.INT8 if value is None else Convert.from_str(value, np.int8)
elif datatype == DataType.INT16:
datarow[index] = Empty.INT16 if value is None else Convert.from_str(value, np.int16)
elif datatype == DataType.INT32:
datarow[index] = Empty.INT32 if value is None else Convert.from_str(value, np.int32)
elif datatype == DataType.INT64:
datarow[index] = Empty.INT64 if value is None else Convert.from_str(value, np.int64)
elif datatype == DataType.UINT8:
datarow[index] = Empty.UINT8 if value is None else Convert.from_str(value, np.uint8)
elif datatype == DataType.UINT16:
datarow[index] = Empty.UINT16 if value is None else Convert.from_str(value, np.uint16)
elif datatype == DataType.UINT32:
datarow[index] = Empty.UINT32 if value is None else Convert.from_str(value, np.uint32)
elif datatype == DataType.UINT64:
datarow[index] = Empty.UINT64 if value is None else Convert.from_str(value, np.uint64)
else:
datarow[index] = None
table.add_row(datarow)