# ******************************************************************************************************
# filterexpressionparser.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 09/07/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from gsf import Convert, Empty, Limits
from .dataset import DataSet
from .datatable import DataTable
from .datarow import DataRow
from .tableidfields import TableIDFields, DEFAULT_TABLEIDFIELDS
from .expressiontree import ExpressionTree
from .callbackerrorlistener import CallbackErrorListener
from .expression import Expression
from .columnexpression import ColumnExpression
from .inlistexpression import InListExpression
from .functionexpression import FunctionExpression
from .operatorexpression import OperatorExpression
from .unaryexpression import UnaryExpression
from .valueexpression import ValueExpression, TRUEVALUE, FALSEVALUE, NULLVALUE
from .constants import ExpressionValueType, ExpressionUnaryType
from .constants import ExpressionFunctionType, ExpressionOperatorType
from .orderbyterm import OrderByTerm
from .errors import EvaluateError
from .parser.FilterExpressionSyntaxListener import FilterExpressionSyntaxListener as ExpressionListener
from .parser.FilterExpressionSyntaxLexer import FilterExpressionSyntaxLexer as ExpressionLexer
from .parser.FilterExpressionSyntaxParser import FilterExpressionSyntaxParser as ExpressionParser
from antlr4 import CommonTokenStream, InputStream, ParseTreeWalker
from antlr4.ParserRuleContext import ParserRuleContext
from typing import Callable, Dict, List, Optional, Set, Tuple
from decimal import Decimal
from datetime import datetime
from uuid import UUID
[docs]
class FilterExpressionParser(ExpressionListener):
"""
Represents a parser for STTP filter expressions.
"""
def __init__(self,
filterexpression: str,
suppress_console_erroroutput: bool = False,
):
self._inputstream = InputStream(filterexpression)
self._lexer = ExpressionLexer(self._inputstream)
self._tokens = CommonTokenStream(self._lexer)
self._parser = ExpressionParser(self._tokens)
self._errorlistener: CallbackErrorListener = CallbackErrorListener()
self._filtered_rows: List[DataRow] = []
self._filtered_rowset: Optional[Set[DataRow]] = None
self._filtered_signalids: List[UUID] = []
self._filtered_signalidset: Optional[Set[UUID]] = None
self._filterexpression_statementcount: int = 0
self._active_expressiontree: Optional[ExpressionTree] = None
self._expressiontrees: List[ExpressionTree] = []
self._expressions: Dict[ParserRuleContext, Expression] = {}
self.dataset: DataSet = None
"""
Defines the source metadata used for parsing the filter expression.
"""
self.primary_tablename: str = Empty.STRING
"""
Defines the name of the table to use in the DataSet when filter expressions do not specify
a table name, e.g., direct signal identification. See:
https://sttp.github.io/documentation/filter-expressions/#direct-signal-identification
"""
self.tableidfields_map: Dict[str, TableIDFields] = {}
"""
Defines a map of table ID fields associated with table names.
"""
self.track_filteredrows = True
"""
Defines a flag that enables tracking of matching rows during filter expression evaluation.
Value defaults to True. Set value to False and set `track_filteredsignalids` to True if
only signal IDs are needed post filter expression evaluation.
"""
self.track_filteredsignalids = False
"""
Defines a flag that enables tracking of matching signal IDs during filter expression evaluation.
"""
if suppress_console_erroroutput:
self._parser.removeErrorListeners()
self._parser.addErrorListener(self._errorlistener)
[docs]
@staticmethod
def from_dataset(dataset: DataSet,
filterexpression: str,
primary_table: str,
tableidfields: Optional[TableIDFields] = None,
suppress_console_erroroutput: bool = False,
) -> Tuple[Optional["FilterExpressionParser"], Optional[Exception]]:
"""
Creates a new filter expression parser associated with the provided `dataSet`
and provided table details. Error will be returned if `dataset` parameter is
None or the `filterexpression` is empty.
"""
if dataset is None:
return None, ValueError("dataset parameter is None")
if not filterexpression:
return None, ValueError("filterexpression parameter is empty")
parser = FilterExpressionParser(filterexpression, suppress_console_erroroutput)
parser.dataset = dataset
if primary_table:
parser.primary_tablename = primary_table
if tableidfields is None:
parser.tableidfields_map[primary_table] = DEFAULT_TABLEIDFIELDS
else:
parser.tableidfields_map[primary_table] = tableidfields
return parser, None
[docs]
def set_parsingexception_callback(self, callback: Callable[[str], None]):
"""
Registers a callback for receiving parsing exception messages.
"""
self._errorlistener.parsingexception_callback = callback
@property
def expressiontrees(self) -> Tuple[List[ExpressionTree], Optional[Exception]]:
"""
Returns the list of expression trees parsed from the filter expression.
"""
if len(self._expressiontrees) == 0:
err = self._visit_parsetreenodes()
if err is not None:
return None, err
return self._expressiontrees, None
@property
def filtered_rows(self) -> List[DataRow]:
"""
Gets the rows matching the parsed filter expression.
Results could contain duplicates if source `DataSet` has duplicated rows.
"""
return self._filtered_rows
@property
def filtered_rowset(self) -> Set[DataRow]:
"""
Gets the unique row set matching the parsed filter expression.
"""
self._initialize_set_operations()
return self._filtered_rowset
@property
def filtered_signalids(self) -> List[UUID]:
"""
Gets the Guid-based signal IDs matching the parsed filter expression.
Results could contain duplicates if source `DataSet` has duplicated rows.
"""
return self._filtered_signalids
@property
def filtered_signalidset(self) -> Set[UUID]:
"""
Gets the unique Guid-based signal ID set matching the parsed filter expression.
"""
self._initialize_set_operations()
return self._filtered_signalidset
@property
def filterexpression_statementcount(self) -> int:
"""
Gets the number of filter expression statements encountered while parsing.
"""
return self._filterexpression_statementcount
[docs]
def table(self, tablename: str) -> Tuple[Optional[DataTable], Optional[Exception]]:
"""
Gets the DataTable for the specified tableName from the FilterExpressionParser DataSet.
An error will be returned if no DataSet has been defined or the tableName cannot be found.
"""
if self.dataset is None:
return None, ValueError("no DataSet has been defined")
table = self.dataset.table(tablename)
if table is None:
return None, ValueError(f"failed to find table \"{tablename}\" in DataSet")
return table, None
[docs]
def evaluate(self, applylimit: bool, applysort: bool) -> Optional[Exception]:
"""
Evaluate parses each statement in the filter expression and tracks the results.
Filter expressions can contain multiple statements, separated by semi-colons, where each
statement results in a unique expression tree; this function returns the combined results
of each encountered filter expression statement, yielding all filtered rows and/or signal
IDs that match the target filter expression. The `applylimit` and `applysort` flags
determine if any encountered "TOP" limit and "ORDER BY" sorting clauses will be respected.
Access matching results via `filtered_rows` and/or `filtered_signalids`, or related set
functions. An error will be returned if expression fails to parse or any row expression
evaluation fails.
"""
if self.dataset is None:
return None, ValueError("no DataSet has been defined")
if not self.track_filteredrows and not self.track_filteredsignalids:
return None, ValueError("no use in evaluating filter expression, neither filtered rows nor signal IDs have been set for tracking")
self._filterexpression_statementcount = 0
self._filtered_rows = []
self._filtered_rowset = None
self._filtered_signalids = []
self._filtered_signalidset = None
self._expressiontrees = []
self._expressions = {}
# Visiting tree nodes will automatically add literals to the the filtered results
err = self._visit_parsetreenodes()
if err is not None:
return err
# Each statement in the filter expression will have its own expression tree, evaluate each
for expressiontree in self._expressiontrees:
tablename = expressiontree.tablename
if len(tablename) == 0:
if len(self.primary_tablename) == 0:
return ValueError("no table name defined for expression tree nor is any PrimaryTableName defined")
tablename = self.primary_tablename
table, err = self.table(tablename)
if err is not None:
return err
def where_predicate(result_expression: ValueExpression) -> Tuple[bool, Optional[Exception]]:
if result_expression.valuetype == ExpressionValueType.BOOLEAN:
return result_expression._booleanvalue(), None
# Filtered results will already have any matched literals
return FALSEVALUE._booleanvalue(), None
# Select all matching boolean results from expression tree evaluated for each table row
matchedrows, err = expressiontree.selectwhere(table, where_predicate, applylimit, applysort)
if err is not None:
return err
signalid_columnindex = -1
if self.track_filteredsignalids:
primary_tableidfields = self.tableidfields_map.get(table.name)
if primary_tableidfields is None:
return EvaluateError(f"failed to find ID fields record for table \"{table.name}\"")
signalid_column = table.column_byname(primary_tableidfields.signalid_fieldname)
if signalid_column is None:
return EvaluateError(f"failed to find signal ID column \"{primary_tableidfields.signalid_fieldname}\" in table \"{table.name}\"")
signalid_columnindex = signalid_column.index
for matchedrow in matchedrows:
self._add_matchedrow(matchedrow, signalid_columnindex)
return None
def _visit_parsetreenodes(self) -> Optional[Exception]:
err: Optional[Exception] = None
try:
# Create a parse tree and start visiting listener methods
walker = ParseTreeWalker()
parsetree = self._parser.parse()
walker.walk(self, parsetree)
except Exception as ex:
err = ex
return err
def _initialize_set_operations(self):
# As an optimization, set operations are not engaged until second filter expression statement
# is encountered, only then will duplicate results be a concern. Note that only using a set
# is not an option because results can be sorted with the "ORDER BY" clause.
if self.track_filteredrows and self._filtered_rowset is None:
self._filtered_rowset = set(self._filtered_rows)
if self.track_filteredsignalids and self._filtered_signalidset is None:
self._filtered_signalidset = set(self._filtered_signalids)
def _add_matchedrow(self, matchedrow: DataRow, signalid_columnindex: int):
if self._filterexpression_statementcount > 1:
# Set operations
if self.track_filteredrows:
startlen = len(self._filtered_rowset)
self._filtered_rowset.add(matchedrow)
if len(self._filtered_rowset) > startlen:
self._filtered_rows.append(matchedrow)
if self.track_filteredsignalids:
signalidfield, null, err = matchedrow.guidvalue(signalid_columnindex)
if not null and err is None and signalidfield != Empty.GUID:
startlen = len(self._filtered_signalidset)
self._filtered_signalidset.add(signalidfield)
if len(self._filtered_signalidset) > startlen:
self._filtered_signalids.append(signalidfield)
else:
# Vector only operations
if self.track_filteredrows:
self._filtered_rows.append(matchedrow)
if self.track_filteredsignalids:
signalidfield, null, err = matchedrow.guidvalue(signalid_columnindex)
if not null and err is None and signalidfield != Empty.GUID:
self._filtered_signalids.append(signalidfield)
def _map_matchedfieldrow(self, primarytable: DataTable, columnname: str, matchvalue: str, signalid_columnindex: int):
column = primarytable.column_byname(columnname)
if column is None:
return
matchvalue = matchvalue.upper()
columnindex = column.index
for row in primarytable:
if row is None:
continue
value, null, err = row.stringvalue(columnindex)
if not null and err is None and value.upper() == matchvalue:
self._add_matchedrow(row, signalid_columnindex)
return
def _get_expr(self, ctx: ParserRuleContext) -> Optional[Expression]:
return self._expressions.get(ctx)
def _add_expr(self, ctx: ParserRuleContext, expression: Expression):
# Track expression in parser rule context map
self._expressions[ctx] = expression
# Update active expression tree root
self._active_expressiontree.root = expression
# filterExpressionStatement
# : identifierStatement
# | filterStatement
# | expression
# ;
[docs]
def enterFilterExpressionStatement(self, ctx: ExpressionParser.FilterExpressionStatementContext):
# One filter expression can contain multiple filter statements separated by semi-colon,
# so we track each as an independent expression tree
self._expressions = {}
self._active_expressiontree = None
self._filterexpression_statementcount += 1
# Encountering second filter expression statement necessitates the use of set operations
# to prevent possible result duplications
if self._filterexpression_statementcount == 2:
self._initialize_set_operations()
# filterStatement
# : K_FILTER ( K_TOP topLimit )? tableName K_WHERE expression ( K_ORDER K_BY orderingTerm ( ',' orderingTerm )* )?
# ;
# topLimit
# : ( '-' | '+' )? INTEGER_LITERAL
# ;
# orderingTerm
# : exactMatchModifier? columnName ( K_ASC | K_DESC )?
# ;
[docs]
def enterFilterStatement(self, ctx: ExpressionParser.FilterStatementContext):
tablename: str = ctx.tableName().getText()
table, err = self.table(tablename)
if err is not None:
raise EvaluateError(f"cannot parse filter expression statement, {err}")
self._active_expressiontree = ExpressionTree()
self._active_expressiontree.tablename = tablename
self._expressiontrees.append(self._active_expressiontree)
if ctx.K_TOP() is not None:
self._active_expressiontree.toplimit = int(ctx.topLimit().getText())
if ctx.K_ORDER() is not None and ctx.K_BY() is not None:
orderingterms = ctx.orderingTerm()
for i in range(len(orderingterms)):
orderingterm: ExpressionParser.OrderingTermContext = orderingterms[i]
orderby_columnname: str = orderingterm.orderByColumnName().getText()
orderby_column = table.column_byname(orderby_columnname)
if orderby_column is None:
raise EvaluateError(f"cannot parse filter expression statement, failed to find order by field \"{orderby_columnname}\" for table \"{table.name}\"")
self._active_expressiontree.orderbyterms.append(OrderByTerm(
orderby_column,
orderingterm.K_DESC() is None,
orderingterm.exactMatchModifier() is not None))
# identifierStatement
# : GUID_LITERAL
# | MEASUREMENT_KEY_LITERAL
# | POINT_TAG_LITERAL
# ;
[docs]
def exitIdentifierStatement(self, ctx: ExpressionParser.IdentifierStatementContext):
# sourcery skip
signalid = Empty.GUID
if ctx.GUID_LITERAL() is not None:
signalid = FilterExpressionParser._parse_guidliteral(ctx.GUID_LITERAL().getText())
if not self.track_filteredrows and not self.track_filteredsignalids:
# Handle edge case of encountering standalone Guid when not tracking rows or table identifiers.
# In this scenario the filter expression parser would only be used to generate expression trees
# for general expression parsing, e.g., for a DataColumn expression, so here the Guid should be
# treated as a literal expression value instead of an identifier to track:
self.enterExpression(None)
self._active_expressiontree.root = ValueExpression(ExpressionValueType.GUID, signalid)
return
if self.track_filteredsignalids and signalid != Empty.GUID:
if self._filterexpression_statementcount > 1:
startlen = len(self._filtered_signalidset)
self._filtered_signalidset.add(signalid)
if len(self._filtered_signalidset) > startlen:
self._filtered_signalids.append(signalid)
else:
self._filtered_signalids.append(signalid)
if not self.track_filteredrows:
return
if self.dataset is None:
return
primary_table = self.dataset[self.primary_tablename]
if primary_table is None:
return
primary_tableidfields = self.tableidfields_map.get(self.primary_tablename)
if primary_tableidfields is None:
return
signalidcolumn = primary_table.column_byname(primary_tableidfields.signalid_fieldname)
if signalidcolumn is None:
return
signalid_columnindex = signalidcolumn.index
if self.track_filteredrows and signalid != Empty.GUID:
# Map matching row for manually specified Guid
for row in primary_table:
if row is None:
continue
value, null, err = row.guidvalue(signalid_columnindex)
if not null and err is None and value == signalid:
if self.filterexpression_statementcount > 1:
startlen = len(self._filtered_rowset)
self._filtered_rowset.add(row)
if len(self._filtered_rowset) > startlen:
self._filtered_rows.append(row)
else:
self._filtered_rows.append(row)
return
return
if ctx.MEASUREMENT_KEY_LITERAL() is not None:
self._map_matchedfieldrow(primary_table, primary_tableidfields.measurementkey_fieldname, ctx.MEASUREMENT_KEY_LITERAL().getText(), signalid_columnindex)
return
if ctx.POINT_TAG_LITERAL() is not None:
self._map_matchedfieldrow(primary_table, primary_tableidfields.pointtag_fieldname, ctx.POINT_TAG_LITERAL().getText(), signalid_columnindex)
return
# expression
# : notOperator expression
# | expression logicalOperator expression
# | predicateExpression
# ;
[docs]
def enterExpression(self, ctx: ExpressionParser.ExpressionContext):
# Handle case of encountering a standalone expression, i.e., an expression not
# within a filter statement context
if self._active_expressiontree is None:
self._active_expressiontree = ExpressionTree()
self._expressiontrees.append(self._active_expressiontree)
# expression
# : notOperator expression
# | expression logicalOperator expression
# | predicateExpression
# ;
[docs]
def exitExpression(self, ctx: ExpressionParser.ExpressionContext):
value: Optional[Expression] = None
# Check for predicate expressions (see explicit visit function)
predicate_expression: ExpressionParser.PredicateExpressionContext = ctx.predicateExpression()
if predicate_expression is not None:
value = self._get_expr(predicate_expression)
if value is None:
raise EvaluateError(f"failed to parse predicate expression \"{predicate_expression.getText()}\"")
self._add_expr(ctx, value)
return
# Check for not operator expressions
not_operator = ctx.notOperator()
if not_operator is not None:
expressions: List[ExpressionParser.ExpressionContext] = ctx.expression()
if len(expressions) != 1:
raise EvaluateError(f"not operator expression is malformed: \"{ctx.getText()}\"")
value = self._get_expr(expressions[0])
if value is None:
raise EvaluateError(f"failed to find not operator expression \"{ctx.getText()}\"")
self._add_expr(ctx, UnaryExpression(ExpressionUnaryType.NOT, value))
return
# Check for logical operator expressions
logical_operator: ExpressionParser.LogicalOperatorContext = ctx.logicalOperator()
if logical_operator is not None:
expressions: List[ExpressionParser.ExpressionContext] = ctx.expression()
if len(expressions) != 2:
raise EvaluateError(f"operator expression, in logical operator expression context, is malformed: \"{ctx.getText()}\"")
left = self._get_expr(expressions[0])
if left is None:
raise EvaluateError(f"failed to find left logical operator expression \"{ctx.getText()}\"")
right = self._get_expr(expressions[1])
if right is None:
raise EvaluateError(f"failed to find right logical operator expression \"{ctx.getText()}\"")
operator_symbol = logical_operator.getText()
if logical_operator.K_AND() is not None or operator_symbol == "&&":
operatortype = ExpressionOperatorType.AND
elif logical_operator.K_OR() is not None or operator_symbol == "||":
operatortype = ExpressionOperatorType.OR
else:
raise EvaluateError(f"unexpected logical operator \"{operator_symbol}\"")
self._add_expr(ctx, OperatorExpression(operatortype, left, right))
return
raise EvaluateError(f"unexpected expression \"{ctx.getText()}\"")
# predicateExpression
# : predicateExpression notOperator? K_IN exactMatchModifier? '(' expressionList ')'
# | predicateExpression K_IS notOperator? K_NULL
# | predicateExpression comparisonOperator predicateExpression
# | predicateExpression notOperator? K_LIKE exactMatchModifier? predicateExpression
# | valueExpression
# ;
[docs]
def exitPredicateExpression(self, ctx: ExpressionParser.PredicateExpressionContext):
# sourcery skip
value_expression: ExpressionParser.ValueExpressionContext = ctx.valueExpression()
# Check for value expressions (see explicit visit function)
if value_expression is not None:
value = self._get_expr(value_expression)
if value is None:
raise EvaluateError(f"failed to find value expression \"{value_expression.getText()}\"")
self._add_expr(ctx, value)
return
has_notkeyword = ctx.notOperator() is not None
exactmatch = ctx.exactMatchModifier() is not None
# Check for IN expressions
if ctx.K_IN() is not None:
predicates: List[ExpressionParser.PredicateExpressionContext] = ctx.predicateExpression()
# IN expression expects one predicate
if len(predicates) != 1:
raise EvaluateError(f"\"IN\" expression is malformed: \"{ctx.getText()}\"")
value = self._get_expr(predicates[0])
if value is None:
raise EvaluateError(f"failed to find \"IN\" predicate expression \"{ctx.getText()}\"")
expressionlist: ExpressionParser.ExpressionListContext = ctx.expressionList()
expressions: List[ExpressionParser.ExpressionContext] = expressionlist.expression()
argumentcount = len(expressions)
if argumentcount < 1:
raise EvaluateError("not enough expressions found for \"IN\" operation")
arguments: List[Expression] = []
for i in range(argumentcount):
argument = self._get_expr(expressions[i])
if argument is None:
raise EvaluateError(f"failed to find argument expression {i} \"{expressions[i].getText()}\" for \"IN\" operation")
arguments.append(argument)
self._add_expr(ctx, InListExpression(value, arguments, has_notkeyword, exactmatch))
return
# Check for IS NULL expressions
if ctx.K_IS() is not None and ctx.K_NULL() is not None:
if has_notkeyword:
operatortype = ExpressionOperatorType.ISNOTNULL
else:
operatortype = ExpressionOperatorType.ISNULL
predicates: List[ExpressionParser.PredicateExpressionContext] = ctx.predicateExpression()
# IS NULL expression expects one predicate
if len(predicates) != 1:
raise EvaluateError(f"\"IS NULL\" expression is malformed: \"{ctx.getText()}\"")
value = self._get_expr(predicates[0])
if value is None:
raise EvaluateError(f"failed to find \"IS NULL\" predicate expression \"{ctx.getText()}\"")
self._add_expr(ctx, OperatorExpression(operatortype, value, None))
return
# Remaining operators require two predicate expressions
predicates: List[ExpressionParser.PredicateExpressionContext] = ctx.predicateExpression()
if len(predicates) != 2:
raise EvaluateError(f"operator expression, in predicate expression context, is malformed: \"{ctx.getText()}\"")
left = self._get_expr(predicates[0])
if left is None:
raise EvaluateError(f"failed to find left operator predicate expression \"{ctx.getText()}\"")
right = self._get_expr(predicates[1])
if right is None:
raise EvaluateError(f"failed to find right operator predicate expression \"{ctx.getText()}\"")
# Check for comparison operator expressions
comparison_operator: ExpressionParser.ComparisonOperatorContext = ctx.comparisonOperator()
if comparison_operator is not None:
operatorsymbol = comparison_operator.getText()
if operatorsymbol == "<":
operatortype = ExpressionOperatorType.LESSTHAN
elif operatorsymbol == "<=":
operatortype = ExpressionOperatorType.LESSTHANOREQUAL
elif operatorsymbol == ">":
operatortype = ExpressionOperatorType.GREATERTHAN
elif operatorsymbol == ">=":
operatortype = ExpressionOperatorType.GREATERTHANOREQUAL
elif operatorsymbol in ["=", "=="]:
operatortype = ExpressionOperatorType.EQUAL
elif operatorsymbol == "===":
operatortype = ExpressionOperatorType.EQUALEXACTMATCH
elif operatorsymbol in ["!=", "<>"]:
operatortype = ExpressionOperatorType.NOTEQUAL
elif operatorsymbol == "!==":
operatortype = ExpressionOperatorType.NOTEQUALEXACTMATCH
else:
raise EvaluateError(f"unexpected comparison operator \"{operatorsymbol}\"")
self._add_expr(ctx, OperatorExpression(operatortype, left, right))
return
# Check for LIKE expressions
if ctx.K_LIKE() is not None:
if exactmatch:
operatortype = ExpressionOperatorType.NOTLIKEEXACTMATCH if has_notkeyword is None else ExpressionOperatorType.LIKEEXACTMATCH
else:
operatortype = ExpressionOperatorType.NOTLIKE if has_notkeyword is None else ExpressionOperatorType.LIKE
self._add_expr(ctx, OperatorExpression(operatortype, left, right))
return
raise EvaluateError(f"unexpected predicate expression \"{ctx.getText()}\"")
# valueExpression
# : literalValue
# | columnName
# | functionExpression
# | unaryOperator valueExpression
# | '(' expression ')'
# | valueExpression mathOperator valueExpression
# | valueExpression bitwiseOperator valueExpression
# ;
[docs]
def exitValueExpression(self, ctx: ExpressionParser.ValueExpressionContext):
# sourcery skip
literal_value: ExpressionParser.LiteralValueContext = ctx.literalValue()
# Check for literal values (see explicit visit function)
if literal_value is not None:
value = self._get_expr(literal_value)
if value is None:
raise EvaluateError(f"failed to find literal value \"{literal_value.getText()}\"")
self._add_expr(ctx, value)
return
# Check for column names (see explicit visit function)
columnname: ExpressionParser.ColumnNameContext = ctx.columnName()
if columnname is not None:
value = self._get_expr(columnname)
if value is None:
raise EvaluateError(f"failed to find column name \"{columnname.getText()}\"")
self._add_expr(ctx, value)
return
# Check for function expressions (see explicit visit function)
function_expression: ExpressionParser.FunctionExpressionContext = ctx.functionExpression()
if function_expression is not None:
value = self._get_expr(function_expression)
if value is None:
raise EvaluateError(f"failed to find function expression \"{function_expression.getText()}\"")
self._add_expr(ctx, value)
return
# Check for unary operators
unary_operator: ExpressionParser.UnaryOperatorContext = ctx.unaryOperator()
if unary_operator is not None:
values: List[ExpressionParser.ValueExpressionContext] = ctx.valueExpression()
# Unary operator expects one value expression
if len(values) != 1:
raise EvaluateError(f"unary operator value expression is malformed: \"{ctx.getText()}\"")
value = self._get_expr(values[0])
if value is None:
raise EvaluateError(f"failed to find unary operator value expression \"{ctx.getText()}\"")
if unary_operator.K_NOT() is None:
operatorsymbol = unary_operator.getText()
if operatorsymbol == "+":
unarytype = ExpressionUnaryType.PLUS
elif operatorsymbol == "-":
unarytype = ExpressionUnaryType.MINUS
elif operatorsymbol in ["~", "!"]:
unarytype = ExpressionUnaryType.NOT
else:
raise EvaluateError(f"unexpected unary type \"{operatorsymbol}\"")
else:
unarytype = ExpressionUnaryType.NOT
self._add_expr(ctx, UnaryExpression(unarytype, value))
return
# Check for sub-expressions, i.e., "(" expression ")"
expression: ExpressionParser.ExpressionContext = ctx.expression()
if expression is not None:
value = self._get_expr(expression)
if value is None:
raise EvaluateError(f"failed to find sub-expression \"{ctx.getText()}\"")
self._add_expr(ctx, value)
return
# Remaining operators require two value expressions
values: List[ExpressionParser.ValueExpressionContext] = ctx.valueExpression()
if len(values) != 2:
raise EvaluateError(f"operator expression, in value expression context, is malformed: \"{ctx.getText()}\"")
left = self._get_expr(values[0])
if left is None:
raise EvaluateError(f"failed to find left operator value expression \"{ctx.getText()}\"")
right = self._get_expr(values[1])
if right is None:
raise EvaluateError(f"failed to find right operator value expression \"{ctx.getText()}\"")
# Check for math operator expressions
math_operator: ExpressionParser.MathOperatorContext = ctx.mathOperator()
if math_operator is not None:
operatorsymbol = math_operator.getText()
if operatorsymbol == "+":
operatortype = ExpressionOperatorType.ADD
elif operatorsymbol == "-":
operatortype = ExpressionOperatorType.SUBTRACT
elif operatorsymbol == "*":
operatortype = ExpressionOperatorType.MULTIPLY
elif operatorsymbol == "/":
operatortype = ExpressionOperatorType.DIVIDE
elif operatorsymbol == "%":
operatortype = ExpressionOperatorType.MODULUS
else:
raise EvaluateError(f"unexpected math operator \"{operatorsymbol}\"")
self._add_expr(ctx, OperatorExpression(operatortype, left, right))
return
# Check for bitwise operator expressions
bitwise_operator: ExpressionParser.BitwiseOperatorContext = ctx.bitwiseOperator()
if bitwise_operator is not None:
# Check for bitwise operators
if bitwise_operator.K_XOR() is None:
operatorsymbol = bitwise_operator.getText()
if operatorsymbol == "&":
operatortype = ExpressionOperatorType.BITWISEAND
elif operatorsymbol == "|":
operatortype = ExpressionOperatorType.BITWISEOR
elif operatorsymbol == "^":
operatortype = ExpressionOperatorType.BITWISEXOR
elif operatorsymbol == "<<":
operatortype = ExpressionOperatorType.BITSHIFTLEFT
elif operatorsymbol == ">>":
operatortype = ExpressionOperatorType.BITSHIFTRIGHT
else:
raise EvaluateError(f"unexpected bitwise operator \"{operatorsymbol}\"")
else:
operatortype = ExpressionOperatorType.BITWISEXOR
self._add_expr(ctx, OperatorExpression(operatortype, left, right))
return
raise EvaluateError(f"unexpected value expression \"{ctx.getText()}\"")
# literalValue
# : INTEGER_LITERAL
# | NUMERIC_LITERAL
# | STRING_LITERAL
# | DATETIME_LITERAL
# | GUID_LITERAL
# | BOOLEAN_LITERAL
# | K_NULL
# ;
[docs]
def exitLiteralValue(self, ctx: ExpressionParser.LiteralValueContext):
result: Optional[ValueExpression] = None
# Literal numeric values will not be negative, unary operators will handle negative values
if ctx.INTEGER_LITERAL() is not None:
literal: str = ctx.INTEGER_LITERAL().getText()
try:
value = Convert.from_str(literal, int)
if value > Limits.MAXINT32:
if value > Limits.MAXINT64:
result = FilterExpressionParser._parse_numericliteral(literal)
else:
result = ValueExpression(ExpressionValueType.INT64, value)
else:
result = ValueExpression(ExpressionValueType.INT32, value)
except Exception:
result = FilterExpressionParser._parse_numericliteral(literal)
elif ctx.NUMERIC_LITERAL() is not None:
literal: str = ctx.NUMERIC_LITERAL().getText()
try:
# Real literals using scientific notation are parsed as double
if "E" in literal.upper():
result = ValueExpression(ExpressionValueType.DOUBLE, float(literal))
else:
result = FilterExpressionParser._parse_numericliteral(literal)
except Exception:
result = FilterExpressionParser._parse_numericliteral(literal)
elif ctx.STRING_LITERAL() is not None:
result = ValueExpression(ExpressionValueType.STRING, FilterExpressionParser._parse_stringliteral(ctx.STRING_LITERAL().getText()))
elif ctx.DATETIME_LITERAL() is not None:
result = ValueExpression(ExpressionValueType.DATETIME, FilterExpressionParser._parse_datetimeliteral(ctx.DATETIME_LITERAL().getText()))
elif ctx.GUID_LITERAL() is not None:
result = ValueExpression(ExpressionValueType.GUID, FilterExpressionParser._parse_guidliteral(ctx.GUID_LITERAL().getText()))
elif ctx.BOOLEAN_LITERAL() is not None:
literal: str = ctx.BOOLEAN_LITERAL().getText()
result = TRUEVALUE if literal.upper() == "TRUE" else FALSEVALUE
elif ctx.K_NULL() is not None:
result = NULLVALUE
if result is not None:
self._add_expr(ctx, result)
@staticmethod
def _parse_numericliteral(literal: str) -> ValueExpression:
try:
value = Decimal(literal)
return ValueExpression(ExpressionValueType.DECIMAL, value)
except Exception:
try:
value = float(literal)
return ValueExpression(ExpressionValueType.DOUBLE, value)
except Exception:
return ValueExpression(ExpressionValueType.STRING, literal)
@staticmethod
def _parse_stringliteral(literal: str) -> str:
# Remove any surrounding quotes from string, ANTLR grammar already
# ensures strings starting with quote also ends with one
return literal[1:-1] if literal[0] == "'" else literal
@staticmethod
def _parse_guidliteral(literal: str) -> UUID:
# Remove any quotes from GUID (boost currently only handles optional braces),
# ANTLR grammar already ensures GUID starting with quote also ends with one
return UUID(literal[1:-1] if literal[0] in ["{", "'"] else literal)
@staticmethod
def _parse_datetimeliteral(literal: str) -> datetime:
# Remove any surrounding '#' symbols from date/time, ANTLR grammar already
# ensures date/time starting with '#' symbol will also end with one
literal = literal[1:-1] if literal[0] == "#" else literal
try:
return Convert.from_str(literal, datetime)
except Exception as ex:
raise EvaluateError(f"failed to parse datetime literal #{literal}#: {ex}") from ex
# columnName
# : IDENTIFIER
# ;
[docs]
def exitColumnName(self, ctx: ExpressionParser.ColumnNameContext):
tablename = self._active_expressiontree.tablename
if tablename is None or len(tablename) == 0:
if self.primary_tablename is None or len(self.primary_tablename) == 0:
raise EvaluateError("cannot parse column name in filter expression, no table name defined for expression tree nor is any PrimaryTableName defined.")
tablename = self.primary_tablename
table, err = self.table(tablename)
if err is not None:
raise EvaluateError(f"cannot parse column name in filter expression, {err}")
columnname: str = ctx.IDENTIFIER().getText()
datacolumn = table.column_byname(columnname)
if datacolumn is None:
raise EvaluateError(f"cannot parse column name in filter expression, failed to find column \"{columnname}\" in table \"{tablename}\"")
self._add_expr(ctx, ColumnExpression(datacolumn))
# functionExpression
# : functionName '(' expressionList? ')'
# ;
[docs]
def exitFunctionExpression(self, ctx: ExpressionParser.FunctionExpressionContext):
# sourcery skip
functionname: ExpressionParser.FunctionNameContext = ctx.functionName()
if functionname.K_ABS() is not None:
functiontype = ExpressionFunctionType.ABS
elif functionname.K_CEILING() is not None:
functiontype = ExpressionFunctionType.CEILING
elif functionname.K_COALESCE() is not None:
functiontype = ExpressionFunctionType.COALESCE
elif functionname.K_CONVERT() is not None:
functiontype = ExpressionFunctionType.CONVERT
elif functionname.K_CONTAINS() is not None:
functiontype = ExpressionFunctionType.CONTAINS
elif functionname.K_DATEADD() is not None:
functiontype = ExpressionFunctionType.DATEADD
elif functionname.K_DATEDIFF() is not None:
functiontype = ExpressionFunctionType.DATEDIFF
elif functionname.K_DATEPART() is not None:
functiontype = ExpressionFunctionType.DATEPART
elif functionname.K_ENDSWITH() is not None:
functiontype = ExpressionFunctionType.ENDSWITH
elif functionname.K_FLOOR() is not None:
functiontype = ExpressionFunctionType.FLOOR
elif functionname.K_IIF() is not None:
functiontype = ExpressionFunctionType.IIF
elif functionname.K_INDEXOF() is not None:
functiontype = ExpressionFunctionType.INDEXOF
elif functionname.K_ISDATE() is not None:
functiontype = ExpressionFunctionType.ISDATE
elif functionname.K_ISINTEGER() is not None:
functiontype = ExpressionFunctionType.ISINTEGER
elif functionname.K_ISGUID() is not None:
functiontype = ExpressionFunctionType.ISGUID
elif functionname.K_ISNULL() is not None:
functiontype = ExpressionFunctionType.ISNULL
elif functionname.K_ISNUMERIC() is not None:
functiontype = ExpressionFunctionType.ISNUMERIC
elif functionname.K_LASTINDEXOF() is not None:
functiontype = ExpressionFunctionType.LASTINDEXOF
elif functionname.K_LEN() is not None:
functiontype = ExpressionFunctionType.LEN
elif functionname.K_LOWER() is not None:
functiontype = ExpressionFunctionType.LOWER
elif functionname.K_MAXOF() is not None:
functiontype = ExpressionFunctionType.MAXOF
elif functionname.K_MINOF() is not None:
functiontype = ExpressionFunctionType.MINOF
elif functionname.K_NOW() is not None:
functiontype = ExpressionFunctionType.NOW
elif functionname.K_NTHINDEXOF() is not None:
functiontype = ExpressionFunctionType.NTHINDEXOF
elif functionname.K_POWER() is not None:
functiontype = ExpressionFunctionType.POWER
elif functionname.K_REGEXMATCH() is not None:
functiontype = ExpressionFunctionType.REGEXMATCH
elif functionname.K_REGEXVAL() is not None:
functiontype = ExpressionFunctionType.REGEXVAL
elif functionname.K_REPLACE() is not None:
functiontype = ExpressionFunctionType.REPLACE
elif functionname.K_REVERSE() is not None:
functiontype = ExpressionFunctionType.REVERSE
elif functionname.K_ROUND() is not None:
functiontype = ExpressionFunctionType.ROUND
elif functionname.K_SPLIT() is not None:
functiontype = ExpressionFunctionType.SPLIT
elif functionname.K_SQRT() is not None:
functiontype = ExpressionFunctionType.SQRT
elif functionname.K_STARTSWITH() is not None:
functiontype = ExpressionFunctionType.STARTSWITH
elif functionname.K_STRCOUNT() is not None:
functiontype = ExpressionFunctionType.STRCOUNT
elif functionname.K_STRCMP() is not None:
functiontype = ExpressionFunctionType.STRCMP
elif functionname.K_SUBSTR() is not None:
functiontype = ExpressionFunctionType.SUBSTR
elif functionname.K_TRIM() is not None:
functiontype = ExpressionFunctionType.TRIM
elif functionname.K_TRIMLEFT() is not None:
functiontype = ExpressionFunctionType.TRIMLEFT
elif functionname.K_TRIMRIGHT() is not None:
functiontype = ExpressionFunctionType.TRIMRIGHT
elif functionname.K_UPPER() is not None:
functiontype = ExpressionFunctionType.UPPER
elif functionname.K_UTCNOW() is not None:
functiontype = ExpressionFunctionType.UTCNOW
else:
raise EvaluateError(f"unknown function \"{functionname.getText()}\"")
expressionlist: ExpressionParser.ExpressionListContext = ctx.expressionList()
arguments: List[Expression] = []
if expressionlist is not None:
expressions: List[ExpressionParser.ExpressionContext] = expressionlist.expression()
argumentcount = len(expressions)
for i in range(argumentcount):
argument = self._get_expr(expressions[i])
if argument is None:
raise EvaluateError(f"failed to find argument expression {i} \"{expressions[i].getText()}\" for function \"{functionname.getText()}\"")
arguments.append(argument)
self._add_expr(ctx, FunctionExpression(functiontype, arguments))
[docs]
@staticmethod
def generate_expressiontrees(dataset: DataSet, primarytable: str, filterexpression: str, suppress_console_erroroutput: bool = False) -> Tuple[Optional[List[ExpressionTree]], Optional[Exception]]:
"""
Produces a set of expression trees for the provided `filterexpression` and `dataset`.
One expression tree will be produced per filter expression statement encountered in the specified `filterexpression`.
If `primarytable` parameter is not defined, then filter expression should not contain directly defined signal IDs.
An error will be returned if `dataSet` parameter is None, the `filterexpression` is empty or expression fails to parse.
"""
parser, err = FilterExpressionParser.from_dataset(dataset, filterexpression, primarytable, None, suppress_console_erroroutput)
if err is not None:
return None, err
parser.track_filteredrows = False
return parser.expressiontrees
[docs]
@staticmethod
def generate_expressiontrees_fromtable(datatable: DataTable, filterexpression: str, suppress_console_erroroutput: bool = False) -> Tuple[Optional[List[ExpressionTree]], Optional[Exception]]:
"""
Produces a set of expression trees for the provided `filterexpression` and `datatable`.
One expression tree will be produced per filter expression statement encountered in the specified `filterexpression`.
An error will be returned if `datatable` parameter is None, the `filterexpression` is empty or expression fails to parse.
"""
if datatable is None:
return None, ValueError("datatable parameter cannot be None")
return FilterExpressionParser.generate_expressiontrees(datatable.parent, datatable.name, filterexpression, suppress_console_erroroutput)
[docs]
@staticmethod
def generate_expressiontree(datatable: DataTable, filterexpression: str, suppress_console_erroroutput: bool = False) -> Tuple[Optional[ExpressionTree], Optional[Exception]]:
"""
Gets the first produced expression tree for the provided `filterexpression` and `datatable`.
If `filterexpression` contains multiple semi-colon separated statements, only the first expression is returned.
An error will be returned if `datatable` parameter is None, the `filterexpression` is empty or expression fails to parse.
"""
expressiontrees, err = FilterExpressionParser.generate_expressiontrees_fromtable(datatable, filterexpression, suppress_console_erroroutput)
if err is not None:
return None, err
if expressiontrees is not None and len(expressiontrees) > 0:
return expressiontrees[0], None
return None, EvaluateError(f"no expression trees generated with filter expression \"{filterexpression}\" for table \"{datatable.name}\"")
[docs]
@staticmethod
def evaluate_expression(filterexpression: str, suppress_console_erroroutput: bool = False) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
"""
Returns the result of the evaluated `filterexpression`.
This expression evaluation function is only for simple expressions that do not reference any `DataSet` columns.
Use `evaluate_datarowexpression` for evaluating filter expressions that contain column references.
If `filterexpression` contains multiple semi-colon separated statements, only the first expression is evaluated.
An error will be returned if the `filterexpression` is empty or expression fails to parse.
"""
if filterexpression is None or not filterexpression:
return None, EvaluateError("filterexpression is empty")
parser = FilterExpressionParser(filterexpression, suppress_console_erroroutput)
parser.track_filteredrows = False
expressiontrees, err = parser.expressiontrees
if err is not None:
return None, err
if expressiontrees is not None and len(expressiontrees) > 0:
return expressiontrees[0].evaluate()
return None, EvaluateError(f"no expression trees generated with filter expression \"{filterexpression}\"")
[docs]
@staticmethod
def evaluate_datarowexpression(datarow: DataRow, filterexpression: str, suppress_console_erroroutput: bool = False) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
"""
Returns the result of the evaluated `filterexpression` using the specified `datarow`.
If `filterexpression` contains multiple semi-colon separated statements, only the first expression is evaluated.
An error will be returned if `datarow` parameter is None, the `filterexpression` is empty, expression fails to
parse or row expression evaluation fails.
"""
if datarow is None:
return None, ValueError("datarow is None")
if filterexpression is None or not filterexpression:
return None, EvaluateError("filterexpression is empty")
expressiontree, err = FilterExpressionParser.generate_expressiontree(datarow.parent, filterexpression, suppress_console_erroroutput)
return (None, err) if err is not None else expressiontree.evaluate(datarow)
[docs]
@staticmethod
def select_datarows(dataset: DataSet, filterexpression: str, primarytable: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[List[DataRow]], Optional[Exception]]:
"""
Returns all rows matching the provided `filterexpression` and `dataset`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results in a
unique expression tree; this function returns the combined results of each encountered filter expression statement.
Returned `DataRow` list will contain all matching rows, order preserved. If `dataset` includes duplicated rows, it
will be possible for the result set to contain duplicates. Any encountered "TOP" limit or "ORDER BY" clauses will
be respected. An error will be returned if `dataset` parameter is None, the `filterexpression` is empty, expression
fails to parse or any row expression evaluation fails.
"""
parser, err = FilterExpressionParser.from_dataset(dataset, filterexpression, primarytable, tableidfields, suppress_console_erroroutput)
if err is not None:
return None, err
err = parser.evaluate(True, True)
return (None, err) if err is not None else (parser.filtered_rows, None)
[docs]
@staticmethod
def select_datarows_fromtable(datatable: DataTable, filterexpression: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[List[DataRow]], Optional[Exception]]:
"""
Returns all rows matching the provided `filterexpression` and `datatable`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results
in a unique expression tree; this function returns the combined results of each encountered filter expression
statement. Returned `DataRow` list will contain all matching rows, order preserved. If dataSet includes
duplicated rows, it will be possible for the result set to contain duplicates. Any encountered "TOP" limit or
"ORDER BY" clauses will be respected. An error will be returned if `datatable` parameter (or its parent DataSet)
is None, the `filterexpression` is empty, expression fails to parse or any row expression evaluation fails.
"""
if datatable is None:
return None, ValueError("datatable is None")
return FilterExpressionParser.select_datarows(datatable.parent, filterexpression, datatable.name, tableidfields, suppress_console_erroroutput)
[docs]
@staticmethod
def select_datarowset(dataset: DataSet, filterexpression: str, primarytable: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[Set[DataRow]], Optional[Exception]]:
"""
Returns all unique rows matching the provided `filterexpression` and `dataset`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results
in a unique expression tree; this function returns the combined results of each encountered filter expression
statement. Returned `DataRow` set will contain only unique rows, in arbitrary order. Any encountered "TOP" limit
clauses for individual filter expression statements will be respected, but "ORDER BY" clauses will be ignored.
An error will be returned if `dataset` parameter is None, the `filterexpression` is empty, expression fails to
parse or any row expression evaluation fails.
"""
parser, err = FilterExpressionParser.from_dataset(dataset, filterexpression, primarytable, tableidfields, suppress_console_erroroutput)
if err is not None:
return None, err
err = parser.evaluate(True, False)
return (None, err) if err is not None else (parser.filtered_rowset, None)
[docs]
@staticmethod
def select_datarowset_fromtable(datatable: DataTable, filterexpression: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[Set[DataRow]], Optional[Exception]]:
"""
Returns all unique rows matching the provided `filterexpression` and `datatable`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results
in a unique expression tree; this function returns the combined results of each encountered filter expression
statement. Returned `DataRow` set will contain only unique rows, in arbitrary order. Any encountered "TOP" limit
clauses for individual filter expression statements will be respected, but "ORDER BY" clauses will be ignored.
An error will be returned if `datatable` parameter (or its parent DataSet) is None, the `filterexpression` is
empty, expression fails to parse or any row expression evaluation fails.
"""
if datatable is None:
return None, ValueError("datatable is None")
return FilterExpressionParser.select_datarowset(datatable.parent, filterexpression, datatable.name, tableidfields, suppress_console_erroroutput)
[docs]
@staticmethod
def select_signalidset(dataset: DataSet, filterexpression: str, primarytable: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[Set[UUID]], Optional[Exception]]:
"""
Returns all unique signal IDs matching the provided `filterexpression` and `dataset`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results
in a unique expression tree; this function returns the combined results of each encountered filter expression
statement. Returned `UUID` set will contain only unique values, in arbitrary order. Any encountered "TOP" limit
clauses for individual filter expression statements will be respected, but "ORDER BY" clauses will be ignored.
An error will be returned if `dataset` parameter is None, the `filterexpression` is empty, expression fails to
parse or any row expression evaluation fails.
"""
parser, err = FilterExpressionParser.from_dataset(dataset, filterexpression, primarytable, tableidfields, suppress_console_erroroutput)
if err is not None:
return None, err
parser.track_filteredrows = False
parser.track_filteredsignalids = True
err = parser.evaluate(True, False)
return (None, err) if err is not None else (parser.filtered_signalidset, None)
[docs]
@staticmethod
def select_signalidset_fromtable(datatable: DataTable, filterexpression: str, tableidfields: Optional[TableIDFields] = None, suppress_console_erroroutput: bool = False) -> Tuple[Optional[Set[UUID]], Optional[Exception]]:
"""
Returns all unique signal IDs matching the provided `filterexpression` and `datatable`.
Filter expressions can contain multiple statements, separated by semi-colons, where each statement results
in a unique expression tree; this function returns the combined results of each encountered filter expression
statement. Returned `UUID` set will contain only unique values, in arbitrary order. Any encountered "TOP" limit
clauses for individual filter expression statements will be respected, but "ORDER BY" clauses will be ignored.
An error will be returned if `datatable` parameter (or its parent DataSet) is None, the `filterexpression` is
empty, expression fails to parse or any row expression evaluation fails.
"""
if datatable is None:
return None, ValueError("datatable is None")
return FilterExpressionParser.select_signalidset(datatable.parent, filterexpression, datatable.name, tableidfields, suppress_console_erroroutput)