Skip to content

Commit

Permalink
Merge branch 'staging' into planner-improvents
Browse files Browse the repository at this point in the history
# Conflicts:
#	mindsdb_sql/planner/query_planner.py
  • Loading branch information
ea-rus committed Dec 30, 2023
2 parents a1115d4 + d52abbf commit 8aa5ed6
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 24 deletions.
2 changes: 1 addition & 1 deletion mindsdb_sql/__about__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__title__ = 'mindsdb_sql'
__package_name__ = 'mindsdb_sql'
__version__ = '0.8.0'
__version__ = '0.9.0'
__description__ = "Pure python SQL parser"
__email__ = "[email protected]"
__author__ = 'MindsDB Inc'
Expand Down
10 changes: 5 additions & 5 deletions mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class CreateKnowledgeBase(ASTNode):
def __init__(
self,
name,
model,
model=None,
storage=None,
from_select=None,
params=None,
Expand Down Expand Up @@ -37,13 +37,13 @@ def __init__(
def to_tree(self, *args, level=0, **kwargs):
ind = indent(level)
storage_str = f"{ind} storage={self.storage.to_string()},\n" if self.storage else ""
model_str = f"{ind} model={self.model.to_string()},\n" if self.model else ""
out_str = f"""
{ind}CreateKnowledgeBase(
{ind} if_not_exists={self.if_not_exists},
{ind} name={self.name.to_string()},
{ind} from_query={self.from_query.to_tree(level=level + 1) if self.from_query else None},
{ind} model={self.model.to_string()},
{storage_str}{ind} params={self.params}
{model_str}{storage_str}{ind} params={self.params}
{ind})
"""
return out_str
Expand All @@ -56,13 +56,13 @@ def get_string(self, *args, **kwargs):
f"FROM ({self.from_query.get_string()})" if self.from_query else ""
)
storage_str = f" STORAGE = {self.storage.to_string()}" if self.storage else ""
model_str = f" MODEL = {self.model.to_string()},\n" if self.model else ""

out_str = (
f"CREATE KNOWLEDGE_BASE {'IF NOT EXISTS' if self.if_not_exists else ''}{self.name.to_string()} "
f"{from_query_str} "
f"USING {using_str},"
f" MODEL = {self.model.to_string()}, "
f"{storage_str}"
f"{model_str}{storage_str}"
)

return out_str
Expand Down
3 changes: 1 addition & 2 deletions mindsdb_sql/parser/dialects/mindsdb/latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, alias=None, parentheses=False, **kwargs)

def to_tree(self, *args, level=0, **kwargs):
return '\t'*level + 'Latest()'
return '\t'*level + 'Latest()'

def get_string(self, *args, **kwargs):
return 'LATEST'

8 changes: 3 additions & 5 deletions mindsdb_sql/parser/dialects/mindsdb/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,9 @@ def create_kb(self, p):
# convert to identifier
storage = Identifier(storage)

if not model:
if isinstance(model, str):
# convert to identifier
model = Identifier(model)
raise ParsingException('Missing model parameter')
if isinstance(model, str):
# convert to identifier
model = Identifier(model)

if_not_exists = p.if_not_exists_or_empty

Expand Down
22 changes: 20 additions & 2 deletions mindsdb_sql/planner/plan_join_ts.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,33 @@ def add_order_not_null(condition):
)
integration_select.where = find_and_remove_time_filter(integration_select.where, time_filter)
integration_selects = [integration_select]
elif isinstance(time_filter, BinaryOperation) and time_filter.op == '=' and time_filter.args[1] == Latest():
elif isinstance(time_filter, BinaryOperation) and time_filter.op == '=':
integration_select = Select(targets=[Star()],
from_table=table,
where=preparation_where,
modifiers=query_modifiers,
order_by=order_by,
limit=Constant(predictor_window),
)
integration_select.where = find_and_remove_time_filter(integration_select.where, time_filter)

if type(time_filter.args[1]) is Latest:
integration_select.where = find_and_remove_time_filter(integration_select.where, time_filter)
else:
time_filter_date = time_filter.args[1]
preparation_time_filter = BinaryOperation(
'<=',
args=[
Identifier(predictor_time_column_name),
time_filter_date
]
)
integration_select.where = add_order_not_null(
replace_time_filter(
preparation_where2, time_filter, preparation_time_filter
)
)
time_filter.op = '>'

integration_selects = [integration_select]
elif isinstance(time_filter, BinaryOperation) and time_filter.op in ('>', '>='):
time_filter_date = time_filter.args[1]
Expand Down
36 changes: 31 additions & 5 deletions tests/test_parser/test_mindsdb/test_knowledgebase.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
)


def test_create_knowledeg_base():
def test_create_knowledge_base():
# create without select
sql = """
CREATE KNOWLEDGE_BASE my_knowledge_base
Expand Down Expand Up @@ -94,15 +94,24 @@ def test_create_knowledeg_base():
assert ast == expected_ast

# create without MODEL
# TODO: this should be an error
# we may allow this in the future when we have a default model
sql = """
CREATE KNOWLEDGE_BASE my_knowledge_base
USING
STORAGE = my_vector_database.some_table
"""
with pytest.raises(Exception):
ast = parse_sql(sql, dialect="mindsdb")

expected_ast = CreateKnowledgeBase(
name=Identifier("my_knowledge_base"),
if_not_exists=False,
model=None,
storage=Identifier(parts=["my_vector_database", "some_table"]),
from_select=None,
params={},
)

ast = parse_sql(sql, dialect="mindsdb")

assert ast == expected_ast

# create without STORAGE
sql = """
Expand Down Expand Up @@ -141,6 +150,23 @@ def test_create_knowledeg_base():
)
assert ast == expected_ast

# create without USING ie no storage or model
# todo currently this is not supported by the parser

# sql = """
# CREATE KNOWLEDGE_BASE my_knowledge_base;
# """
# ast = parse_sql(sql, dialect="mindsdb")
# expected_ast = CreateKnowledgeBase(
# name=Identifier("my_knowledge_base"),
# if_not_exists=False,
# model=None,
# storage=None,
# from_select=None,
# params={},
# )
# assert ast == expected_ast

# create with params
sql = """
CREATE KNOWLEDGE_BASE my_knowledge_base
Expand Down
86 changes: 82 additions & 4 deletions tests/test_planner/test_ts_predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@

import pytest

from mindsdb_sql import parse_sql
from mindsdb_sql import parse_sql, NativeQuery, OrderBy, NullConstant
from mindsdb_sql.exceptions import PlanningException
from mindsdb_sql.parser.ast import *
from mindsdb_sql.parser.ast import Select, Star, Identifier, Join, Constant, BinaryOperation, Update, BetweenOperation
from mindsdb_sql.parser.dialects.mindsdb.latest import Latest
from mindsdb_sql.planner import plan_query
from mindsdb_sql.planner.query_plan import QueryPlan
from mindsdb_sql.planner.step_result import Result
from mindsdb_sql.planner.steps import *
from mindsdb_sql.planner.steps import (
JoinStep,
SaveToTable,
ProjectStep,
InsertToTable,
MapReduceStep,
MultipleSteps,
UpdateToTable,
LimitOffsetStep,
FetchDataframeStep,
ApplyTimeseriesPredictorStep
)
from mindsdb_sql.parser.utils import JoinType


Expand Down Expand Up @@ -725,7 +736,74 @@ def test_join_predictor_timeseries_concrete_date_less_or_equal(self):

for i in range(len(plan.steps)):
assert plan.steps[i] == expected_plan.steps[i]


def test_join_predictor_timeseries_concrete_date_equal(self):
predictor_window = 10
group_by_column = 'vendor_id'

sql = """
select * from
mysql.data.ny_output as ta
join mindsdb.tp3 as tb
where
ta.pickup_hour = 10
and ta.vendor_id = 1
"""

query = parse_sql(sql, dialect='mindsdb')

expected_plan = QueryPlan(
steps=[
FetchDataframeStep(integration='mysql',
query=Select(targets=[
Identifier(parts=['ta', group_by_column], alias=Identifier(group_by_column))],
from_table=Identifier('data.ny_output', alias=Identifier('ta')),
where=BinaryOperation('=', args=[Identifier('ta.vendor_id'), Constant(1)]),
distinct=True,
)
),
MapReduceStep(
values=Result(0),
reduce='union',
step=FetchDataframeStep(
integration='mysql',
query=parse_sql("""
SELECT * FROM data.ny_output AS ta
WHERE ta.pickup_hour <= 10 AND ta.vendor_id = 1 AND ta.pickup_hour is not null and
ta.vendor_id = '$var[vendor_id]'
ORDER BY ta.pickup_hour DESC LIMIT 10
"""),
),
),
ApplyTimeseriesPredictorStep(
output_time_filter=BinaryOperation('>', args=[Identifier('ta.pickup_hour'), Constant(10)]),
namespace='mindsdb',
predictor=Identifier('tp3', alias=Identifier('tb')),
dataframe=Result(1),
),
JoinStep(left=Result(1),
right=Result(2),
query=Join(
right=Identifier('result_2'),
left=Identifier('result_1'),
join_type=JoinType.JOIN)
),
ProjectStep(dataframe=Result(3), columns=[Star()]),
],
)

plan = plan_query(query,
integrations=['mysql'],
predictor_namespace='mindsdb',
predictor_metadata={
'tp3': {'timeseries': True,
'order_by_column': 'pickup_hour',
'group_by_columns': [group_by_column],
'window': predictor_window}
})

for i in range(len(plan.steps)):
assert plan.steps[i] == expected_plan.steps[i]

def test_join_predictor_timeseries_error_on_nested_where(self):
query = Select(targets=[Identifier('pred.time'), Identifier('pred.price')],
Expand Down

0 comments on commit 8aa5ed6

Please sign in to comment.