Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize join tables from different databases: executor #10146

Merged
merged 6 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mindsdb/api/executor/sql_query/steps/join_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from mindsdb_sql.parser.ast import (
Identifier,
Identifier, BinaryOperation, Constant
)
from mindsdb_sql.planner.steps import (
JoinStep,
Expand Down Expand Up @@ -74,7 +74,11 @@ def adapt_condition(node, **kwargs):
return Identifier(parts=['table_b', col_name])

if step.query.condition is None:
raise NotSupportedYet('Unable to join table without condition')
# prevent memory overflow
if len(left_data) * len(right_data) < 10 ** 7:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are left_data and right_data dataframes? If so, then may be better to get real size (df.memory_usage(index=True, deep=True).sum()) and compare with free memory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are ResultSets

step.query.condition = BinaryOperation(op='=', args=[Constant(0), Constant(0)])
else:
raise NotSupportedYet('Unable to join table without condition')

condition = copy.deepcopy(step.query.condition)
query_traversal(condition, adapt_condition)
Expand Down
17 changes: 16 additions & 1 deletion mindsdb/api/executor/sql_query/steps/subselect_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
from mindsdb_sql.parser.ast import (
Identifier,
Select,
Star
Star,
Constant,
Parameter
)
from mindsdb_sql.planner.step_result import Result
from mindsdb_sql.planner.steps import SubSelectStep, QueryStep
from mindsdb_sql.planner.utils import query_traversal

Expand Down Expand Up @@ -41,6 +44,9 @@ def call(self, step):
def f_all_cols(node, **kwargs):
if isinstance(node, Identifier):
query_cols.add(node.parts[-1])
elif isinstance(node, Result):
prev_result = self.steps_data[node.step_num]
return Constant(prev_result.get_column_values(col_idx=0)[0])

query_traversal(query.where, f_all_cols)

Expand All @@ -50,6 +56,15 @@ def f_all_cols(node, **kwargs):
if col_name not in result_cols:
result.add_column(Column(col_name))

# inject previous step values
if isinstance(query, Select):

def inject_values(node, **kwargs):
if isinstance(node, Parameter) and isinstance(node.value, Result):
prev_result = self.steps_data[node.value.step_num]
return Constant(prev_result.get_column_values(col_idx=0)[0])
query_traversal(query, inject_values)

df = result.to_df()
res = query_df(df, query, session=self.session)

Expand Down
2 changes: 1 addition & 1 deletion requirements/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ redis >=5.0.0, < 6.0.0
walrus==0.9.3
flask-compress >= 1.0.0
appdirs >= 1.0.0
mindsdb-sql ~= 0.20.0
mindsdb-sql ~= 0.21.0
pydantic >= 2.7.0
mindsdb-evaluator >= 0.0.7, < 0.1.0
checksumdir >= 1.2.0
Expand Down
54 changes: 54 additions & 0 deletions tests/unit/executor/test_base_queires.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,60 @@ def test_complex_joins(self, data_handler):

assert row['t3a'] == 6

@patch('mindsdb.integrations.handlers.postgres_handler.Handler')
def test_joins_different_db(self, data_handler):
df1 = pd.DataFrame([
{'a': 1, 'c': 1},
{'a': 3, 'c': 2},
])
df2 = pd.DataFrame([
{'a': 6, 'c': 1},
{'a': 4, 'c': 2},
{'a': 2, 'c': 3},
])

self.set_data('tbl1', df1)
self.set_handler(data_handler, name='pg', tables={'tbl2': df2})

# --- test join table-table ---
ret = self.run_sql('''
SELECT *
FROM dummy_data.tbl1 as t1
JOIN pg.tbl2 as t2 on t1.c=t2.c
''')

# must be 2 rows
assert len(ret) == 2

# second table is called with filter
calls = data_handler().query.call_args_list
sql = calls[0][0][0].to_string()
assert sql.strip() == 'SELECT * FROM tbl2 AS t2 WHERE c IN (2, 1)'

@patch('mindsdb.integrations.handlers.postgres_handler.Handler')
def test_implicit_join(self, data_handler):
df1 = pd.DataFrame([
{'a': 1, 'c': 1},
{'a': 3, 'c': 2},
])
df2 = pd.DataFrame([
{'a': 6, 'c': 1},
{'a': 4, 'c': 2},
{'a': 2, 'c': 3},
])

self.set_data('tbl1', df1)
self.set_handler(data_handler, name='pg', tables={'tbl2': df2})

# --- test join table-table ---
ret = self.run_sql('''
SELECT * FROM dummy_data.tbl1 as t1, pg.tbl2 as t2
where t1.c=t2.c
''')

# must be 2 rows
assert len(ret) == 2

def test_complex_queries(self):

# -- set up data --
Expand Down
Loading