# ******************************************************************************************************
# datatable.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 08/25/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from __future__ import annotations
from gsf import Empty
from .datacolumn import DataColumn
from .datarow import DataRow
from .datatype import DataType
from typing import Callable, Dict, Iterator, List, Optional, Tuple, TYPE_CHECKING
if TYPE_CHECKING:
from .dataset import DataSet
[docs]
class DataTable:
"""
Represents a collection of `DataColumn` objects where each data column defines a name and a data
type. Data columns can also be computed where its value would be derived from other columns and
functions (https://sttp.github.io/documentation/filter-expressions/) defined in an expression.
Note that this implementation uses a case-insensitive map for `DataColumn` name lookups.
Internally, case-insensitive lookups are accomplished using `str.upper()`.
"""
def __init__(self,
parent: DataSet,
name: str
):
"""
Creates a new `DataTable`.
"""
self._parent = parent
self._name = name
self._columnindexes: Dict[str, int] = {}
self._columns: List[DataColumn] = []
self._rows: List[DataRow] = []
# Container methods for DataTable map to rows, not columns
def __getitem__(self, key: int) -> DataRow:
return self._rows[key]
def __setitem__(self, key: int, value: DataRow):
self._rows[key] = value
def __delitem__(self, key: int):
del self._rows[key]
def __len__(self) -> int:
return len(self._rows)
def __contains__(self, item: DataRow) -> bool:
return item in self._rows
def __iter__(self) -> Iterator[DataRow]:
return iter(self._rows)
@property
def parent(self) -> DataSet:
"""
Gets the parent `DataSet` of the `DataTable`.
"""
return self._parent
@property
def name(self) -> str:
"""
Gets the name of the `DataTable`.
"""
return self._name
[docs]
def clear_columns(self):
"""
Clears the internal column collections.
"""
self._columnindexes = {}
self._columns = []
[docs]
def add_column(self, column: DataColumn):
"""
Adds the specified column to the `DataTable`.
"""
column._index = len(self._columns)
self._columnindexes[column.name.upper()] = column.index
self._columns.append(column)
[docs]
def column(self, columnindex: int) -> Optional[DataColumn]:
"""
Gets the `DataColumn` at the specified column index if the index is in range;
otherwise, None is returned.
"""
if columnindex < 0 or columnindex > len(self._columns):
return None
return self._columns[columnindex]
[docs]
def column_byname(self, columnname: str) -> Optional[DataColumn]:
"""
Gets the `DataColumn` for the specified column name if the name exists;
otherwise, None is returned. Lookup is case-insensitive.
"""
if (columnindex := self._columnindexes.get(columnname.upper())) is not None:
return self.column(columnindex)
return None
[docs]
def columnindex(self, columnname: str) -> int:
"""
Gets the index for the specified column name if the name exists;
otherwise, -1 is returned. Lookup is case-insensitive.
"""
if (column := self.column_byname(columnname)) is not None:
return column.index
return -1
[docs]
def create_column(self, name: str, datatype: DataType, expression: str = Empty.STRING):
"""
Creates a new `DataColumn` associated with the `DataTable`.
Use `add_column` to add the new column to the `DataTable`.
"""
return DataColumn(self, name, datatype, expression)
[docs]
def clone_column(self, source: DataColumn) -> DataColumn:
"""
Creates a copy of the specified source `DataColumn` associated with the `DataTable`.
"""
return self.create_column(source.name, source.datatype, source.expression)
@property
def columncount(self) -> int:
"""
Gets the total number columns defined in the `DataTable`.
"""
return len(self._columns)
[docs]
def clear_rows(self):
"""
Clears the internal row collection.
"""
self._rows = []
[docs]
def add_row(self, row: DataRow):
"""
Adds the specified row to the `DataTable`.
"""
self._rows.append(row)
[docs]
def row(self, rowindex: int) -> Optional[DataRow]:
"""
Gets the `DataRow` at the specified row index if the index is in range;
otherwise, None is returned.
"""
if rowindex < 0 or rowindex > len(self._rows):
return None
return self._rows[rowindex]
[docs]
def rowswhere(self, predicate: Callable[[DataRow], bool], limit: int = -1) -> List[DataRow]:
"""
Returns the rows matching the predicate expression. Set limit parameter
to -1 for all matching rows.
"""
matchingrows = []
count = 0
for datarow in self._rows:
if datarow is None:
continue
if predicate(datarow):
matchingrows.append(datarow)
count += 1
if limit > -1 and count >= limit:
break
return matchingrows
[docs]
def create_row(self) -> DataRow:
"""
Creates a new `DataRow` associated with the `DataTable`.
Use `add_row` to add the new row to the `DataTable`.
"""
return DataRow(self)
[docs]
def clone_row(self, source: DataRow) -> DataRow:
"""
Creates a copy of the specified source `DataRow` associated with the `DataTable`.
"""
row = self.create_row()
for i in range(len(self._columns)):
value, _ = source.value[i]
row.value[i] = value
return row
@property
def rowcount(self) -> int:
"""
Gets the total number of rows defined in the `DataTable`.
"""
return len(self._rows)
[docs]
def rowvalue_as_string(self, rowindex: int, columnindex: int) -> str:
"""
Reads the row record value at the specified column index converted to a string.
For column index out of range or any other errors, an empty string will be returned.
"""
row = self.row(rowindex)
return Empty.STRING if row is None else row.value_as_string(columnindex)
[docs]
def rowvalue_as_string_byname(self, rowindex: int, columnname: str) -> str:
"""
Reads the row record value for the specified column name converted to a string.
For column name not found or any other errors, an empty string will be returned.
"""
row = self.row(rowindex)
return Empty.STRING if row is None else row.value_as_string_byname(columnname)
def __repr__(self):
image: List[str] = [f"{self.name} ["]
for i in range(self._columns):
if i > 0:
image.append(", ")
image.append(str(self._columns[i]))
image.append(f"] x {len(self._rows,)} rows")
return "".join(image)
[docs]
def select(self, filterexpression: str, sortorder: Optional[str] = None, limit: int = -1) -> Tuple[Optional[List[DataRow]], Optional[Exception]]:
"""
Returns the rows matching the filter expression criteria in the specified sort order.
The `filterexpression` parameter should be in the syntax of a SQL WHERE expression but
should not include the WHERE keyword. The `sortorder` parameter defines field names,
separated by commas, that exist in the `DataTable` used to order the results. Each
field specified in the `sortorder` can have an `ASC` or `DESC` suffix; defaults to
`ASC` when no suffix is provided. When `sortorder` is an empty string, records will
be returned in natural order. Set limit parameter to -1 for all matching rows. When
`filterexpression` is an empty string, all records will be returned; any specified
sort order and limit will still be respected.
"""
if filterexpression is None or not filterexpression:
filterexpression = "True" # Return all records
if limit > 0:
filterexpression = f"FILTER TOP {limit} {self.name} WHERE {filterexpression}"
else:
filterexpression = f"FILTER {self.name} WHERE {filterexpression}"
if sortorder is not None and sortorder:
filterexpression += f" ORDER BY {sortorder}"
from .filterexpressionparser import FilterExpressionParser
expressiontree, err = FilterExpressionParser.generate_expressiontree(self, filterexpression, True)
return (None, err) if err is not None else expressiontree.select(self)