Skip to content

Commit 7a2cb59

Browse files
committed
Add import_csv_pandas and import_csv_dask utility primitives
1 parent 486e720 commit 7a2cb59

File tree

5 files changed

+62
-2
lines changed

5 files changed

+62
-2
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ jobs:
5353
pip install "setuptools>=64" --upgrade
5454
5555
# Install package in editable mode.
56-
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
56+
pip install --use-pep517 --prefer-binary --editable=.[io,test,develop]
5757
5858
- name: Run linter and software tests
5959
run: |

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
## Unreleased
55

66
- Add SQL runner utility primitives to `io.sql` namespace
7+
- Add `import_csv_pandas` and `import_csv_dask` utility primitives
78

89

910
## 2023/11/06 v0.0.2

cratedb_toolkit/util/database.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,61 @@ def ensure_repository_az(
194194
"""
195195
self.run_sql(sql)
196196

197+
def import_csv_pandas(
198+
self, filepath: t.Union[str, Path], tablename: str, index=False, chunksize=1000, if_exists="replace"
199+
):
200+
"""
201+
Import CSV data using pandas.
202+
"""
203+
import pandas as pd
204+
from crate.client.sqlalchemy.support import insert_bulk
205+
206+
df = pd.read_csv(filepath)
207+
with self.engine.connect() as connection:
208+
return df.to_sql(
209+
tablename, connection, index=index, chunksize=chunksize, if_exists=if_exists, method=insert_bulk
210+
)
211+
212+
def import_csv_dask(
213+
self,
214+
filepath: t.Union[str, Path],
215+
tablename: str,
216+
index=False,
217+
chunksize=1000,
218+
if_exists="replace",
219+
npartitions: int = None,
220+
progress: bool = False,
221+
):
222+
"""
223+
Import CSV data using Dask.
224+
"""
225+
import dask.dataframe as dd
226+
import pandas as pd
227+
from crate.client.sqlalchemy.support import insert_bulk
228+
229+
# Set a few defaults.
230+
# TODO: Use amount of CPU cores instead?
231+
npartitions = npartitions or 4
232+
233+
if progress:
234+
from dask.diagnostics import ProgressBar
235+
236+
pbar = ProgressBar()
237+
pbar.register()
238+
239+
# Mangle data.
240+
df = pd.read_csv(filepath)
241+
ddf = dd.from_pandas(df, npartitions=npartitions)
242+
return ddf.to_sql(
243+
tablename,
244+
uri=self.dburi,
245+
index=index,
246+
chunksize=chunksize,
247+
if_exists=if_exists,
248+
method=insert_bulk,
249+
parallel=True,
250+
)
251+
197252

198253
def sa_is_empty(thing):
199254
"""

pyproject.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,10 @@ develop = [
102102
"ruff==0.1.3",
103103
"validate-pyproject<0.16",
104104
]
105+
io = [
106+
"dask<=2023.10.1,>=2020",
107+
"pandas<3,>=2",
108+
]
105109
release = [
106110
"build<2",
107111
"twine<5",

release/oci/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ COPY . /src
2121

2222
# Install package.
2323
RUN --mount=type=cache,id=pip,target=/root/.cache/pip \
24-
pip install --use-pep517 --prefer-binary '/src'
24+
pip install --use-pep517 --prefer-binary '/src[io]'
2525

2626
# Uninstall Git again.
2727
RUN apt-get --yes remove --purge git && apt-get --yes autoremove

0 commit comments

Comments
 (0)