diff --git a/CHANGELOG.md b/CHANGELOG.md index 9273ecfc..b317e306 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +# WIP + +## Significant changes + +* `DatapipeApp` becomes main entry point to work with pipeline +* BatchTransform metadata has status "pending"/"clean"/"failed" +* `DatapipeApp.ingest_data` updates BatchTransform metadata on write + # 0.14.1 * Refactor metadata handling into `datapipe.meta` submodule diff --git a/datapipe/cli.py b/datapipe/cli.py index 79e678b4..f6a720a8 100644 --- a/datapipe/cli.py +++ b/datapipe/cli.py @@ -182,8 +182,9 @@ def setup_logging(): ) trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(cloud_trace_exporter)) # type: ignore + executor_instance: Executor if executor == "SingleThreadExecutor": - ctx.obj["executor"] = SingleThreadExecutor() + executor_instance = SingleThreadExecutor() elif executor == "RayExecutor": import ray @@ -191,12 +192,14 @@ def setup_logging(): ray_ctx = ray.init() - ctx.obj["executor"] = RayExecutor() + executor_instance = RayExecutor() else: raise ValueError(f"Unknown executor: {executor}") + ctx.obj["executor"] = executor_instance + with tracer.start_as_current_span("init"): - ctx.obj["pipeline"] = load_pipeline(pipeline) + ctx.obj["pipeline"] = load_pipeline(pipeline).with_executor(executor_instance) @cli.group() diff --git a/datapipe/compute.py b/datapipe/compute.py index 1fe7dc3d..bae97983 100644 --- a/datapipe/compute.py +++ b/datapipe/compute.py @@ -1,9 +1,11 @@ import hashlib import logging +import time from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple +import pandas as pd from opentelemetry import trace from datapipe.datatable import DataStore, DataTable @@ -192,6 +194,15 @@ def get_change_list_process_ids( ) -> Tuple[int, Iterable[IndexDF]]: raise NotImplementedError() + def notify_change_list( + self, + ds: DataStore, + change_list: ChangeList, + now: Optional[float] = None, + run_config: Optional[RunConfig] = None, + ) -> None: + pass + def run_full( self, ds: DataStore, @@ -237,13 +248,66 @@ class Pipeline: class DatapipeApp: - def __init__(self, ds: DataStore, catalog: Catalog, pipeline: Pipeline): + def __init__( + self, + ds: DataStore, + catalog: Catalog, + pipeline: Pipeline, + executor: Optional[Executor] = None, + ): self.ds = ds self.catalog = catalog self.pipeline = pipeline + self.executor = executor self.steps = build_compute(ds, catalog, pipeline) + def with_executor(self, executor: Executor) -> "DatapipeApp": + self.executor = executor + + return self + + def consumers(self, table_name: str) -> List[ComputeStep]: + return [ + step + for step in self.steps + if table_name in [i.dt.name for i in step.input_dts] + ] + + def producers(self, table_name: str) -> List[ComputeStep]: + return [ + step + for step in self.steps + if table_name in [o.name for o in step.output_dts] + ] + + def ingest_data( + self, + table_name: str, + data_df: pd.DataFrame, + now: Optional[float] = None, + ) -> ChangeList: + table = self.ds.get_table(table_name) + now = now or time.time() + changes = table.store_chunk(data_df, now=now) + + change_list = ChangeList({table_name: changes}) + + for step in self.consumers(table_name): + step.notify_change_list(self.ds, change_list, now=now) + + return change_list + + def ingest_and_process_data(self, table_name: str, data_df: pd.DataFrame) -> None: + cl = self.ingest_data(table_name, data_df) + + run_steps_changelist( + self.ds, + self.steps, + cl, + executor=self.executor, + ) + def build_compute( ds: DataStore, catalog: Catalog, pipeline: Pipeline diff --git a/datapipe/meta/sql_meta.py b/datapipe/meta/sql_meta.py index 7374226e..c6186bcc 100644 --- a/datapipe/meta/sql_meta.py +++ b/datapipe/meta/sql_meta.py @@ -2,6 +2,7 @@ import math import time from dataclasses import dataclass +from enum import Enum from typing import ( TYPE_CHECKING, Any, @@ -18,6 +19,7 @@ import cityhash import pandas as pd import sqlalchemy as sa +from opentelemetry import trace from datapipe.run_config import RunConfig from datapipe.sql_util import sql_apply_idx_filter_to_table, sql_apply_runconfig_filter @@ -37,6 +39,9 @@ from datapipe.datatable import DataStore +tracer = trace.get_tracer("datapipe.meta.sql_meta") + + TABLE_META_SCHEMA: List[sa.Column] = [ sa.Column("hash", sa.Integer), sa.Column("create_ts", sa.Float), # Время создания строки @@ -452,12 +457,10 @@ def get_agg_cte( return (keys, sql.cte(name=f"{tbl.name}__update")) -TRANSFORM_META_SCHEMA: DataSchema = [ - sa.Column("process_ts", sa.Float), # Время последней успешной обработки - sa.Column("is_success", sa.Boolean), # Успешно ли обработана строка - sa.Column("priority", sa.Integer), # Приоритет обработки (чем больше, тем выше) - sa.Column("error", sa.String), # Текст ошибки -] +class TransformStatus(Enum): + PENDING = "pending" + CLEAN = "clean" + FAILED = "failed" class TransformMetaTable: @@ -473,7 +476,17 @@ def __init__( self.primary_schema = primary_schema self.primary_keys = [i.name for i in primary_schema] - self.sql_schema = [i._copy() for i in primary_schema + TRANSFORM_META_SCHEMA] + self.sql_schema = [i._copy() for i in primary_schema] + [ + sa.Column("update_ts", sa.Float), # Время последнего обновления + sa.Column("process_ts", sa.Float), # Время последней успешной обработки + sa.Column("priority", sa.Integer), # Приоритет обработки + sa.Column( + "status", + sa.Enum(TransformStatus, name="transform_status"), + index=True, + ), + sa.Column("error", sa.String), # Текст ошибки + ] self.sql_table = sa.Table( name, @@ -506,7 +519,7 @@ def insert_rows( [ { "process_ts": 0, - "is_success": False, + "status": TransformStatus.PENDING.value, "priority": 0, "error": None, **idx_dict, # type: ignore @@ -520,7 +533,42 @@ def insert_rows( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_success( + def mark_rows_pending( + self, + idx: IndexDF, + update_ts: float, + run_config: Optional[RunConfig] = None, + ) -> None: + idx = cast(IndexDF, idx[self.primary_keys].drop_duplicates().dropna()) + if len(idx) == 0: + return + + insert_sql = self.dbconn.insert(self.sql_table).values( + [ + { + "update_ts": update_ts, + "process_ts": 0, + "status": TransformStatus.PENDING.value, + "error": None, + **idx_dict, # type: ignore + } + for idx_dict in idx.to_dict(orient="records") + ] + ) + + sql = insert_sql.on_conflict_do_update( + index_elements=self.primary_keys, + set_={ + "update_ts": update_ts, + "status": TransformStatus.PENDING.value, + "error": None, + }, + ) + + with self.dbconn.con.begin() as con: + con.execute(sql) + + def mark_rows_clean( self, idx: IndexDF, process_ts: float, @@ -544,7 +592,7 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, + "status": TransformStatus.CLEAN.value, "priority": 0, "error": None, } @@ -560,7 +608,7 @@ def mark_rows_processed_success( [ { "process_ts": process_ts, - "is_success": True, + "status": TransformStatus.CLEAN.value, "priority": 0, "error": None, **idx_dict, # type: ignore @@ -573,7 +621,7 @@ def mark_rows_processed_success( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": True, + "status": TransformStatus.CLEAN.value, "error": None, }, ) @@ -582,7 +630,7 @@ def mark_rows_processed_success( with self.dbconn.con.begin() as con: con.execute(sql) - def mark_rows_processed_error( + def mark_rows_failed( self, idx: IndexDF, process_ts: float, @@ -598,8 +646,9 @@ def mark_rows_processed_error( insert_sql = self.dbconn.insert(self.sql_table).values( [ { + "update_ts": 0, "process_ts": process_ts, - "is_success": False, + "status": TransformStatus.FAILED.value, "priority": 0, "error": error, **idx_dict, # type: ignore @@ -612,7 +661,7 @@ def mark_rows_processed_error( index_elements=self.primary_keys, set_={ "process_ts": process_ts, - "is_success": False, + "status": TransformStatus.FAILED.value, "error": error, }, ) @@ -637,16 +686,12 @@ def mark_all_rows_unprocessed( self, run_config: Optional[RunConfig] = None, ) -> None: - update_sql = ( - sa.update(self.sql_table) - .values( - { - "process_ts": 0, - "is_success": False, - "error": None, - } - ) - .where(self.sql_table.c.is_success == True) + update_sql = sa.update(self.sql_table).values( + { + "process_ts": 0, + "status": TransformStatus.PENDING.value, + "error": None, + } ) sql = sql_apply_runconfig_filter( @@ -657,6 +702,84 @@ def mark_all_rows_unprocessed( with self.dbconn.con.begin() as con: con.execute(sql) + def _build_changed_idx_sql( + self, + run_config: Optional[RunConfig] = None, + ) -> Any: + sql = ( + sa.select(sa.func.count()) + .select_from(self.sql_table) + .where( + self.sql_table.c.status != TransformStatus.CLEAN.value, + ) + ) + + sql = sql_apply_runconfig_filter( + sql, self.sql_table, self.primary_keys, run_config + ) + + return sql + + def get_changed_idx_count(self, run_config: Optional[RunConfig] = None) -> int: + sql = self._build_changed_idx_sql(run_config=run_config) + + with self.dbconn.con.begin() as con: + res = con.execute(sql).scalar() + + return cast(int, res) + + def get_full_process_ids( + self, + chunk_size: int, + run_config: Optional[RunConfig] = None, + ) -> Tuple[int, Iterable[IndexDF]]: + """ + Метод для получения перечня индексов для обработки. + + Returns: (idx_size, iterator) + + - idx_size - количество индексов требующих обработки + - idx_df - датафрейм без колонок с данными, только индексная колонка + """ + + # if len(self.input_dts) == 0: + # return (0, iter([])) + + idx_count = self.get_changed_idx_count(run_config=run_config) + + sql = self._build_changed_idx_sql(run_config=run_config) + + # join_keys, u1 = build_changed_idx_sql( + # ds=ds, + # meta_table=self.meta_table, + # input_dts=self.input_dts, + # transform_keys=self.transform_keys, + # run_config=run_config, + # order_by=self.order_by, + # order=self.order, # type: ignore # pylance is stupid + # ) + + # Список ключей из фильтров, которые нужно добавить в результат + # extra_filters: LabelDict + # if run_config is not None: + # extra_filters = { + # k: v for k, v in run_config.filters.items() if k not in join_keys + # } + # else: + # extra_filters = {} + + def alter_res_df(): + with self.dbconn.con.begin() as con: + for df in pd.read_sql_query(sql, con=con, chunksize=chunk_size): + df = df[self.transform_keys] + + # for k, v in extra_filters.items(): + # df[k] = v + + yield cast(IndexDF, df) + + return math.ceil(idx_count / chunk_size), alter_res_df() + def sql_apply_filters_idx_to_subquery( sql: Any, @@ -788,7 +911,7 @@ def build_changed_idx_sql( out: Any = ( sa.select( *[sa.column(k) for k in transform_keys] - + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.is_success] + + [tr_tbl.c.process_ts, tr_tbl.c.priority, tr_tbl.c.status] ) .select_from(tr_tbl) .group_by(*[sa.column(k) for k in transform_keys]) @@ -826,11 +949,11 @@ def build_changed_idx_sql( .where( sa.or_( sa.and_( - out.c.is_success == True, # noqa + out.c.status == TransformStatus.CLEAN.value, agg_of_aggs.c.update_ts > out.c.process_ts, ), - out.c.is_success != True, # noqa - out.c.process_ts == None, # noqa + out.c.status != TransformStatus.CLEAN.value, + out.c.process_ts == None, ) ) ) diff --git a/datapipe/step/batch_transform.py b/datapipe/step/batch_transform.py index 46aad4c7..e3ebd4fd 100644 --- a/datapipe/step/batch_transform.py +++ b/datapipe/step/batch_transform.py @@ -21,8 +21,6 @@ import pandas as pd from opentelemetry import trace -from sqlalchemy import alias, func, select -from sqlalchemy.sql.expression import select from tqdm_loggable.auto import tqdm from datapipe.compute import ( @@ -193,22 +191,7 @@ def get_changed_idx_count( run_config: Optional[RunConfig] = None, ) -> int: run_config = self._apply_filters_to_run_config(run_config) - _, sql = build_changed_idx_sql( - ds=ds, - meta_table=self.meta_table, - input_dts=self.input_dts, - transform_keys=self.transform_keys, - run_config=run_config, - ) - - with ds.meta_dbconn.con.begin() as con: - idx_count = con.execute( - select(*[func.count()]).select_from( - alias(sql.subquery(), name="union_select") - ) - ).scalar() - - return cast(int, idx_count) + return self.meta_table.get_changed_idx_count(run_config=run_config) def get_full_process_ids( self, @@ -216,57 +199,13 @@ def get_full_process_ids( chunk_size: Optional[int] = None, run_config: Optional[RunConfig] = None, ) -> Tuple[int, Iterable[IndexDF]]: - """ - Метод для получения перечня индексов для обработки. - - Returns: (idx_size, iterator) - - - idx_size - количество индексов требующих обработки - - idx_df - датафрейм без колонок с данными, только индексная колонка - """ - run_config = self._apply_filters_to_run_config(run_config) - chunk_size = chunk_size or self.chunk_size - with tracer.start_as_current_span("compute ids to process"): - if len(self.input_dts) == 0: - return (0, iter([])) - - idx_count = self.get_changed_idx_count( - ds=ds, + run_config = self._apply_filters_to_run_config(run_config) + return self.meta_table.get_full_process_ids( + chunk_size=chunk_size or self.chunk_size, run_config=run_config, ) - join_keys, u1 = build_changed_idx_sql( - ds=ds, - meta_table=self.meta_table, - input_dts=self.input_dts, - transform_keys=self.transform_keys, - run_config=run_config, - order_by=self.order_by, - order=self.order, # type: ignore # pylance is stupid - ) - - # Список ключей из фильтров, которые нужно добавить в результат - extra_filters: LabelDict - if run_config is not None: - extra_filters = { - k: v for k, v in run_config.filters.items() if k not in join_keys - } - else: - extra_filters = {} - - def alter_res_df(): - with ds.meta_dbconn.con.begin() as con: - for df in pd.read_sql_query(u1, con=con, chunksize=chunk_size): - df = df[self.transform_keys] - - for k, v in extra_filters.items(): - df[k] = v - - yield cast(IndexDF, df) - - return math.ceil(idx_count / chunk_size), alter_res_df() - def get_change_list_process_ids( self, ds: DataStore, @@ -357,7 +296,7 @@ def store_batch_result( changes.append(res_dt.name, del_idx) - self.meta_table.mark_rows_processed_success( + self.meta_table.mark_rows_clean( idx, process_ts=process_ts, run_config=run_config ) @@ -386,7 +325,7 @@ def store_batch_err( ), ) - self.meta_table.mark_rows_processed_error( + self.meta_table.mark_rows_failed( idx, process_ts=process_ts, error=str(e), @@ -402,6 +341,22 @@ def fill_metadata(self, ds: DataStore) -> None: def reset_metadata(self, ds: DataStore) -> None: self.meta_table.mark_all_rows_unprocessed() + def notify_change_list( + self, + ds: DataStore, + change_list: ChangeList, + now: Optional[float] = None, + run_config: Optional[RunConfig] = None, + ) -> None: + now = now or time.time() + + for idx in change_list.changes.values(): + self.meta_table.mark_rows_pending( + idx, + update_ts=now, + run_config=run_config, + ) + def get_batch_input_dfs( self, ds: DataStore, diff --git a/docs/source/batch-transform-task-lifecycle.drawio b/docs/source/batch-transform-task-lifecycle.drawio new file mode 100644 index 00000000..c3ea0011 --- /dev/null +++ b/docs/source/batch-transform-task-lifecycle.drawio @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/docs/source/batch-transform-task-lifecycle.md b/docs/source/batch-transform-task-lifecycle.md new file mode 100644 index 00000000..3db919ca --- /dev/null +++ b/docs/source/batch-transform-task-lifecycle.md @@ -0,0 +1,2 @@ +# Жизненный цикл одной задачи на трансформацию + diff --git a/pyproject.toml b/pyproject.toml index f14b8fbe..d52a67ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "datapipe-core" -version = "0.14.1" +version = "0.15.0-dev.1" description = "`datapipe` is a realtime incremental ETL library for Python application" readme = "README.md" repository = "https://github.com/epoch8/datapipe" diff --git a/tests/test_transform_meta.py b/tests/test_transform_meta.py index 797bd577..3b2da299 100644 --- a/tests/test_transform_meta.py +++ b/tests/test_transform_meta.py @@ -1,13 +1,16 @@ from typing import List +import pandas as pd import pytest from pytest_cases import parametrize from sqlalchemy import Column, Integer -from datapipe.datatable import MetaTable -from datapipe.step.batch_transform import BatchTransformStep -from datapipe.store.database import DBConn +from datapipe.compute import Catalog, DatapipeApp, Pipeline, Table +from datapipe.datatable import DataStore, MetaTable +from datapipe.step.batch_transform import BatchTransform, BatchTransformStep +from datapipe.store.database import DBConn, TableStoreDB from datapipe.types import MetaSchema +from tests.util import assert_df_equal def make_mt(name, dbconn, schema_keys) -> MetaTable: @@ -103,3 +106,52 @@ def test_compute_transform_schema_fail( BatchTransformStep.compute_transform_schema( inp_mts, out_mts, transform_keys=transform_keys ) + + +TEST_SCHEMA: List[Column] = [ + Column("id", Integer, primary_key=True), + Column("a", Integer), +] + + +def noop_func(df): + return [] + + +def test_transform_meta_updates_on_datatable_write( + dbconn: DBConn, +): + ds = DataStore(dbconn, create_meta_table=True) + + app = DatapipeApp( + ds=ds, + catalog=Catalog( + { + "tbl": Table(store=TableStoreDB(dbconn, "tbl", TEST_SCHEMA, True)), + } + ), + pipeline=Pipeline( + [ + BatchTransform( + func=noop_func, + inputs=["tbl"], + outputs=[], + ) + ] + ), + ) + + step = app.steps[0] + assert isinstance(step, BatchTransformStep) + + app.ingest_data( + "tbl", + pd.DataFrame.from_records( + [ + {"id": 1, "a": 1}, + ] + ), + now=1000, + ) + + assert step.meta_table.get_changed_idx_count() == 1 diff --git a/tests/util.py b/tests/util.py index 2f21ec2b..7ba4fd6b 100644 --- a/tests/util.py +++ b/tests/util.py @@ -1,4 +1,5 @@ -from typing import List +from typing import List, cast + import pandas as pd from datapipe.datatable import DataTable @@ -13,22 +14,22 @@ def assert_idx_equal(a, b): assert a == b -def assert_df_equal(a: pd.DataFrame, b: pd.DataFrame, index_cols=['id']) -> bool: +def assert_df_equal(a: pd.DataFrame, b: pd.DataFrame, index_cols=["id"]) -> bool: a = a.set_index(index_cols) b = b.set_index(index_cols) assert_idx_equal(a.index, b.index) - eq_rows = (a.sort_index() == b.sort_index()).all(axis='columns') + eq_rows = (a.sort_index() == b.sort_index()).all(axis="columns") if eq_rows.all(): return True else: - print('Difference') - print('A:') + print("Difference") + print("A:") print(a.loc[-eq_rows]) - print('B:') + print("B:") print(b.loc[-eq_rows]) raise AssertionError @@ -51,8 +52,8 @@ def assert_idx_no_duplicates(idx: IndexDF, index_cols: List[str]) -> bool: if len(duplicates) == 0: return True else: - idx = idx.loc[idx.index].sort_values(index_cols) - print('Duplicated found:') + idx = cast(IndexDF, idx.loc[idx.index].sort_values(index_cols)) + print("Duplicated found:") print(idx) raise AssertionError