From cd2bf554711f1120dab46aa52e006c8cfcf02f06 Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 31 Oct 2024 11:48:36 +0300 Subject: [PATCH 1/4] fix order with constant in join --- mindsdb_sql/planner/plan_join.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mindsdb_sql/planner/plan_join.py b/mindsdb_sql/planner/plan_join.py index 3089dc6..9f25460 100644 --- a/mindsdb_sql/planner/plan_join.py +++ b/mindsdb_sql/planner/plan_join.py @@ -145,7 +145,8 @@ def resolve_table(self, table): return TableInfo(integration, table, aliases, conditions=[], sub_select=sub_select) def get_table_for_column(self, column: Identifier): - + if not isinstance(column, Identifier): + return # to lowercase parts = tuple(map(str.lower, column.parts[:-1])) if parts in self.tables_idx: @@ -381,7 +382,8 @@ def process_table(self, item, query_in): order_by = [] # all order column be from this table for col in query_in.order_by: - if self.get_table_for_column(col.field).table != item.table: + table_info = self.get_table_for_column(col.field) + if table_info is None or table_info.table != item.table: order_by = False break col = copy.deepcopy(col) From b735f13caaeafec5ef939f4757c4b033c432238b Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 31 Oct 2024 17:39:28 +0300 Subject: [PATCH 2/4] get filters for fetching tables from previous table --- mindsdb_sql/planner/plan_join.py | 71 +++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) diff --git a/mindsdb_sql/planner/plan_join.py b/mindsdb_sql/planner/plan_join.py index 9f25460..09bb983 100644 --- a/mindsdb_sql/planner/plan_join.py +++ b/mindsdb_sql/planner/plan_join.py @@ -21,7 +21,7 @@ class TableInfo: sub_select: ast.ASTNode = None predictor_info: dict = None join_condition = None - + index: int = None class PlanJoin: @@ -85,12 +85,15 @@ def __init__(self, planner): # index to lookup tables self.tables_idx = None + self.tables = [] + self.tables_fetch_step = {} self.step_stack = None self.query_context = {} self.partition = None + def plan(self, query): self.tables_idx = {} join_step = self.plan_join_tables(query) @@ -161,6 +164,9 @@ def get_join_sequence(self, node, condition=None): for alias in table_info.aliases: self.tables_idx[alias] = table_info + table_info.index = len(self.tables) + self.tables.append(table_info) + table_info.predictor_info = self.planner.get_predictor(node) if condition is not None: @@ -376,6 +382,8 @@ def process_table(self, item, query_in): # not use conditions conditions = [] + conditions += self.get_filters_from_join_conditions(item) + if self.query_context['use_limit']: order_by = None if query_in.order_by is not None: @@ -408,6 +416,8 @@ def process_table(self, item, query_in): # step = self.planner.get_integration_select_step(query2) step = FetchDataframeStep(integration=item.integration, query=query2) + self.tables_fetch_step[item.index] = step + self.add_plan_step(step) self.step_stack.append(step) @@ -442,6 +452,65 @@ def _check_conditions(node, **kwargs): query_traversal(model_table.join_condition, _check_conditions) return columns_map + def get_filters_from_join_conditions(self, fetch_table): + + binary_ops = set() + conditions = [] + data_conditions = [] + + def _check_conditions(node, **kwargs): + if not isinstance(node, BinaryOperation): + return + + if node.op != '=': + binary_ops.add(node.op.lower()) + return + + arg1, arg2 = node.args + table1 = self.get_table_for_column(arg1) if isinstance(arg1, Identifier) else None + table2 = self.get_table_for_column(arg2) if isinstance(arg2, Identifier) else None + + if table1 is not fetch_table: + if table2 is not fetch_table: + return + # set our table first + table1, table2 = table2, table1 + arg1, arg2 = arg2, arg1 + + if isinstance(arg2, Constant): + conditions.append(node) + elif table2 is not None: + node.args = [arg1, arg2] + data_conditions.append(node) + + query_traversal(fetch_table.join_condition, _check_conditions) + + binary_ops.discard('and') + if len(binary_ops) > 0: + # other operations exists, skip + return + + for condition in data_conditions: + # is fetched? + arg1, arg2 = condition.args + table2 = self.get_table_for_column(arg2) + fetch_step = self.tables_fetch_step.get(table2.index) + + if fetch_step is None: + continue + + # extract distinct values + # remove alias + arg2 = Identifier(parts=[arg2.parts[-1]]) + query2 = Select(targets=[arg2], distinct=True) + subselect_step = SubSelectStep(query2, fetch_step.result) + subselect_step = self.add_plan_step(subselect_step) + + condition.args[1] = Parameter(subselect_step.result) + conditions.append(condition) + + return conditions + def process_predictor(self, item, query_in): if len(self.step_stack) == 0: raise NotImplementedError("Predictor can't be first element of join syntax") From 589b1f0d1e910e60c7e91ffec6efb2be0e87746c Mon Sep 17 00:00:00 2001 From: andrew Date: Fri, 1 Nov 2024 11:40:27 +0300 Subject: [PATCH 3/4] test fixes --- mindsdb_sql/planner/plan_join.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/mindsdb_sql/planner/plan_join.py b/mindsdb_sql/planner/plan_join.py index 09bb983..bc4bd4a 100644 --- a/mindsdb_sql/planner/plan_join.py +++ b/mindsdb_sql/planner/plan_join.py @@ -481,6 +481,7 @@ def _check_conditions(node, **kwargs): conditions.append(node) elif table2 is not None: node.args = [arg1, arg2] + node = copy.deepcopy(node) data_conditions.append(node) query_traversal(fetch_table.join_condition, _check_conditions) @@ -501,12 +502,13 @@ def _check_conditions(node, **kwargs): # extract distinct values # remove alias - arg2 = Identifier(parts=[arg2.parts[-1]]) + arg2.parts = arg2.parts[-1:] query2 = Select(targets=[arg2], distinct=True) subselect_step = SubSelectStep(query2, fetch_step.result) subselect_step = self.add_plan_step(subselect_step) condition.args[1] = Parameter(subselect_step.result) + condition.op = 'in' conditions.append(condition) return conditions From ef30b127ac87142224acbe1781105e351388527b Mon Sep 17 00:00:00 2001 From: andrew Date: Fri, 1 Nov 2024 14:35:00 +0300 Subject: [PATCH 4/4] testing fixes --- mindsdb_sql/planner/plan_join.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/mindsdb_sql/planner/plan_join.py b/mindsdb_sql/planner/plan_join.py index bc4bd4a..ea86784 100644 --- a/mindsdb_sql/planner/plan_join.py +++ b/mindsdb_sql/planner/plan_join.py @@ -480,20 +480,17 @@ def _check_conditions(node, **kwargs): if isinstance(arg2, Constant): conditions.append(node) elif table2 is not None: - node.args = [arg1, arg2] - node = copy.deepcopy(node) - data_conditions.append(node) + data_conditions.append([arg1, arg2]) query_traversal(fetch_table.join_condition, _check_conditions) binary_ops.discard('and') if len(binary_ops) > 0: # other operations exists, skip - return + return [] - for condition in data_conditions: + for arg1, arg2 in data_conditions: # is fetched? - arg1, arg2 = condition.args table2 = self.get_table_for_column(arg2) fetch_step = self.tables_fetch_step.get(table2.index) @@ -501,15 +498,21 @@ def _check_conditions(node, **kwargs): continue # extract distinct values - # remove alias - arg2.parts = arg2.parts[-1:] + # remove aliases + arg1 = Identifier(parts=[arg1.parts[-1]]) + arg2 = Identifier(parts=[arg2.parts[-1]]) + query2 = Select(targets=[arg2], distinct=True) subselect_step = SubSelectStep(query2, fetch_step.result) subselect_step = self.add_plan_step(subselect_step) - condition.args[1] = Parameter(subselect_step.result) - condition.op = 'in' - conditions.append(condition) + conditions.append(BinaryOperation( + op='in', + args=[ + arg1, + Parameter(subselect_step.result) + ] + )) return conditions