Skip to content

Commit

Permalink
plan union in single integration query
Browse files Browse the repository at this point in the history
  • Loading branch information
ea-rus committed Nov 11, 2024
1 parent fbc4315 commit c6983a2
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 50 deletions.
98 changes: 58 additions & 40 deletions mindsdb_sql/planner/query_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def find_objects(node, is_table, **kwargs):
query_traversal(query, find_objects)

# cte names are not mdb objects
if query.cte:
if isinstance(query, Select) and query.cte:
cte_names = [
cte.name.parts[-1]
for cte in query.cte
Expand Down Expand Up @@ -271,21 +271,21 @@ def find_selects(node, **kwargs):
return find_selects

def plan_select_identifier(self, query):
query_info = self.get_query_info(query)

if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1:
# select from predictor
return self.plan_select_from_predictor(query)
elif (
len(query_info['integrations']) == 1
and len(query_info['mdb_entities']) == 0
and len(query_info['user_functions']) == 0
):

int_name = list(query_info['integrations'])[0]
if self.integrations.get(int_name, {}).get('class_type') != 'api':
# one integration without predictors, send all query to integration
return self.plan_integration_select(query)
# query_info = self.get_query_info(query)
#
# if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1:
# # select from predictor
# return self.plan_select_from_predictor(query)
# elif (
# len(query_info['integrations']) == 1
# and len(query_info['mdb_entities']) == 0
# and len(query_info['user_functions']) == 0
# ):
#
# int_name = list(query_info['integrations'])[0]
# if self.integrations.get(int_name, {}).get('class_type') != 'api':
# # one integration without predictors, send all query to integration
# return self.plan_integration_select(query)

# find subselects
main_integration, _ = self.resolve_database_table(query.from_table)
Expand Down Expand Up @@ -380,21 +380,21 @@ def plan_api_db_select(self, query):

def plan_nested_select(self, select):

query_info = self.get_query_info(select)
# get all predictors

if (
len(query_info['mdb_entities']) == 0
and len(query_info['integrations']) == 1
and len(query_info['user_functions']) == 0
and 'files' not in query_info['integrations']
and 'views' not in query_info['integrations']
):
int_name = list(query_info['integrations'])[0]
if self.integrations.get(int_name, {}).get('class_type') != 'api':

# if no predictor inside = run as is
return self.plan_integration_nested_select(select, int_name)
# query_info = self.get_query_info(select)
# # get all predictors
#
# if (
# len(query_info['mdb_entities']) == 0
# and len(query_info['integrations']) == 1
# and len(query_info['user_functions']) == 0
# and 'files' not in query_info['integrations']
# and 'views' not in query_info['integrations']
# ):
# int_name = list(query_info['integrations'])[0]
# if self.integrations.get(int_name, {}).get('class_type') != 'api':
#
# # if no predictor inside = run as is
# return self.plan_integration_nested_select(select, int_name)

return self.plan_mdb_nested_select(select)

Expand Down Expand Up @@ -685,22 +685,38 @@ def plan_delete(self, query: Delete):
))

def plan_cte(self, query):
query_info = self.get_query_info(query)

if (
len(query_info['integrations']) == 1
and len(query_info['mdb_entities']) == 0
and len(query_info['user_functions']) == 0
):
# single integration, will be planned later
return

for cte in query.cte:
step = self.plan_select(cte.query)
name = cte.name.parts[-1]
self.cte_results[name] = step.result

def check_single_integration(self, query):
query_info = self.get_query_info(query)

# can we send all query to integration?

# one integration and not mindsdb objects in query
if (
len(query_info['mdb_entities']) == 0
and len(query_info['integrations']) == 1
and 'files' not in query_info['integrations']
and 'views' not in query_info['integrations']
and len(query_info['user_functions']) == 0
):

int_name = list(query_info['integrations'])[0]
# if is sql database
if self.integrations.get(int_name, {}).get('class_type') != 'api':

# send to this integration
self.prepare_integration_select(int_name, query)

last_step = self.plan.add_step(FetchDataframeStep(integration=int_name, query=query))
return last_step

def plan_select(self, query, integration=None):

if isinstance(query, (Union, Except, Intersect)):
return self.plan_union(query, integration=integration)

Expand Down Expand Up @@ -775,6 +791,8 @@ def from_query(self, query=None):
query = self.query

if isinstance(query, (Select, Union, Except, Intersect)):
if self.check_single_integration(query):
return self.plan
self.plan_select(query)
elif isinstance(query, CreateTable):
self.plan_create_table(query)
Expand Down
26 changes: 16 additions & 10 deletions tests/test_planner/test_integration_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def test_integration_select_subquery_in_from(self):
steps=[
FetchDataframeStep(integration='int',
query=Select(
targets=[Identifier('column1')],
targets=[Identifier('column1', alias=Identifier('column1'))],
from_table=Select(
targets=[Identifier('column1', alias=Identifier('column1'))],
from_table=Identifier('tab'),
Expand Down Expand Up @@ -378,7 +378,7 @@ def test_integration_select_default_namespace_subquery_in_from(self):
steps=[
FetchDataframeStep(integration='int',
query=Select(
targets=[Identifier('column1')],
targets=[Identifier('column1', alias=Identifier('column1')),],
from_table=Select(
targets=[Identifier('column1', alias=Identifier('column1'))],
from_table=Identifier('tab'),
Expand Down Expand Up @@ -588,20 +588,26 @@ def test_select_from_single_integration(self):
with tab2 as (
select * from int1.tabl2
)
select x from tab2
join int1.tab1 on 0=0
where x1 in (select id from int1.tab1)
limit 1
select a from (
select x from tab2
union
select y from int1.tab1
where x1 in (select id from int1.tab1)
limit 1
)
'''

sql_integration = '''
with tab2 as (
select * from tabl2
)
select x from tab2
join tab1 on 0=0
where x1 in (select id as id from tab1)
limit 1
select a as a from (
select x as x from tab2
union
select y as y from tab1
where x1 in (select id as id from tab1)
limit 1
)
'''
query = parse_sql(sql_parsed, dialect='mindsdb')

Expand Down

0 comments on commit c6983a2

Please sign in to comment.