From 17598f790423581e073031261591f34a54e783a5 Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Thu, 31 Aug 2023 23:06:13 -0700 Subject: [PATCH 1/9] Add sql syntax for knowledge base --- .../parser/dialects/mindsdb/__init__.py | 2 + .../parser/dialects/mindsdb/knowledge_base.py | 78 +++++ mindsdb_sql/parser/dialects/mindsdb/lexer.py | 4 + mindsdb_sql/parser/dialects/mindsdb/parser.py | 39 +++ .../test_mindsdb/test_knowledgebase.py | 331 ++++++++++++++++++ 5 files changed, 454 insertions(+) create mode 100644 mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py create mode 100644 tests/test_parser/test_mindsdb/test_knowledgebase.py diff --git a/mindsdb_sql/parser/dialects/mindsdb/__init__.py b/mindsdb_sql/parser/dialects/mindsdb/__init__.py index ef92b64f..565ea39d 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/__init__.py +++ b/mindsdb_sql/parser/dialects/mindsdb/__init__.py @@ -16,6 +16,8 @@ from .drop_job import DropJob from .chatbot import CreateChatBot, UpdateChatBot, DropChatBot from .trigger import CreateTrigger, DropTrigger +from .knowledge_base import CreateKnowledgeBase, DropKnowledgeBase # remove it in next release CreateDatasource = CreateDatabase + diff --git a/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py new file mode 100644 index 00000000..a483e870 --- /dev/null +++ b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py @@ -0,0 +1,78 @@ +from mindsdb_sql.parser.ast.base import ASTNode +from mindsdb_sql.parser.utils import indent + + +class CreateKnowledgeBase(ASTNode): + def __init__( + self, + name, + model, + storage, + from_query=None, + params=None, + if_not_exists=False, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.name = name + self.model = model + self.storage = storage + self.params = params + self.if_not_exists = if_not_exists + self.from_query = from_query + + def to_tree(self, *args, level=0, **kwargs): + ind = indent(level) + out_str = f""" + {ind}CreateKnowledgeBase( + {ind} if_not_exists={self.if_not_exists}, + {ind} name={self.name.to_string()}, + {ind} from_query={self.from_query.to_tree(level=level+1) if self.from_query else None}, + {ind} model={self.model.to_string()}, + {ind} storage={self.storage.to_string()}, + {ind} params={self.params} + {ind}) + """ + return out_str + + def get_string(self, *args, **kwargs): + params = self.params.copy() + using_ar = [f"{k}={repr(v)}" for k, v in params.items()] + using_str = ", ".join(using_ar) + from_query_str = ( + f"FROM ({self.from_query.get_string()})" if self.from_query else "" + ) + + out_str = ( + f"CREATE KNOWLEDGE_BASE {'IF NOT EXISTS' if self.if_not_exists else ''}{self.name.to_string()} " + f"{from_query_str} " + f"MODEL {self.model.to_string()} " + f"STORAGE {self.storage.to_string()} " + f"USING {using_str}" + ) + + return out_str + + def __repr__(self) -> str: + return self.to_tree() + + +class DropKnowledgeBase(ASTNode): + def __init__(self, name, if_exists=False, *args, **kwargs): + super().__init__(*args, **kwargs) + self.name = name + self.if_exists = if_exists + + def to_tree(self, *args, level=0, **kwargs): + ind = indent(level) + out_str = ( + f"{ind}DropKnowledgeBase(" + f"{ind} if_exists={self.if_exists}," + f"name={self.name.to_string()})" + ) + return out_str + + def get_string(self, *args, **kwargs): + out_str = f'DROP KNOWLEDGE_BASE {"IF EXISTS" if self.if_exists else ""}{self.name.to_string()}' + return out_str diff --git a/mindsdb_sql/parser/dialects/mindsdb/lexer.py b/mindsdb_sql/parser/dialects/mindsdb/lexer.py index 5bc5b40b..b02ea72f 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/lexer.py +++ b/mindsdb_sql/parser/dialects/mindsdb/lexer.py @@ -30,6 +30,7 @@ class MindsDBLexer(Lexer): LATEST, HORIZON, USING, ENGINE, TRAIN, PREDICT, PARAMETERS, JOB, CHATBOT, EVERY,PROJECT, ANOMALY, DETECTION, + KNOWLEDGE_BASE, KNOWLEDGE_BASES, # SHOW/DDL Keywords @@ -115,6 +116,9 @@ class MindsDBLexer(Lexer): ANOMALY = r'\bANOMALY\b' DETECTION = r'\bDETECTION\b' + KNOWLEDGE_BASE = r'\bKNOWLEDGE[_|\s]BASE\b' + KNOWLEDGE_BASES = r'\bKNOWLEDGE[_|\s]BASES\b' + # Misc SET = r'\bSET\b' START = r'\bSTART\b' diff --git a/mindsdb_sql/parser/dialects/mindsdb/parser.py b/mindsdb_sql/parser/dialects/mindsdb/parser.py index 81058dda..509a1ea0 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/parser.py +++ b/mindsdb_sql/parser/dialects/mindsdb/parser.py @@ -16,6 +16,7 @@ from mindsdb_sql.parser.dialects.mindsdb.latest import Latest from mindsdb_sql.parser.dialects.mindsdb.evaluate import Evaluate from mindsdb_sql.parser.dialects.mindsdb.create_file import CreateFile +from mindsdb_sql.parser.dialects.mindsdb.knowledge_base import CreateKnowledgeBase, DropKnowledgeBase from mindsdb_sql.exceptions import ParsingException from mindsdb_sql.parser.dialects.mindsdb.lexer import MindsDBLexer from mindsdb_sql.parser.dialects.mindsdb.retrain_predictor import RetrainPredictor @@ -80,10 +81,47 @@ class MindsDBParser(Parser): 'update_chat_bot', 'create_trigger', 'drop_trigger', + 'create_kb', + 'drop_kb', ) def query(self, p): return p[0] + # -- Knowledge Base -- + @_( + 'CREATE KNOWLEDGE_BASE identifier MODEL identifier STORAGE identifier', + 'CREATE KNOWLEDGE_BASE identifier MODEL identifier STORAGE identifier USING kw_parameter_list', + # from select + 'CREATE KNOWLEDGE_BASE identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier', + 'CREATE KNOWLEDGE_BASE identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier USING kw_parameter_list', + 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier MODEL identifier STORAGE identifier', + 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier MODEL identifier STORAGE identifier USING kw_parameter_list', + 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier', + 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier USING kw_parameter_list', + ) + def create_kb(self, p): + params = getattr(p, 'kw_parameter_list', {}) + from_query = getattr(p, 'select', None) + name = p.identifier0 + model = p.identifier1 + storage = p.identifier2 + if_not_exists = hasattr(p, 'IF_NOT_EXISTS') + + return CreateKnowledgeBase( + name=name, + model=model, + storage=storage, + from_query=from_query, + params=params, + if_not_exists=if_not_exists + ) + + @_('DROP KNOWLEDGE_BASE identifier', + 'DROP KNOWLEDGE_BASE IF_EXISTS identifier') + def drop_kb(self, p): + if_exists = hasattr(p, 'IF_EXISTS') + return DropKnowledgeBase(name=p.identifier, if_exists=if_exists) + # -- ChatBot -- @_('CREATE CHATBOT identifier USING kw_parameter_list') def create_chat_bot(self, p): @@ -461,6 +499,7 @@ def show(self, p): 'ML_ENGINES', 'HANDLERS', 'SEARCH_PATH', + 'KNOWLEDGE_BASES', 'ALL') def show_category(self, p): return ' '.join([x for x in p]) diff --git a/tests/test_parser/test_mindsdb/test_knowledgebase.py b/tests/test_parser/test_mindsdb/test_knowledgebase.py new file mode 100644 index 00000000..fe9e7c80 --- /dev/null +++ b/tests/test_parser/test_mindsdb/test_knowledgebase.py @@ -0,0 +1,331 @@ +import pytest +from mindsdb_sql import parse_sql +from mindsdb_sql.parser.dialects.mindsdb.knowledge_base import ( + CreateKnowledgeBase, + DropKnowledgeBase, +) +from mindsdb_sql.parser.ast import ( + Select, + Identifier, + Join, + Show, + BinaryOperation, + Constant, + Star, + Delete, + Insert, + OrderBy, +) + + +def test_create_knowledeg_base(): + # create without select + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + MODEL mindsdb.my_embedding_model + STORAGE my_vector_database.some_table + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = CreateKnowledgeBase( + name=Identifier("my_knowledge_base"), + if_not_exists=False, + model=Identifier(parts=["mindsdb", "my_embedding_model"]), + storage=Identifier(parts=["my_vector_database", "some_table"]), + from_query=None, + params={}, + ) + assert ast == expected_ast + + # the order of MODEL and STORAGE should not matter + # TODO: the current syntax is sensitive to the order + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + STORAGE my_vector_database.some_table + MODEL mindsdb.my_embedding_model + """ + with pytest.raises(Exception): + ast = parse_sql(sql, dialect="mindsdb") + + # create from a query + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + FROM ( + SELECT id, content, embeddings, metadata + FROM my_table + JOIN my_embedding_model + ) + MODEL mindsdb.my_embedding_model + STORAGE my_vector_database.some_table + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = CreateKnowledgeBase( + name=Identifier("my_knowledge_base"), + if_not_exists=False, + model=Identifier(parts=["mindsdb", "my_embedding_model"]), + storage=Identifier(parts=["my_vector_database", "some_table"]), + from_query=Select( + targets=[ + Identifier("id"), + Identifier("content"), + Identifier("embeddings"), + Identifier("metadata"), + ], + from_table=Join( + left=Identifier("my_table"), + right=Identifier("my_embedding_model"), + join_type="JOIN", + ), + ), + params={}, + ) + + assert ast == expected_ast + + # create without MODEL + # TODO: this should be an error + # we may allow this in the future when we have a default model + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + STORAGE my_vector_database.some_table + """ + with pytest.raises(Exception): + ast = parse_sql(sql, dialect="mindsdb") + + # create without STORAGE + # TODO: this should be an error + # we may allow this in the future when we have a default storage + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + MODEL mindsdb.my_embedding_model + """ + with pytest.raises(Exception): + ast = parse_sql(sql, dialect="mindsdb") + + # create if not exists + sql = """ + CREATE KNOWLEDGE_BASE IF NOT EXISTS my_knowledge_base + MODEL mindsdb.my_embedding_model + STORAGE my_vector_database.some_table + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = CreateKnowledgeBase( + name=Identifier("my_knowledge_base"), + if_not_exists=True, + model=Identifier(parts=["mindsdb", "my_embedding_model"]), + storage=Identifier(parts=["my_vector_database", "some_table"]), + from_query=None, + params={}, + ) + assert ast == expected_ast + + # create with params + sql = """ + CREATE KNOWLEDGE_BASE my_knowledge_base + MODEL mindsdb.my_embedding_model + STORAGE my_vector_database.some_table + USING + some_param = 'some value', + other_param = 'other value' + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = CreateKnowledgeBase( + name=Identifier("my_knowledge_base"), + if_not_exists=False, + model=Identifier(parts=["mindsdb", "my_embedding_model"]), + storage=Identifier(parts=["my_vector_database", "some_table"]), + from_query=None, + params={"some_param": "some value", "other_param": "other value"}, + ) + assert ast == expected_ast + + +def test_drop_knowledge_base(): + # drop if exists + sql = """ + DROP KNOWLEDGE_BASE IF EXISTS my_knowledge_base + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = DropKnowledgeBase( + name=Identifier("my_knowledge_base"), if_exists=True + ) + assert ast == expected_ast + + # drop without if exists + sql = """ + DROP KNOWLEDGE_BASE my_knowledge_base + """ + ast = parse_sql(sql, dialect="mindsdb") + + expected_ast = DropKnowledgeBase( + name=Identifier("my_knowledge_base"), if_exists=False + ) + assert ast == expected_ast + + +@pytest.mark.skip(reason="not implemented") +def test_alter_knowledge_base(): + pass + + +def test_show_knowledge_base(): + sql = """ + SHOW KNOWLEDGE_BASES + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = Show( + category="KNOWLEDGE_BASES", + ) + assert ast == expected_ast + + # without underscore shall also work + sql = """ + SHOW KNOWLEDGE BASES + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = Show( + category="KNOWLEDGE BASES", + ) + assert ast == expected_ast + + +def test_select_from_knowledge_base(): + # this is no different from a regular select + sql = """ + SELECT * FROM my_knowledge_base + WHERE + query = 'some text in natural query' + AND + metadata.some_column = 'some value' + ORDER BY + distances DESC + LIMIT 10 + """ + ast = parse_sql(sql, dialect="mindsdb") + + expected_ast = Select( + targets=[Star()], + from_table=Identifier("my_knowledge_base"), + where=BinaryOperation( + op="AND", + args=[ + BinaryOperation( + op="=", + args=[Identifier("query"), Constant("some text in natural query")], + ), + BinaryOperation( + op="=", + args=[Identifier("metadata.some_column"), Constant("some value")], + ), + ], + ), + order_by=[OrderBy(field=Identifier("distances"), direction="DESC")], + limit=Constant(10), + ) + assert ast == expected_ast + + +def test_delete_from_knowledge_base(): + # this is no different from a regular delete + sql = """ + DELETE FROM my_knowledge_base + WHERE + id = 'some id' + AND + metadata.some_column = 'some value' + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = Delete( + table=Identifier("my_knowledge_base"), + where=BinaryOperation( + op="AND", + args=[ + BinaryOperation(op="=", args=[Identifier("id"), Constant("some id")]), + BinaryOperation( + op="=", + args=[Identifier("metadata.some_column"), Constant("some value")], + ), + ], + ), + ) + assert ast == expected_ast + + +def test_insert_into_knowledge_base(): + # this is no different from a regular insert + sql = """ + INSERT INTO my_knowledge_base ( + id, content, embeddings, metadata + ) + VALUES ( + 'some id', + 'some text', + '[1,2,3,4,5]', + '{"some_column": "some value"}' + ), + ( + 'some other id', + 'some other text', + '[1,2,3,4,5]', + '{"some_column": "some value"}' + ) + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = Insert( + table=Identifier("my_knowledge_base"), + columns=[ + Identifier("id"), + Identifier("content"), + Identifier("embeddings"), + Identifier("metadata"), + ], + values=[ + [ + Constant("some id"), + Constant("some text"), + Constant("[1,2,3,4,5]"), + Constant('{"some_column": "some value"}'), + ], + [ + Constant("some other id"), + Constant("some other text"), + Constant("[1,2,3,4,5]"), + Constant('{"some_column": "some value"}'), + ], + ], + ) + assert ast == expected_ast + + # insert from a select + sql = """ + INSERT INTO my_knowledge_base ( + id, content, embeddings, metadata + ) + SELECT id, content, embeddings, metadata + FROM my_table + WHERE + metadata.some_column = 'some value' + """ + ast = parse_sql(sql, dialect="mindsdb") + expected_ast = Insert( + table=Identifier("my_knowledge_base"), + columns=[ + Identifier("id"), + Identifier("content"), + Identifier("embeddings"), + Identifier("metadata"), + ], + from_select=Select( + targets=[ + Identifier("id"), + Identifier("content"), + Identifier("embeddings"), + Identifier("metadata"), + ], + from_table=Identifier("my_table"), + where=BinaryOperation( + op="=", + args=[Identifier("metadata.some_column"), Constant("some value")], + ), + ), + ) + assert ast == expected_ast From 111e5e1f4cb00d85a77232e1874fbafa710f48cb Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Fri, 1 Sep 2023 15:56:43 -0700 Subject: [PATCH 2/9] change from_query to from_select --- mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py | 4 ++-- mindsdb_sql/parser/dialects/mindsdb/parser.py | 2 +- tests/test_parser/test_mindsdb/test_knowledgebase.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py index a483e870..66f58b68 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py +++ b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py @@ -8,7 +8,7 @@ def __init__( name, model, storage, - from_query=None, + from_select=None, params=None, if_not_exists=False, *args, @@ -20,7 +20,7 @@ def __init__( self.storage = storage self.params = params self.if_not_exists = if_not_exists - self.from_query = from_query + self.from_query = from_select def to_tree(self, *args, level=0, **kwargs): ind = indent(level) diff --git a/mindsdb_sql/parser/dialects/mindsdb/parser.py b/mindsdb_sql/parser/dialects/mindsdb/parser.py index 509a1ea0..5a573610 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/parser.py +++ b/mindsdb_sql/parser/dialects/mindsdb/parser.py @@ -111,7 +111,7 @@ def create_kb(self, p): name=name, model=model, storage=storage, - from_query=from_query, + from_select=from_query, params=params, if_not_exists=if_not_exists ) diff --git a/tests/test_parser/test_mindsdb/test_knowledgebase.py b/tests/test_parser/test_mindsdb/test_knowledgebase.py index fe9e7c80..cfe68a3c 100644 --- a/tests/test_parser/test_mindsdb/test_knowledgebase.py +++ b/tests/test_parser/test_mindsdb/test_knowledgebase.py @@ -31,7 +31,7 @@ def test_create_knowledeg_base(): if_not_exists=False, model=Identifier(parts=["mindsdb", "my_embedding_model"]), storage=Identifier(parts=["my_vector_database", "some_table"]), - from_query=None, + from_select=None, params={}, ) assert ast == expected_ast @@ -63,7 +63,7 @@ def test_create_knowledeg_base(): if_not_exists=False, model=Identifier(parts=["mindsdb", "my_embedding_model"]), storage=Identifier(parts=["my_vector_database", "some_table"]), - from_query=Select( + from_select=Select( targets=[ Identifier("id"), Identifier("content"), @@ -113,7 +113,7 @@ def test_create_knowledeg_base(): if_not_exists=True, model=Identifier(parts=["mindsdb", "my_embedding_model"]), storage=Identifier(parts=["my_vector_database", "some_table"]), - from_query=None, + from_select=None, params={}, ) assert ast == expected_ast @@ -133,7 +133,7 @@ def test_create_knowledeg_base(): if_not_exists=False, model=Identifier(parts=["mindsdb", "my_embedding_model"]), storage=Identifier(parts=["my_vector_database", "some_table"]), - from_query=None, + from_select=None, params={"some_param": "some value", "other_param": "other value"}, ) assert ast == expected_ast From f5d058a7fee33a9af9ed3e1d6d09e79a00ecef9c Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Wed, 6 Sep 2023 12:39:22 -0700 Subject: [PATCH 3/9] planner for knowledgebase --- mindsdb_sql/planner/query_planner.py | 192 ++++++++++++++++++- tests/test_planner/test_knowledege_base.py | 213 +++++++++++++++++++++ 2 files changed, 403 insertions(+), 2 deletions(-) create mode 100644 tests/test_planner/test_knowledege_base.py diff --git a/mindsdb_sql/planner/query_planner.py b/mindsdb_sql/planner/query_planner.py index b7b2b0ab..24fef651 100644 --- a/mindsdb_sql/planner/query_planner.py +++ b/mindsdb_sql/planner/query_planner.py @@ -32,7 +32,9 @@ def __init__(self, integrations: list = None, predictor_namespace=None, predictor_metadata: list = None, - default_namespace: str = None): + default_namespace: str = None, + additional_metadata: list = None, + ): self.query = query self.plan = QueryPlan() @@ -90,8 +92,46 @@ def __init__(self, self.projects = list(_projects) self.databases = list(self.integrations.keys()) + self.projects + # additional metadata -- knowledge base + self.additional_metadata = {} + additional_metadata = additional_metadata or [] + for metadata in additional_metadata: + if 'integration_name' not in metadata: + metadata['integration_name'] = self.predictor_namespace + idx = f'{metadata["integration_name"]}.{metadata["name"]}'.lower() + + self._validate_knowledge_base_meta(metadata) + self.additional_metadata[idx] = metadata + self.statement = None + def _validate_knowledge_base_meta(self, metadata): + """ + Verify the entry for knowledge base metadata is valid + """ + TYPE_FIELD = "type" + if TYPE_FIELD not in metadata: + return + elif metadata[TYPE_FIELD] != "knowledge_base": + return + MODEL_FIELD = "model" + STORAGE_FIELD = "storage" + if MODEL_FIELD not in metadata: + raise PlanningException(f"Knowledge base metadata must contain a {MODEL_FIELD} field") + else: + # we enforce to specify a full qualified name for the model + # e.g., integration_name.model_name + model_name = metadata[MODEL_FIELD] + if len(model_name.split(".")) != 2: + raise PlanningException(f"Knowledge base model name must be in the format of integration_name.model_name") + + if STORAGE_FIELD not in metadata: + raise PlanningException(f"Knowledge base metadata must contain a {STORAGE_FIELD} field") + else: + storage_name = metadata[STORAGE_FIELD] + if len(storage_name.split(".")) != 2: + raise PlanningException(f"Knowledge base storage name must be in the format of integration_name.table_name") + def is_predictor(self, identifier): return self.get_predictor(identifier) is not None @@ -124,6 +164,30 @@ def get_predictor(self, identifier): info['name'] = name return info + def get_knowledge_base(self, identifier): + name_parts = list(identifier.parts) + name = name_parts[-1] + namespace = None + if len(name_parts) > 1: + namespace = name_parts[-2] + else: + if self.default_namespace is not None: + namespace = self.default_namespace + + idx_ar = [name] + if namespace is not None: + idx_ar.insert(0, namespace) + + idx = '.'.join(idx_ar).lower() + info = self.additional_metadata.get(idx) + if info is not None and info.get("type") == "knowledge_base": + return info + else: + return + + def is_knowledge_base(self, identifier): + return self.get_knowledge_base(identifier) is not None + def prepare_integration_select(self, database, query): # replacement for 'utils.recursively_disambiguate_*' functions from utils # main purpose: make tests working (don't change planner outputs) @@ -1155,6 +1219,10 @@ def plan_create_table(self, query): def plan_insert(self, query): table = query.table + if self.is_knowledge_base(table): + # knowledgebase table + return self.plan_insert_knowledge_base(query) + if query.from_select is not None: integration_name = query.table.parts[0] @@ -1210,7 +1278,12 @@ def plan_select(self, query, integration=None): from_table = query.from_table if isinstance(from_table, Identifier): - return self.plan_select_identifier(query) + # decide from_table is a knowledgebase table or a table from integration + if self.is_knowledge_base(from_table): + # knowledgebase table + return self.plan_select_knowledege_base(query) + else: + return self.plan_select_identifier(query) elif isinstance(from_table, Select): return self.plan_nested_select(query) elif isinstance(from_table, Join): @@ -1260,6 +1333,121 @@ def plan_union(self, query): return self.plan.add_step(UnionStep(left=query1.result, right=query2.result, unique=query.unique)) + + def plan_select_knowledege_base(self, query): + SEARCH_QUERY = "search_query" # TODO: need to make it as a constant + MODEL_FIELD = "model" # TODO: need to make it as a constant + STORAGE_FIELD = "storage" # TODO: need to make it as a constant + + + knowledegebase_metadata = self.get_knowledge_base(query.from_table) + vector_database_table = knowledegebase_metadata[STORAGE_FIELD] + model_name = knowledegebase_metadata[MODEL_FIELD] + + CONTENT_FIELD = knowledegebase_metadata.get("content_field") or "content" + EMBEDDINGS_FIELD = knowledegebase_metadata.get("embeddings_field") or "embeddings" + SEARCH_VECTOR_FIELD = knowledegebase_metadata.get("search_vector_field") or "search_vector" + + is_search_query_present = False + def find_search_query(node, **kwargs): + nonlocal is_search_query_present + if isinstance(node, Identifier) and node.parts[-1] == SEARCH_QUERY: + is_search_query_present = True + + # decide predictor is needed in the query + # by detecting if a where clause involving field SEARCH_QUERY is present + # if yes, then we need to add additional step to the plan + # to apply the predictor to the search query + utils.query_traversal( + query.where, + callback=find_search_query + ) + + if not is_search_query_present: + # dispatch to the underlying storage table + query.from_table = Identifier(vector_database_table) + return self.plan_select(query) + else: + # rewrite the where clause + # search_query = 'some text' + # -> + # search_vector = (select embeddings from model_name where content = 'some text') + def rewrite_search_query_clause(node, **kwargs): + if isinstance(node, BinaryOperation): + if node.args[0] == Identifier(SEARCH_QUERY): + node.args[0] = Identifier(SEARCH_VECTOR_FIELD) + node.args[1] = Select( + targets=[Identifier(EMBEDDINGS_FIELD)], + from_table=Identifier(model_name), + where=BinaryOperation( + op="=", + args=[ + Identifier(CONTENT_FIELD), + node.args[1] + ] + ) + ) + + utils.query_traversal( + query.where, + callback=rewrite_search_query_clause + ) + + # dispatch to the underlying storage table + query.from_table = Identifier(vector_database_table) + return self.plan_select(query) + + def plan_insert_knowledge_base(self, query: Insert): + metadata = self.get_knowledge_base(query.table) + STORAGE_FIELD = "storage" # TODO: need to make it as a constant + MODEL_FIELD = "model" # TODO: need to make it as a constant + EMBEDDINGS_FIELD = metadata.get("embeddings_field") or "embeddings" + + vector_database_table = metadata[STORAGE_FIELD] + model_name = metadata[MODEL_FIELD] + + query.table = Identifier(vector_database_table) + + if query.from_select is not None: + # detect if embeddings field is present in the columns list + # if so, we do not need to apply the predictor + # if not, we need to join the select with the model table + is_embeddings_field_present = False + def find_embeddings_field(node, **kwargs): + nonlocal is_embeddings_field_present + if isinstance(node, Identifier) and node.parts[-1] == EMBEDDINGS_FIELD: + is_embeddings_field_present = True + + utils.query_traversal( + query.columns, + callback=find_embeddings_field + ) + + if is_embeddings_field_present: + return self.plan_insert(query) + + # rewrite the select statement + # to join with the model table + + select: Select = query.from_select + select.targets.append(Identifier(EMBEDDINGS_FIELD)) + select.from_table = Select( + targets=copy.deepcopy(select.targets), + from_table=Join( + left=select.from_table, + right=Identifier(model_name), + join_type="JOIN" + ) + ) + + # append the embeddings field to the columns list + if query.columns: + query.columns.append(Identifier(EMBEDDINGS_FIELD)) + + return self.plan_insert(query) + else: + raise NotImplementedError("Not implemented insert without select") + # method for compatibility def from_query(self, query=None): if query is None: diff --git a/tests/test_planner/test_knowledege_base.py b/tests/test_planner/test_knowledege_base.py new file mode 100644 index 00000000..806cf6ef --- /dev/null +++ b/tests/test_planner/test_knowledege_base.py @@ -0,0 +1,213 @@ +# test planning for knowledge base related queries + +import pytest +from mindsdb_sql.parser.ast import * +from mindsdb_sql.planner import plan_query +from mindsdb_sql import parse_sql +from mindsdb_sql.planner.step_result import Result +from mindsdb_sql.planner.steps import * +from functools import partial + + +@pytest.fixture +def planner_context(): + integrations = [ + { + "name": "my_chromadb", + "type": "data", + }, + { + "name": "my_database", + "type": "data", + }, + ] + + predictors = [ + { + "name": "my_model", + "integration_name": "mindsdb", + }, + ] + + additional_context = [ + { + "name": "my_kb", + "type": "knowledge_base", + "model": "mindsdb.my_model", + "storage": "my_chromadb.my_table", + "search_vector_field": "search_vector", + "embeddings_field": "embeddings", + "content_field": "content", + } + ] + + return integrations, predictors, additional_context + + +def plan_sql(sql, *args, **kwargs): + return plan_query(parse_sql(sql, dialect="mindsdb"), *args, **kwargs) + + +def test_insert_into_kb(planner_context): + integration_context, predictor_context, additional_context = planner_context + _plan_sql = partial( + plan_sql, + default_namespace="mindsdb", + integrations=integration_context, + predictor_metadata=predictor_context, + additional_metadata=additional_context, + ) + + # insert into kb with values + sql = """ + INSERT INTO my_kb + (id, content, metadata) + VALUES + (1, 'hello world', '{"a": 1, "b": 2}'), + (2, 'hello world', '{"a": 1, "b": 2}'), + (3, 'hello world', '{"a": 1, "b": 2}'); + """ + # this will dispatch the underlying dataframes to the underlying model + # then it will dispatch the query to the underlying storage + # TODO: need to figure out what to do with this situation + + # insert into kb with select + sql = """ + INSERT INTO my_kb + (id, content, metadata) + SELECT + id, content, metadata + FROM my_database.my_table + """ + # this will join the subselect with the underlying model + # then it will dispatch the query to the underlying storage + equivalent_sql = """ + INSERT INTO my_chromadb.my_table + (id, content, metadata, embeddings) + SELECT + id, content, metadata, embeddings + FROM ( + SELECT + id, content, metadata, embeddings + FROM my_database.my_table + JOIN mindsdb.my_model + ) + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + +def test_select_from_kb(planner_context): + integration_context, predictor_context, additional_context = planner_context + _plan_sql = partial( + plan_sql, + default_namespace="mindsdb", + integrations=integration_context, + predictor_metadata=predictor_context, + additional_metadata=additional_context, + ) + + # select from kb without where + sql = """ + SELECT + id, content, embeddings, metadata + FROM my_kb + """ + # this will dispatch the query to the underlying storage + equivalent_sql = """ + SELECT + id, content, embeddings, metadata + FROM my_chromadb.my_table + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + # select from kb with search_query + sql = """ + SELECT + id, content, embeddings, metadata + FROM my_kb + WHERE + search_query = 'hello world' + """ + # this will dispatch the search_query to the underlying model + # then it will dispatch the query to the underlying storage + equivalent_sql = """ + SELECT + id, content, embeddings, metadata + FROM my_chromadb.my_table + WHERE + search_vector = ( + SELECT + embeddings + FROM mindsdb.my_model + WHERE + content = 'hello world' + ) + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + # select from kb with no search_query and just metadata query + sql = """ + SELECT + id, content, embeddings, metadata + FROM my_kb + WHERE + `metadata.a` = 1 + """ + # this will dispatch the whole query to the underlying storage + equivalent_sql = """ + SELECT + id, content, embeddings, metadata + FROM my_chromadb.my_table + WHERE + `metadata.a` = 1 + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + # select from kb with search_query and metadata query + sql = """ + SELECT + id, content, embeddings, metadata + FROM my_kb + WHERE + search_query = 'hello world' + AND + `metadata.a` = 1 + """ + # this will dispatch the search_query to the underlying model + # then it will dispatch the query to the underlying storage + equivalent_sql = """ + SELECT + id, content, embeddings, metadata + FROM my_chromadb.my_table + WHERE + search_vector = ( + SELECT + embeddings + FROM mindsdb.my_model + WHERE + content = 'hello world' + ) + AND + `metadata.a` = 1 + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + +@pytest.mark.skip(reason="not implemented") +def test_update_kb(): + ... From 24c3c8b7e3bf246d8a5b45323b8f703fc605533e Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Wed, 6 Sep 2023 14:56:23 -0700 Subject: [PATCH 4/9] support insert from values --- mindsdb_sql/planner/query_planner.py | 45 +++++++++++++++++++++- tests/test_planner/test_knowledege_base.py | 5 +-- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/mindsdb_sql/planner/query_planner.py b/mindsdb_sql/planner/query_planner.py index 24fef651..aaffa516 100644 --- a/mindsdb_sql/planner/query_planner.py +++ b/mindsdb_sql/planner/query_planner.py @@ -1446,7 +1446,50 @@ def find_embeddings_field(node, **kwargs): return self.plan_insert(query) else: - raise NotImplementedError("Not implemented insert without select") + if not query.columns: + raise PlanningException("Columns list is empty when using values") + + keys = [column.name for column in query.columns] + is_embeddings_field_present = EMBEDDINGS_FIELD in keys + + query.table = Identifier(vector_database_table) + # directly dispatch to the underlying storage table + if is_embeddings_field_present: + return self.plan_insert(query) + + # if the embeddings field is not present in the columns list + # we need to wrap values in ast.Data + # join it with a model table + # modify the query using from_table + # and dispatch to the underlying storage table + + records = [] + _unwrap_constant_or_self = lambda node: node.value if isinstance(node, Constant) else node + for row in query.values: + records.append( + dict( + zip( + keys, + map(_unwrap_constant_or_self, row) + ) + ) + ) + + data = ast.Data(records, alias=Identifier("data")) + predictor_select = Select( + targets=[Identifier(col.name) for col in query.columns] + [Identifier(EMBEDDINGS_FIELD)], + from_table=Join( + left=data, + right=Identifier(model_name), + join_type="JOIN" + ) + ) + + query.columns += [ast.TableColumn(name=EMBEDDINGS_FIELD)] + query.from_select = predictor_select + query.values = None + + return self.plan_insert(query) # method for compatibility def from_query(self, query=None): diff --git a/tests/test_planner/test_knowledege_base.py b/tests/test_planner/test_knowledege_base.py index 806cf6ef..5017a3bc 100644 --- a/tests/test_planner/test_knowledege_base.py +++ b/tests/test_planner/test_knowledege_base.py @@ -67,9 +67,8 @@ def test_insert_into_kb(planner_context): (2, 'hello world', '{"a": 1, "b": 2}'), (3, 'hello world', '{"a": 1, "b": 2}'); """ - # this will dispatch the underlying dataframes to the underlying model - # then it will dispatch the query to the underlying storage - # TODO: need to figure out what to do with this situation + plan = _plan_sql(sql) + assert len(plan.steps) > 0 # TODO: better to specify t the detail of the plan # insert into kb with select sql = """ From c33adc5ac5cf157e57d9dc398805279edf7e1ced Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Tue, 12 Sep 2023 11:33:50 -0700 Subject: [PATCH 5/9] Add delete planning --- mindsdb_sql/planner/query_planner.py | 12 ++++++ tests/test_planner/test_knowledege_base.py | 43 ++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/mindsdb_sql/planner/query_planner.py b/mindsdb_sql/planner/query_planner.py index aaffa516..d9d3ca75 100644 --- a/mindsdb_sql/planner/query_planner.py +++ b/mindsdb_sql/planner/query_planner.py @@ -1258,6 +1258,9 @@ def plan_update(self, query): )) def plan_delete(self, query: Delete): + if self.is_knowledge_base(query.table): + # knowledgebase table + return self.plan_delete_knowledge_base(query) # find subselects main_integration, _ = self.resolve_database_table(query.table) @@ -1491,6 +1494,15 @@ def find_embeddings_field(node, **kwargs): return self.plan_insert(query) + def plan_delete_knowledge_base(self, query: Delete): + metadata = self.get_knowledge_base(query.table) + STORAGE_FIELD = "storage" + + vector_database_table = metadata[STORAGE_FIELD] + query.table = Identifier(vector_database_table) + + return self.plan_delete(query) + # method for compatibility def from_query(self, query=None): if query is None: diff --git a/tests/test_planner/test_knowledege_base.py b/tests/test_planner/test_knowledege_base.py index 5017a3bc..9af4ec65 100644 --- a/tests/test_planner/test_knowledege_base.py +++ b/tests/test_planner/test_knowledege_base.py @@ -210,3 +210,46 @@ def test_select_from_kb(planner_context): @pytest.mark.skip(reason="not implemented") def test_update_kb(): ... + + +def test_delete_from_kb(planner_context): + integration_context, predictor_context, additional_context = planner_context + _plan_sql = partial( + plan_sql, + default_namespace="mindsdb", + integrations=integration_context, + predictor_metadata=predictor_context, + additional_metadata=additional_context, + ) + + sql = """ + DELETE FROM my_kb + WHERE + id = 1 + """ + # this will dispatch the delete to the underlying storage + equivalent_sql = """ + DELETE FROM my_chromadb.my_table + WHERE + id = 1 + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps + + sql = """ + DELETE FROM my_kb + WHERE + `metadata.a` = 1 + """ + # this will dispatch the delete to the underlying storage + equivalent_sql = """ + DELETE FROM my_chromadb.my_table + WHERE + `metadata.a` = 1 + """ + plan = _plan_sql(sql) + expected_plan = _plan_sql(equivalent_sql) + + assert plan.steps == expected_plan.steps From a91c9bbebcbf0b6a42091efb73f5737093024c1c Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Mon, 18 Sep 2023 15:52:35 -0700 Subject: [PATCH 6/9] change create kb syntax + docstring --- .../parser/dialects/mindsdb/knowledge_base.py | 26 ++++++++- mindsdb_sql/parser/dialects/mindsdb/parser.py | 56 ++++++++++++++----- .../test_mindsdb/test_knowledgebase.py | 45 ++++++++++----- 3 files changed, 95 insertions(+), 32 deletions(-) diff --git a/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py index 66f58b68..f7a462c1 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py +++ b/mindsdb_sql/parser/dialects/mindsdb/knowledge_base.py @@ -3,6 +3,9 @@ class CreateKnowledgeBase(ASTNode): + """ + Create a new knowledge base + """ def __init__( self, name, @@ -14,6 +17,15 @@ def __init__( *args, **kwargs, ): + """ + Args: + name: Identifier -- name of the knowledge base + model: Identifier -- name of the model to use + storage: Identifier -- name of the storage to use + from_select: SelectStatement -- select statement to use as the source of the knowledge base + params: dict -- additional parameters to pass to the knowledge base. E.g., chunking strategy, etc. + if_not_exists: bool -- if True, do not raise an error if the knowledge base already exists + """ super().__init__(*args, **kwargs) self.name = name self.model = model @@ -47,9 +59,9 @@ def get_string(self, *args, **kwargs): out_str = ( f"CREATE KNOWLEDGE_BASE {'IF NOT EXISTS' if self.if_not_exists else ''}{self.name.to_string()} " f"{from_query_str} " - f"MODEL {self.model.to_string()} " - f"STORAGE {self.storage.to_string()} " - f"USING {using_str}" + f"USING {using_str}," + f" MODEL = {self.model.to_string()}, " + f" STORAGE {self.storage.to_string()} " ) return out_str @@ -59,7 +71,15 @@ def __repr__(self) -> str: class DropKnowledgeBase(ASTNode): + """ + Delete a knowledge base + """ def __init__(self, name, if_exists=False, *args, **kwargs): + """ + Args: + name: Identifier -- name of the knowledge base + if_exists: bool -- if True, do not raise an error if the knowledge base does not exist + """ super().__init__(*args, **kwargs) self.name = name self.if_exists = if_exists diff --git a/mindsdb_sql/parser/dialects/mindsdb/parser.py b/mindsdb_sql/parser/dialects/mindsdb/parser.py index 5a573610..3f665223 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/parser.py +++ b/mindsdb_sql/parser/dialects/mindsdb/parser.py @@ -89,23 +89,29 @@ def query(self, p): # -- Knowledge Base -- @_( - 'CREATE KNOWLEDGE_BASE identifier MODEL identifier STORAGE identifier', - 'CREATE KNOWLEDGE_BASE identifier MODEL identifier STORAGE identifier USING kw_parameter_list', + 'CREATE KNOWLEDGE_BASE if_not_exists_or_empty identifier USING kw_parameter_list', # from select - 'CREATE KNOWLEDGE_BASE identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier', - 'CREATE KNOWLEDGE_BASE identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier USING kw_parameter_list', - 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier MODEL identifier STORAGE identifier', - 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier MODEL identifier STORAGE identifier USING kw_parameter_list', - 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier', - 'CREATE KNOWLEDGE_BASE IF_NOT_EXISTS identifier FROM LPAREN select RPAREN MODEL identifier STORAGE identifier USING kw_parameter_list', + 'CREATE KNOWLEDGE_BASE if_not_exists_or_empty identifier FROM LPAREN select RPAREN USING kw_parameter_list', + 'CREATE KNOWLEDGE_BASE if_not_exists_or_empty identifier FROM LPAREN select RPAREN', ) def create_kb(self, p): params = getattr(p, 'kw_parameter_list', {}) from_query = getattr(p, 'select', None) - name = p.identifier0 - model = p.identifier1 - storage = p.identifier2 - if_not_exists = hasattr(p, 'IF_NOT_EXISTS') + name = p.identifier + # check model and storage are in params + model = params.pop('model', None) or params.pop('MODEL', None) # case insensitive + storage = params.pop('storage', None) or params.pop('STORAGE', None) # case insensitive + if not model: + if isinstance(model, str): + # convert to identifier + model = Identifier(model) + raise ParsingException('Missing model parameter') + if not storage: + if isinstance(storage, str): + # convert to identifier + storage = Identifier(storage) + raise ParsingException('Missing storage parameter') + if_not_exists = p.if_not_exists_or_empty return CreateKnowledgeBase( name=name, @@ -116,6 +122,7 @@ def create_kb(self, p): if_not_exists=if_not_exists ) + @_('DROP KNOWLEDGE_BASE identifier', 'DROP KNOWLEDGE_BASE IF_EXISTS identifier') def drop_kb(self, p): @@ -1465,9 +1472,12 @@ def kw_parameter_list(self, p): return params @_('identifier EQUALS object', - 'identifier EQUALS json_value') + 'identifier EQUALS json_value', + 'identifier EQUALS identifier') def kw_parameter(self, p): - key = '.'.join(p.identifier.parts) + key = getattr(p, 'identifier', None) or getattr(p, 'identifier0', None) + assert key is not None + key = '.'.join(key.parts) return {key: p[2]} # json @@ -1661,6 +1671,24 @@ def raw_query(self, p): def raw_query(self, p): return p[0] + p[1] + @_( + 'IF_NOT_EXISTS', + 'empty' + ) + def if_not_exists_or_empty(self, p): + if hasattr(p, 'IF_NOT_EXISTS'): + return True + return False + + @_( + 'IF_EXISTS', + 'empty' + ) + def if_exists_or_empty(self, p): + if hasattr(p, 'IF_EXISTS'): + return True + return False + @_(*all_tokens_list) def raw_query(self, p): return p._slice diff --git a/tests/test_parser/test_mindsdb/test_knowledgebase.py b/tests/test_parser/test_mindsdb/test_knowledgebase.py index cfe68a3c..50812217 100644 --- a/tests/test_parser/test_mindsdb/test_knowledgebase.py +++ b/tests/test_parser/test_mindsdb/test_knowledgebase.py @@ -22,8 +22,9 @@ def test_create_knowledeg_base(): # create without select sql = """ CREATE KNOWLEDGE_BASE my_knowledge_base - MODEL mindsdb.my_embedding_model - STORAGE my_vector_database.some_table + USING + MODEL=mindsdb.my_embedding_model, + STORAGE = my_vector_database.some_table """ ast = parse_sql(sql, dialect="mindsdb") expected_ast = CreateKnowledgeBase( @@ -36,15 +37,25 @@ def test_create_knowledeg_base(): ) assert ast == expected_ast + # using the alias KNOWLEDGE BASE without underscore shall also work + sql = """ + CREATE KNOWLEDGE BASE my_knowledge_base + USING + MODEL=mindsdb.my_embedding_model, + STORAGE = my_vector_database.some_table + """ + ast = parse_sql(sql, dialect="mindsdb") + assert ast == expected_ast + # the order of MODEL and STORAGE should not matter - # TODO: the current syntax is sensitive to the order sql = """ CREATE KNOWLEDGE_BASE my_knowledge_base - STORAGE my_vector_database.some_table - MODEL mindsdb.my_embedding_model + USING + STORAGE = my_vector_database.some_table, + MODEL = mindsdb.my_embedding_model """ - with pytest.raises(Exception): - ast = parse_sql(sql, dialect="mindsdb") + ast = parse_sql(sql, dialect="mindsdb") + assert ast == expected_ast # create from a query sql = """ @@ -54,8 +65,9 @@ def test_create_knowledeg_base(): FROM my_table JOIN my_embedding_model ) - MODEL mindsdb.my_embedding_model - STORAGE my_vector_database.some_table + USING + MODEL = mindsdb.my_embedding_model, + STORAGE = my_vector_database.some_table """ ast = parse_sql(sql, dialect="mindsdb") expected_ast = CreateKnowledgeBase( @@ -86,7 +98,8 @@ def test_create_knowledeg_base(): # we may allow this in the future when we have a default model sql = """ CREATE KNOWLEDGE_BASE my_knowledge_base - STORAGE my_vector_database.some_table + USING + STORAGE = my_vector_database.some_table """ with pytest.raises(Exception): ast = parse_sql(sql, dialect="mindsdb") @@ -96,7 +109,8 @@ def test_create_knowledeg_base(): # we may allow this in the future when we have a default storage sql = """ CREATE KNOWLEDGE_BASE my_knowledge_base - MODEL mindsdb.my_embedding_model + USING + MODEL = mindsdb.my_embedding_model """ with pytest.raises(Exception): ast = parse_sql(sql, dialect="mindsdb") @@ -104,8 +118,9 @@ def test_create_knowledeg_base(): # create if not exists sql = """ CREATE KNOWLEDGE_BASE IF NOT EXISTS my_knowledge_base - MODEL mindsdb.my_embedding_model - STORAGE my_vector_database.some_table + USING + MODEL = mindsdb.my_embedding_model, + STORAGE = my_vector_database.some_table """ ast = parse_sql(sql, dialect="mindsdb") expected_ast = CreateKnowledgeBase( @@ -121,9 +136,9 @@ def test_create_knowledeg_base(): # create with params sql = """ CREATE KNOWLEDGE_BASE my_knowledge_base - MODEL mindsdb.my_embedding_model - STORAGE my_vector_database.some_table USING + MODEL = mindsdb.my_embedding_model, + STORAGE = my_vector_database.some_table, some_param = 'some value', other_param = 'other value' """ From 9f577abb3ee4f797adc83f300db9791beab1b6be Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Wed, 20 Sep 2023 13:16:41 -0700 Subject: [PATCH 7/9] revert planner --- mindsdb_sql/planner/query_planner.py | 247 +-------------------------- 1 file changed, 2 insertions(+), 245 deletions(-) diff --git a/mindsdb_sql/planner/query_planner.py b/mindsdb_sql/planner/query_planner.py index d9d3ca75..b7b2b0ab 100644 --- a/mindsdb_sql/planner/query_planner.py +++ b/mindsdb_sql/planner/query_planner.py @@ -32,9 +32,7 @@ def __init__(self, integrations: list = None, predictor_namespace=None, predictor_metadata: list = None, - default_namespace: str = None, - additional_metadata: list = None, - ): + default_namespace: str = None): self.query = query self.plan = QueryPlan() @@ -92,46 +90,8 @@ def __init__(self, self.projects = list(_projects) self.databases = list(self.integrations.keys()) + self.projects - # additional metadata -- knowledge base - self.additional_metadata = {} - additional_metadata = additional_metadata or [] - for metadata in additional_metadata: - if 'integration_name' not in metadata: - metadata['integration_name'] = self.predictor_namespace - idx = f'{metadata["integration_name"]}.{metadata["name"]}'.lower() - - self._validate_knowledge_base_meta(metadata) - self.additional_metadata[idx] = metadata - self.statement = None - def _validate_knowledge_base_meta(self, metadata): - """ - Verify the entry for knowledge base metadata is valid - """ - TYPE_FIELD = "type" - if TYPE_FIELD not in metadata: - return - elif metadata[TYPE_FIELD] != "knowledge_base": - return - MODEL_FIELD = "model" - STORAGE_FIELD = "storage" - if MODEL_FIELD not in metadata: - raise PlanningException(f"Knowledge base metadata must contain a {MODEL_FIELD} field") - else: - # we enforce to specify a full qualified name for the model - # e.g., integration_name.model_name - model_name = metadata[MODEL_FIELD] - if len(model_name.split(".")) != 2: - raise PlanningException(f"Knowledge base model name must be in the format of integration_name.model_name") - - if STORAGE_FIELD not in metadata: - raise PlanningException(f"Knowledge base metadata must contain a {STORAGE_FIELD} field") - else: - storage_name = metadata[STORAGE_FIELD] - if len(storage_name.split(".")) != 2: - raise PlanningException(f"Knowledge base storage name must be in the format of integration_name.table_name") - def is_predictor(self, identifier): return self.get_predictor(identifier) is not None @@ -164,30 +124,6 @@ def get_predictor(self, identifier): info['name'] = name return info - def get_knowledge_base(self, identifier): - name_parts = list(identifier.parts) - name = name_parts[-1] - namespace = None - if len(name_parts) > 1: - namespace = name_parts[-2] - else: - if self.default_namespace is not None: - namespace = self.default_namespace - - idx_ar = [name] - if namespace is not None: - idx_ar.insert(0, namespace) - - idx = '.'.join(idx_ar).lower() - info = self.additional_metadata.get(idx) - if info is not None and info.get("type") == "knowledge_base": - return info - else: - return - - def is_knowledge_base(self, identifier): - return self.get_knowledge_base(identifier) is not None - def prepare_integration_select(self, database, query): # replacement for 'utils.recursively_disambiguate_*' functions from utils # main purpose: make tests working (don't change planner outputs) @@ -1219,10 +1155,6 @@ def plan_create_table(self, query): def plan_insert(self, query): table = query.table - if self.is_knowledge_base(table): - # knowledgebase table - return self.plan_insert_knowledge_base(query) - if query.from_select is not None: integration_name = query.table.parts[0] @@ -1258,9 +1190,6 @@ def plan_update(self, query): )) def plan_delete(self, query: Delete): - if self.is_knowledge_base(query.table): - # knowledgebase table - return self.plan_delete_knowledge_base(query) # find subselects main_integration, _ = self.resolve_database_table(query.table) @@ -1281,12 +1210,7 @@ def plan_select(self, query, integration=None): from_table = query.from_table if isinstance(from_table, Identifier): - # decide from_table is a knowledgebase table or a table from integration - if self.is_knowledge_base(from_table): - # knowledgebase table - return self.plan_select_knowledege_base(query) - else: - return self.plan_select_identifier(query) + return self.plan_select_identifier(query) elif isinstance(from_table, Select): return self.plan_nested_select(query) elif isinstance(from_table, Join): @@ -1336,173 +1260,6 @@ def plan_union(self, query): return self.plan.add_step(UnionStep(left=query1.result, right=query2.result, unique=query.unique)) - - def plan_select_knowledege_base(self, query): - SEARCH_QUERY = "search_query" # TODO: need to make it as a constant - MODEL_FIELD = "model" # TODO: need to make it as a constant - STORAGE_FIELD = "storage" # TODO: need to make it as a constant - - - knowledegebase_metadata = self.get_knowledge_base(query.from_table) - vector_database_table = knowledegebase_metadata[STORAGE_FIELD] - model_name = knowledegebase_metadata[MODEL_FIELD] - - CONTENT_FIELD = knowledegebase_metadata.get("content_field") or "content" - EMBEDDINGS_FIELD = knowledegebase_metadata.get("embeddings_field") or "embeddings" - SEARCH_VECTOR_FIELD = knowledegebase_metadata.get("search_vector_field") or "search_vector" - - is_search_query_present = False - def find_search_query(node, **kwargs): - nonlocal is_search_query_present - if isinstance(node, Identifier) and node.parts[-1] == SEARCH_QUERY: - is_search_query_present = True - - # decide predictor is needed in the query - # by detecting if a where clause involving field SEARCH_QUERY is present - # if yes, then we need to add additional step to the plan - # to apply the predictor to the search query - utils.query_traversal( - query.where, - callback=find_search_query - ) - - if not is_search_query_present: - # dispatch to the underlying storage table - query.from_table = Identifier(vector_database_table) - return self.plan_select(query) - else: - # rewrite the where clause - # search_query = 'some text' - # -> - # search_vector = (select embeddings from model_name where content = 'some text') - def rewrite_search_query_clause(node, **kwargs): - if isinstance(node, BinaryOperation): - if node.args[0] == Identifier(SEARCH_QUERY): - node.args[0] = Identifier(SEARCH_VECTOR_FIELD) - node.args[1] = Select( - targets=[Identifier(EMBEDDINGS_FIELD)], - from_table=Identifier(model_name), - where=BinaryOperation( - op="=", - args=[ - Identifier(CONTENT_FIELD), - node.args[1] - ] - ) - ) - - utils.query_traversal( - query.where, - callback=rewrite_search_query_clause - ) - - # dispatch to the underlying storage table - query.from_table = Identifier(vector_database_table) - return self.plan_select(query) - - def plan_insert_knowledge_base(self, query: Insert): - metadata = self.get_knowledge_base(query.table) - STORAGE_FIELD = "storage" # TODO: need to make it as a constant - MODEL_FIELD = "model" # TODO: need to make it as a constant - EMBEDDINGS_FIELD = metadata.get("embeddings_field") or "embeddings" - - vector_database_table = metadata[STORAGE_FIELD] - model_name = metadata[MODEL_FIELD] - - query.table = Identifier(vector_database_table) - - if query.from_select is not None: - # detect if embeddings field is present in the columns list - # if so, we do not need to apply the predictor - # if not, we need to join the select with the model table - is_embeddings_field_present = False - def find_embeddings_field(node, **kwargs): - nonlocal is_embeddings_field_present - if isinstance(node, Identifier) and node.parts[-1] == EMBEDDINGS_FIELD: - is_embeddings_field_present = True - - utils.query_traversal( - query.columns, - callback=find_embeddings_field - ) - - if is_embeddings_field_present: - return self.plan_insert(query) - - # rewrite the select statement - # to join with the model table - - select: Select = query.from_select - select.targets.append(Identifier(EMBEDDINGS_FIELD)) - select.from_table = Select( - targets=copy.deepcopy(select.targets), - from_table=Join( - left=select.from_table, - right=Identifier(model_name), - join_type="JOIN" - ) - ) - - # append the embeddings field to the columns list - if query.columns: - query.columns.append(Identifier(EMBEDDINGS_FIELD)) - - return self.plan_insert(query) - else: - if not query.columns: - raise PlanningException("Columns list is empty when using values") - - keys = [column.name for column in query.columns] - is_embeddings_field_present = EMBEDDINGS_FIELD in keys - - query.table = Identifier(vector_database_table) - # directly dispatch to the underlying storage table - if is_embeddings_field_present: - return self.plan_insert(query) - - # if the embeddings field is not present in the columns list - # we need to wrap values in ast.Data - # join it with a model table - # modify the query using from_table - # and dispatch to the underlying storage table - - records = [] - _unwrap_constant_or_self = lambda node: node.value if isinstance(node, Constant) else node - for row in query.values: - records.append( - dict( - zip( - keys, - map(_unwrap_constant_or_self, row) - ) - ) - ) - - data = ast.Data(records, alias=Identifier("data")) - predictor_select = Select( - targets=[Identifier(col.name) for col in query.columns] + [Identifier(EMBEDDINGS_FIELD)], - from_table=Join( - left=data, - right=Identifier(model_name), - join_type="JOIN" - ) - ) - - query.columns += [ast.TableColumn(name=EMBEDDINGS_FIELD)] - query.from_select = predictor_select - query.values = None - - return self.plan_insert(query) - - def plan_delete_knowledge_base(self, query: Delete): - metadata = self.get_knowledge_base(query.table) - STORAGE_FIELD = "storage" - - vector_database_table = metadata[STORAGE_FIELD] - query.table = Identifier(vector_database_table) - - return self.plan_delete(query) - # method for compatibility def from_query(self, query=None): if query is None: From ce5bd2971620723c604362091daac658c1780a59 Mon Sep 17 00:00:00 2001 From: Yuhui Shi Date: Wed, 20 Sep 2023 13:16:50 -0700 Subject: [PATCH 8/9] delete planne test --- tests/test_planner/test_knowledege_base.py | 255 --------------------- 1 file changed, 255 deletions(-) delete mode 100644 tests/test_planner/test_knowledege_base.py diff --git a/tests/test_planner/test_knowledege_base.py b/tests/test_planner/test_knowledege_base.py deleted file mode 100644 index 9af4ec65..00000000 --- a/tests/test_planner/test_knowledege_base.py +++ /dev/null @@ -1,255 +0,0 @@ -# test planning for knowledge base related queries - -import pytest -from mindsdb_sql.parser.ast import * -from mindsdb_sql.planner import plan_query -from mindsdb_sql import parse_sql -from mindsdb_sql.planner.step_result import Result -from mindsdb_sql.planner.steps import * -from functools import partial - - -@pytest.fixture -def planner_context(): - integrations = [ - { - "name": "my_chromadb", - "type": "data", - }, - { - "name": "my_database", - "type": "data", - }, - ] - - predictors = [ - { - "name": "my_model", - "integration_name": "mindsdb", - }, - ] - - additional_context = [ - { - "name": "my_kb", - "type": "knowledge_base", - "model": "mindsdb.my_model", - "storage": "my_chromadb.my_table", - "search_vector_field": "search_vector", - "embeddings_field": "embeddings", - "content_field": "content", - } - ] - - return integrations, predictors, additional_context - - -def plan_sql(sql, *args, **kwargs): - return plan_query(parse_sql(sql, dialect="mindsdb"), *args, **kwargs) - - -def test_insert_into_kb(planner_context): - integration_context, predictor_context, additional_context = planner_context - _plan_sql = partial( - plan_sql, - default_namespace="mindsdb", - integrations=integration_context, - predictor_metadata=predictor_context, - additional_metadata=additional_context, - ) - - # insert into kb with values - sql = """ - INSERT INTO my_kb - (id, content, metadata) - VALUES - (1, 'hello world', '{"a": 1, "b": 2}'), - (2, 'hello world', '{"a": 1, "b": 2}'), - (3, 'hello world', '{"a": 1, "b": 2}'); - """ - plan = _plan_sql(sql) - assert len(plan.steps) > 0 # TODO: better to specify t the detail of the plan - - # insert into kb with select - sql = """ - INSERT INTO my_kb - (id, content, metadata) - SELECT - id, content, metadata - FROM my_database.my_table - """ - # this will join the subselect with the underlying model - # then it will dispatch the query to the underlying storage - equivalent_sql = """ - INSERT INTO my_chromadb.my_table - (id, content, metadata, embeddings) - SELECT - id, content, metadata, embeddings - FROM ( - SELECT - id, content, metadata, embeddings - FROM my_database.my_table - JOIN mindsdb.my_model - ) - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - -def test_select_from_kb(planner_context): - integration_context, predictor_context, additional_context = planner_context - _plan_sql = partial( - plan_sql, - default_namespace="mindsdb", - integrations=integration_context, - predictor_metadata=predictor_context, - additional_metadata=additional_context, - ) - - # select from kb without where - sql = """ - SELECT - id, content, embeddings, metadata - FROM my_kb - """ - # this will dispatch the query to the underlying storage - equivalent_sql = """ - SELECT - id, content, embeddings, metadata - FROM my_chromadb.my_table - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - # select from kb with search_query - sql = """ - SELECT - id, content, embeddings, metadata - FROM my_kb - WHERE - search_query = 'hello world' - """ - # this will dispatch the search_query to the underlying model - # then it will dispatch the query to the underlying storage - equivalent_sql = """ - SELECT - id, content, embeddings, metadata - FROM my_chromadb.my_table - WHERE - search_vector = ( - SELECT - embeddings - FROM mindsdb.my_model - WHERE - content = 'hello world' - ) - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - # select from kb with no search_query and just metadata query - sql = """ - SELECT - id, content, embeddings, metadata - FROM my_kb - WHERE - `metadata.a` = 1 - """ - # this will dispatch the whole query to the underlying storage - equivalent_sql = """ - SELECT - id, content, embeddings, metadata - FROM my_chromadb.my_table - WHERE - `metadata.a` = 1 - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - # select from kb with search_query and metadata query - sql = """ - SELECT - id, content, embeddings, metadata - FROM my_kb - WHERE - search_query = 'hello world' - AND - `metadata.a` = 1 - """ - # this will dispatch the search_query to the underlying model - # then it will dispatch the query to the underlying storage - equivalent_sql = """ - SELECT - id, content, embeddings, metadata - FROM my_chromadb.my_table - WHERE - search_vector = ( - SELECT - embeddings - FROM mindsdb.my_model - WHERE - content = 'hello world' - ) - AND - `metadata.a` = 1 - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - -@pytest.mark.skip(reason="not implemented") -def test_update_kb(): - ... - - -def test_delete_from_kb(planner_context): - integration_context, predictor_context, additional_context = planner_context - _plan_sql = partial( - plan_sql, - default_namespace="mindsdb", - integrations=integration_context, - predictor_metadata=predictor_context, - additional_metadata=additional_context, - ) - - sql = """ - DELETE FROM my_kb - WHERE - id = 1 - """ - # this will dispatch the delete to the underlying storage - equivalent_sql = """ - DELETE FROM my_chromadb.my_table - WHERE - id = 1 - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps - - sql = """ - DELETE FROM my_kb - WHERE - `metadata.a` = 1 - """ - # this will dispatch the delete to the underlying storage - equivalent_sql = """ - DELETE FROM my_chromadb.my_table - WHERE - `metadata.a` = 1 - """ - plan = _plan_sql(sql) - expected_plan = _plan_sql(equivalent_sql) - - assert plan.steps == expected_plan.steps From 673afc776a9c5bbbfc640e50b0b4812a6a7e8377 Mon Sep 17 00:00:00 2001 From: andrew Date: Thu, 21 Sep 2023 15:02:23 +0300 Subject: [PATCH 9/9] more wide usage of if_not_exists_or_empty if_exists_or_empty --- mindsdb_sql/parser/dialects/mindsdb/parser.py | 137 +++++++----------- 1 file changed, 50 insertions(+), 87 deletions(-) diff --git a/mindsdb_sql/parser/dialects/mindsdb/parser.py b/mindsdb_sql/parser/dialects/mindsdb/parser.py index 3f665223..c8e91070 100644 --- a/mindsdb_sql/parser/dialects/mindsdb/parser.py +++ b/mindsdb_sql/parser/dialects/mindsdb/parser.py @@ -123,11 +123,9 @@ def create_kb(self, p): ) - @_('DROP KNOWLEDGE_BASE identifier', - 'DROP KNOWLEDGE_BASE IF_EXISTS identifier') + @_('DROP KNOWLEDGE_BASE if_exists_or_empty identifier') def drop_kb(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropKnowledgeBase(name=p.identifier, if_exists=if_exists) + return DropKnowledgeBase(name=p.identifier, if_exists=p.if_exists_or_empty) # -- ChatBot -- @_('CREATE CHATBOT identifier USING kw_parameter_list') @@ -175,14 +173,10 @@ def drop_trigger(self, p): # -- Jobs -- - @_('CREATE JOB identifier LPAREN raw_query RPAREN job_schedule', - 'CREATE JOB identifier AS LPAREN raw_query RPAREN job_schedule', - 'CREATE JOB identifier LPAREN raw_query RPAREN', - 'CREATE JOB identifier AS LPAREN raw_query RPAREN', - 'CREATE JOB IF_NOT_EXISTS identifier LPAREN raw_query RPAREN job_schedule', - 'CREATE JOB IF_NOT_EXISTS identifier AS LPAREN raw_query RPAREN job_schedule', - 'CREATE JOB IF_NOT_EXISTS identifier LPAREN raw_query RPAREN', - 'CREATE JOB IF_NOT_EXISTS identifier AS LPAREN raw_query RPAREN') + @_('CREATE JOB if_not_exists_or_empty identifier LPAREN raw_query RPAREN job_schedule', + 'CREATE JOB if_not_exists_or_empty identifier AS LPAREN raw_query RPAREN job_schedule', + 'CREATE JOB if_not_exists_or_empty identifier LPAREN raw_query RPAREN', + 'CREATE JOB if_not_exists_or_empty identifier AS LPAREN raw_query RPAREN') def create_job(self, p): query_str = tokens_to_string(p.raw_query) @@ -209,7 +203,7 @@ def create_job(self, p): start_str=start_str, end_str=end_str, repeat_str=repeat_str, - if_not_exists=hasattr(p, 'IF_NOT_EXISTS') + if_not_exists=p.if_not_exists_or_empty ) @_('START string', @@ -241,10 +235,9 @@ def job_schedule(self, p): schedule = {param: value} return schedule - @_('DROP JOB identifier', - 'DROP JOB IF_EXISTS identifier') + @_('DROP JOB if_exists_or_empty identifier') def drop_job(self, p): - return DropJob(name=p.identifier, if_exists=hasattr(p, 'IF_EXISTS')) + return DropJob(name=p.identifier, if_exists=p.if_exists_or_empty) # Explain @@ -259,27 +252,20 @@ def alter_table(self, p): arg=' '.join([p.id0, p.id1])) # DROP VEW - @_('DROP VIEW identifier', - 'DROP VIEW IF_EXISTS identifier') + @_('DROP VIEW if_exists_or_empty identifier') def drop_view(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropView([p.identifier], if_exists=if_exists) + return DropView([p.identifier], if_exists=p.if_exists_or_empty) - @_('DROP VIEW enumeration', - 'DROP VIEW IF_EXISTS enumeration') + @_('DROP VIEW if_exists_or_empty enumeration') def drop_view(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropView(p.enumeration, if_exists=if_exists) + return DropView(p.enumeration, if_exists=p.if_exists_or_empty) # DROP DATABASE - @_('DROP DATABASE identifier', - 'DROP DATABASE IF_EXISTS identifier', - 'DROP PROJECT identifier', - 'DROP SCHEMA identifier', - 'DROP SCHEMA IF_EXISTS identifier') + @_('DROP DATABASE if_exists_or_empty identifier', + 'DROP PROJECT if_exists_or_empty identifier', + 'DROP SCHEMA if_exists_or_empty identifier') def drop_database(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropDatabase(name=p.identifier, if_exists=if_exists) + return DropDatabase(name=p.identifier, if_exists=p.if_exists_or_empty) # Transactions @@ -630,17 +616,15 @@ def use(self, p): return Use(value=p.identifier) # CREATE VIEW - @_('CREATE VIEW identifier create_view_from_table_or_nothing AS LPAREN raw_query RPAREN', - 'CREATE VIEW identifier create_view_from_table_or_nothing LPAREN raw_query RPAREN', - 'CREATE VIEW IF_NOT_EXISTS identifier create_view_from_table_or_nothing AS LPAREN raw_query RPAREN', - 'CREATE VIEW IF_NOT_EXISTS identifier create_view_from_table_or_nothing LPAREN raw_query RPAREN') + @_('CREATE VIEW if_not_exists_or_empty identifier create_view_from_table_or_nothing AS LPAREN raw_query RPAREN', + 'CREATE VIEW if_not_exists_or_empty identifier create_view_from_table_or_nothing LPAREN raw_query RPAREN') def create_view(self, p): query_str = tokens_to_string(p.raw_query) return CreateView(name=p.identifier, from_table=p.create_view_from_table_or_nothing, query_str=query_str, - if_not_exists=hasattr(p, 'IF_NOT_EXISTS')) + if_not_exists=p.if_not_exists_or_empty) @_('FROM identifier') def create_view_from_table_or_nothing(self, p): @@ -651,38 +635,30 @@ def create_view_from_table_or_nothing(self, p): pass # DROP PREDICTOR - @_('DROP PREDICTOR identifier', - 'DROP MODEL identifier', - 'DROP PREDICTOR IF_EXISTS identifier', - 'DROP MODEL IF_EXISTS identifier') + @_('DROP PREDICTOR if_exists_or_empty identifier', + 'DROP MODEL if_exists_or_empty identifier') def drop_predictor(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropPredictor(p.identifier, if_exists=if_exists) + return DropPredictor(p.identifier, if_exists=p.if_exists_or_empty) # DROP DATASOURCE - @_('DROP DATASOURCE identifier', - 'DROP DATASOURCE IF_EXISTS identifier') + @_('DROP DATASOURCE if_exists_or_empty identifier') def drop_datasource(self, p): - return DropDatasource(p.identifier, if_exists=hasattr(p, 'IF_EXISTS')) + return DropDatasource(p.identifier, if_exists=p.if_exists_or_empty) # DROP DATASET - @_('DROP DATASET identifier', - 'DROP DATASET IF_EXISTS identifier') + @_('DROP DATASET if_exists_or_empty identifier') def drop_dataset(self, p): - return DropDataset(p.identifier, if_exists=hasattr(p, 'IF_EXISTS')) + return DropDataset(p.identifier, if_exists=p.if_exists_or_empty) # DROP TABLE - @_('DROP TABLE IF_EXISTS identifier', - 'DROP TABLE identifier') + @_('DROP TABLE if_exists_or_empty identifier') def drop_table(self, p): - if_exists = hasattr(p, 'IF_EXISTS') - return DropTables(tables=[p.identifier], if_exists=if_exists) + return DropTables(tables=[p.identifier], if_exists=p.if_exists_or_empty) # create table - @_('CREATE TABLE identifier select', - 'CREATE TABLE identifier LPAREN select RPAREN', - 'CREATE TABLE IF_NOT_EXISTS identifier select', - 'CREATE TABLE IF_NOT_EXISTS identifier LPAREN select RPAREN', + @_('CREATE TABLE identifier select', # TODO tests failing without it + 'CREATE TABLE if_not_exists_or_empty identifier select', + 'CREATE TABLE if_not_exists_or_empty identifier LPAREN select RPAREN', 'CREATE OR REPLACE TABLE identifier select', 'CREATE OR REPLACE TABLE identifier LPAREN select RPAREN') def create_table(self, p): @@ -694,7 +670,7 @@ def create_table(self, p): name=p.identifier, is_replace=is_replace, from_select=p.select, - if_not_exists=hasattr(p, 'IF_NOT_EXISTS') + if_not_exists=getattr(p, 'if_not_exists_or_empty', False) ) @_('CREATE TABLE identifier USING kw_parameter_list') @@ -733,14 +709,10 @@ def create_predictor(self, p): p.create_predictor.order_by = p.ordering_terms return p.create_predictor - @_('CREATE PREDICTOR identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', - 'CREATE PREDICTOR identifier PREDICT result_columns', - 'CREATE PREDICTOR IF_NOT_EXISTS identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', - 'CREATE PREDICTOR IF_NOT_EXISTS identifier PREDICT result_columns', - 'CREATE MODEL identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', - 'CREATE MODEL identifier PREDICT result_columns', - 'CREATE MODEL IF_NOT_EXISTS identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', - 'CREATE MODEL IF_NOT_EXISTS identifier PREDICT result_columns') + @_('CREATE PREDICTOR if_not_exists_or_empty identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', + 'CREATE PREDICTOR if_not_exists_or_empty identifier PREDICT result_columns', + 'CREATE MODEL if_not_exists_or_empty identifier FROM identifier LPAREN raw_query RPAREN PREDICT result_columns', + 'CREATE MODEL if_not_exists_or_empty identifier PREDICT result_columns') def create_predictor(self, p): query_str = None if hasattr(p, 'raw_query'): @@ -757,7 +729,7 @@ def create_predictor(self, p): integration_name=getattr(p, 'identifier1', None), query_str=query_str, targets=p.result_columns, - if_not_exists=hasattr(p, 'IF_NOT_EXISTS') + if_not_exists=p.if_not_exists_or_empty ) # Typed models @@ -861,21 +833,18 @@ def evaluate(self, p): # ML ENGINE # CREATE - @_('CREATE ML_ENGINE identifier FROM id USING kw_parameter_list', - 'CREATE ML_ENGINE identifier FROM id', - 'CREATE ML_ENGINE IF_NOT_EXISTS identifier FROM id USING kw_parameter_list', - 'CREATE ML_ENGINE IF_NOT_EXISTS identifier FROM id') + @_('CREATE ML_ENGINE if_not_exists_or_empty identifier FROM id USING kw_parameter_list', + 'CREATE ML_ENGINE if_not_exists_or_empty identifier FROM id') def create_integration(self, p): return CreateMLEngine(name=p.identifier, handler=p.id, params=getattr(p, 'kw_parameter_list', None), - if_not_exists=hasattr(p, 'IF_NOT_EXISTS')) + if_not_exists=p.if_not_exists_or_empty) # DROP - @_('DROP ML_ENGINE identifier', - 'DROP ML_ENGINE IF_EXISTS identifier') + @_('DROP ML_ENGINE if_exists_or_empty identifier') def create_integration(self, p): - return DropMLEngine(name=p.identifier, if_exists=hasattr(p, 'IF_EXISTS')) + return DropMLEngine(name=p.identifier, if_exists=p.if_exists_or_empty) # CREATE INTEGRATION @_('CREATE database_engine', @@ -902,23 +871,17 @@ def create_integration(self, p): parameters=parameters, if_not_exists=p.database_engine['if_not_exists']) - @_('DATABASE identifier', - 'PROJECT identifier', - 'DATABASE identifier ENGINE string', - 'DATABASE identifier ENGINE EQUALS string', - 'DATABASE identifier WITH ENGINE string', - 'DATABASE identifier WITH ENGINE EQUALS string', - 'DATABASE IF_NOT_EXISTS identifier', - 'DATABASE IF_NOT_EXISTS identifier ENGINE string', - 'DATABASE IF_NOT_EXISTS identifier ENGINE EQUALS string', - 'DATABASE IF_NOT_EXISTS identifier WITH ENGINE string', - 'DATABASE IF_NOT_EXISTS identifier WITH ENGINE EQUALS string', - 'PROJECT IF_NOT_EXISTS identifier') + @_('DATABASE if_not_exists_or_empty identifier', + 'DATABASE if_not_exists_or_empty identifier ENGINE string', + 'DATABASE if_not_exists_or_empty identifier ENGINE EQUALS string', + 'DATABASE if_not_exists_or_empty identifier WITH ENGINE string', + 'DATABASE if_not_exists_or_empty identifier WITH ENGINE EQUALS string', + 'PROJECT if_not_exists_or_empty identifier') def database_engine(self, p): engine = None if hasattr(p, 'string'): engine = p.string - return {'identifier': p.identifier, 'engine': engine, 'if_not_exists': hasattr(p, 'IF_NOT_EXISTS')} + return {'identifier': p.identifier, 'engine': engine, 'if_not_exists': p.if_not_exists_or_empty} # UNION / UNION ALL @_('select UNION select')