-
Notifications
You must be signed in to change notification settings - Fork 319
feat: ducklake
destination
#3015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
✅ Deploy Preview for dlt-hub-docs canceled.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks really good! here's summary of my suggestion:
- simplify ducklake credentials class (ie. remove
__init__
, implement_conn_str()
- load extensions in
borrow_conn
- we'll need to tweak how connections are opened in ibis handover (but that's easy)
@@ -202,6 +204,7 @@ def is_partial(self) -> bool: | |||
return self.database == ":pipeline:" | |||
|
|||
def on_resolved(self) -> None: | |||
# TODO Why don't we support `:memory:` string? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we support it. you can pass duckdb instance instead of credentials and destination factory will use it:
https://dlthub.com/docs/dlt-ecosystem/destinations/duckdb#destination-configuration (those docs will benefit from better section titles)
:memory:
database is wiped out when connection is closed. during the loading the connection will be opened and closed several times. ie. to migrate schemas. and at the end all the data will be lost because we close all connection when loader exits
@@ -19,6 +19,8 @@ | |||
DUCK_DB_NAME_PAT = "%s.duckdb" | |||
|
|||
|
|||
# NOTE duckdb extensions are only loaded when using the dlt cursor. They are not | |||
# loaded when using the native connection (e.g., when passing it to Ibis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's a mechanism to load extensions at start. it could be made easier for implementers but right now you can update extensions
in on_resolve
of DuckLakeCredentials(DuckDbBaseCredentials) (that you implement below).
some docs: https://dlthub.com/docs/dlt-ecosystem/destinations/duckdb#additional-configuration
another option you have is to subclass sql_client. see the base class.
class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
dbapi: ClassVar[DBApi] = duckdb
def __init__(
self,
dataset_name: str,
staging_dataset_name: str,
credentials: DuckDbBaseCredentials,
capabilities: DestinationCapabilitiesContext,
) -> None:
super().__init__(None, dataset_name, staging_dataset_name, capabilities)
self._conn: duckdb.DuckDBPyConnection = None
self.credentials = credentials
# set additional connection options so derived class can change it
# TODO: move that to methods that can be overridden, include local_config
self._pragmas = ["enable_checkpoint_on_shutdown"]
self._global_config: Dict[str, Any] = {
"TimeZone": "UTC",
"checkpoint_threshold": "1gb",
}
@raise_open_connection_error
def open_connection(self) -> duckdb.DuckDBPyConnection:
self._conn = self.credentials.borrow_conn(
pragmas=self._pragmas,
global_config=self._global_config,
local_config={
"search_path": self.fully_qualified_dataset_name(),
},
)
return self._conn
and inject extensions on init or when connection is being opened
@@ -546,3 +558,39 @@ def __del__(self) -> None: | |||
if self.memory_db: | |||
self.memory_db.close() | |||
self.memory_db = None | |||
|
|||
|
|||
def _install_extension(duckdb_sql_client: DuckDbSqlClient, extension_name: LiteralString) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mhmmm I think the code that adds extensions in borrow_conn
will suffice. if not we can move those utils there?
class DuckLakeCredentials(DuckDbCredentials): | ||
def __init__( | ||
self, | ||
# TODO how does duckdb resolve the name of the database to the name of the dataset / pipeline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here's something that I may not fully grasp. but DuckLakeCredentials
will create :memory:
instance
- to which you attach
catalog
below - to which you attach
storage
- that gets configured with extensions and settings in
DuckLakeCredentials
(self) - and this instance
DuckLakeCredentials
is used to borrow_con
so what should assume dataset_name
here? catalog database if it is dukcdb? pls see below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the default case, here's what I'm currently aiming for:
pipeline = dlt.pipeline("jaffle_shop", destination="ducklake")
pipeline.run(...)
- a
duckdb
instance is created in:memory:
; we call it theducklake_client
- the
ducklake_client
installs theducklake
extension for duckdb (needs to be done once per system) - the
ducklake_client
uses theATTACH
command to load acatalog
andstorage
- the
catalog
is a duckdb instance on disk (with extension.ducklake
instead of.duckdb
by convention) - the default
storage
is completely handled by DuckDB / DuckLake
The outcome is
|- pipeline.py
|- jaffle_shop.ducklake # catalog file (if duckdb or sqlite)
|- jaffle_shop.ducklake.files/ # storage
|- main/ # schema level
|- customers/ # table level
|- data.parquet # data
|- orders/
Design
- The
DuckLakeCredentials
inherits fromDuckDbCredentials
and the "main" credentials are used to define theducklake_client
- We always use an in-memory DuckDB connection for the
ducklake_client
# TODO how does duckdb resolve the name of the database to the name of the dataset / pipeline | ||
ducklake_name: str = "ducklake", | ||
*, | ||
catalog_database: Optional[Union[ConnectionStringCredentials, DuckDbCredentials]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
postgres, mysql, duckdb, motherduck are all ConnectionStringCredentials
so maybe that's enough to put here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can use drivername
to distinguish them
return caps | ||
|
||
|
||
# TODO support connecting to a snapshot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would be amazing but we can do that later. snapshots mean reproducible local environments that you can get with 0 copy
attach_statement = f"ATTACH IF NOT EXISTS 'ducklake:{ducklake_name}.ducklake'" | ||
if storage: | ||
# TODO handle storage credentials by creating secrets | ||
attach_statement += f" (DATA_PATH {storage.bucket_url})" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you should pass storage
to create_secret
before you attach (after you open the connection)
) | ||
|
||
|
||
def test_native_duckdb_workflow(tmp_path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to do a few "smoke tests". the next step would be to enable ducklake to be tested for exactly the same tests as duckdb using ie. local duckdb as catalog and local filesystem as storage.
let's do another iteration of this ticket and then I'll look at this. I was able to do the same with iceberg
destination so I'm pretty sure it will work
|
||
|
||
# TODO add connection to a specific snapshot | ||
# TODO does it make sense for ducklake to have a staging destination? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point see here: #1692
|
||
return DuckLakeClient | ||
|
||
def _raw_capabilities(self) -> DestinationCapabilitiesContext: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: ducklake will support upsert
(MERGE INTO) so we can enable this strategy to see if it works
Related Issues
Questions / tasks
chess.duckdb
fromdlt.pipeline(..., destination="duckdb")