# ******************************************************************************************************
# expressiontree.py - Gbtc
#
# Copyright © 2022, Grid Protection Alliance. All Rights Reserved.
#
# Licensed to the Grid Protection Alliance (GPA) under one or more contributor license agreements. See
# the NOTICE file distributed with this work for additional information regarding copyright ownership.
# The GPA licenses this file to you under the MIT License (MIT), the "License"; you may not use this
# file except in compliance with the License. You may obtain a copy of the License at:
#
# http://opensource.org/licenses/MIT
#
# Unless agreed to in writing, the subject software distributed under the License is distributed on an
# "AS-IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Refer to the
# License for the specific language governing permissions and limitations.
#
# Code Modification History:
# ----------------------------------------------------------------------------------------------------
# 09/04/2022 - J. Ritchie Carroll
# Generated original version of source code.
#
# ******************************************************************************************************
from gsf import Empty, Limits, normalize_enumname
from .datarow import DataRow
from .datatable import DataTable
from .datatype import DataType
from .orderbyterm import OrderByTerm
from .expression import Expression
from .valueexpression import ValueExpression, \
TRUEVALUE, FALSEVALUE, EMPTYSTRINGVALUE, \
NULLBOOLVALUE, NULLSTRINGVALUE, \
NULLINT32VALUE, NULLDATETIMEVALUE
from .unaryexpression import UnaryExpression
from .columnexpression import ColumnExpression
from .inlistexpression import InListExpression
from .functionexpression import FunctionExpression
from .operatorexpression import OperatorExpression
from .constants import ExpressionType, \
ExpressionValueType, ExpressionFunctionType, \
ExpressionOperatorType, TimeInterval, \
is_integertype, is_numerictype, \
derive_operationvaluetype, \
derive_comparison_operationvaluetype, \
derive_arithmetic_operationvaluetype
from .errors import EvaluateError
from typing import Callable, List, Optional, Tuple, Union
from functools import cmp_to_key
from datetime import datetime, timezone
from dateutil import parser
from dateutil.relativedelta import relativedelta
from uuid import UUID
import numpy as np
import math
import re
import sys
INTSIZE = 64 if sys.maxsize > 2**32 else 32
UMAXINT64 = np.uint64(Limits.MAXINT64)
def _find_nthindex(source: str, test: str, index: int) -> int:
result = 0
for _ in range(index + 1):
location = source.find(test, result)
if location == -1:
result = 0
break
result = location + 1
return result - 1
def _split_nthindex(source: str, test: str, index: int) -> Tuple[int, int, bool]:
firstindex = _find_nthindex(source, test, index - 1)
secondindex = _find_nthindex(source, test, index)
if firstindex <= 0:
return (-1, -1, False) if secondindex <= 0 else (0, secondindex, True)
if secondindex <= 0:
return firstindex + len(test), len(source), True
return firstindex + len(test), secondindex, True
[docs]
class ExpressionTree:
"""
Represents a tree of expressions for evaluation.
"""
def __init__(self):
"""
Creates a new expression tree.
"""
self._currentrow: Optional[DataRow] = None
self.tablename: str = Empty.STRING
"""
Defines the associated table name parsed from "FILTER" statement, if any.
"""
self.toplimit: int = -1
"""
Defined the parsed value associated with the "TOP" keyword, if any.
"""
self.orderbyterms: List[OrderByTerm] = []
"""
Represents the order by elements parsed from the "ORDER BY" keyword, if any.
"""
self.root: Optional[Expression] = None
"""
Defines the starting `Expression` for evaluation of the expression tree, or None if there
is not one. This is the root expression of the `ExpressionTree`. Value is automatically
managed by the `FilterExpressionParser`.
"""
[docs]
def select(self, table: DataTable) -> Tuple[List[DataRow], Optional[Exception]]:
"""
Returns the rows matching the the `ExpressionTree`.
The expression tree result type is expected to be a boolean for this filtering operation.
This works like the "WHERE" clause of a SQL expression. Any "TOP" limit and "ORDER BY"
sorting clauses found in filter expressions will be respected. An error will be returned
if the table parameter is None, the expression tree does not yield a boolean value or any
row expresssion evaluation fails.
"""
def predicate(result_expression: ValueExpression) -> Tuple[bool, Optional[Exception]]:
# Final expression should have a boolean data type (operates as a WHERE clause)
if result_expression.valuetype != ExpressionValueType.BOOLEAN:
return False, EvaluateError(f"cannot execute select operation, final expression tree evaluation result must be a boolean value, not \"{normalize_enumname(result_expression.valuetype)}\"")
# If final result is Null, i.e., has no value due to Null propagation, treat result as False
return result_expression._booleanvalue(), None
return self.selectwhere(table, predicate, True, True)
[docs]
def selectwhere(self, table: DataTable, predicate: Callable[[ValueExpression], Tuple[bool, Optional[Exception]]], applylimit: bool, applysort: bool) -> Tuple[List[DataRow], Optional[Exception]]:
"""
Returns each table row evaluated from the `ExpressionTree` that matches the specified predicate.
The `applylimit` and `applysort` parameters determine if any encountered "TOP" limit and "ORDER BY"
sorting clauses will be respected. An error will be returned if the table parameter is nil or any
row expresssion evaluation fails.
"""
if table is None:
return [], TypeError("cannot execute select operation, table parameter cannot be None")
matchedrows: List[DataRow] = []
# Find rows that match the expression tree
for row in table:
if applylimit and self.toplimit > -1 and len(matchedrows) >= self.toplimit:
break
if row is None:
continue
result_expression, err = self.evaluate(row)
if err is not None:
return [], err
# If value result for row matches predicate expression, add it to matching rows
result, err = predicate(result_expression)
if err is not None:
return [], err
if result:
matchedrows.append(row)
# Sort matching rows if requested
if applysort and matchedrows and self.orderbyterms:
def compare_rows(leftmatchedrow: DataRow, rightmatchedrow: DataRow) -> int:
for orderbyterm in self.orderbyterms:
if orderbyterm.ascending:
leftrow = leftmatchedrow
rightrow = rightmatchedrow
else:
leftrow = rightmatchedrow
rightrow = leftmatchedrow
result, err = DataRow.compare_datarowcolumns(leftrow, rightrow, orderbyterm.column.index, orderbyterm.extactmatch)
if err is not None:
raise EvaluateError(f"cannot execute select operation, failed while comparing rows for sorting: {err}")
# If last compare result was equal, continue sort based on next order-by term
if result != 0:
return result
return 0
matchedrows.sort(key=cmp_to_key(compare_rows))
return matchedrows, None
[docs]
def evaluate(self, row: Optional[DataRow] = None) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
"""
Traverses the the `ExpressionTree` for the provided data row to produce a `ValueExpression`.
Root expression should be assigned before calling `evaluate`; otherwise result will be a Null
`ValueExpression`. The `datarow` parameter can be None if there are no columns referenced in
expression tree. An error will be returned if the expresssion evaluation fails.
"""
self._currentrow = row
return self._evaluate(self.root)
def _evaluate(self, expression: Optional[Union[Expression, ValueExpression]], target_valuetype: ExpressionValueType = ExpressionValueType.BOOLEAN) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if expression is None:
return ValueExpression.nullvalue(target_valuetype), None
expressiontype = expression.expressiontype
if expressiontype == ExpressionType.VALUE:
# Change Undefined NULL values to Nullable of target type
if expression.valuetype == ExpressionValueType.UNDEFINED:
return ValueExpression.nullvalue(target_valuetype), None
return expression, None
if expressiontype == ExpressionType.UNARY:
return self._evaluate_unary(expression)
if expressiontype == ExpressionType.COLUMN:
return self._evaluate_column(expression)
if expressiontype == ExpressionType.INLIST:
return self._evaluate_in_list(expression)
if expressiontype == ExpressionType.FUNCTION:
return self._evaluate_function(expression)
if expressiontype == ExpressionType.OPERATOR:
return self._evaluate_operator(expression)
return None, TypeError("unexpected expression type encountered")
def _evaluate_unary(self, unary_expression: UnaryExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
unary_value, err = self._evaluate(unary_expression.value)
if err is not None:
return None, EvaluateError(f"failed while evaluating unary expression value: {err}")
unary_valuetype = unary_value.valuetype
# If unary value is Null, result is Null
if unary_value.is_null():
return ValueExpression.nullvalue(unary_valuetype), None
if unary_valuetype == ExpressionValueType.BOOLEAN:
return unary_expression.applyto_bool(unary_value._booleanvalue())
if unary_valuetype == ExpressionValueType.INT32:
return unary_expression.applyto_int32(unary_value._int32value())
if unary_valuetype == ExpressionValueType.INT64:
return unary_expression.applyto_int64(unary_value._int64value())
if unary_valuetype == ExpressionValueType.DECIMAL:
return unary_expression.applyto_decimal(unary_value._decimalvalue())
if unary_valuetype == ExpressionValueType.DOUBLE:
return unary_expression.applyto_double(unary_value._doublevalue())
return None, EvaluateError(f"cannot apply unary \"{normalize_enumname(unary_expression.unarytype)}\" operator to \"{normalize_enumname(unary_valuetype)}\"")
def _evaluate_column(self, column_expression: ColumnExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]: # sourcery skip
if self._currentrow is None:
return None, TypeError("cannot evaluate column expression, current data row reference is not defined")
column = column_expression.datacolumn
if column is None:
return None, TypeError("cannot evaluate column expression, data column reference is not defined")
columnindex = column.index
columndatatype = column.datatype
if columndatatype == DataType.STRING:
valuetype = ExpressionValueType.STRING
value, isnull, err = self._currentrow.stringvalue(columnindex)
elif columndatatype == DataType.BOOLEAN:
valuetype = ExpressionValueType.BOOLEAN
value, isnull, err = self._currentrow.booleanvalue(columnindex)
elif columndatatype == DataType.DATETIME:
valuetype = ExpressionValueType.DATETIME
value, isnull, err = self._currentrow.datetimevalue(columnindex)
elif columndatatype == DataType.SINGLE:
valuetype = ExpressionValueType.DOUBLE
f32, isnull, err = self._currentrow.singlevalue(columnindex)
value = np.float64(f32)
elif columndatatype == DataType.DOUBLE:
valuetype = ExpressionValueType.DOUBLE
value, isnull, err = self._currentrow.doublevalue(columnindex)
elif columndatatype == DataType.DECIMAL:
valuetype = ExpressionValueType.DECIMAL
value, isnull, err = self._currentrow.decimalvalue(columnindex)
elif columndatatype == DataType.GUID:
valuetype = ExpressionValueType.GUID
value, isnull, err = self._currentrow.guidvalue(columnindex)
elif columndatatype == DataType.INT8:
valuetype = ExpressionValueType.INT32
i8, isnull, err = self._currentrow.int8value(columnindex)
value = np.int32(i8)
elif columndatatype == DataType.INT16:
valuetype = ExpressionValueType.INT32
i16, isnull, err = self._currentrow.int16value(columnindex)
value = np.int32(i16)
elif columndatatype == DataType.INT32:
valuetype = ExpressionValueType.INT32
value, isnull, err = self._currentrow.int32value(columnindex)
elif columndatatype == DataType.INT64:
valuetype = ExpressionValueType.INT64
value, isnull, err = self._currentrow.int64value(columnindex)
elif columndatatype == DataType.UINT8:
valuetype = ExpressionValueType.INT32
ui8, isnull, err = self._currentrow.uint8value(columnindex)
value = np.int32(ui8)
elif columndatatype == DataType.UINT16:
valuetype = ExpressionValueType.INT32
ui16, isnull, err = self._currentrow.uint16value(columnindex)
value = np.int32(ui16)
elif columndatatype == DataType.UINT32:
valuetype = ExpressionValueType.INT64
ui32, isnull, err = self._currentrow.uint32value(columnindex)
value = np.int64(ui32)
elif columndatatype == DataType.UINT64:
ui64, isnull, err = self._currentrow.uint64value(columnindex)
if ui64 > UMAXINT64:
valuetype = ExpressionValueType.DOUBLE
value = np.float64(ui64)
else:
valuetype = ExpressionValueType.INT64
value = np.int64(ui64)
else:
return None, TypeError("unexpected column data type encountered")
if err is not None:
return None, EvaluateError(f"failed while evaluating \"{column.name}\" column \"{normalize_enumname(columndatatype)}\" value for current row: {err}")
if isnull:
return ValueExpression.nullvalue(valuetype), None
return ValueExpression(valuetype, value), None
def _evaluate_in_list(self, inlist_expression: InListExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
inlist_value, err = self._evaluate(inlist_expression.value)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IN\" expression source value: {err}")
# If in-list test value is Null, result is Null
if inlist_value.is_null():
return ValueExpression.nullvalue(inlist_value.valuetype), None
has_notkeyword = inlist_expression.has_notkeyword
exactmatch = inlist_expression.exactmatch
arguments = inlist_expression.arguments
for i in range(len(arguments)):
argument_value, err = self._evaluate(arguments[i])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IN\" expression argument {i} value: {err}")
valuetype, err = derive_comparison_operationvaluetype(ExpressionOperatorType.EQUAL, inlist_value.valuetype, argument_value.valuetype)
if err is not None:
return None, EvaluateError(f"failed while deriving \"IN\" expression argument {i} comparison operation value type: {err}")
result, err = self._equal_op(inlist_value, argument_value, valuetype, exactmatch)
if err is not None:
return None, EvaluateError(f"failed while comparing \"IN\" expression source value to argument {i} for equality: {err}")
if result._booleanvalue():
return FALSEVALUE if has_notkeyword else TRUEVALUE, None
return TRUEVALUE if has_notkeyword else FALSEVALUE, None
def _evaluate_function(self, function_expression: FunctionExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
functiontype = function_expression.functiontype
arguments = function_expression.arguments
if functiontype == ExpressionFunctionType.ABS:
return self._evaluate_abs(arguments)
if functiontype == ExpressionFunctionType.CEILING:
return self._evaluate_ceiling(arguments)
if functiontype == ExpressionFunctionType.COALESCE:
return self._evaluate_coalesce(arguments)
if functiontype == ExpressionFunctionType.CONVERT:
return self._evaluate_convert(arguments)
if functiontype == ExpressionFunctionType.CONTAINS:
return self._evaluate_contains(arguments)
if functiontype == ExpressionFunctionType.DATEADD:
return self._evaluate_dateadd(arguments)
if functiontype == ExpressionFunctionType.DATEDIFF:
return self._evaluate_datediff(arguments)
if functiontype == ExpressionFunctionType.DATEPART:
return self._evaluate_datepart(arguments)
if functiontype == ExpressionFunctionType.ENDSWITH:
return self._evaluate_endswith(arguments)
if functiontype == ExpressionFunctionType.FLOOR:
return self._evaluate_floor(arguments)
if functiontype == ExpressionFunctionType.IIF:
return self._evaluate_iif(arguments)
if functiontype == ExpressionFunctionType.INDEXOF:
return self._evaluate_indexof(arguments)
if functiontype == ExpressionFunctionType.ISDATE:
return self._evaluate_is_date(arguments)
if functiontype == ExpressionFunctionType.ISINTEGER:
return self._evaluate_is_integer(arguments)
if functiontype == ExpressionFunctionType.ISGUID:
return self._evaluate_is_guid(arguments)
if functiontype == ExpressionFunctionType.ISNULL:
return self._evaluate_is_null(arguments)
if functiontype == ExpressionFunctionType.ISNUMERIC:
return self._evaluate_is_numeric(arguments)
if functiontype == ExpressionFunctionType.LASTINDEXOF:
return self._evaluate_lastindexof(arguments)
if functiontype == ExpressionFunctionType.LEN:
return self._evaluate_len(arguments)
if functiontype == ExpressionFunctionType.LOWER:
return self._evaluate_lower(arguments)
if functiontype == ExpressionFunctionType.MAXOF:
return self._evaluate_maxof(arguments)
if functiontype == ExpressionFunctionType.MINOF:
return self._evaluate_minof(arguments)
if functiontype == ExpressionFunctionType.NTHINDEXOF:
return self._evaluate_nthindexof(arguments)
if functiontype == ExpressionFunctionType.NOW:
return self._evaluate_now(arguments)
if functiontype == ExpressionFunctionType.POWER:
return self._evaluate_power(arguments)
if functiontype == ExpressionFunctionType.REGEXMATCH:
return self._evaluate_regexmatch(arguments)
if functiontype == ExpressionFunctionType.REGEXVAL:
return self._evaluate_regexval(arguments)
if functiontype == ExpressionFunctionType.REPLACE:
return self._evaluate_replace(arguments)
if functiontype == ExpressionFunctionType.REVERSE:
return self._evaluate_reverse(arguments)
if functiontype == ExpressionFunctionType.ROUND:
return self._evaluate_round(arguments)
if functiontype == ExpressionFunctionType.SPLIT:
return self._evaluate_split(arguments)
if functiontype == ExpressionFunctionType.SQRT:
return self._evaluate_sqrt(arguments)
if functiontype == ExpressionFunctionType.STARTSWITH:
return self._evaluate_startswith(arguments)
if functiontype == ExpressionFunctionType.STRCOUNT:
return self._evaluate_strcount(arguments)
if functiontype == ExpressionFunctionType.STRCMP:
return self._evaluate_strcmp(arguments)
if functiontype == ExpressionFunctionType.SUBSTR:
return self._evaluate_substr(arguments)
if functiontype == ExpressionFunctionType.TRIM:
return self._evaluate_trim(arguments)
if functiontype == ExpressionFunctionType.TRIMLEFT:
return self._evaluate_trimleft(arguments)
if functiontype == ExpressionFunctionType.TRIMRIGHT:
return self._evaluate_trimright(arguments)
if functiontype == ExpressionFunctionType.UPPER:
return self._evaluate_upper(arguments)
if functiontype == ExpressionFunctionType.UTCNOW:
return self._evaluate_utcnow(arguments)
return None, TypeError("unexpected function type encountered")
def _evaluate_abs(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Abs\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Abs\" function source value, first argument: {err}")
return self._abs(sourcevalue)
def _evaluate_ceiling(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Ceiling\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Ceiling\" function source value, first argument: {err}")
return self._ceiling(sourcevalue)
def _evaluate_coalesce(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2:
return None, ValueError(f"\"Coalesce\" function expects at least 2 arguments, received {len(arguments)}")
# Not pre-evaluating Coalesce arguments - arguments will be evaluated only up to first non-null value
return self._coalesce(arguments)
def _evaluate_convert(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"Convert\" function expects 2 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Convert\" function source value, first argument: {err}")
targettype, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Convert\" function target type, second argument: {err}")
return self._convert(sourcevalue, targettype)
def _evaluate_contains(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"Contains\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Contains\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Contains\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._contains(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Contains\" function optional ignore case value, third argument: {err}")
return self._contains(sourcevalue, testvalue, ignorecase)
def _evaluate_dateadd(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 3:
return None, ValueError(f"\"DateAdd\" function expects 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateAdd\" function source value, first argument: {err}")
addvalue, err = self._evaluate(arguments[1], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateAdd\" function add value, second argument: {err}")
intervaltype, err = self._evaluate(arguments[2], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateAdd\" function interval type value, third argument: {err}")
return self._dateadd(sourcevalue, addvalue, intervaltype)
def _evaluate_datediff(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 3:
return None, ValueError(f"\"DateDiff\" function expects 3 arguments, received {len(arguments)}")
leftvalue, err = self._evaluate(arguments[0], ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateDiff\" function left value, first argument: {err}")
rightvalue, err = self._evaluate(arguments[1], ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateDiff\" function right value, second argument: {err}")
intervaltype, err = self._evaluate(arguments[2], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DateDiff\" function interval type value, third argument: {err}")
return self._datediff(leftvalue, rightvalue, intervaltype)
def _evaluate_datepart(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"DatePart\" function expects 2 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DatePart\" function source value, first argument: {err}")
intervaltype, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"DatePart\" function interval type value, second argument: {err}")
return self._datepart(sourcevalue, intervaltype)
def _evaluate_endswith(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"EndsWith\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"EndsWith\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"EndsWith\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._endswith(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"EndsWith\" function optional ignore case value, third argument: {err}")
return self._endswith(sourcevalue, testvalue, ignorecase)
def _evaluate_floor(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Floor\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Floor\" function source value, first argument: {err}")
return self._floor(sourcevalue)
def _evaluate_iif(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 3:
return None, ValueError(f"\"IIf\" function expects 3 arguments, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IIf\" function test value, first argument: {err}")
# Not pre-evaluating IIf result value arguments - only evaluating desired path
return self._iif(testvalue, arguments[1], arguments[2])
def _evaluate_indexof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"IndexOf\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IndexOf\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IndexOf\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._indexof(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IndexOf\" function optional ignore case value, third argument: {err}")
return self._indexof(sourcevalue, testvalue, ignorecase)
def _evaluate_is_date(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"IsDate\" function expects 1 argument, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsDate\" function test value, first argument: {err}")
return self._is_date(testvalue)
def _evaluate_is_integer(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"IsInteger\" function expects 1 argument, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsInteger\" function test value, first argument: {err}")
return self._is_integer(testvalue)
def _evaluate_is_guid(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"IsGuid\" function expects 1 argument, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsGuid\" function test value, first argument: {err}")
return self._is_guid(testvalue)
def _evaluate_is_null(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"IsNull\" function expects 2 arguments, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsNull\" function test value, first argument: {err}")
defaultvalue, err = self._evaluate(arguments[1])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsNull\" function default value, second argument: {err}")
return self._is_null(testvalue, defaultvalue)
def _evaluate_is_numeric(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"IsNumeric\" function expects 1 argument, received {len(arguments)}")
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IsNumeric\" function test value, first argument: {err}")
return self._is_numeric(testvalue)
def _evaluate_lastindexof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"LastIndexOf\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"LastIndexOf\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"LastIndexOf\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._lastindexof(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"LastIndexOf\" function optional ignore case value, third argument: {err}")
return self._lastindexof(sourcevalue, testvalue, ignorecase)
def _evaluate_len(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Len\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Len\" function source value, first argument: {err}")
return self._len(sourcevalue)
def _evaluate_lower(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Lower\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Lower\" function source value, first argument: {err}")
return self._lower(sourcevalue)
def _evaluate_maxof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2:
return None, ValueError(f"\"MaxOf\" function expects at least 2 arguments, received {len(arguments)}")
return self._maxof(arguments)
def _evaluate_minof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2:
return None, ValueError(f"\"MinOf\" function expects at least 2 arguments, received {len(arguments)}")
return self._minof(arguments)
def _evaluate_nthindexof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 3 or len(arguments) > 4:
return None, ValueError(f"\"NthIndexOf\" function expects 3 or 4 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"NthIndexOf\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"NthIndexOf\" function test value, second argument: {err}")
indexvalue, err = self._evaluate(arguments[2], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"NthIndexOf\" function index value, third argument: {err}")
if len(arguments) == 3:
return self._nthindexof(sourcevalue, testvalue, indexvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[3], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"NthIndexOf\" function optional ignore case value, fourth argument: {err}")
return self._nthindexof(sourcevalue, testvalue, indexvalue, ignorecase)
def _evaluate_now(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if arguments:
return None, ValueError(f"\"Now\" function expects 0 arguments, received {len(arguments)}")
return self._now()
def _evaluate_power(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"Power\" function expects 2 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Power\" function source value, first argument: {err}")
exponentvalue, err = self._evaluate(arguments[1], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Power\" function exponent value, second argument: {err}")
return self._power(sourcevalue, exponentvalue)
def _evaluate_regexmatch(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"RegExMatch\" function expects 2 arguments, received {len(arguments)}")
regexvalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"RegExMatch\" function expression value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"RegExMatch\" function test value, second argument: {err}")
return self._regexmatch(regexvalue, testvalue)
def _evaluate_regexval(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 2:
return None, ValueError(f"\"RegExValue\" function expects 2 arguments, received {len(arguments)}")
regexvalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"RegExValue\" function expression value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"RegExValue\" function test value, second argument: {err}")
return self._regexval(regexvalue, testvalue)
def _evaluate_replace(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 3 or len(arguments) > 4:
return None, ValueError(f"\"Replace\" function expects 3 or 4 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Replace\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Replace\" function test value, second argument: {err}")
replacevalue, err = self._evaluate(arguments[2], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Replace\" function replace value, third argument: {err}")
if len(arguments) == 3:
return self._replace(sourcevalue, testvalue, replacevalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[3], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Replace\" function optional ignore case value, fourth argument: {err}")
return self._replace(sourcevalue, testvalue, replacevalue, ignorecase)
def _evaluate_reverse(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Reverse\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Reverse\" function source value, first argument: {err}")
return self._reverse(sourcevalue)
def _evaluate_round(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Round\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Round\" function source value, first argument: {err}")
return self._round(sourcevalue)
def _evaluate_split(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 3 or len(arguments) > 4:
return None, ValueError(f"\"Split\" function expects 3 or 4 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Split\" function source value, first argument: {err}")
delimitervalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Split\" function delimiter value, second argument: {err}")
indexvalue, err = self._evaluate(arguments[2], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Split\" function index value, third argument: {err}")
if len(arguments) == 3:
return self._split(sourcevalue, delimitervalue, indexvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[3], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Split\" function optional ignore case value, fourth argument: {err}")
return self._split(sourcevalue, delimitervalue, indexvalue, ignorecase)
def _evaluate_sqrt(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Sqrt\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.DOUBLE)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Sqrt\" function source value, first argument: {err}")
return self._sqrt(sourcevalue)
def _evaluate_startswith(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"StartsWith\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StartsWith\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StartsWith\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._startswith(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StartsWith\" function optional ignore case value, third argument: {err}")
return self._startswith(sourcevalue, testvalue, ignorecase)
def _evaluate_strcount(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"StrCount\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCount\" function source value, first argument: {err}")
testvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCount\" function test value, second argument: {err}")
if len(arguments) == 2:
return self._strcount(sourcevalue, testvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCount\" function optional ignore case value, third argument: {err}")
return self._strcount(sourcevalue, testvalue, ignorecase)
def _evaluate_strcmp(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"StrCmp\" function expects 2 or 3 arguments, received {len(arguments)}")
leftvalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCmp\" function left value, first argument: {err}")
rightvalue, err = self._evaluate(arguments[1], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCmp\" function right value, second argument: {err}")
if len(arguments) == 2:
return self._strcmp(leftvalue, rightvalue, NULLBOOLVALUE)
ignorecase, err = self._evaluate(arguments[2], ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"StrCmp\" function optional ignore case value, third argument: {err}")
return self._strcmp(leftvalue, rightvalue, ignorecase)
def _evaluate_substr(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) < 2 or len(arguments) > 3:
return None, ValueError(f"\"SubStr\" function expects 2 or 3 arguments, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"SubStr\" function source value, first argument: {err}")
indexvalue, err = self._evaluate(arguments[1], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"SubStr\" function index value, second argument: {err}")
if len(arguments) == 2:
return self._substr(sourcevalue, indexvalue, NULLINT32VALUE)
lengthvalue, err = self._evaluate(arguments[2], ExpressionValueType.INT32)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"SubStr\" function optional length value, third argument: {err}")
return self._substr(sourcevalue, indexvalue, lengthvalue)
def _evaluate_trim(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Trim\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Trim\" function source value, first argument: {err}")
return self._trim(sourcevalue)
def _evaluate_trimleft(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"TrimLeft\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"TrimLeft\" function source value, first argument: {err}")
return self._trimleft(sourcevalue)
def _evaluate_trimright(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"TrimRight\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"TrimRight\" function source value, first argument: {err}")
return self._trimright(sourcevalue)
def _evaluate_upper(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if len(arguments) != 1:
return None, ValueError(f"\"Upper\" function expects 1 argument, received {len(arguments)}")
sourcevalue, err = self._evaluate(arguments[0], ExpressionValueType.STRING)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Upper\" function source value, first argument: {err}")
return self._upper(sourcevalue)
def _evaluate_utcnow(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if arguments:
return None, ValueError(f"\"UtcNow\" function expects 0 arguments, received {len(arguments)}")
return self._utcnow()
def _evaluate_operator(self, operator_expression: OperatorExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
operatortype = operator_expression.operatortype
leftvalue, err = self._evaluate(operator_expression.leftvalue)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"{normalize_enumname(operatortype)}\" operator left operand: {err}")
rightvalue, err = self._evaluate(operator_expression.rightvalue)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"{normalize_enumname(operatortype)}\" operator right operand: {err}")
valuetype, err = derive_operationvaluetype(operatortype, leftvalue.valuetype, rightvalue.valuetype)
if err is not None:
return None, EvaluateError(f"failed while deriving \"{normalize_enumname(operatortype)}\" operator value type: {err}")
if operatortype == ExpressionOperatorType.MULTIPLY:
return self._multiply_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.DIVIDE:
return self._divide_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.MODULUS:
return self._modulus_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.ADD:
return self._add_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.SUBTRACT:
return self._subtract_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.BITSHIFTLEFT:
return self._bitshiftleft_op(leftvalue, rightvalue)
if operatortype == ExpressionOperatorType.BITSHIFTRIGHT:
return self._bitshiftright_op(leftvalue, rightvalue)
if operatortype == ExpressionOperatorType.BITWISEAND:
return self._bitwiseand_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.BITWISEOR:
return self._bitwiseor_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.BITWISEXOR:
return self._bitwisexor_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.LESSTHAN:
return self._lessthan_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.LESSTHANOREQUAL:
return self._lessthanorequal_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.GREATERTHAN:
return self._greaterthan_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.GREATERTHANOREQUAL:
return self._greaterthanorequal_op(leftvalue, rightvalue, valuetype)
if operatortype == ExpressionOperatorType.EQUAL:
return self._equal_op(leftvalue, rightvalue, valuetype, False)
if operatortype == ExpressionOperatorType.EQUALEXACTMATCH:
return self._equal_op(leftvalue, rightvalue, valuetype, True)
if operatortype == ExpressionOperatorType.NOTEQUAL:
return self._notequal_op(leftvalue, rightvalue, valuetype, False)
if operatortype == ExpressionOperatorType.NOTEQUALEXACTMATCH:
return self._notequal_op(leftvalue, rightvalue, valuetype, True)
if operatortype == ExpressionOperatorType.ISNULL:
return self._isnull_op(leftvalue), None
if operatortype == ExpressionOperatorType.ISNOTNULL:
return self._isnotnull_op(leftvalue), None
if operatortype == ExpressionOperatorType.LIKE:
return self._like_op(leftvalue, rightvalue, False)
if operatortype == ExpressionOperatorType.LIKEEXACTMATCH:
return self._like_op(leftvalue, rightvalue, True)
if operatortype == ExpressionOperatorType.NOTLIKE:
return self._notlike_op(leftvalue, rightvalue, False)
if operatortype == ExpressionOperatorType.NOTLIKEEXACTMATCH:
return self._notlike_op(leftvalue, rightvalue, True)
if operatortype == ExpressionOperatorType.AND:
return self._and_op(leftvalue, rightvalue)
if operatortype == ExpressionOperatorType.OR:
return self._or_op(leftvalue, rightvalue)
return None, TypeError("unexpected operator type encountered")
# Filter Expression Function Implementations
def _abs(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
sourcevaluetype = sourcevalue.valuetype
if not is_numerictype(sourcevaluetype):
return None, TypeError("\"Abs\" function source value, first argument, must be a numeric type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return ValueExpression.nullvalue(sourcevaluetype), None
if sourcevaluetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, sourcevalue._booleanvalue()), None
if sourcevaluetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, abs(sourcevalue._int32value())), None
if sourcevaluetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, abs(sourcevalue._int64value())), None
if sourcevaluetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, abs(sourcevalue._decimalvalue())), None
if sourcevaluetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, abs(sourcevalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _ceiling(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
sourcevaluetype = sourcevalue.valuetype
if not is_numerictype(sourcevaluetype):
return None, TypeError("\"Ceiling\" function source value, first argument, must be a numeric type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return ValueExpression.nullvalue(sourcevaluetype), None
if is_integertype(sourcevaluetype):
return sourcevalue, None
if sourcevaluetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, math.ceil(sourcevalue._decimalvalue())), None
if sourcevaluetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, math.ceil(sourcevalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _coalesce(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Coalesce\" function argument 0: {err}")
if not testvalue.is_null():
return testvalue, None
for i in range(1, len(arguments)):
listvalue, err = self._evaluate(arguments[i])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"Coalesce\" function argument {i}: {err}")
if not listvalue.is_null():
return listvalue, None
return testvalue, None
def _convert(self, sourcevalue: ValueExpression, targettype: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if targettype.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Convert\" function target type, second argument, must be a \"String\"")
if targettype.is_null():
return None, TypeError("\"Convert\" function target type, second argument, is null")
targettypename = targettype._stringvalue().upper()
# Remove any "System." prefix:
targettypename = targettypename.removeprefix("SYSTEM.")
targetvaluetype = ExpressionValueType.UNDEFINED
foundvaluetype = False
for valuetype in ExpressionValueType:
if targettypename == valuetype.name:
targetvaluetype = valuetype
foundvaluetype = True
break
if not foundvaluetype:
# Handle a few common aliases
if targettypename == "SINGLE" or targettypename.startswith("FLOAT"):
targetvaluetype = ExpressionValueType.DOUBLE
foundvaluetype = True
elif targettypename == "BOOL":
targetvaluetype = ExpressionValueType.BOOLEAN
foundvaluetype = True
elif targettypename.startswith("INT") or targettypename.startswith("UINT"):
targetvaluetype = ExpressionValueType.INT64
foundvaluetype = True
elif targettypename in ["DATE", "TIME"]:
targetvaluetype = ExpressionValueType.DATETIME
foundvaluetype = True
elif targettypename == "UUID":
targetvaluetype = ExpressionValueType.GUID
foundvaluetype = True
if not foundvaluetype or targetvaluetype == ExpressionValueType.UNDEFINED:
return None, EvaluateError(f"specified \"Convert\" function target type \"{targettype._stringvalue()}\", second argument, is not supported")
return sourcevalue.convert(targetvaluetype)
def _contains(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Contains\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Contains\" function test value, second argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLBOOLVALUE, None
if testvalue.is_null():
return FALSEVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"Contains\" function ignore case, third argument, to a \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.BOOLEAN, testvalue._stringvalue().upper() in sourcevalue._stringvalue().upper()), None
return ValueExpression(ExpressionValueType.BOOLEAN, testvalue._stringvalue() in sourcevalue._stringvalue()), None
def _dateadd(self, sourcevalue: ValueExpression, addvalue: ValueExpression, intervaltype: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype not in [ExpressionValueType.DATETIME, ExpressionValueType.STRING]:
return None, TypeError("\"DateAdd\" function source value, first argument, must be a \"DateTime\" or a \"String\"")
if not is_integertype(addvalue.valuetype):
return None, TypeError("\"DateAdd\" function add value, second argument, must be an integer type")
if intervaltype.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"DateAdd\" function interval type, third argument, must be a \"String\"")
if addvalue.is_null():
return None, TypeError("\"DateAdd\" function add value second argument, is null")
if intervaltype.is_null():
return None, TypeError("\"DateAdd\" function interval type, third argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLDATETIMEVALUE, None
# DateTime parameters should support strings as well as literals
sourcevalue, err = sourcevalue.convert(ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while converting \"DateAdd\" function source value, first argument, to a \"DateTime\": {err}")
interval = TimeInterval.parse(intervaltype._stringvalue())
if interval is None:
return None, EvaluateError("failed while parsing \"DateAdd\" function interval type, third argument, as a valid time interval")
value: int = addvalue.integervalue()
if interval == TimeInterval.YEAR:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(years=value)), None
if interval == TimeInterval.MONTH:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(months=value)), None
if interval in [TimeInterval.DAYOFYEAR, TimeInterval.DAY]:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(days=value)), None
if interval == TimeInterval.WEEKDAY:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(weekday=value)), None
if interval == TimeInterval.WEEK:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(weeks=value)), None
if interval == TimeInterval.HOUR:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(hours=value)), None
if interval == TimeInterval.MINUTE:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(minutes=value)), None
if interval == TimeInterval.SECOND:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(seconds=value)), None
if interval == TimeInterval.MILLISECOND:
return ValueExpression(ExpressionValueType.DATETIME, sourcevalue._datetimevalue() + relativedelta(microseconds=value * 1000)), None
return None, TypeError("unexpected time interval encountered")
def _datediff(self, leftvalue: ValueExpression, rightvalue: ValueExpression, intervaltype: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# sourcery skip
if leftvalue.valuetype not in [ExpressionValueType.DATETIME, ExpressionValueType.STRING]:
return None, TypeError("\"DateDiff\" function left value, first argument, must be a \"DateTime\" or a \"String\"")
if rightvalue.valuetype not in [ExpressionValueType.DATETIME, ExpressionValueType.STRING]:
return None, TypeError("\"DateDiff\" function right value, second argument, must be a \"DateTime\" or a \"String\"")
if intervaltype.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"DateDiff\" function interval type, third argument, must be a \"String\"")
if intervaltype.is_null():
return None, TypeError("\"DateDiff\" function interval type, third argument, is null")
# If either test value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return NULLINT32VALUE, None
# DateTime parameters should support strings as well as literals
leftvalue, err = leftvalue.convert(ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while converting \"DateDiff\" function left value, first argument, to a \"DateTime\": {err}")
rightvalue, err = rightvalue.convert(ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while converting \"DateDiff\" function right value, second argument, to a \"DateTime\": {err}")
interval = TimeInterval.parse(intervaltype._stringvalue())
if interval is None:
return None, EvaluateError("failed while parsing \"DateDiff\" function interval type, third argument, as a valid time interval")
if interval in [TimeInterval.YEAR, TimeInterval.MONTH]:
delta = relativedelta(rightvalue._datetimevalue(), leftvalue._datetimevalue())
if interval == TimeInterval.YEAR:
return ValueExpression(ExpressionValueType.INT32, delta.years), None
return ValueExpression(ExpressionValueType.INT32, delta.months + (12 * delta.years)), None
delta = rightvalue._datetimevalue() - leftvalue._datetimevalue()
if interval in [TimeInterval.DAYOFYEAR, TimeInterval.DAY, TimeInterval.WEEKDAY]:
return ValueExpression(ExpressionValueType.INT32, delta.days), None
if interval == TimeInterval.WEEK:
return ValueExpression(ExpressionValueType.INT32, delta.days // 7), None
if interval == TimeInterval.HOUR:
return ValueExpression(ExpressionValueType.INT32, round(delta.total_seconds()) // 3600), None
if interval == TimeInterval.MINUTE:
return ValueExpression(ExpressionValueType.INT32, round(delta.total_seconds()) // 60), None
if interval == TimeInterval.SECOND:
return ValueExpression(ExpressionValueType.INT32, round(delta.total_seconds())), None
if interval == TimeInterval.MILLISECOND:
return ValueExpression(ExpressionValueType.INT32, int(delta.total_seconds()) * 1000 + delta.microseconds // 1000), None
return None, TypeError("unexpected time interval encountered")
def _datepart(self, sourcevalue: ValueExpression, intervaltype: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype not in [ExpressionValueType.DATETIME, ExpressionValueType.STRING]:
return None, TypeError("\"DatePart\" function source value, first argument, must be a \"DateTime\" or a \"String\"")
if intervaltype.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"DatePart\" function interval type, second argument, must be a \"String\"")
if intervaltype.is_null():
return None, TypeError("\"DatePart\" function interval type, second argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLINT32VALUE, None
# DateTime parameters should support strings as well as literals
sourcevalue, err = sourcevalue.convert(ExpressionValueType.DATETIME)
if err is not None:
return None, EvaluateError(f"failed while converting \"DatePart\" function source value, first argument, to a \"DateTime\": {err}")
interval = TimeInterval.parse(intervaltype._stringvalue())
if interval is None:
return None, EvaluateError(f"failed while parsing \"DatePart\" function interval type, second argument, as a valid time interval")
if interval == TimeInterval.YEAR:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().year), None
if interval == TimeInterval.MONTH:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().month), None
if interval == TimeInterval.DAYOFYEAR:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().timetuple().tm_yday), None
if interval == TimeInterval.DAY:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().day), None
if interval == TimeInterval.WEEKDAY:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().weekday() + 2), None # Starts on Monday at zero
if interval == TimeInterval.WEEK:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().isocalendar()[1]), None
if interval == TimeInterval.HOUR:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().hour), None
if interval == TimeInterval.MINUTE:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().minute), None
if interval == TimeInterval.SECOND:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().second), None
if interval == TimeInterval.MILLISECOND:
return ValueExpression(ExpressionValueType.INT32, sourcevalue._datetimevalue().microsecond / 1000), None
return None, TypeError("unexpected time interval encountered")
def _endswith(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"EndsWith\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"EndsWith\" function test value, second argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLBOOLVALUE, None
if testvalue.is_null():
return FALSEVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"EndsWith\" function ignore case, third argument, to a \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.BOOLEAN, sourcevalue._stringvalue().upper().endswith(testvalue._stringvalue().upper())), None
return ValueExpression(ExpressionValueType.BOOLEAN, sourcevalue._stringvalue().endswith(testvalue._stringvalue())), None
def _floor(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
sourcevaluetype = sourcevalue.valuetype
if not is_numerictype(sourcevaluetype):
return None, TypeError("\"Floor\" function source value, first argument, must be a numeric type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return ValueExpression.nullvalue(sourcevaluetype), None
if is_integertype(sourcevaluetype):
return sourcevalue, None
if sourcevaluetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, math.floor(sourcevalue._decimalvalue())), None
if sourcevaluetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, math.floor(sourcevalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _iif(self, testvalue: ValueExpression, truevalue: Expression, falsevalue: Expression) -> Tuple[Optional[ValueExpression], Optional[Exception]]: # sourcery skip
if testvalue.valuetype != ExpressionValueType.BOOLEAN:
return None, TypeError("\"IIf\" function test value, first argument, must be a \"Boolean\"")
# Null test expression evaluates to false, that is, false value expression
if testvalue._booleanvalue():
result, err = self._evaluate(truevalue)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IIf\" function true value, second argument: {err}")
return result, None
result, err = self._evaluate(falsevalue)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"IIf\" function false value, third argument: {err}")
return result, None
def _indexof(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"IndexOf\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"IndexOf\" function test value, second argument, must be a \"String\"")
if testvalue.is_null():
return None, TypeError("\"IndexOf\" function test value, second argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLINT32VALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"IndexOf\" function ignore case, third argument, to a \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().upper().find(testvalue._stringvalue().upper())), None
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().find(testvalue._stringvalue())), None
def _is_date(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.is_null():
return FALSEVALUE, None
if sourcevalue.valuetype == ExpressionValueType.DATETIME:
return TRUEVALUE, None
if sourcevalue.valuetype == ExpressionValueType.STRING:
try:
parser.parse(sourcevalue._stringvalue())
return TRUEVALUE, None
except Exception:
return FALSEVALUE, None
return FALSEVALUE, None
def _is_integer(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.is_null():
return FALSEVALUE, None
if is_integertype(sourcevalue.valuetype):
return TRUEVALUE, None
if sourcevalue.valuetype == ExpressionValueType.STRING:
value = sourcevalue._stringvalue()
# Shortcut for unsigned ints
if value.isnumeric():
return TRUEVALUE, None
try:
if "X" in value.upper():
int(value, base=16)
return TRUEVALUE, None
int(value)
return TRUEVALUE, None
except Exception:
return FALSEVALUE, None
return FALSEVALUE, None
def _is_guid(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.is_null():
return FALSEVALUE, None
if sourcevalue.valuetype == ExpressionValueType.GUID:
return TRUEVALUE, None
if sourcevalue.valuetype == ExpressionValueType.STRING:
try:
UUID(sourcevalue._stringvalue())
return TRUEVALUE, None
except Exception:
return FALSEVALUE, None
return FALSEVALUE, None
def _is_null(self, testvalue: ValueExpression, defaultvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if defaultvalue.is_null():
return None, TypeError("\"IsNull\" function default value, second argument, is null")
return (defaultvalue, None) if testvalue.is_null() else (testvalue, None)
def _is_numeric(self, testvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if testvalue.is_null():
return FALSEVALUE, None
if is_numerictype(testvalue.valuetype):
return TRUEVALUE, None
if testvalue.valuetype == ExpressionValueType.STRING:
try:
float(testvalue._stringvalue())
return TRUEVALUE, None
except Exception:
return FALSEVALUE, None
return FALSEVALUE, None
def _lastindexof(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"LastIndexOf\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"LastIndexOf\" function test value, second argument, must be a \"String\"")
if testvalue.is_null():
return None, TypeError("\"LastIndexOf\" function test value, second argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLINT32VALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"LastIndexOf\" function ignore case, third argument, to a \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().upper().rfind(testvalue._stringvalue().upper())), None
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().rfind(testvalue._stringvalue())), None
def _len(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Len\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLINT32VALUE, None
return ValueExpression(ExpressionValueType.INT32, len(sourcevalue._stringvalue())), None
def _lower(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Lower\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().lower()), None
def _maxof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MaxOf\" function argument 0: {err}")
for i in range(1, len(arguments)):
nextvalue, err = self._evaluate(arguments[i])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MaxOf\" function argument {i}: {err}")
valuetype, err = derive_comparison_operationvaluetype(ExpressionOperatorType.GREATERTHAN, testvalue.valuetype, nextvalue.valuetype)
if err is not None:
return None, EvaluateError(f"failed while deriving \"MaxOf\" function greater than comparison operator value type: {err}")
result, err = self._greaterthan_op(nextvalue, testvalue, valuetype)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MaxOf\" function greater than comparison operator: {err}")
if result._booleanvalue() or (testvalue.is_null() and not nextvalue.is_null()):
testvalue = nextvalue
return testvalue, None
def _minof(self, arguments: List[Expression]) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
testvalue, err = self._evaluate(arguments[0])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MinOf\" function argument 0: {err}")
for i in range(1, len(arguments)):
nextvalue, err = self._evaluate(arguments[i])
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MinOf\" function argument {i}: {err}")
valuetype, err = derive_comparison_operationvaluetype(ExpressionOperatorType.LESSTHAN, testvalue.valuetype, nextvalue.valuetype)
if err is not None:
return None, EvaluateError(f"failed while deriving \"MinOf\" function greater than comparison operator value type: {err}")
result, err = self._lessthan_op(nextvalue, testvalue, valuetype)
if err is not None:
return None, EvaluateError(f"failed while evaluating \"MinOf\" function greater than comparison operator: {err}")
if result._booleanvalue() or (testvalue.is_null() and not nextvalue.is_null()):
testvalue = nextvalue
return testvalue, None
def _now(self) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
return ValueExpression(ExpressionValueType.DATETIME, datetime.now()), None
def _nthindexof(self, sourcevalue: ValueExpression, testvalue: ValueExpression, indexvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"NthIndexOf\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"NthIndexOf\" function test value, second argument, must be a \"String\"")
if testvalue.is_null():
return None, TypeError("\"NthIndexOf\" function test value, second argument, is null")
if indexvalue.is_null():
return None, TypeError("\"NthIndexOf\" function index value, third argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLINT32VALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"NthIndexOf\" function ignore case, fourth argument, to a \"Boolean\": {err}")
if ignorecase._booleanvalue():
source = sourcevalue._stringvalue().upper()
test = testvalue._stringvalue().upper()
else:
source = sourcevalue._stringvalue()
test = testvalue._stringvalue()
return ValueExpression(ExpressionValueType.INT32, ExpressionTree._find_nthindex(source, test, indexvalue.integervalue(-1))), None
def _power(self, sourcevalue: ValueExpression, exponentvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if not is_numerictype(sourcevalue.valuetype):
return None, TypeError("\"Power\" function source value, first argument, must be a numeric type")
if not is_numerictype(exponentvalue.valuetype):
return None, TypeError("\"Power\" function exponent value, second argument, must be a numeric type")
# If source value or exponent value is Null, result is Null
if sourcevalue.is_null() or exponentvalue.is_null():
return ValueExpression.nullvalue(sourcevalue.valuetype), None
valuetype, err = derive_arithmetic_operationvaluetype(ExpressionOperatorType.MULTIPLY, sourcevalue.valuetype, exponentvalue.valuetype)
if err is not None:
return None, EvaluateError(f"failed while deriving \"Power\" function multiplicative arithmetic operation value type: {err}")
sourcevalue, err = sourcevalue.convert(valuetype)
if err is not None:
return None, EvaluateError(f"failed while converting \"Power\" function source value, first argument, to \"{valuetype}\": {err}")
exponentvalue, err = exponentvalue.convert(valuetype)
if err is not None:
return None, EvaluateError(f"failed while converting \"Power\" function exponent value, second argument, to \"{valuetype}\": {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, math.pow(sourcevalue._booleanvalue_asint(), exponentvalue._booleanvalue_asint()) != 0.0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, math.pow(sourcevalue._int32value(), exponentvalue._int32value())), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, math.pow(sourcevalue._int64value(), exponentvalue._int64value())), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, math.pow(sourcevalue._decimalvalue(), exponentvalue._decimalvalue())), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, math.pow(sourcevalue._doublevalue(), exponentvalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _regexmatch(self, regexvalue: ValueExpression, testvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
return self._evaluateregex("RegExMatch", regexvalue, testvalue, False)
def _regexval(self, regexvalue: ValueExpression, testvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
return self._evaluateregex("RegExVal", regexvalue, testvalue, True)
def _evaluateregex(self, functionname: str, regexvalue: ValueExpression, testvalue: ValueExpression, return_matchedvalue: bool) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if regexvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError(f"\"{functionname}\" function regular expression value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError(f"\"{functionname}\" function test value, second argument, must be a \"String\"")
# If regular expression value or test value is Null, result is Null
if regexvalue.is_null() or testvalue.is_null():
return (NULLSTRINGVALUE, None) if return_matchedvalue else (NULLBOOLVALUE, None)
try:
match = re.search(regexvalue._stringvalue(), testvalue._stringvalue())
if return_matchedvalue:
return (EMPTYSTRINGVALUE, None) if match is None else (ValueExpression(ExpressionValueType.STRING, match[0]), None)
return (FALSEVALUE, None) if match is None else (TRUEVALUE, None)
except Exception as ex:
return None, EvaluateError(f"failed while evaluating \"{functionname}\" function expression value, first argument: {ex}")
def _replace(self, sourcevalue: ValueExpression, testvalue: ValueExpression, replacevalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Replace\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Replace\" function test value, second argument, must be a \"String\"")
if replacevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Replace\" function replace value, third argument, must be a \"String\"")
if testvalue.is_null():
return None, TypeError("\"Replace\" function test value, second argument, is null")
if replacevalue.is_null():
return None, TypeError("\"Replace\" function replace value, third argument, is null")
# If source value, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"Replace\" function ignore case value, fourth argument, to \"Boolean\": {err}")
if ignorecase._booleanvalue():
try:
regex = re.compile(re.escape(testvalue._stringvalue()), re.IGNORECASE)
except Exception as ex:
return None, EvaluateError(f"failed while compiling \"Replace\" function case-insensitive RegEx replace expression for test value, second argument: {ex}")
return ValueExpression(ExpressionValueType.STRING, regex.sub(replacevalue._stringvalue(), sourcevalue._stringvalue())), None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().replace(testvalue._stringvalue(), replacevalue._stringvalue())), None
def _reverse(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Reverse\" function source value, first argument, must be a \"String\"")
# If source value, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue()[::-1]), None
def _round(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
sourcevaluetype = sourcevalue.valuetype
if not is_numerictype(sourcevaluetype):
return None, TypeError("\"Round\" function source value, first argument, must be a numeric type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return ValueExpression.nullvalue(sourcevaluetype), None
if is_integertype(sourcevaluetype):
return sourcevalue, None
if sourcevaluetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, sourcevalue._decimalvalue().quantize(0)), None
if sourcevaluetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, round(sourcevalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _split(self, sourcevalue: ValueExpression, delimitervalue: ValueExpression, indexvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Split\" function source value, first argument, must be a \"String\"")
if delimitervalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Split\" function delimiter value, second argument, must be a \"String\"")
if not is_integertype(indexvalue.valuetype):
return None, TypeError("\"Split\" function index value, third argument, must be an integer type")
if delimitervalue.is_null():
return None, TypeError("\"Split\" function delimiter value, second argument, is null")
if indexvalue.is_null():
return None, TypeError("\"Split\" function index value, third argument, is null")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"Split\" function ignore case value, fourth argument, to \"Boolean\": {err}")
sourcevalue = sourcevalue._stringvalue()
index = indexvalue._integervalue()
if ignorecase._booleanvalue():
start, stop, success = _split_nthindex(sourcevalue.upper(), delimitervalue._stringvalue().upper(), index)
else:
start, stop, success = _split_nthindex(sourcevalue, delimitervalue._stringvalue(), index)
return (ValueExpression(ExpressionValueType.STRING, sourcevalue[start:stop]), None) if success else (EMPTYSTRINGVALUE, None)
def _sqrt(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
sourcevaluetype = sourcevalue.valuetype
if not is_numerictype(sourcevaluetype):
return None, TypeError("\"Sqrt\" function source value, first argument, must be a numeric type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return ValueExpression.nullvalue(sourcevaluetype), None
if is_integertype(sourcevaluetype):
return ValueExpression(ExpressionValueType.DOUBLE, math.sqrt(sourcevalue.integervalue())), None
if sourcevaluetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, sourcevalue._decimalvalue().sqrt()), None
if sourcevaluetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, math.sqrt(sourcevalue._doublevalue())), None
return None, TypeError("unexpected expression value type encountered")
def _startswith(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StartsWith\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StartsWith\" function test value, second argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
if testvalue.is_null():
return FALSEVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"StartsWith\" function ignore case value, third argument, to \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.BOOLEAN, sourcevalue._stringvalue().upper().startswith(testvalue._stringvalue().upper())), None
return ValueExpression(ExpressionValueType.BOOLEAN, sourcevalue._stringvalue().startswith(testvalue._stringvalue())), None
def _strcount(self, sourcevalue: ValueExpression, testvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StrCount\" function source value, first argument, must be a \"String\"")
if testvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StrCount\" function test value, second argument, must be a \"String\"")
if sourcevalue.is_null() or testvalue.is_null():
return ValueExpression(ExpressionValueType.INT32, 0), None
findvalue = testvalue._stringvalue()
if len(findvalue) == 0:
return ValueExpression(ExpressionValueType.INT32, 0), None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"StrCount\" function ignore case value, third argument, to \"Boolean\": {err}")
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().upper().count(findvalue.upper())), None
return ValueExpression(ExpressionValueType.INT32, sourcevalue._stringvalue().count(findvalue)), None
def _strcmp(self, leftvalue: ValueExpression, rightvalue: ValueExpression, ignorecase: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if leftvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StrCmp\" function left value, first argument, must be a \"String\"")
if rightvalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"StrCmp\" function right value, second argument, must be a \"String\"")
if leftvalue.is_null() or rightvalue.is_null():
return NULLSTRINGVALUE, None
ignorecase, err = ignorecase.convert(ExpressionValueType.BOOLEAN)
if err is not None:
return None, EvaluateError(f"failed while converting \"StrCmp\" function ignore case value, third argument, to \"Boolean\": {err}")
def compare_str(left: str, right: str) -> int:
return -1 if left < right else 1 if left > right else 0
if ignorecase._booleanvalue():
return ValueExpression(ExpressionValueType.INT32, compare_str(leftvalue._stringvalue().upper(), rightvalue._stringvalue().upper())), None
return ValueExpression(ExpressionValueType.INT32, compare_str(leftvalue._stringvalue(), rightvalue._stringvalue())), None
def _substr(self, sourcevalue: ValueExpression, indexvalue: ValueExpression, lengthvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"SubStr\" function source value, first argument, must be a \"String\"")
if not is_integertype(indexvalue.valuetype):
return None, TypeError("\"SubStr\" function index value, second argument, must be an integer type")
if not is_integertype(lengthvalue.valuetype):
return None, TypeError("\"SubStr\" function length value, third argument, must be an integer type")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
sourcetext = sourcevalue._stringvalue()
index = indexvalue.integervalue()
if index < 0 or index >= len(sourcetext):
return EMPTYSTRINGVALUE, None
if not lengthvalue.is_null():
length = lengthvalue.integervalue()
if length <= 0:
return EMPTYSTRINGVALUE, None
if index + length < len(sourcetext):
return ValueExpression(ExpressionValueType.STRING, sourcetext[index:index + length]), None
return ValueExpression(ExpressionValueType.STRING, sourcetext[index:]), None
def _trim(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Trim\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().strip()), None
def _trimleft(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"TrimLeft\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().lstrip()), None
def _trimright(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"TrimRight\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().rstrip()), None
def _upper(self, sourcevalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
if sourcevalue.valuetype != ExpressionValueType.STRING:
return None, TypeError("\"Upper\" function source value, first argument, must be a \"String\"")
# If source value is Null, result is Null
if sourcevalue.is_null():
return NULLSTRINGVALUE, None
return ValueExpression(ExpressionValueType.STRING, sourcevalue._stringvalue().upper()), None
def _utcnow(self) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
return ValueExpression(ExpressionValueType.DATETIME, datetime.now(timezone.utc)), None
# Filter Expression Operator Implementations
def _convert_operands(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[ValueExpression], Optional[Exception]]:
# sourcery skip
leftvalue, err = leftvalue.convert(valuetype)
if err is not None:
return None, None, err
rightvalue, err = rightvalue.convert(valuetype)
if err is not None:
return None, None, err
return leftvalue, rightvalue, None
def _multiply_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"multiplication \"*\" operator {err}")
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() * rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() * rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, leftvalue._decimalvalue() * rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, leftvalue._doublevalue() * rightvalue._doublevalue()), None
return None, EvaluateError(f"cannot apply multiplication \"*\" operator to \"{valuetype}\"")
def _divide_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"division \"/\" operator {err}")
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() / rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() / rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, leftvalue._decimalvalue() / rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, leftvalue._doublevalue() / rightvalue._doublevalue()), None
return None, EvaluateError(f"cannot apply division \"/\" operator to \"{valuetype}\"")
def _modulus_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"modulus \"%\" operator {err}")
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() % rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() % rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, leftvalue._decimalvalue() % rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, leftvalue._doublevalue() % rightvalue._doublevalue()), None
return None, EvaluateError(f"cannot apply modulus \"%\" operator to \"{valuetype}\"")
def _add_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"addition \"+\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() + rightvalue._booleanvalue_asint()) != 0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() + rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() + rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, leftvalue._decimalvalue() + rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, leftvalue._doublevalue() + rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
return ValueExpression(ExpressionValueType.STRING, leftvalue._stringvalue() + rightvalue._stringvalue()), None
return None, EvaluateError(f"cannot apply addition \"+\" operator to \"{valuetype}\"")
def _subtract_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"subtraction \"-\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() - rightvalue._booleanvalue_asint()) != 0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() - rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() - rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.DECIMAL, leftvalue._decimalvalue() - rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.DOUBLE, leftvalue._doublevalue() - rightvalue._doublevalue()), None
return None, EvaluateError(f"cannot apply subtraction \"-\" operator to \"{valuetype}\"")
def _bitshiftleft_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left is Null, result is Null
if leftvalue.is_null():
return leftvalue, None
if not is_integertype(rightvalue.valuetype):
return None, TypeError(f"left bit-shift \"<<\" operator right operand, shift value, must be an integer")
if rightvalue.is_null():
return None, TypeError(f"left bit-shift \"<<\" operator right operand, shift value, is Null")
shiftamount = rightvalue.integervalue()
if leftvalue.valuetype == ExpressionValueType.BOOLEAN:
if shiftamount < 0:
shiftamount = INTSIZE - (abs(shiftamount) % INTSIZE)
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() << shiftamount) != 0), None
if leftvalue.valuetype == ExpressionValueType.INT32:
if shiftamount < 0:
shiftamount = 32 - (abs(shiftamount) % 32)
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() << shiftamount), None
if leftvalue.valuetype == ExpressionValueType.INT64:
if shiftamount < 0:
shiftamount = 64 - (abs(shiftamount) % 64)
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() << shiftamount), None
return None, EvaluateError(f"cannot apply left bit-shift \"<<\" operator to \"{leftvalue.valuetype}\"")
def _bitshiftright_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left is Null, result is Null
if leftvalue.is_null():
return leftvalue, None
if not is_integertype(rightvalue.valuetype):
return None, TypeError(f"right bit-shift \">>\" operator right operand, shift value, must be an integer")
if rightvalue.is_null():
return None, TypeError(f"right bit-shift \">>\" operator right operand, shift value, is Null")
shiftamount = rightvalue.integervalue()
if leftvalue.valuetype == ExpressionValueType.BOOLEAN:
if shiftamount < 0:
shiftamount = INTSIZE - (abs(shiftamount) % INTSIZE)
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() >> shiftamount) != 0), None
if leftvalue.valuetype == ExpressionValueType.INT32:
if shiftamount < 0:
shiftamount = 32 - (abs(shiftamount) % 32)
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() >> shiftamount), None
if leftvalue.valuetype == ExpressionValueType.INT64:
if shiftamount < 0:
shiftamount = 64 - (abs(shiftamount) % 64)
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() >> shiftamount), None
return None, EvaluateError(f"cannot apply right bit-shift \">>\" operator to \"{leftvalue.valuetype}\"")
def _bitwiseand_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"bitwise \"&\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() & rightvalue._booleanvalue_asint()) != 0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() & rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() & rightvalue._int64value()), None
return None, EvaluateError(f"cannot apply bitwise \"&\" operator to \"{valuetype}\"")
def _bitwiseor_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"bitwise \"|\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() | rightvalue._booleanvalue_asint()) != 0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() | rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() | rightvalue._int64value()), None
return None, EvaluateError(f"cannot apply bitwise \"|\" operator to \"{valuetype}\"")
def _bitwisexor_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(valuetype), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"bitwise \"^\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, (leftvalue._booleanvalue_asint() ^ rightvalue._booleanvalue_asint()) != 0), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.INT32, leftvalue._int32value() ^ rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.INT64, leftvalue._int64value() ^ rightvalue._int64value()), None
return None, EvaluateError(f"cannot apply bitwise \"^\" operator to \"{valuetype}\"")
def _lessthan_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"less-than \"<\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() < rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() < rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() < rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() < rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() < rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() < rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() < rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() < rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply less-than \"<\" operator to \"{valuetype}\"")
def _lessthanorequal_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"less-than-or-equal \"<=\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() <= rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() <= rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() <= rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() <= rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() <= rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() <= rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() <= rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() <= rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply less-than-or-equal \"<=\" operator to \"{valuetype}\"")
def _greaterthan_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"greater-than \">\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() > rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() > rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() > rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() > rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() > rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() > rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() > rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() > rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply greater-than \">\" operator to \"{valuetype}\"")
def _greaterthanorequal_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"greater-than-or-equal \">=\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() >= rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() >= rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() >= rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() >= rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() >= rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() >= rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() >= rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() >= rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply greater-than-or-equal \">=\" operator to \"{valuetype}\"")
def _equal_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType, exactmatch: bool) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"equal \"=\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() == rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() == rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() == rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() == rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() == rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
if exactmatch:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue() == rightvalue._stringvalue()), None
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() == rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() == rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() == rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply equal \"=\" operator to \"{valuetype}\"")
def _notequal_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType, exactmatch: bool) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return ValueExpression.nullvalue(ExpressionValueType.BOOLEAN), None
leftvalue, rightvalue, err = self._convert_operands(leftvalue, rightvalue, valuetype)
if err is not None:
return None, EvaluateError(f"not-equal \"!=\" operator {err}")
if valuetype == ExpressionValueType.BOOLEAN:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue_asint() != rightvalue._booleanvalue_asint()), None
if valuetype == ExpressionValueType.INT32:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int32value() != rightvalue._int32value()), None
if valuetype == ExpressionValueType.INT64:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._int64value() != rightvalue._int64value()), None
if valuetype == ExpressionValueType.DECIMAL:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._decimalvalue() != rightvalue._decimalvalue()), None
if valuetype == ExpressionValueType.DOUBLE:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._doublevalue() != rightvalue._doublevalue()), None
if valuetype == ExpressionValueType.STRING:
if exactmatch:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue() != rightvalue._stringvalue()), None
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._stringvalue().upper() != rightvalue._stringvalue().upper()), None
if valuetype == ExpressionValueType.GUID:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._guidvalue() != rightvalue._guidvalue()), None
if valuetype == ExpressionValueType.DATETIME:
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._datetimevalue() != rightvalue._datetimevalue()), None
return None, EvaluateError(f"cannot apply not-equal \"!=\" operator to \"{valuetype}\"")
def _isnull_op(self, value: ValueExpression) -> Optional[ValueExpression]:
return ValueExpression(ExpressionValueType.BOOLEAN, value.is_null())
def _isnotnull_op(self, value: ValueExpression) -> Optional[ValueExpression]:
return ValueExpression(ExpressionValueType.BOOLEAN, not value.is_null())
def _like_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, exactmatch: bool) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# sourcery skip
# If left is Null, result is Null
if leftvalue.is_null():
return NULLBOOLVALUE, None
if leftvalue.valuetype != ExpressionValueType.STRING or rightvalue.valuetype != ExpressionValueType.STRING:
return None, EvaluateError(f"cannot perform \"LIKE\" operation on \"{leftvalue.valuetype}\" and \"{rightvalue.valuetype}\"")
if rightvalue.is_null():
return None, TypeError(f"right operand of \"LIKE\" operation is Null")
leftoperand = leftvalue._stringvalue()
rightoperand = rightvalue._stringvalue()
testexpression = rightoperand.replace("%", "*")
startswith_wildcard = testexpression.startswith("*")
endswith_wildcard = testexpression.endswith("*")
if startswith_wildcard:
testexpression = testexpression[1:]
if endswith_wildcard and len(testexpression) > 0:
testexpression = testexpression[:-1]
# "*" or "**" expression means match everything
if len(testexpression) == 0:
return TRUEVALUE, None
# Wild cards in the middle of the string are not supported
if "*" in testexpression:
return None, EvaluateError(f"right operand of \"LIKE\" expression \"{rightoperand}\" has an invalid pattern")
if startswith_wildcard:
if exactmatch:
if leftoperand.endswith(testexpression):
return TRUEVALUE, None
else:
if leftoperand.upper().endswith(testexpression.upper()):
return TRUEVALUE, None
if endswith_wildcard:
if exactmatch:
if leftoperand.startswith(testexpression):
return TRUEVALUE, None
else:
if leftoperand.upper().startswith(testexpression.upper()):
return TRUEVALUE, None
if startswith_wildcard and endswith_wildcard:
if exactmatch:
if testexpression in leftoperand:
return TRUEVALUE, None
else:
if testexpression.upper() in leftoperand.upper():
return TRUEVALUE, None
return FALSEVALUE, None
def _notlike_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression, valuetype: ExpressionValueType, exactmatch: bool) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left is Null, result is Null
if leftvalue.is_null():
return NULLBOOLVALUE, None
likeresult, err = self._like_op(leftvalue, rightvalue, valuetype, exactmatch)
if err is not None:
return None, err
return (FALSEVALUE, None) if likeresult._booleanvalue() else (TRUEVALUE, None)
def _and_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return NULLBOOLVALUE, None
if leftvalue.valuetype != ExpressionValueType.BOOLEAN or rightvalue.valuetype != ExpressionValueType.BOOLEAN:
return None, EvaluateError(f"cannot perform \"AND\" operation on \"{leftvalue.valuetype}\" and \"{rightvalue.valuetype}\"")
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue() and rightvalue._booleanvalue()), None
def _or_op(self, leftvalue: ValueExpression, rightvalue: ValueExpression) -> Tuple[Optional[ValueExpression], Optional[Exception]]:
# If left or right value is Null, result is Null
if leftvalue.is_null() or rightvalue.is_null():
return NULLBOOLVALUE, None
if leftvalue.valuetype != ExpressionValueType.BOOLEAN or rightvalue.valuetype != ExpressionValueType.BOOLEAN:
return None, EvaluateError(f"cannot perform \"OR\" operation on \"{leftvalue.valuetype}\" and \"{rightvalue.valuetype}\"")
return ValueExpression(ExpressionValueType.BOOLEAN, leftvalue._booleanvalue() or rightvalue._booleanvalue()), None