diff --git a/README.md b/README.md index 1927acd..aea6708 100644 --- a/README.md +++ b/README.md @@ -18,23 +18,23 @@ You can establish a connection to the MindsDB server using the SDK. Here are som ```python import mindsdb_sdk -server = mindsdb_sdk.connect() -server = mindsdb_sdk.connect('http://127.0.0.1:47334') +con = mindsdb_sdk.connect() +con = mindsdb_sdk.connect('http://127.0.0.1:47334') ``` #### Connect to the MindsDB Cloud ```python import mindsdb_sdk -server = mindsdb_sdk.connect(login='a@b.com', password='-') -server = mindsdb_sdk.connect('https://cloud.mindsdb.com', login='a@b.com', password='-') +con = mindsdb_sdk.connect(login='a@b.com', password='-') +con = mindsdb_sdk.connect('https://cloud.mindsdb.com', login='a@b.com', password='-') ``` #### Connect to a MindsDB Pro server ```python import mindsdb_sdk -server = mindsdb_sdk.connect('http://', login='a@b.com', password='-', is_managed=True) +con = mindsdb_sdk.connect('http://', login='a@b.com', password='-', is_managed=True) ``` ## Basic usage @@ -43,7 +43,7 @@ Once connected to the server, you can perform various operations. Here are some ```python # Get a list of databases -databases = server.list_databases() +databases = con.databases.list() # Get a specific database database = databases[0] # Database type object @@ -53,24 +53,27 @@ query = database.query('select * from table1') print(query.fetch()) # Create a table -table = database.create_table('table2', query) +table = database.tables.create('table2', query) # Get a project -project = server.get_project('proj') +project = con.projects.proj + +# or use mindsdb project +project = con # Perform an SQL query within a project query = project.query('select * from database.table join model1') # Create a view -view = project.create_view('view1', query=query) +view = project.views.create('view1', query=query) # Get a list of views -views = project.list_views() +views = project.views.list() view = views[0] df = view.fetch() # Get a list of models -models = project.list_models() +models = project.models.list() model = models[0] # Use a model for prediction @@ -83,7 +86,7 @@ timeseries_options = { 'window': 5, 'horizon': 1 } -model = project.create_model( +model = project.models.create( 'rentals_model', predict='price', query=query, @@ -98,6 +101,10 @@ You can find more examples in this [Google colab notebook]( https://colab.research.google.com/drive/1QouwAR3saFb9ffthrIs1LSH5COzyQa11#scrollTo=k6IbwsKRPQCR ) +## Examples + +https://github.com/mindsdb/mindsdb_python_sdk/tree/staging/examples + ## API Documentation The API documentation for the MindsDB SDK can be found at https://mindsdb.github.io/mindsdb_python_sdk/. You can generate the API documentation locally by following these steps: diff --git a/docs/requirements.txt b/docs/requirements.txt index 21ae0f2..7c67cc7 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,6 +1,6 @@ requests -pandas == 1.3.5 -mindsdb-sql >= 0.5.0, < 0.6.0 +pandas >= 1.3.5 +mindsdb-sql >= 0.7.0, < 0.8.0 sphinx sphinx-rtd-theme diff --git a/docs/source/database.rst b/docs/source/database.rst index 0491238..748aae7 100644 --- a/docs/source/database.rst +++ b/docs/source/database.rst @@ -1,7 +1,7 @@ -Database +Databases ---------------------------- -.. automodule:: mindsdb_sdk.database +.. automodule:: mindsdb_sdk.databases :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/handlers.rst b/docs/source/handlers.rst new file mode 100644 index 0000000..c0087e6 --- /dev/null +++ b/docs/source/handlers.rst @@ -0,0 +1,7 @@ +Handlers +------------------------- + +.. automodule:: mindsdb_sdk.handlers + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/index.rst b/docs/source/index.rst index d554756..c7705ac 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -87,7 +87,12 @@ Base usage query=query, ) + More +More examples +----------- + +``_ API documentation ================= @@ -105,9 +110,15 @@ API documentation server database + handlers + ml_engines project model + tables + views query + jobs + Indices and tables ------------------ diff --git a/docs/source/jobs.rst b/docs/source/jobs.rst new file mode 100644 index 0000000..aa47c3e --- /dev/null +++ b/docs/source/jobs.rst @@ -0,0 +1,7 @@ +Jobs +------------------------- + +.. automodule:: mindsdb_sdk.jobs + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/ml_engines.rst b/docs/source/ml_engines.rst new file mode 100644 index 0000000..f1dd4e4 --- /dev/null +++ b/docs/source/ml_engines.rst @@ -0,0 +1,7 @@ +ML Engines +------------------------- + +.. automodule:: mindsdb_sdk.ml_engines + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/model.rst b/docs/source/model.rst index 2e81d2d..6b9bdf8 100644 --- a/docs/source/model.rst +++ b/docs/source/model.rst @@ -1,7 +1,7 @@ -Model +Models ------------------------- -.. automodule:: mindsdb_sdk.model +.. automodule:: mindsdb_sdk.models :members: :undoc-members: :show-inheritance: \ No newline at end of file diff --git a/docs/source/project.rst b/docs/source/project.rst index 0ce71b5..c7333fe 100644 --- a/docs/source/project.rst +++ b/docs/source/project.rst @@ -1,7 +1,7 @@ -Project +Projects --------------------------- -.. automodule:: mindsdb_sdk.project +.. automodule:: mindsdb_sdk.projects :members: :undoc-members: :show-inheritance: diff --git a/docs/source/tables.rst b/docs/source/tables.rst new file mode 100644 index 0000000..3082b55 --- /dev/null +++ b/docs/source/tables.rst @@ -0,0 +1,7 @@ +Tables +------------------------- + +.. automodule:: mindsdb_sdk.tables + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/docs/source/views.rst b/docs/source/views.rst new file mode 100644 index 0000000..aa5ff8b --- /dev/null +++ b/docs/source/views.rst @@ -0,0 +1,7 @@ +Views +------------------------- + +.. automodule:: mindsdb_sdk.views + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/examples/home_rentals.py b/examples/home_rentals.py new file mode 100644 index 0000000..33f55a3 --- /dev/null +++ b/examples/home_rentals.py @@ -0,0 +1,37 @@ + +import mindsdb_sdk + +con = mindsdb_sdk.connect() + +# connect to database +db = con.databases.create( + 'example_db', + engine='postgres', + connection_args={ + "user": "demo_user", + "password": "demo_password", + "host": "3.220.66.106", + "port": "5432", + "database": "demo" + } +) + +# get table +# because table with schema we are using .get +tbl = db.tables.get('demo_data.home_rentals') + +# create model +model = con.models.create( + 'home_rentals_model', + predict='rental_price', + query=tbl +) + +# wait till training complete +model.wait_complete() + +# make prediction for first 3 rows +result = model.predict(tbl.limit(3)) + + + diff --git a/examples/using_openai.py b/examples/using_openai.py new file mode 100644 index 0000000..fe48b78 --- /dev/null +++ b/examples/using_openai.py @@ -0,0 +1,25 @@ + +import mindsdb_sdk + +con = mindsdb_sdk.connect() + +openai_handler = con.ml_handlers.openai + +# create ml engine +openai = con.ml_engines.create( + 'openai', + handler=openai_handler, + # handler='openai', # <-- another option to define handler + connection_data={'api_key': ''} +) + +# create model +model = con.models.create( + 'open1', + predict='answer', + engine=openai, # created ml engine + prompt_template='answer question: {{q}}' +) + +# use model +model.predict({'q': 'size of the sun'}) \ No newline at end of file diff --git a/examples/working_with_tables.py b/examples/working_with_tables.py new file mode 100644 index 0000000..73ca881 --- /dev/null +++ b/examples/working_with_tables.py @@ -0,0 +1,43 @@ +import mindsdb_sdk +import pandas as pd + +con = mindsdb_sdk.connect() + +# get user's database (connected to mindsdb as rental_db) +db = con.databases.rental_db + +# get table +table1 = db.tables.house_sales + + +# ---- create new table ---- + +# copy create table house_sales and fill it with rows with type=house +table2 = db.tables.create('house_sales2', table1.filter(type='house')) + +# create table from csv file +df = pd.read_csv('my_data.csv') +table3 = db.tables.create('my_table', df) + + +# ---- insert into table ---- + +# insert to table2 first 10 rows from table1 +table2.insert(table1.limit(10)) + + +# ---- update data in table ---- + +# get all rows with type=house from table1 and update values in table2 using key ('saledate', 'type', 'bedrooms') +table2.update( + table1.filter(type='house'), + on=['saledate', 'type', 'bedrooms'] +) + + +# ---- delete rows from table ---- + +# delete all rows where bedrooms=2 +table2.delete(bedrooms=2) + + diff --git a/mindsdb_sdk/connect.py b/mindsdb_sdk/connect.py index a7ce661..0a5c8ed 100644 --- a/mindsdb_sdk/connect.py +++ b/mindsdb_sdk/connect.py @@ -1,5 +1,7 @@ from mindsdb_sdk.server import Server +from mindsdb_sdk.connectors.rest_api import RestAPI + def connect(url: str = None, login: str = None, password: str = None, is_managed: bool = False) -> Server: """ @@ -39,4 +41,6 @@ def connect(url: str = None, login: str = None, password: str = None, is_managed # is local url = 'http://127.0.0.1:47334' - return Server(url=url, login=login, password=password, is_managed=is_managed) \ No newline at end of file + api = RestAPI(url, login, password, is_managed) + + return Server(api) \ No newline at end of file diff --git a/mindsdb_sdk/database.py b/mindsdb_sdk/database.py deleted file mode 100644 index 5245aca..0000000 --- a/mindsdb_sdk/database.py +++ /dev/null @@ -1,169 +0,0 @@ -from typing import List, Union - -import pandas as pd - -from mindsdb_sql.parser.ast import Identifier, DropTables - -from mindsdb_sdk.query import Query, Table - - -class Database: - """ - Allows to work with database (datasource): to use tables and make raw queries - - To run native query - At this moment query is just saved in Qeury object and not executed - - >>> query = database.query('select * from table1') # returns Query - - This command sends request to server to execute query and return dataframe - - >>> df = query.fetch() - - Wortking with tables: - Get table as Query object - - >>> table = database.get_table('table1') - - Filter and limit - - >>> table = table.filter(a=1, b='2') - >>> table = table.limit(3) - - Get content of table as dataframe. At that moment query will be sent on server and executed - - >>> df = table.fetch() - - Creating table - - From query: - - >>> table = database.create_table('table2', query) - - From other table - - >>> table2 = database.create_table('table2', table) - - Uploading file - - >>> db = server.get_database('files') - >>> db.create_table('filename', dataframe) - - ` Droping table - - >>> database.drop_table('table2') - - """ - - def __init__(self, server, name): - self.server = server - self.name = name - self.api = server.api - - def __repr__(self): - return f'{self.__class__.__name__}({self.name})' - - def query(self, sql: str) -> Query: - """ - Make raw query to integration - - :param sql: sql of the query - :return: Query object - """ - return Query(self.api, sql, database=self.name) - - def _list_tables(self): - df = self.query('show tables').fetch() - - # first column - return list(df[df.columns[0]]) - - def list_tables(self) -> List[Table]: - """ - Show list of tables in integration - - :return: list of Table objects - """ - return [Table(self, name) for name in self._list_tables()] - - def get_table(self, name: str) -> Table: - """ - Get table by name - - :param name: name of table - :return: Table object - """ - - if name not in self._list_tables(): - if '.' not in name: - # fixme: schemas not visible in 'show tables' - raise AttributeError("Table doesn't exist") - return Table(self, name) - - def create_table(self, name: str, query: Union[pd.DataFrame, Query], replace: bool = False) -> Table: - """ - Create new table and return it. - - On mindsdb server it executes command: - `insert into a (select ...)` - - or if replace is True - `create table a (select ...)` - - 'select ...' is extracted from input Query - - :param name: name of table - :param query: Query object - :param replace: if true, - :return: Table object - """ - - if isinstance(query, pd.DataFrame) and self.name == 'files': - # now it is only possible for file uploading - self.api.upload_file(name, query) - - return Table(self, name) - - if not isinstance(query, Query): - raise NotImplementedError - - # # query can be in different database: wrap to NativeQuery - # ast_query = CreateTable( - # name=Identifier(name), - # is_replace=is_replace, - # from_select=Select( - # targets=[Star()], - # from_table=NativeQuery( - # integration=Identifier(data.database), - # query=data.sql - # ) - # ) - # ) - # self.query(ast_query.to_string()).fetch() - - # call in query database - table = Identifier(parts=[self.name, name]) - - replace_str = '' - if replace: - replace_str = ' or replace' - - self.api.sql_query( - f'create{replace_str} table {table.to_string()} ({query.sql})', - database=query.database - ) - - return Table(self, name) - - def drop_table(self, name: str): - """ - Delete table - - :param name: name of table - """ - ast_query = DropTables( - tables=[ - Identifier(parts=[name]) - ] - ) - self.api.sql_query(ast_query.to_string(), database=self.name) diff --git a/mindsdb_sdk/databases.py b/mindsdb_sdk/databases.py new file mode 100644 index 0000000..2e69b7b --- /dev/null +++ b/mindsdb_sdk/databases.py @@ -0,0 +1,133 @@ +from typing import List + +from mindsdb_sql.parser.dialects.mindsdb import CreateDatabase +from mindsdb_sql.parser.ast import DropDatabase, Identifier + +from mindsdb_sdk.utils.objects_collection import CollectionBase + +from .query import Query +from .tables import Tables + + +class Database: + """ + Allows to work with database (datasource): to use tables and make raw queries + + To run native query + At this moment query is just saved in Qeury object and not executed + + >>> query = database.query('select * from table1') # returns Query + + This command sends request to server to execute query and return dataframe + + >>> df = query.fetch() + + Has list of tables in .tables attribute. + + """ + + def __init__(self, server, name): + self.server = server + self.name = name + self.api = server.api + + self.tables = Tables(self, self.api) + + # old api + self.get_table = self.tables.get + self.list_tables = self.tables.list + self.create_table = self.tables.create + self.drop_table = self.tables.drop + + def __repr__(self): + return f'{self.__class__.__name__}({self.name})' + + def query(self, sql: str) -> Query: + """ + Make raw query to integration + + :param sql: sql of the query + :return: Query object + """ + return Query(self.api, sql, database=self.name) + + +class Databases(CollectionBase): + """ + Databases + ---------- + + >>> databases.list() + >>> db = databases[0] # Database type object + + # create + + >>> db = databases.create('example_db', + ... type='postgres', + ... connection_args={'host': ''}) + + # drop database + + >>> databases.drop('example_db') + + # get existing + + >>> db = databases.get('example_db') + + """ + + def __init__(self, api): + self.api = api + + def _list_databases(self): + data = self.api.sql_query( + "select NAME from information_schema.databases where TYPE='data'" + ) + return list(data.NAME) + + def list(self) -> List[Database]: + """ + Show list of integrations (databases) on server + + :return: list of Database objects + """ + return [Database(self, name) for name in self._list_databases()] + + def create(self, name: str, engine: str, connection_args: dict) -> Database: + """ + Create new integration and return it + + :param name: Identifier for the integration to be created + :param engine: Engine to be selected depending on the database connection. + :param connection_args: {"key": "value"} object with the connection parameters specific for each engine + :return: created Database object + """ + ast_query = CreateDatabase( + name=Identifier(name), + engine=engine, + parameters=connection_args, + ) + self.api.sql_query(ast_query.to_string()) + return Database(self, name) + + def drop(self, name: str): + """ + Delete integration + + :param name: name of integration + """ + ast_query = DropDatabase(name=Identifier(name)) + self.api.sql_query(ast_query.to_string()) + + def get(self, name: str) -> Database: + """ + Get integration by name + + :param name: name of integration + :return: Database object + """ + if name not in self._list_databases(): + raise AttributeError("Database doesn't exist") + return Database(self, name) + + diff --git a/mindsdb_sdk/handlers.py b/mindsdb_sdk/handlers.py new file mode 100644 index 0000000..31b3472 --- /dev/null +++ b/mindsdb_sdk/handlers.py @@ -0,0 +1,93 @@ +from dataclasses import dataclass +import dataclasses +from typing import List + +from mindsdb_sql.parser.ast import Show, Identifier, BinaryOperation, Constant + +from mindsdb_sdk.utils.objects_collection import CollectionBase + + +@dataclass(init=False) +class Handler: + name: str + title: str + version: str + description: str + connection_args: dict + import_success: bool + import_error: str + + def __init__(self, **kwargs): + names = set([f.name for f in dataclasses.fields(self)]) + for k, v in kwargs.items(): + if k in names: + setattr(self, k, v) + + +class Handlers(CollectionBase): + """ + **Handlers colection** + + Examples of usage: + + ML handlers: + + Get list + >>> con.ml_handlers.list() + + Get + >>> openai_handler = con.ml_handlers.openai + + DATA handlers: + + Get list + >>> con.data_handlers.list() + + Get + >>> pg_handler = con.data_handlers.postgres + + """ + + def __init__(self, api, type): + self.api = api + self.type = type + + def list(self) -> List[Handler]: + """ + Returns list of handlers on server depending on type + :return: list of handlers + """ + + ast_query = Show( + category='HANDLERS', + where=BinaryOperation( + op='=', + args=[ + Identifier('type'), + Constant(self.type) + ] + ) + ) + + df = self.api.sql_query(ast_query.to_string()) + # columns to lower case + cols_map = {i: i.lower() for i in df.columns} + df = df.rename(columns=cols_map) + + return [ + Handler(**item) + for item in df.to_dict('records') + ] + + def get(self, name: str) -> Handler: + """ + Get handler by name + + :param name + :return: handler object + """ + name = name.lower() + for item in self.list(): + if item.name == name: + return item + raise AttributeError(f"Handler doesn't exist: {name}") diff --git a/mindsdb_sdk/jobs.py b/mindsdb_sdk/jobs.py new file mode 100644 index 0000000..222efc0 --- /dev/null +++ b/mindsdb_sdk/jobs.py @@ -0,0 +1,145 @@ +import datetime as dt +from typing import Union, List + +import pandas as pd + +from mindsdb_sql.parser.dialects.mindsdb import CreateJob, DropJob +from mindsdb_sql.parser.ast import Identifier, Star, Select + +from mindsdb_sdk.utils.sql import dict_to_binary_op +from mindsdb_sdk.utils.objects_collection import CollectionBase + + +class Job: + def __init__(self, project, data): + self.project = project + self.data = data + self._update(data) + + def _update(self, data): + self.name = data['name'] + self.query_str = data['query'] + self.start_at = data['start_at'] + self.end_at = data['end_at'] + self.next_run_at = data['next_run_at'] + self.schedule_str = data['schedule_str'] + + def __repr__(self): + return f"{self.__class__.__name__}({self.name}, query='{self.query_str}')" + + def refresh(self): + """ + Retrieve job data from mindsdb server + """ + job = self.project.get_job(self.name) + self._update(job.data) + + def get_history(self) -> pd.DataFrame: + """ + Get history of job execution + + :return: dataframe with job executions + """ + ast_query = Select( + targets=[Star()], + from_table=Identifier('jobs_history'), + where=dict_to_binary_op({ + 'name': self.name + }) + ) + return self.project.api.sql_query(ast_query.to_string(), database=self.project.name) + + +class Jobs(CollectionBase): + def __init__(self, project, api): + self.project = project + self.api = api + + def list(self, name: str = None) -> List[Job]: + """ + Show list of jobs in project + + :return: list of Job objects + """ + + ast_query = Select(targets=[Star()], from_table=Identifier('jobs')) + + if name is not None: + ast_query.where = dict_to_binary_op({'name': name}) + + df = self.api.sql_query(ast_query.to_string(), database=self.project.name) + + # columns to lower case + cols_map = {i: i.lower() for i in df.columns} + df = df.rename(columns=cols_map) + + return [ + Job(self.project, item) + for item in df.to_dict('records') + ] + + def get(self, name: str) -> Job: + """ + Get job by name from project + + :param name: name of the job + :return: Job object + """ + + jobs = self.list(name) + if len(jobs) == 1: + return jobs[0] + elif len(jobs) == 0: + raise AttributeError("Job doesn't exist") + else: + raise RuntimeError("Several jobs with the same name") + + def create(self, name: str, query_str: str, + start_at: dt.datetime = None, end_at: dt.datetime = None, + repeat_str: str = None) -> Union[Job, None]: + """ + Create new job in project and return it. + If it is not possible (job executed and not accessible anymore): return None + More info: https://docs.mindsdb.com/sql/create/jobs + + :param name: name of the job + :param query_str: str, job's query (or list of queries with ';' delimiter) which job have to execute + :param start_at: datetime, first start of job, + :param end_at: datetime, when job have to be stopped, + :param repeat_str: str, optional, how to repeat job (e.g. '1 hour', '2 weeks', '3 min') + :return: Job object or None + """ + + if start_at is not None: + start_str = start_at.strftime("%Y-%m-%d %H:%M:%S") + else: + start_str = None + + if end_at is not None: + end_str = end_at.strftime("%Y-%m-%d %H:%M:%S") + else: + end_str = None + ast_query = CreateJob( + name=Identifier(name), + query_str=query_str, + start_str=start_str, + end_str=end_str, + repeat_str=repeat_str + ) + + self.api.sql_query(ast_query.to_string(), database=self.project.name) + + # job can be executed and remove it is not repeatable + jobs = self.list(name) + if len(jobs) == 1: + return jobs[0] + + def drop(self, name: str): + """ + Drop job from project + + :param name: name of the job + """ + ast_query = DropJob(Identifier(name)) + + self.api.sql_query(ast_query.to_string(), database=self.project.name) diff --git a/mindsdb_sdk/ml_engines.py b/mindsdb_sdk/ml_engines.py new file mode 100644 index 0000000..eeb15f2 --- /dev/null +++ b/mindsdb_sdk/ml_engines.py @@ -0,0 +1,103 @@ +from dataclasses import dataclass +from typing import List, Union + +from mindsdb_sql.parser.ast import Show, Identifier +from mindsdb_sql.parser.dialects.mindsdb import CreateMLEngine, DropMLEngine + +from mindsdb_sdk.utils.objects_collection import CollectionBase + +from .handlers import Handler + +@dataclass +class MLEngine: + name: str + handler: str + connection_data: dict + + +class MLEngines(CollectionBase): + """ + + **ML engines collection** + + Examples of usage: + + Get list + >>> ml_engines = con.ml_engines.list() + + Get + >>> openai_engine = con.ml_engines.openai1 + + Create + >>> con.ml_engines.create( + ... 'openai1', + ... 'openai', + ... connection_data={'api_key': '111'} + ...) + + Drop + >>> con.ml_engines.drop('openai1') + + """ + + def __init__(self, api): + self.api = api + + def list(self) -> List[MLEngine]: + """ + Returns list of ml engines on server + :return: list of ml engines + """ + + ast_query = Show(category='ml_engines') + + df = self.api.sql_query(ast_query.to_string()) + # columns to lower case + cols_map = {i: i.lower() for i in df.columns} + df = df.rename(columns=cols_map) + + return [ + MLEngine(**item) + for item in df.to_dict('records') + ] + + def get(self, name: str) -> MLEngine: + """ + Get ml engine by name + + :param name + :return: ml engine object + """ + name = name.lower() + for item in self.list(): + if item.name == name: + return item + raise AttributeError(f"MLEngine doesn't exist {name}") + + def create(self, name: str, handler: Union[str, Handler], connection_data: dict = None) -> MLEngine: + """ + Create new ml engine and return it + :param name: ml engine name, string + :param handler: handler name, string or Handler + :param connection_data: parameters for ml engine, dict, optional + :return: created ml engine object + """ + + if isinstance(handler, Handler): + handler = handler.name + + ast_query = CreateMLEngine(Identifier(name), handler, params=connection_data) + + self.api.sql_query(ast_query.to_string()) + + return MLEngine(name, handler, connection_data) + + def drop(self, name: str): + """ + Drop ml engine by name + :param name: name + """ + ast_query = DropMLEngine(Identifier(name)) + + self.api.sql_query(ast_query.to_string()) + diff --git a/mindsdb_sdk/model.py b/mindsdb_sdk/models.py similarity index 50% rename from mindsdb_sdk/model.py rename to mindsdb_sdk/models.py index 377d2ef..ee7cb8b 100644 --- a/mindsdb_sdk/model.py +++ b/mindsdb_sdk/models.py @@ -1,19 +1,99 @@ from __future__ import annotations +import time from typing import List, Union import pandas as pd +from mindsdb_sql.parser.dialects.mindsdb import CreatePredictor, DropPredictor from mindsdb_sql.parser.dialects.mindsdb import RetrainPredictor, FinetunePredictor from mindsdb_sql.parser.ast import Identifier, Select, Star, Join, Update, Describe, Constant from mindsdb_sql import parse_sql -from mindsdb_sql.planner.utils import query_traversal -from mindsdb_sdk.utils import dict_to_binary_op -from mindsdb_sdk.query import Query +from .ml_engines import MLEngine + +from mindsdb_sdk.utils.objects_collection import CollectionBase +from mindsdb_sdk.utils.sql import dict_to_binary_op + +from .query import Query class Model: + """ + + Versions + + List model versions + + >>> model.list_versions() + + + Get info + + >>> print(model.status) + >>> print(model.data) + + Update model data from server + + >>> model.refresh() + +Usng model + + Dataframe on input + + >>> result_df = model.predict(df_rental) + >>> result_df = model.predict(df_rental, params={'a': 'q'}) + + Dict on input + + >>> result_df = model.predict({'n_rooms': 2}) + + Deferred query on input + + >>> result_df = model.predict(query, params={'': ''}) + + Time series prediction + + >>> query = database.query('select * from table1 where type="house" and saledate>latest') + >>> model.predict(query) + + The join model with table in raw query + + >>> result_df = project.query(''' + ... SELECT m.saledate as date, m.ma as forecast + ... FROM mindsdb.house_sales_model as m + ... JOIN example_db.demo_data.house_sales as t + ... WHERE t.saledate > LATEST AND t.type = 'house' + ... AND t.bedrooms=2 + ... LIMIT 4; + ...''').fetch() + + + **Model managing** + + Fine-tuning + + >>> model.finetune(query) + >>> model.finetune('select * from demo_data.house_sales', database='example_db') + >>> model.finetune(query, params={'x': 2}) + + Retraining + + >>> model.retrain(query) + >>> model.retrain('select * from demo_data.house_sales', database='example_db') + >>> model.retrain(query, params={'x': 2}) + + Describe + + >>> df_info = model.describe() + >>> df_info = model.describe('features') + + Change active version + + >>> model.set_active(version=3) + + """ + def __init__(self, project, data): self.project = project @@ -112,6 +192,19 @@ def predict(self, data: Union[pd.DataFrame, Query, dict], params: dict = None) - else: raise ValueError('Unknown input') + def wait_complete(self): + + for i in range(400): + time.sleep(0.3) + + status = self.get_status() + if status in ('generating', 'training'): + continue + elif status == 'error': + raise RuntimeError(f'Training failed: {self.data["error"]}') + else: + break + def get_status(self) -> str: """ Refresh model data and return status of model @@ -240,6 +333,15 @@ def get_version(self, num: int) -> ModelVersion: return m raise ValueError('Version is not found') + def drop_version(self, num: int) -> ModelVersion: + """ + Drop version of the model + + :param num: version to drop + """ + + return self.project.drop_model_version(self.name, num) + def set_active(self, version: int): """ Change model active version @@ -266,3 +368,220 @@ def __init__(self, project, data): super().__init__(project, data) self.version = data['version'] + + +class Models(CollectionBase): + """ + + **Models** + + Get: + + >>> all_models = models.list() + >>> model = all_models[0] + + Get version: + + >>> all_models = models.list(with_versions=True) + >>> model = all_models[0] + + By name: + + >>> model = models.get('model1') + >>> model = models.get('model1', version=2) + + Create + + Create, using params and qeury as string + + >>> model = models.create( + ... 'rentals_model', + ... predict='price', + ... engine='lightwood', + ... database='example_db', + ... query='select * from table', + ... options={ + ... 'module': 'LightGBM' + ... }, + ... timeseries_options={ + ... 'order': 'date', + ... 'group': ['a', 'b'] + ... } + ...) + + Create, using deferred query. 'query' will be executed and converted to dataframe on mindsdb backend. + + >>> query = databases.db.query('select * from table') + >>> model = models.create( + ... 'rentals_model', + ... predict='price', + ... query=query, + ...) + + + Drop + + >>> models.drop('rentals_model') + >>> models.rentals_model.drop_version(version=10) + + """ + + def __init__(self, project, api): + self.project = project + self.api = api + + def create( + self, + name: str, + predict: str = None, + engine: Union[str, MLEngine] = None, + query: Union[str, Query] = None, + database: str = None, + options: dict = None, + timeseries_options: dict = None, **kwargs + ) -> Model: + """ + Create new model in project and return it + + If query/database is passed, it will be executed on mindsdb side + + :param name: name of the model + :param predict: prediction target + :param engine: ml engine for new model, default is mindsdb + :param query: sql string or Query object to get data for training of model, optional + :param database: database to get data for training, optional + :param options: parameters for model, optional + :param timeseries_options: parameters for forecasting model + :return: created Model object, it can be still in training state + """ + if isinstance(query, Query): + database = query.database + query = query.sql + elif isinstance(query, pd.DataFrame): + raise NotImplementedError('Dataframe as input for training model is not supported yet') + + if database is not None: + database = Identifier(database) + + if predict is not None: + targets = [Identifier(predict)] + else: + targets = None + ast_query = CreatePredictor( + name=Identifier(name), + query_str=query, + integration_name=database, + targets=targets, + ) + + if timeseries_options is not None: + # check ts options + allowed_keys = ['group', 'order', 'window', 'horizon'] + for key in timeseries_options.keys(): + if key not in allowed_keys: + raise AttributeError(f"Unexpected time series option: {key}") + + if 'group' in timeseries_options: + group = timeseries_options['group'] + if not isinstance(group, list): + group = [group] + ast_query.group_by = [Identifier(i) for i in group] + if 'order' in timeseries_options: + ast_query.order_by = [Identifier(timeseries_options['order'])] + if 'window' in timeseries_options: + ast_query.window = timeseries_options['window'] + if 'horizon' in timeseries_options: + ast_query.horizon = timeseries_options['horizon'] + + if options is None: + options = {} + # options and kwargs are the same + options.update(kwargs) + + if engine is not None: + if isinstance(engine, MLEngine): + engine = engine.name + + options['engine'] = engine + ast_query.using = options + df = self.project.query(ast_query.to_string()).fetch() + if len(df) > 0: + data = dict(df.iloc[0]) + # to lowercase + data = {k.lower(): v for k,v in data.items()} + + return Model(self.project, data) + + def get(self, name: str, version: int = None) -> Union[Model, ModelVersion]: + """ + Get model by name from project + + if version is passed it returns ModelVersion object with specific version + + :param name: name of the model + :param version: version of model, optional + :return: Model or ModelVersion object + """ + if version is not None: + ret = self.list(with_versions=True, name=name, version=version) + else: + ret = self.list(name=name) + if len(ret) == 0: + raise AttributeError("Model doesn't exist") + elif len(ret) == 1: + return ret[0] + else: + raise RuntimeError('Several models with the same name/version') + + def drop(self, name: str): + """ + Drop model from project with all versions + + :param name: name of the model + """ + ast_query = DropPredictor(name=Identifier(name)) + self.project.query(ast_query.to_string()).fetch() + + + def list(self, with_versions: bool = False, + name: str = None, + version: int = None) -> List[Union[Model, ModelVersion]]: + """ + List models (or model versions) in project + + If with_versions = True it shows all models with version (executes 'select * from models_versions') + Otherwise it shows only models (executes 'select * from models') + + :param with_versions: show model versions + :param name: to show models or versions only with selected name, optional + :param version: to show model or versions only with selected version, optional + :return: list of Model or ModelVersion objects + """ + + table = 'models' + model_class = Model + if with_versions: + table = 'models_versions' + model_class = ModelVersion + + filters = { } + if name is not None: + filters['NAME'] = name + if version is not None: + filters['VERSION'] = version + + ast_query = Select( + targets=[Star()], + from_table=Identifier(table), + where=dict_to_binary_op(filters) + ) + df = self.project.query(ast_query.to_string()).fetch() + + # columns to lower case + cols_map = { i: i.lower() for i in df.columns } + df = df.rename(columns=cols_map) + + return [ + model_class(self.project, item) + for item in df.to_dict('records') + ] \ No newline at end of file diff --git a/mindsdb_sdk/project.py b/mindsdb_sdk/project.py deleted file mode 100644 index d432a42..0000000 --- a/mindsdb_sdk/project.py +++ /dev/null @@ -1,563 +0,0 @@ -import datetime as dt -from typing import Union, List - -import pandas as pd - -from mindsdb_sql.parser.dialects.mindsdb import CreatePredictor, CreateView, DropPredictor, CreateJob, DropJob -from mindsdb_sql.parser.ast import DropView, Identifier, Delete, Star, Select - -from mindsdb_sdk.utils import dict_to_binary_op -from mindsdb_sdk.model import Model, ModelVersion -from mindsdb_sdk.query import Query, View - - -class Job: - def __init__(self, project, data): - self.project = project - self.data = data - self._update(data) - - def _update(self, data): - self.name = data['name'] - self.query_str = data['query'] - self.start_at = data['start_at'] - self.end_at = data['end_at'] - self.next_run_at = data['next_run_at'] - self.schedule_str = data['schedule_str'] - - def __repr__(self): - return f"{self.__class__.__name__}({self.name}, query='{self.query_str}')" - - def refresh(self): - """ - Retrieve job data from mindsdb server - """ - job = self.project.get_job(self.name) - self._update(job.data) - - def get_history(self) -> pd.DataFrame: - """ - Get history of job execution - - :return: dataframe with job executions - """ - ast_query = Select( - targets=[Star()], - from_table=Identifier('jobs_history'), - where=dict_to_binary_op({ - 'name': self.name - }) - ) - return self.project.api.sql_query(ast_query.to_string(), database=self.project.name) - - -class Project: - """ - Allows to work with project: to manage models and views inside of it or call raw queries inside of project - - **Queries** - - Making prediciton using sql: - - >>> query = project.query('select * from database.table join model1') - >>> df = query.fetch() - - Making time series prediction: - - >>> df = project.query(''' - ... SELECT m.saledate as date, m.ma as forecast - ... FROM mindsdb.house_sales_model as m - ... JOIN example_db.demo_data.house_sales as t - ... WHERE t.saledate > LATEST AND t.type = 'house' - ... AND t.bedrooms=2 - ... LIMIT 4; - ... ''').fetch() - - **Views** - - Get: - - >>> views = project.list_views() - >>> view = views[0] - - By name: - - >>> view project.get_view('view1') - - Create: - - >>> view = project.create_view( - ... 'view1', - ... database='example_db', # optional, can also be database object - ... query='select * from table1' - ...) - - Create using query object: - - >>> view = project.create_view( - ... 'view1', - ... query=database.query('select * from table1') - ...) - - Getting data: - - >>> view = view.filter(a=1, b=2) - >>> view = view.limit(100) - >>> df = view.fetch() - - Drop view: - - >>> project.drop_view('view1') - - - **Models** - - Get: - - >>> models = project.list_models() - >>> model = models[0] - - Get version: - - >>> models = project.list_models(with_versions=True) - >>> model = models[0] - - By name: - - >>> model = project.get_model('model1') - >>> model = project.get_model('model1', version=2) - - Versions - - List model versions - - >>> models = model.list_versions() - >>> model = models[0] # Model object - - - Get info - - >>> print(model.status) - >>> print(model.data) - - Update model data from server - - >>> model.refresh() - - Create - - Create, using params and qeury as string - - >>> model = project.create_model( - ... 'rentals_model', - ... predict='price', - ... engine='lightwood', - ... database='example_db', - ... query='select * from table', - ... options={ - ... 'module': 'LightGBM' - ... }, - ... timeseries_options={ - ... 'order': 'date', - ... 'group': ['a', 'b'] - ... } - ...) - - Create, using deferred query. 'query' will be executed and converted to dataframe on mindsdb backend. - - >>> query = database.query('select * from table') - >>> model = project.create_model( - ... 'rentals_model', - ... predict='price', - ... query=query, - ...) - - Usng model - - Dataframe on input - - >>> result_df = model.predict(df_rental) - >>> result_df = model.predict(df_rental, params={'a': 'q'}) - - Dict on input - - >>> result_df = model.predict({'n_rooms': 2}) - - Deferred query on input - - >>> result_df = model.predict(query, params={'': ''}) - - Time series prediction - - >>> query = database.query('select * from table1 where type="house" and saledate>latest') - >>> model.predict(query) - - The join model with table in raw query - - >>> result_df = project.query(''' - ... SELECT m.saledate as date, m.ma as forecast - ... FROM mindsdb.house_sales_model as m - ... JOIN example_db.demo_data.house_sales as t - ... WHERE t.saledate > LATEST AND t.type = 'house' - ... AND t.bedrooms=2 - ... LIMIT 4; - ...''').fetch() - - - **Model managing** - - Fine-tuning - - >>> model.finetune(query) - >>> model.finetune('select * from demo_data.house_sales', database='example_db') - >>> model.finetune(query, params={'x': 2}) - - Retraining - - >>> model.retrain(query) - >>> model.retrain('select * from demo_data.house_sales', database='example_db') - >>> model.retrain(query, params={'x': 2}) - - Describe - - >>> df_info = model.describe() - >>> df_info = model.describe('features') - - Change active version - - >>> model.set_active(version=3) - - Drop - - >>> project.drop_model('rentals_model') - >>> project.drop_model_version('rentals_model', version=10) - - """ - - def __init__(self, server, name): - self.name = name - self.server = server - self.api = server.api - - def __repr__(self): - return f'{self.__class__.__name__}({self.name})' - - def query(self, sql: str) -> Query: - """ - Execute raw query inside of project - - :param sql: sql query - :return: Query object - """ - return Query(self.api, sql, database=self.name) - - def _list_views(self): - df = self.api.objects_tree(self.name) - df = df[df.type == 'view'] - - return list(df['name']) - - def list_views(self) -> List[View]: - """ - Show list of views in project - - :return: list of View objects - """ - return [View(self, name) for name in self._list_views()] - - def create_view(self, name: str, sql: Union[str, Query], database: str = None) -> View: - """ - Create new view in project and return it - - :param name: name of the view - :param sql: sql query as string or query object - :param database: datasource of the view (where input sql will be executed) - :return: View object - """ - if isinstance(sql, Query): - database = sql.database - sql = sql.sql - elif not isinstance(sql, str): - raise ValueError() - - if database is not None: - database = Identifier(database) - ast_query = CreateView( - name=Identifier(name), - query_str=sql, - from_table=database - ) - - self.query(ast_query.to_string()).fetch() - return View(self, name) - - def drop_view(self, name: str): - """ - Drop view from project - - :param name: name of the view - """ - - ast_query = DropView(names=[Identifier(name)]) - - self.query(ast_query.to_string()).fetch() - - def get_view(self, name: str) -> View: - """ - Get view by name from project - - :param name: name of the view - :return: View object - """ - - if name not in self._list_views(): - raise AttributeError("View doesn't exist") - return View(self, name) - - def list_models(self, with_versions: bool = False, - name: str = None, - version: int = None) -> List[Union[Model, ModelVersion]]: - """ - List models (or model versions) in project - - If with_versions = True it shows all models with version (executes 'select * from models_versions') - Otherwise it shows only models (executes 'select * from models') - - :param with_versions: show model versions - :param name: to show models or versions only with selected name, optional - :param version: to show model or versions only with selected version, optional - :return: list of Model or ModelVersion objects - """ - - table = 'models' - model_class = Model - if with_versions: - table = 'models_versions' - model_class = ModelVersion - - filters = {} - if name is not None: - filters['NAME'] = name - if version is not None: - filters['VERSION'] = version - - ast_query = Select( - targets=[Star()], - from_table=Identifier(table), - where=dict_to_binary_op(filters) - ) - df = self.query(ast_query.to_string()).fetch() - - # columns to lower case - cols_map = {i: i.lower() for i in df.columns} - df = df.rename(columns=cols_map) - - return [ - model_class(self, item) - for item in df.to_dict('records') - ] - - def create_model(self, name: str, predict: str = None, engine: str = None, - query: Union[str, Query] = None, database: str = None, - options: dict = None, timeseries_options: dict = None) -> Model: - """ - Create new model in project and return it - - If query/database is passed, it will be executed on mindsdb side - - :param name: name of the model - :param predict: prediction target - :param engine: ml engine for new model, default is mindsdb - :param query: sql string or Query object to get data for training of model, optional - :param database: database to get data for training, optional - :param options: parameters for model, optional - :param timeseries_options: parameters for forecasting model - :return: created Model object, it can be still in training state - """ - if isinstance(query, Query): - database = query.database - query = query.sql - elif isinstance(query, pd.DataFrame): - raise NotImplementedError('Dataframe as input for training model is not supported yet') - - if database is not None: - database = Identifier(database) - - if predict is not None: - targets = [Identifier(predict)] - else: - targets = None - ast_query = CreatePredictor( - name=Identifier(name), - query_str=query, - integration_name=database, - targets=targets, - ) - - if timeseries_options is not None: - # check ts options - allowed_keys = ['group', 'order', 'window', 'horizon'] - for key in timeseries_options.keys(): - if key not in allowed_keys: - raise AttributeError(f"Unexpected time series option: {key}") - - if 'group' in timeseries_options: - group = timeseries_options['group'] - if not isinstance(group, list): - group = [group] - ast_query.group_by = [Identifier(i) for i in group] - if 'order' in timeseries_options: - ast_query.order_by = [Identifier(timeseries_options['order'])] - if 'window' in timeseries_options: - ast_query.window = timeseries_options['window'] - if 'horizon' in timeseries_options: - ast_query.horizon = timeseries_options['horizon'] - if options is None: - options = {} - if engine is not None: - options['engine'] = engine - ast_query.using = options - df = self.query(ast_query.to_string()).fetch() - if len(df) > 0: - data = dict(df.iloc[0]) - # to lowercase - data = {k.lower(): v for k,v in data.items()} - - return Model(self, data) - - def get_model(self, name: str, version: int = None) -> Union[Model, ModelVersion]: - """ - Get model by name from project - - if version is passed it returns ModelVersion object with specific version - - :param name: name of the model - :param version: version of model, optional - :return: Model or ModelVersion object - """ - if version is not None: - ret = self.list_models(with_versions=True, name=name, version=version) - else: - ret = self.list_models(name=name) - if len(ret) == 0: - raise AttributeError("Model doesn't exist") - elif len(ret) == 1: - return ret[0] - else: - raise RuntimeError('Several models with the same name/version') - - def drop_model(self, name: str): - """ - Drop model from project with all versions - - :param name: name of the model - """ - ast_query = DropPredictor(name=Identifier(name)) - self.query(ast_query.to_string()).fetch() - - def drop_model_version(self, name: str, version: int): - """ - Drop version of the model - - :param name: name of the model - :param version: version to drop - """ - ast_query = Delete( - table=Identifier('models_versions'), - where=dict_to_binary_op({ - 'name': name, - 'version': version - }) - ) - self.query(ast_query.to_string()).fetch() - - def list_jobs(self, name: str = None) -> List[Job]: - """ - Show list of jobs in project - - :return: list of Job objects - """ - - ast_query = Select(targets=[Star()], from_table=Identifier('jobs')) - - if name is not None: - ast_query.where = dict_to_binary_op({'name': name}) - - df = self.api.sql_query(ast_query.to_string(), database=self.name) - - # columns to lower case - cols_map = {i: i.lower() for i in df.columns} - df = df.rename(columns=cols_map) - - return [ - Job(self, item) - for item in df.to_dict('records') - ] - - def get_job(self, name: str) -> Job: - """ - Get job by name from project - - :param name: name of the job - :return: Job object - """ - - jobs = self.list_jobs(name) - if len(jobs) == 1: - return jobs[0] - elif len(jobs) == 0: - raise AttributeError("Job doesn't exist") - else: - raise RuntimeError("Several jobs with the same name") - - def create_job(self, name: str, query_str: str, - start_at: dt.datetime = None, end_at: dt.datetime = None, - repeat_str: str = None) -> Union[Job, None]: - """ - Create new job in project and return it. - If it is not possible (job executed and not accessible anymore): return None - More info: https://docs.mindsdb.com/sql/create/jobs - - :param name: name of the job - :param query_str: str, job's query (or list of queries with ';' delimiter) which job have to execute - :param start_at: datetime, first start of job, - :param end_at: datetime, when job have to be stopped, - :param repeat_str: str, optional, how to repeat job (e.g. '1 hour', '2 weeks', '3 min') - :return: Job object or None - """ - - if start_at is not None: - start_str = start_at.strftime("%Y-%m-%d %H:%M:%S") - else: - start_str = None - - if end_at is not None: - end_str = end_at.strftime("%Y-%m-%d %H:%M:%S") - else: - end_str = None - ast_query = CreateJob( - name=Identifier(name), - query_str=query_str, - start_str=start_str, - end_str=end_str, - repeat_str=repeat_str - ) - - self.api.sql_query(ast_query.to_string(), database=self.name) - - # job can be executed and remove it is not repeatable - jobs = self.list_jobs(name) - if len(jobs) == 1: - return jobs[0] - - def drop_job(self, name: str): - """ - Drop job from project - - :param name: name of the job - """ - ast_query = DropJob(Identifier(name)) - - self.api.sql_query(ast_query.to_string(), database=self.name) - - diff --git a/mindsdb_sdk/projects.py b/mindsdb_sdk/projects.py new file mode 100644 index 0000000..d234ab5 --- /dev/null +++ b/mindsdb_sdk/projects.py @@ -0,0 +1,184 @@ +from typing import List + +from mindsdb_sql.parser.dialects.mindsdb import CreateDatabase +from mindsdb_sql.parser.ast import DropDatabase +from mindsdb_sql.parser.ast import Identifier, Delete + +from mindsdb_sdk.utils.sql import dict_to_binary_op + +from mindsdb_sdk.utils.objects_collection import CollectionBase + +from .models import Models +from .query import Query +from .views import Views +from .jobs import Jobs + + +class Project: + """ + Allows to work with project: to manage models and views inside of it or call raw queries inside of project + + Server instance allows to manipulate project and databases (integration) on mindsdb server + + Attributes for accessing to different objects: + - models + - views + - jobs + + It is possible to cal queries from project context: + + Making prediciton using sql: + + >>> query = project.query('select * from database.table join model1') + >>> df = query.fetch() + + Making time series prediction: + + >>> df = project.query(''' + ... SELECT m.saledate as date, m.ma as forecast + ... FROM mindsdb.house_sales_model as m + ... JOIN example_db.demo_data.house_sales as t + ... WHERE t.saledate > LATEST AND t.type = 'house' + ... AND t.bedrooms=2 + ... LIMIT 4; + ... ''').fetch() + + """ + + def __init__(self, api, name): + self.name = name + self.api = api + + self.models = Models(self, api) + + # old api + self.get_model = self.models.get + self.list_models = self.models.list + self.create_model = self.models.create + self.drop_model = self.models.drop + + self.views = Views(self, api) + + # old api + self.get_view = self.views.get + self.list_views = self.views.list + self.create_view = self.views.create + self.drop_view = self.views.drop + + self.jobs = Jobs(self, api) + + # old api + self.get_job = self.jobs.get + self.list_jobs = self.jobs.list + self.create_job = self.jobs.create + self.drop_job = self.jobs.drop + + def __repr__(self): + return f'{self.__class__.__name__}({self.name})' + + def query(self, sql: str) -> Query: + """ + Execute raw query inside of project + + :param sql: sql query + :return: Query object + """ + return Query(self.api, sql, database=self.name) + + + def drop_model_version(self, name: str, version: int): + """ + Drop version of the model + + :param name: name of the model + :param version: version to drop + """ + ast_query = Delete( + table=Identifier('models_versions'), + where=dict_to_binary_op({ + 'name': name, + 'version': version + }) + ) + self.query(ast_query.to_string()).fetch() + + + +class Projects(CollectionBase): + """ + Projects + ---------- + + # list of projects + + >>> projects.list() + + # create + + >>> project = projects.create('proj') + + # drop + + >>> projects.drop('proj') + + # get existing + + >>> projects.get('proj') + + # by attribute + >>> projects.proj + + """ + + def __init__(self, api): + self.api = api + + def _list_projects(self): + data = self.api.sql_query("select NAME from information_schema.databases where TYPE='project'") + return list(data.NAME) + + def list(self) -> List[Project]: + """ + Show list of project on server + + :return: list of Project objects + """ + # select * from information_schema.databases where TYPE='project' + return [Project(self.api, name) for name in self._list_projects()] + + def get(self, name: str = 'mindsdb') -> Project: + """ + Get Project by name + + :param name: name of project + :return: Project object + """ + if name not in self._list_projects(): + raise AttributeError("Project doesn't exist") + return Project(self.api, name) + + def create(self, name: str) -> Project: + """ + Create new project and return it + + :param name: name of the project + :return: Project object + """ + + ast_query = CreateDatabase( + name=Identifier(name), + engine='mindsdb', + parameters={} + ) + + self.api.sql_query(ast_query.to_string()) + return Project(self.api, name) + + def drop(self, name: str): + """ + Drop project from server + + :param name: name of the project + """ + ast_query = DropDatabase(name=Identifier(name)) + self.api.sql_query(ast_query.to_string()) \ No newline at end of file diff --git a/mindsdb_sdk/query.py b/mindsdb_sdk/query.py index eea142f..09c6a21 100644 --- a/mindsdb_sdk/query.py +++ b/mindsdb_sdk/query.py @@ -1,11 +1,5 @@ -import copy - import pandas as pd -from mindsdb_sql.parser.ast import Select, Star, Identifier, Constant - -from mindsdb_sdk.utils import dict_to_binary_op - class Query: def __init__(self, api, sql, database=None): @@ -28,79 +22,3 @@ def fetch(self) -> pd.DataFrame: """ return self.api.sql_query(self.sql, self.database) - -class Table(Query): - def __init__(self, db, name): - super().__init__(db.api, '', db.name) - self.name = name - self.db = db - self._filters = {} - self._limit = None - self._update_query() - - def _filters_repr(self): - filters = '' - if len(filters) > 0: - filters = ', '.join( - f'{k}={v}' - for k, v in self._filters - ) - filters = ', ' + filters - return filters - - def __repr__(self): - return f'{self.__class__.__name__}({self.name}{self._filters_repr()})' - - def filter(self, **kwargs): - """ - Applies filters on table - table.filter(a=1, b=2) adds where condition to table: - 'select * from table1 where a=1 and b=2' - - :param kwargs: filter - """ - # creates new object - query = copy.deepcopy(self) - query._filters.update(kwargs) - query._update_query() - return query - - def limit(self, val: int): - """ - Applies limit condition to table query - - :param val: limit size - """ - query = copy.deepcopy(self) - query._limit = val - query._update_query() - return query - - def _update_query(self): - ast_query = Select( - targets=[Star()], - from_table=Identifier(self.name), - where=dict_to_binary_op(self._filters) - ) - if self._limit is not None: - ast_query.limit = Constant(self._limit) - self.sql = ast_query.to_string() - - -class View(Table): - # The same as table - pass - -# TODO getting view sql from api not implemented yet -# class View(Table): -# def __init__(self, api, data, project): -# super().__init__(api, data['name'], project) -# self.view_sql = data['sql'] -# -# def __repr__(self): -# # -# sql = self.view_sql.replace('\n', ' ') -# if len(sql) > 40: -# sql = sql[:37] + '...' -# -# return f'{self.__class__.__name__}({self.name}{self._filters_repr()}, sql={sql})' diff --git a/mindsdb_sdk/server.py b/mindsdb_sdk/server.py index 1b92ee8..d6e6319 100644 --- a/mindsdb_sdk/server.py +++ b/mindsdb_sdk/server.py @@ -1,166 +1,51 @@ -from typing import List +from .databases import Databases +from .projects import Project, Projects +from .ml_engines import MLEngines +from .handlers import Handlers -from mindsdb_sql.parser.dialects.mindsdb import CreateDatabase -from mindsdb_sql.parser.ast import DropDatabase, Identifier -from mindsdb_sdk.connectors.rest_api import RestAPI -from .database import Database -from .project import Project - - -class Server: +class Server(Project): """ Server instance allows to manipulate project and databases (integration) on mindsdb server - Example if usage: - - Databases - ---------- - - >>> databases = server.list_databases() - >>> database = databases[0] # Database type object - - # create - - >>> database = server.create_database('example_db', - ... type='postgres', - ... connection_args={'host': ''}) - - # drop database - - >>> server.drop_database('example_db') - - # get existing - >>> database = server.get_database('example_db') + Attributes for accessing to different objects: + - projects + - databases + - ml_engines - Projects - ---------- + Server is also root(mindsdb) project and has its attributes + - models + - views + - jobs - # list of projects - - >>> projects = server.list_projects() - >>> project = projects[0] # Project type object - - - # create - - >>> project = server.create_project('proj') + """ - # drop + def __init__(self, api): + # server is also mindsdb project + super().__init__(api, 'mindsdb') - >>> server.drop_project('proj') + self.projects = Projects(api) - # get existing + # old api + self.get_project = self.projects.get + self.list_projects = self.projects.list + self.create_project = self.projects.create + self.drop_project = self.projects.drop - >>> project = server.get_project('proj') + self.databases = Databases(api) - >>> project = server.get_project() # default is mindsdb project + # old api + self.get_database = self.databases.get + self.list_databases = self.databases.list + self.create_database = self.databases.create + self.drop_database = self.databases.drop - """ + self.ml_engines = MLEngines(self.api) - def __init__(self, url: str = None, login: str = None, password: str = None, is_managed: bool = False): - self.api = RestAPI(url, login, password, is_managed) + self.ml_handlers = Handlers(self.api, 'ml') + self.data_handlers = Handlers(self.api, 'data') def __repr__(self): return f'{self.__class__.__name__}({self.api.url})' - def _list_databases(self): - data = self.api.sql_query( - "select NAME from information_schema.databases where TYPE='data'" - ) - return list(data.NAME) - - def list_databases(self) -> List[Database]: - """ - Show list of integrations (databases) on server - - :return: list of Database objects - """ - return [Database(self, name) for name in self._list_databases()] - - def create_database(self, name: str, engine: str, connection_args: dict) -> Database: - """ - Create new integration and return it - - :param name: Identifier for the integration to be created - :param engine: Engine to be selected depending on the database connection. - :param connection_args: {"key": "value"} object with the connection parameters specific for each engine - :return: created Database object - """ - ast_query = CreateDatabase( - name=Identifier(name), - engine=engine, - parameters=connection_args, - ) - self.api.sql_query(ast_query.to_string()) - return Database(self, name) - - def drop_database(self, name: str): - """ - Delete integration - - :param name: name of integration - """ - ast_query = DropDatabase(name=Identifier(name)) - self.api.sql_query(ast_query.to_string()) - - def get_database(self, name: str) -> Database: - """ - Get integration by name - - :param name: name of integration - :return: Database object - """ - if name not in self._list_databases(): - raise AttributeError("Database doesn't exist") - return Database(self, name) - - def _list_projects(self): - data = self.api.sql_query("select NAME from information_schema.databases where TYPE='project'") - return list(data.NAME) - - def list_projects(self) -> List[Project]: - """ - Show list of project on server - - :return: list of Project objects - """ - # select * from information_schema.databases where TYPE='project' - return [Project(self, name) for name in self._list_projects()] - - def create_project(self, name: str) -> Project: - """ - Create new project and return it - - :param name: name of the project - :return: Project object - """ - - ast_query = CreateDatabase( - name=Identifier(name), - engine='mindsdb', - parameters={} - ) - - self.api.sql_query(ast_query.to_string()) - return Project(self, name) - - def drop_project(self, name: str): - """ - Drop project from server - - :param name: name of the project - """ - ast_query = DropDatabase(name=Identifier(name)) - self.api.sql_query(ast_query.to_string()) - - def get_project(self, name: str = 'mindsdb') -> Project: - """ - Get Project by name - :param name: name of project - :return: Project object - """ - if name not in self._list_projects(): - raise AttributeError("Project doesn't exist") - return Project(self, name) diff --git a/mindsdb_sdk/tables.py b/mindsdb_sdk/tables.py new file mode 100644 index 0000000..78d7f3e --- /dev/null +++ b/mindsdb_sdk/tables.py @@ -0,0 +1,309 @@ +import copy +from typing import Union +from typing import List + +import pandas as pd + +from mindsdb_sql.parser.ast import DropTables +from mindsdb_sql.parser.ast import Select, Star, Identifier, Constant, Delete, Insert, Update + +from mindsdb_sdk.utils.sql import dict_to_binary_op +from mindsdb_sdk.utils.objects_collection import CollectionBase + +from .query import Query + + +class Table(Query): + def __init__(self, db, name): + super().__init__(db.api, '', db.name) + self.name = name + self.db = db + self._filters = {} + self._limit = None + self._update_query() + + def _filters_repr(self): + filters = '' + if len(self._filters) > 0: + filters = ', '.join( + f'{k}={v}' + for k, v in self._filters.items() + ) + filters = ', ' + filters + return filters + + def __repr__(self): + limit_str = '' + if self._limit is not None: + limit_str = f'; limit={self._limit}' + return f'{self.__class__.__name__}({self.name}{self._filters_repr()}{limit_str})' + + def filter(self, **kwargs): + """ + Applies filters on table + table.filter(a=1, b=2) adds where condition to table: + 'select * from table1 where a=1 and b=2' + + :param kwargs: filter + """ + # creates new object + query = copy.deepcopy(self) + query._filters.update(kwargs) + query._update_query() + return query + + def limit(self, val: int): + """ + Applies limit condition to table query + + :param val: limit size + """ + query = copy.deepcopy(self) + query._limit = val + query._update_query() + return query + + def _update_query(self): + ast_query = Select( + targets=[Star()], + from_table=Identifier(self.name), + where=dict_to_binary_op(self._filters) + ) + if self._limit is not None: + ast_query.limit = Constant(self._limit) + self.sql = ast_query.to_string() + + def insert(self, query: Union[pd.DataFrame, Query]): + """ + Insert data from query of dataframe + :param query: dataframe of + :return: + """ + + if isinstance(query, pd.DataFrame): + # insert data + data_split = query.to_dict('split') + + ast_query = Insert( + table=Identifier(self.name), + columns=data_split['columns'], + values=data_split['data'] + ) + + sql = ast_query.to_string() + self.api.sql_query(sql, self.database) + else: + # insert from select + table = Identifier(parts=[self.database, self.name]) + self.api.sql_query( + f'INSERT INTO {table.to_string()} ({query.sql})', + database=query.database + ) + + def delete(self, **kwargs): + """ + Deletes record from table using filters table.delete(a=1, b=2) + + :param kwargs: filter + """ + identifier = Identifier(self.name) + # add database + identifier.parts.insert(0, self.database) + + ast_query = Delete( + table=identifier, + where=dict_to_binary_op(kwargs) + ) + + sql = ast_query.to_string() + self.api.sql_query(sql, 'mindsdb') + + def update(self, values: Union[dict, Query], on: list = None, filters: dict = None): + ''' + Update table by condition of from other table. + If 'values' is a dict: + - it will be an update by condition + - 'filters' is required + - used command: update table set a=1 where x=1 + + If 'values' is a Query: + - it will be an update from select + - 'on' is required + - used command: update table on a,b from (query) + + :param values: input for update, can be dict or query + :param on: list of column to map subselect to table ['a', 'b', ...] + :param filters: dict to filter updated rows, {'column': 'value', ...} + + ''' + + if isinstance(values, Query): + # is update from select + if on is None: + raise ValueError('"on" parameter is required for update from query') + + # insert from select + table = Identifier(parts=[self.database, self.name]) + map_cols = ', '.join(on) + self.api.sql_query( + f'UPDATE {table.to_string()} ON {map_cols} FROM ({values.sql})', + database=values.database + ) + elif isinstance(values, dict): + # is regular update + if filters is None: + raise ValueError('"filters" parameter is required for update') + + update_columns = { + k: Constant(v) + for k, v in values.items() + } + + ast_query = Update( + table=Identifier(self.name), + update_columns=update_columns, + where=dict_to_binary_op(filters) + ) + + sql = ast_query.to_string() + self.api.sql_query(sql, self.database) + else: + raise NotImplementedError + + +class Tables(CollectionBase): + """ + Wortking with tables: + Get table as Query object + + >>> table = tables.get('table1') + + Filter and limit + + >>> table = table.filter(a=1, b='2') + >>> table = table.limit(3) + + Get content of table as dataframe. At that moment query will be sent on server and executed + + >>> df = table.fetch() + + Creating table + + From query: + + >>> table = tables.create('table2', query) + + From other table + + >>> table2 = table.create('table2', table) + + Uploading file + + >>> db = con.databases.files + >>> db.tables.create('filename', dataframe) + + ` Droping table + + >>> db.tables.drop('table2') + """ + + def __init__(self, database, api): + self.database = database + self.api = api + + def _list_tables(self): + df = self.database.query('show tables').fetch() + + # first column + return list(df[df.columns[0]]) + + def list(self) -> List[Table]: + """ + Show list of tables in integration + + :return: list of Table objects + """ + return [Table(self.database, name) for name in self._list_tables()] + + def get(self, name: str) -> Table: + """ + Get table by name + + :param name: name of table + :return: Table object + """ + + if name not in self._list_tables(): + if '.' not in name: + # fixme: schemas not visible in 'show tables' + raise AttributeError("Table doesn't exist") + return Table(self.database, name) + + def create(self, name: str, query: Union[pd.DataFrame, Query], replace: bool = False) -> Table: + """ + Create new table and return it. + + On mindsdb server it executes command: + `insert into a (select ...)` + + or if replace is True + `create table a (select ...)` + + 'select ...' is extracted from input Query + + :param name: name of table + :param query: Query object + :param replace: if true, + :return: Table object + """ + + if isinstance(query, pd.DataFrame) and self.database.name == 'files': + # now it is only possible for file uploading + self.api.upload_file(name, query) + + return Table(self.database, name) + + if not isinstance(query, Query): + raise NotImplementedError + + # # query can be in different database: wrap to NativeQuery + # ast_query = CreateTable( + # name=Identifier(name), + # is_replace=is_replace, + # from_select=Select( + # targets=[Star()], + # from_table=NativeQuery( + # integration=Identifier(data.database), + # query=data.sql + # ) + # ) + # ) + # self.query(ast_query.to_string()).fetch() + + # call in query database + table = Identifier(parts=[self.database.name, name]) + + replace_str = '' + if replace: + replace_str = ' or replace' + + self.api.sql_query( + f'create{replace_str} table {table.to_string()} ({query.sql})', + database=query.database + ) + + return Table(self.database, name) + + def drop(self, name: str): + """ + Delete table + + :param name: name of table + """ + ast_query = DropTables( + tables=[ + Identifier(parts=[name]) + ] + ) + self.api.sql_query(ast_query.to_string(), database=self.database.name) + diff --git a/mindsdb_sdk/utils/__init__.py b/mindsdb_sdk/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mindsdb_sdk/utils/objects_collection.py b/mindsdb_sdk/utils/objects_collection.py new file mode 100644 index 0000000..054c026 --- /dev/null +++ b/mindsdb_sdk/utils/objects_collection.py @@ -0,0 +1,57 @@ +import re +from typing import Iterable + + +class CollectionBase: + + def __dir__(self) -> Iterable[str]: + internal_methods = ['create', 'drop', 'get', 'list'] + + items = [item.name for item in self.list()] + + items = [i for i in items if re.match('^(?![0-9])\w+$', i)] + return internal_methods + items + + def __getattr__(self, name): + if name.startswith('__'): + raise AttributeError(name) + + return self.get(name) + + +class MethodCollection(CollectionBase): + + def __init__(self, name, methods): + self.name = name + self.methods = methods + + def __repr__(self): + return f'{self.__class__.__name__}({self.name})' + + def get(self, *args, **kwargs): + method = self.methods.get('get') + if method is None: + raise NotImplementedError() + + return method(*args, **kwargs) + + def list(self, *args, **kwargs): + method = self.methods.get('list') + if method is None: + raise NotImplementedError() + + return method(*args, **kwargs) + + def create(self, *args, **kwargs): + method = self.methods.get('create') + if method is None: + raise NotImplementedError() + + return method(*args, **kwargs) + + def drop(self, name): + method = self.methods.get('drop') + if method is None: + raise NotImplementedError() + + return method(name) diff --git a/mindsdb_sdk/utils.py b/mindsdb_sdk/utils/sql.py similarity index 100% rename from mindsdb_sdk/utils.py rename to mindsdb_sdk/utils/sql.py diff --git a/mindsdb_sdk/views.py b/mindsdb_sdk/views.py new file mode 100644 index 0000000..46ebf55 --- /dev/null +++ b/mindsdb_sdk/views.py @@ -0,0 +1,139 @@ +from typing import List, Union + +from mindsdb_sql.parser.dialects.mindsdb import CreateView +from mindsdb_sql.parser.ast import DropView +from mindsdb_sql.parser.ast import Identifier + +from mindsdb_sdk.utils.objects_collection import CollectionBase + +from .query import Query +from .tables import Table + + +class View(Table): + # The same as table + pass + + +# TODO getting view sql from api not implemented yet +# class View(Table): +# def __init__(self, api, data, project): +# super().__init__(api, data['name'], project) +# self.view_sql = data['sql'] +# +# def __repr__(self): +# # +# sql = self.view_sql.replace('\n', ' ') +# if len(sql) > 40: +# sql = sql[:37] + '...' +# +# return f'{self.__class__.__name__}({self.name}{self._filters_repr()}, sql={sql})' + +class Views(CollectionBase): + """ + **Views** + + Get: + + >>> views = views.list() + >>> view = views[0] + + By name: + + >>> view = views.get('view1') + + Create: + + >>> view = views.create( + ... 'view1', + ... database='example_db', # optional, can also be database object + ... query='select * from table1' + ...) + + Create using query object: + + >>> view = views.create( + ... 'view1', + ... query=database.query('select * from table1') + ...) + + Getting data: + + >>> view = view.filter(a=1, b=2) + >>> view = view.limit(100) + >>> df = view.fetch() + + Drop view: + + >>> views.drop('view1') + + """ + + def __init__(self, project, api): + self.project = project + self.api = api + + + # The same as table + def _list_views(self): + df = self.api.objects_tree(self.project.name) + df = df[df.type == 'view'] + + return list(df['name']) + + def list(self) -> List[View]: + """ + Show list of views in project + + :return: list of View objects + """ + return [View(self.project, name) for name in self._list_views()] + + def create(self, name: str, sql: Union[str, Query], database: str = None) -> View: + """ + Create new view in project and return it + + :param name: name of the view + :param sql: sql query as string or query object + :param database: datasource of the view (where input sql will be executed) + :return: View object + """ + if isinstance(sql, Query): + database = sql.database + sql = sql.sql + elif not isinstance(sql, str): + raise ValueError() + + if database is not None: + database = Identifier(database) + ast_query = CreateView( + name=Identifier(name), + query_str=sql, + from_table=database + ) + + self.project.query(ast_query.to_string()).fetch() + return View(self.project, name) + + def drop(self, name: str): + """ + Drop view from project + + :param name: name of the view + """ + + ast_query = DropView(names=[Identifier(name)]) + + self.project.query(ast_query.to_string()).fetch() + + def get(self, name: str) -> View: + """ + Get view by name from project + + :param name: name of the view + :return: View object + """ + + if name not in self._list_views(): + raise AttributeError("View doesn't exist") + return View(self.project, name) diff --git a/requirements.txt b/requirements.txt index bf6ac41..eb8d5b6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ requests -pandas == 1.3.5 +pandas >= 1.3.5 mindsdb-sql >= 0.7.0, < 0.8.0 diff --git a/tests/test_sdk.py b/tests/test_sdk.py index e90831b..f1013f1 100644 --- a/tests/test_sdk.py +++ b/tests/test_sdk.py @@ -7,7 +7,7 @@ import pandas as pd from mindsdb_sql import parse_sql -from mindsdb_sdk.model import ModelVersion +from mindsdb_sdk.models import ModelVersion import mindsdb_sdk from mindsdb_sdk.connectors import rest_api @@ -59,7 +59,124 @@ def check_sql_call(mock, sql, database=None, call_stack_num=None): assert database == kwargs['json']['context']['db'] -class Test: +class BaseFlow: + @patch('requests.Session.post') + def check_model(self, model, database, mock_post): + + # using dataframe on input + data_in = [{ 'a': 1 }] + df_in = pd.DataFrame(data_in) + + data_out = [{ 'z': 2 }] + response_mock(mock_post, data_out) + + params = { 'x': '1' } + pred_df = model.predict(df_in, params=params) + + model_name = model.name + if isinstance(model, ModelVersion): + model_name = f'{model_name}.{model.version}' + + call_args = mock_post.call_args + assert call_args[0][ + 0] == f'https://cloud.mindsdb.com/api/projects/{model.project.name}/models/{model_name}/predict' + assert call_args[1]['json']['data'] == data_in + assert call_args[1]['json']['params'] == params + + # check prediction + assert (pred_df == pd.DataFrame(data_out)).all().bool() + + # predict using dict + pred_df = model.predict({ 'a': 1 }) + assert (pred_df == pd.DataFrame(data_out)).all().bool() + + # using deferred query + response_mock(mock_post, pd.DataFrame(data_out)) # will be used sql/query + + query = database.query('select a from t1') + pred_df = model.predict(query, params={ 'x': '1' }) + + check_sql_call(mock_post, + f'SELECT m.* FROM (SELECT a FROM t1) as t JOIN {model.project.name}.{model_name} AS m USING x="1"') + assert (pred_df == pd.DataFrame(data_out)).all().bool() + + # time series prediction + query = database.query('select * from t1 where type="house" and saledate>latest') + model.predict(query) + + check_sql_call(mock_post, + f"SELECT m.* FROM (SELECT * FROM t1 WHERE (type = 'house') AND (saledate > LATEST)) as t JOIN {model.project.name}.{model_name} AS m") + assert (pred_df == pd.DataFrame(data_out)).all().bool() + + # ----------- model managing -------------- + response_mock( + mock_post, + pd.DataFrame([{ 'NAME': 'm1', 'VERSION': 2, 'STATUS': 'complete' }]) + ) + + model.finetune(query, options={ 'x': 2 }) + check_sql_call( + mock_post, + f'Finetune {model.project.name}.{model_name} FROM {query.database} ({query.sql}) USING x=2' + ) + + model.finetune('select a from t1', database='d1') + check_sql_call( + mock_post, + f'Finetune {model.project.name}.{model_name} FROM d1 (select a from t1)' + ) + + model.retrain(query, options={ 'x': 2 }) + check_sql_call( + mock_post, + f'RETRAIN {model.project.name}.{model_name} FROM {query.database} ({query.sql}) USING x=2' + ) + + model.retrain('select a from t1', database='d1') + check_sql_call( + mock_post, + f'RETRAIN {model.project.name}.{model_name} FROM d1 (select a from t1)' + ) + + # describe + if not isinstance(model, ModelVersion): # not working (DESCRIBE db1.m1.2.ensemble not parsed) + + info = model.describe() # dataframe on json. need to discuss + check_sql_call(mock_post, f'DESCRIBE {model.project.name}.{model_name}') + + info = model.describe('ensemble') # dataframe on json. need to discuss + check_sql_call(mock_post, f'DESCRIBE {model.project.name}.{model_name}.ensemble') + + # ----------- versions -------------- + + # list all versions + models = model.list_versions() + check_sql_call(mock_post, f"SELECT * FROM models_versions WHERE NAME = '{model.name}'", + database=model.project.name) + model2 = models[0] # Model object + + model2 = model.get_version(2) + + # change active version + model2.set_active(version=3) + + # get call before last call + mock_call = mock_post.call_args_list[-2] + assert mock_call[1]['json'][ + 'query'] == f"update models_versions set active=1 where (name = '{model2.name}') AND (version = 3)" + + @patch('requests.Session.post') + def check_table(self, table, mock_post): + response_mock(mock_post, pd.DataFrame([{'x': 'a'}])) + + table = table.filter(a=3, b='2') + table = table.limit(3) + table.fetch() + str(table) + check_sql_call(mock_post, f'SELECT * FROM {table.name} WHERE (a = 3) AND (b = \'2\') LIMIT 3') + + +class Test(BaseFlow): @patch('requests.Session.put') @patch('requests.Session.post') @@ -278,7 +395,7 @@ def check_project_models(self, project, database, mock_post): LIMIT 4; ''' result_df = project.query(sql).fetch() - # TODO + check_sql_call(mock_post, sql) # check ts params @@ -328,105 +445,431 @@ def check_project_models_versions(self, project, database, mock_post): project.drop_model_version('m1', 1) check_sql_call(mock_post, f"delete from models_versions where name='m1' and version=1") + @patch('requests.Session.post') - def check_model(self, model, database, mock_post): + def check_database(self, database, mock_post): - # using dataframe on input - data_in = [{'a': 1}] - df_in = pd.DataFrame(data_in) + # test query + sql = 'select * from tbl1' + query = database.query(sql) + assert query.sql == sql - data_out = [{'z': 2}] - response_mock(mock_post, data_out) + result = pd.DataFrame([{'s': '1'}, {'s': 'a'}]) + response_mock(mock_post, result) - params = {'x': '1'} - pred_df = model.predict(df_in, params=params) + data = query.fetch() - model_name = model.name - if isinstance(model, ModelVersion): - model_name = f'{model_name}.{model.version}' + check_sql_call(mock_post, sql) - call_args = mock_post.call_args - assert call_args[0][0] == f'https://cloud.mindsdb.com/api/projects/{model.project.name}/models/{model_name}/predict' - assert call_args[1]['json']['data'] == data_in - assert call_args[1]['json']['params'] == params + assert (data == result).all().bool() - # check prediction - assert (pred_df == pd.DataFrame(data_out)).all().bool() + # test tables + response_mock(mock_post, pd.DataFrame([{'name': 't1'}])) + tables = database.list_tables() + table = tables[0] - # predict using dict - pred_df = model.predict({'a': 1}) - assert (pred_df == pd.DataFrame(data_out)).all().bool() + self.check_table(table) - # using deferred query - response_mock(mock_post, pd.DataFrame(data_out)) # will be used sql/query + table = database.get_table('t1') + assert table.name == 't1' + self.check_table(table) - query = database.query('select a from t1') - pred_df = model.predict(query, params={'x': '1'}) + # create from query + table2 = database.create_table('t2', query) + check_sql_call(mock_post, f'create table {database.name}.t2 (select * from tbl1)') - check_sql_call(mock_post, f'SELECT m.* FROM (SELECT a FROM t1) as t JOIN {model.project.name}.{model_name} AS m USING x="1"') - assert (pred_df == pd.DataFrame(data_out)).all().bool() + assert table2.name == 't2' + self.check_table(table2) - # time series prediction - query = database.query('select * from t1 where type="house" and saledate>latest') - model.predict(query) + # create from table + table1 = database.get_table('t1') + table1 = table1.filter(b=2) + table3 = database.create_table('t3', table1) + check_sql_call(mock_post, f'create table {database.name}.t3 (SELECT * FROM t1 WHERE b = 2)') - check_sql_call(mock_post, f"SELECT m.* FROM (SELECT * FROM t1 WHERE (type = 'house') AND (saledate > LATEST)) as t JOIN {model.project.name}.{model_name} AS m") - assert (pred_df == pd.DataFrame(data_out)).all().bool() + assert table3.name == 't3' + self.check_table(table3) - # ----------- model managing -------------- - response_mock( + # drop table + database.drop_table('t3') + check_sql_call(mock_post, f'drop table t3') + + + @patch('requests.Session.post') + def check_project_jobs(self, project, mock_post): + + response_mock(mock_post, pd.DataFrame([{ + 'NAME': 'job1', + 'QUERY': 'select 1', + 'start_at': None, + 'end_at': None, + 'next_run_at': None, + 'schedule_str': None, + }])) + + jobs = project.list_jobs() + + check_sql_call(mock_post, "select * from jobs") + + job = jobs[0] + assert job.name == 'job1' + assert job.query_str == 'select 1' + + job.refresh() + check_sql_call( mock_post, - pd.DataFrame([{'NAME': 'm1', 'VERSION': 2, 'STATUS': 'complete'}]) + f"select * from jobs where name = 'job1'" + ) + + project.create_job( + name='job2', + query_str='retrain m1', + repeat_str='1 min', + start_at=dt.datetime(2025, 2, 5, 11, 22), + end_at=dt.date(2030, 1, 2) ) - model.finetune(query, options={'x': 2}) check_sql_call( mock_post, - f'Finetune {model.project.name}.{model_name} FROM {query.database} ({query.sql}) USING x=2' + f"CREATE JOB job2 (retrain m1) START '2025-02-05 11:22:00' END '2030-01-02 00:00:00' EVERY 1 min", + call_stack_num=-2 ) - model.finetune('select a from t1', database='d1') + project.drop_job('job2') + check_sql_call( mock_post, - f'Finetune {model.project.name}.{model_name} FROM d1 (select a from t1)' + f"DROP JOB job2" + ) + + +class TestSimplify(BaseFlow): + + @patch('requests.Session.put') + @patch('requests.Session.post') + def test_flow(self, mock_post, mock_put): + + con = mindsdb_sdk.connect(login='a@b.com') + + # check login + call_args = mock_post.call_args + assert call_args[0][0] == 'https://cloud.mindsdb.com/cloud/login' + assert call_args[1]['json']['email'] == 'a@b.com' + + # --------- databases ------------- + response_mock(mock_post, pd.DataFrame([{'NAME': 'db1'}])) + + databases = con.databases.list() + + check_sql_call(mock_post, "select NAME from information_schema.databases where TYPE='data'") + + database = databases[0] + assert database.name == 'db1' + self.check_database(database) + + database = con.databases.get('db1') + database = con.databases.db1 + self.check_database(database) + + database = con.databases.create( + 'pg1', + engine='postgres', + connection_args={'host': 'localhost'} + ) + check_sql_call(mock_post, 'CREATE DATABASE pg1 WITH ENGINE = "postgres", PARAMETERS = {"host": "localhost"}') + + self.check_database(database) + + con.databases.drop('pg1-a') + check_sql_call(mock_post, 'DROP DATABASE `pg1-a`') + + # --------- projects ------------- + # connection is also default project, check it + self.check_project(con, database) + + projects = con.projects.list() + check_sql_call(mock_post, "select NAME from information_schema.databases where TYPE='project'") + + project = projects[0] + assert project.name == 'db1' + self.check_project(project, database) + + project = con.projects.get('db1') + project = con.projects.db1 + self.check_project(project, database) + + project = con.projects.create('proj1') + check_sql_call( + mock_post, 'CREATE DATABASE proj1 WITH ENGINE = "mindsdb", PARAMETERS = {}') + self.check_project(project, database) + + con.projects.drop('proj1-1') + check_sql_call(mock_post, 'DROP DATABASE `proj1-1`') + + # test upload file + response_mock(mock_post, pd.DataFrame([{'NAME': 'files'}])) + database = con.databases.files + # create file + df = pd.DataFrame([{'s': '1'}, {'s': 'a'}]) + database.tables.create('my_file', df) + + call_args = mock_put.call_args + assert call_args[0][0] == 'https://cloud.mindsdb.com/api/files/my_file' + assert call_args[1]['data']['name'] == 'my_file' + assert 'file' in call_args[1]['files'] + + # --------- handlers ------------- + # data + response_mock(mock_post, + pd.DataFrame([{'NAME': 'mysql', 'TYPE': 'data', 'TITLE': 'MySQL', + 'DESCRIPTION': "MindsDB handler for MySQL", + 'CONNECTION_ARGS': {'a': 1}}])) + + handlers = con.data_handlers.list() + + check_sql_call(mock_post, "show handlers WHERE type = 'data'") + + handler = handlers[0] + assert handler.name == 'mysql' + assert handler.title == 'MySQL' + + _ = con.ml_handlers.get('mysql') + _ = con.ml_handlers.mysql + + # ml + response_mock(mock_post, + pd.DataFrame([{'NAME': 'openai', 'TYPE': 'ml', 'TITLE': 'OpenAI', + 'DESCRIPTION': "MindsDB handler for OpenAI", + 'CONNECTION_ARGS': {'a': 1}}])) + + handlers = con.ml_handlers.list() + + check_sql_call(mock_post, "show handlers WHERE type = 'ml'") + + handler = handlers[0] + assert handler.name == 'openai' + assert handler.title == 'OpenAI' + + _ = con.ml_handlers.get('openai') + openai_handler = con.ml_handlers.openai + + # --------- ml_engines ------------- + response_mock(mock_post, pd.DataFrame([{ 'NAME': 'openai1', 'HANDLER': 'openai', 'CONNECTION_DATA': {'a': 1}}])) + + ml_engines = con.ml_engines.list() + + check_sql_call(mock_post, "show ml_engines") + + ml_engine = ml_engines[0] + assert ml_engine.name == 'openai1' + assert ml_engine.handler == 'openai' + + _ = con.ml_engines.get('openai1') + _ = con.ml_engines.openai1 + + con.ml_engines.create( + 'openai1', + openai_handler, + connection_data={'api_key': '111'} ) + check_sql_call(mock_post, 'CREATE ML_ENGINE openai1 FROM openai USING api_key = "111"') + + con.ml_engines.create( + 'openai1', + 'openai', + connection_data={'api_key': '111'} + ) + check_sql_call(mock_post, 'CREATE ML_ENGINE openai1 FROM openai USING api_key = "111"') + + con.ml_engines.drop('openai1') + check_sql_call(mock_post, 'DROP ML_ENGINE openai1') + + def check_project(self, project, database): + self.check_project_views( project, database) + + self.check_project_models(project, database) + + self.check_project_models_versions(project, database) + + self.check_project_jobs(project) + + @patch('requests.Session.get') + @patch('requests.Session.post') + def check_project_views(self, project, database, mock_post, mock_get): + # ----------- views -------------- + + response_mock(mock_get, [ + {'name': 'v1', 'type': 'view'}, + ]) + + views = project.views.list() + view = views[0] # View object - model.retrain(query, options={'x': 2}) + assert view.name == 'v1' + + # view has the same behaviour as table + self.check_table(view) + + # get existing + view = project.views.get('v1') + view = project.views.v1 + + assert view.name == 'v1' + self.check_table(view) + + # create + view = project.views.create( + 'v2', + database='example_db', # optional, can also be database object + sql='select * from t1' + ) + check_sql_call(mock_post, 'CREATE VIEW v2 from example_db (select * from t1)') + + assert view.name == 'v2' + self.check_table(view) + + # using query object + view = project.views.create( + 'v2', + sql=project.query('select * from t1') + ) + check_sql_call(mock_post, f'CREATE VIEW v2 from {project.name} (select * from t1)') + + assert view.name == 'v2' + self.check_table(view) + + # drop + project.views.drop('v2') + check_sql_call(mock_post, 'DROP VIEW v2') + + project.views.drop('v2-v') + check_sql_call(mock_post, 'DROP VIEW `v2-v`') + + @patch('requests.Session.post') + def check_project_models(self, project, database, mock_post): + # ----------- models -------------- + response_mock( + mock_post, + pd.DataFrame([{'NAME': 'm1', 'VERSION': 1, 'STATUS': 'complete'}]) + ) + + models = project.models.list() + model = models[0] # Model object + + assert model.name == 'm1' + assert model.get_status() == 'complete' + + self.check_model(model, database) + + model = project.models.get('m1') + model = project.models.m1 + + assert model.name == 'm1' + self.check_model(model, database) + + # create, using params + response_mock( + mock_post, + pd.DataFrame([{'NAME': 'm2', 'VERSION': 1, 'STATUS': 'complete'}]) + ) + model = project.models.create( + 'm2', + predict='price', + engine='lightwood', + database='example_db', + query='select * from t1', + timeseries_options={ + 'order': 'date', + 'group': ['a', 'b'], + 'window': 10, + 'horizon': 2 + }, + module = 'LightGBM', # has to be in options + ) check_sql_call( mock_post, - f'RETRAIN {model.project.name}.{model_name} FROM {query.database} ({query.sql}) USING x=2' + f'CREATE PREDICTOR m2 FROM example_db (select * from t1) PREDICT price ORDER BY date GROUP BY a, b WINDOW 10 HORIZON 2 USING module="LightGBM", `engine`="lightwood"' + ) + assert model.name == 'm2' + self.check_model(model, database) + + # create, using deferred query. + query = database.query('select * from t2') + model = project.models.create( + 'm2', + predict='price', + query=query, ) - model.retrain('select a from t1', database='d1') check_sql_call( mock_post, - f'RETRAIN {model.project.name}.{model_name} FROM d1 (select a from t1)' + f'CREATE PREDICTOR m2 FROM {database.name} (select * from t2) PREDICT price' ) - # describe - if not isinstance(model, ModelVersion): # not working (DESCRIBE db1.m1.2.ensemble not parsed) + assert model.name == 'm2' + self.check_model(model, database) - info = model.describe() # dataframe on json. need to discuss - check_sql_call( mock_post, f'DESCRIBE {model.project.name}.{model_name}') + project.models.drop('m3-a') + check_sql_call(mock_post, f'DROP PREDICTOR `m3-a`') - info = model.describe('ensemble') # dataframe on json. need to discuss - check_sql_call(mock_post, f'DESCRIBE {model.project.name}.{model_name}.ensemble') + # the old way of join model with table + sql = ''' + SELECT m.saledate as date, m.ma as forecast + FROM mindsdb.house_sales_model as m + JOIN example_db.demo_data.house_sales as t + WHERE t.saledate > LATEST AND t.type = 'house' + AND t.bedrooms=2 + LIMIT 4; + ''' + result_df = project.query(sql).fetch() - # ----------- versions -------------- + check_sql_call(mock_post, sql) - # list all versions - models = model.list_versions() - check_sql_call(mock_post, f"SELECT * FROM models_versions WHERE NAME = '{model.name}'", database=model.project.name) - model2 = models[0] # Model object + # check ts params + with pytest.raises(AttributeError): + project.models.create( + 'm2', + predict='price', + engine='lightwood', + database='example_db', + query='select * from t1', + options={ + 'module': 'LightGBM' + }, + timeseries_options={ + 'order': 'date', + 'group1': ['a', 'b'], + } + ) - model2 = model.get_version(2) + @patch('requests.Session.post') + def check_project_models_versions(self, project, database, mock_post): + # ----------- model version -------------- + response_mock( + mock_post, + pd.DataFrame([{'NAME': 'm1', 'VERSION': 2, 'STATUS': 'complete'}]) + ) - # change active version - model2.set_active(version=3) + # list + models = project.models.list(with_versions=True) + model = models[0] + assert isinstance(model, ModelVersion) - # get call before last call - mock_call = mock_post.call_args_list[-2] - assert mock_call[1]['json']['query'] == f"update models_versions set active=1 where (name = '{model2.name}') AND (version = 3)" + assert model.name == 'm1' + assert model.version == 2 + + self.check_model(model, database) + + # get + model = project.models.get('m1', version=1) + + assert model.name == 'm1' + assert model.version == 2 + + self.check_model(model, database) + + project.models.m1.drop_version(1) + check_sql_call(mock_post, f"delete from models_versions where name='m1' and version=1") @patch('requests.Session.post') def check_database(self, database, mock_post): @@ -447,46 +890,58 @@ def check_database(self, database, mock_post): # test tables response_mock(mock_post, pd.DataFrame([{'name': 't1'}])) - tables = database.list_tables() + tables = database.tables.list() table = tables[0] self.check_table(table) - table = database.get_table('t1') + table = database.tables.get('t1') + table = database.tables.t1 assert table.name == 't1' self.check_table(table) # create from query - table2 = database.create_table('t2', query) + table2 = database.tables.create('t2', query) check_sql_call(mock_post, f'create table {database.name}.t2 (select * from tbl1)') assert table2.name == 't2' self.check_table(table2) + # -- insert into table -- + # from dataframe + table2.insert(pd.DataFrame([{'s': '1', 'x': 1}, {'s': 'a', 'x': 2}])) + check_sql_call(mock_post, "INSERT INTO t2(s, x) VALUES ('1', 1), ('a', 2)") + + # from query + table2.insert(query) + check_sql_call(mock_post, f"INSERT INTO {database.name}.t2 (select * from tbl1)") + + # -- delete in table -- + table2.delete(a=1, b='2') + check_sql_call(mock_post, f"DELETE FROM {database.name}.t2 WHERE (a = 1) AND (b = '2')") + + # -- update table -- + # from query + table2.update(query, on=['a', 'b']) + check_sql_call(mock_post, f"UPDATE {database.name}.t2 ON a, b FROM (select * from tbl1)") + + # from dict + table2.update({'a': '1', 'b': 1}, filters={'x': 3}) + check_sql_call(mock_post, f"UPDATE t2 SET a='1', b=1 WHERE x=3") + # create from table - table1 = database.get_table('t1') + table1 = database.tables.t1 table1 = table1.filter(b=2) - table3 = database.create_table('t3', table1) + table3 = database.tables.create('t3', table1) check_sql_call(mock_post, f'create table {database.name}.t3 (SELECT * FROM t1 WHERE b = 2)') assert table3.name == 't3' self.check_table(table3) # drop table - database.drop_table('t3') + database.tables.drop('t3') check_sql_call(mock_post, f'drop table t3') - - @patch('requests.Session.post') - def check_table(self, table, mock_post): - response_mock(mock_post, pd.DataFrame([{'x': 'a'}])) - - table = table.filter(a=3, b='2') - table = table.limit(3) - table.fetch() - - check_sql_call(mock_post, f'SELECT * FROM {table.name} WHERE (a = 3) AND (b = \'2\') LIMIT 3') - @patch('requests.Session.post') def check_project_jobs(self, project, mock_post): @@ -499,7 +954,7 @@ def check_project_jobs(self, project, mock_post): 'schedule_str': None, }])) - jobs = project.list_jobs() + jobs = project.jobs.list() check_sql_call(mock_post, "select * from jobs") @@ -507,13 +962,17 @@ def check_project_jobs(self, project, mock_post): assert job.name == 'job1' assert job.query_str == 'select 1' + job = project.jobs.job1 + assert job.name == 'job1' + assert job.query_str == 'select 1' + job.refresh() check_sql_call( mock_post, f"select * from jobs where name = 'job1'" ) - project.create_job( + project.jobs.create( name='job2', query_str='retrain m1', repeat_str='1 min', @@ -527,7 +986,7 @@ def check_project_jobs(self, project, mock_post): call_stack_num=-2 ) - project.drop_job('job2') + project.jobs.drop('job2') check_sql_call( mock_post,