-
Notifications
You must be signed in to change notification settings - Fork 1
Airflow: Add tutorials from community forum #279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds multiple new integration guides for Apache Airflow and Apache Arrow, introduces hot→cold and delete-based data-retention examples, updates Airflow/Arrow index pages and toctrees, and provides example DAGs, SQL, and Docker configs for orchestration and storage allocation. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant Airflow as Airflow DAG
participant Catalog as retention_policies
participant CrateDB as CrateDB
Note over Airflow,Catalog: Hot→Cold reallocation + eventual deletion
User->>Airflow: Trigger scheduled DAG
Airflow->>Catalog: SELECT active policies
Catalog-->>Airflow: policy rows
loop per policy
Airflow->>CrateDB: ALTER TABLE ... PARTITION ... SET ("routing.allocation.require.storage"='cold')
CrateDB-->>Airflow: async shard relocation started/logs
end
alt deletion stage reached
Airflow->>CrateDB: DELETE FROM table WHERE partition_column = value
CrateDB-->>Airflow: deletion result/logs
end
sequenceDiagram
autonumber
actor User
participant Airflow as Airflow DAG
participant CrateDB as CrateDB
participant S3 as Amazon S3
Note over Airflow,CrateDB: Daily export to S3 using COPY TO
User->>Airflow: Schedule run (logical date)
Airflow->>CrateDB: COPY (SELECT ... WHERE ts in previous day) TO 's3://bucket/prefix/{{ ds }}/...'
CrateDB->>S3: upload JSON lines
S3-->>Airflow: write complete
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
@coderabbitai review and please converge to active voice. |
@amotl I'll review the PR with a focus on converting the documentation to active voice as requested. This aligns with the learning I have about using active voice instead of first-person or passive voice in documentation reviews. 🧠 Learnings used
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (17)
docs/integrate/airflow/import-parquet.md (5)
4-10
: Tighten intro; use active voice and remove fillerStreamline, avoid first‑person, and clarify the CSV fallback.
-## Introduction -Using Airflow to import the NYC Taxi and Limousine dataset in Parquet format. - -CrateDB does not support `COPY FROM` for Parquet. It supports CSV and JSON. Therefore, this tutorial uses an alternative approach rather than switching the previous CSV workflow to Parquet. - -First and foremost, keep in mind the strategy presented here for importing Parquet files into CrateDB, we have already covered this topic in a previous tutorial using a different approach from the one introduced in this tutorial, so feel free to have a look at the tutorial about {ref}`arrow-import-parquet` and explore with the different possibilities out there. +## Introduction +Use Airflow to import the NYC Taxi and Limousine dataset provided in Parquet format. + +CrateDB supports `COPY FROM` for CSV and JSON, not Parquet. This tutorial converts Parquet to CSV before loading. + +For an alternative Parquet ingestion approach, see {ref}`arrow-import-parquet`.
13-18
: Remove passive voice; simplify prerequisites and staging explanationMake instructions direct and consistent.
-Before getting started, you need to have some knowledge of Airflow and an instance of Airflow already running. Besides that, a CrateDB instance should already be set up before moving on with this tutorial. This SQL is also available in the setup folder in our [GitHub repository](https://github.com/crate/crate-airflow-tutorial). +Before you start, have Airflow and CrateDB running. The SQL shown below also resides in the setup folder of the [GitHub repository](https://github.com/crate/crate-airflow-tutorial). -We start by creating the two tables in CrateDB: A temporary staging table (`nyc_taxi.load_trips_staging`) and the final destination table (`nyc_taxi.trips`). +Create two tables in CrateDB: a temporary staging table (`nyc_taxi.load_trips_staging`) and the final table (`nyc_taxi.trips`). -In this case, the staging table is a primary insertion point, which was later used to cast data to their final types. For example, the `passenger_count` column is defined as `REAL` in the staging table, while it is defined as `INTEGER` in the `nyc_taxi.trips` table. +Insert into the staging table first, then cast values into their final types when inserting into `nyc_taxi.trips`. For example, `passenger_count` is `REAL` in staging and `INTEGER` in `nyc_taxi.trips`.
77-78
: Drop filler phrasesAvoid “Ok!” and conversational tone.
-Ok! So, once the tools are already set up with the corresponding tables created, we should be good to go. +With the tools set up and tables created, proceed to the DAG.
88-93
: Fix list indentation andparquet-tools
formatting (markdownlint MD007)Indent sublist items by two spaces and keep
parquet-tools
as code, not a link.-* **process_parquet:** afterward, the name is used to download the file to local storage and then transform it from Parquet to CSV using [`parquet-tools`] (Apache Parquet CLI, see [Apache Arrow]) - * `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"` - * `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>` - Both tasks are executed within one Bash Operator. +* **process_parquet:** afterward, use the name to download the file to local storage and convert it from Parquet to CSV using `parquet-tools` (Apache Parquet CLI; see [Apache Arrow]). + * `curl -o "<LOCAL-PARQUET-FILE-PATH>" "<REMOTE-PARQUET-FILE>"` + * `parquet-tools csv <LOCAL-PARQUET-FILE-PATH> > <CSV-FILE-PATH>` + Both commands run within one `BashOperator`.
104-106
: Clarify the “two-month delay” explanationThe paragraph is hard to parse. Tighten and use explicit dates.
-In this case, according to the NYC TLC website “Trip data is published monthly (with two months delay)”. So, the DAG is set up to run monthly, and given the first file was made available in January 2009, the start date was set to March 2009. But why March and not January? As previously mentioned, the files are made available with 2 months of delay, so the first DAG instance, which has a logical execution date equal to "March 2009" will retrieve March as the current month minus 2, corresponding to January 2009, the very first file ever published. +The NYC TLC publishes trip data monthly with a two‑month delay. Set the DAG to run monthly with a start date of March 2009. The first run (logical date March 2009) downloads the file for January 2009 (logical date minus two months), which is the first available dataset.docs/integrate/airflow/import-stock-market-data.md (6)
4-7
: Avoid bare URL and broken image; add proper link and asset
- Convert the YouTube URL to proper markdown.
- The “upload://…” image path won’t resolve in Sphinx. Replace with a public URL or remove.
-Watch this tutorial on Youtube: https://www.youtube.com/watch?v=YTTUzeaYUgQ&t=685s - - +Watch this tutorial on YouTube: [Automating stock data with Airflow and CrateDB](https://www.youtube.com/watch?v=YTTUzeaYUgQ&t=685s). + +<!-- Replace with a valid image URL or remove --> +<!--  -->
13-17
: Use active voice; remove narrationTighten the overview and avoid “you can… you want to…”.
-You have a goal: regularly update stock market data. -To achieve your goal, you can divide it into tasks: download, prepare, and store data. You want to turn these tasks into a workflow, run it and observe the results; in other words, you want to orchestrate your workflow, and Airflow is the tool for that. - -So the first thing to do is to start CrateDB and set up a table to store your data. Then, to orchestrate the process of regular data updates, you will create an Airflow project and establish the connection to CrateDB. Once you set up your Airflow project, you will write your tasks in Python as an Airflow DAG workflow (more details later). Finally, you will set a schedule for your workflow, and it's done! +Goal: update stock market data regularly. +Approach: define tasks to download, prepare, and store data; orchestrate them with Airflow. + +Steps: start CrateDB and create a table; create an Airflow project and set the CrateDB connection; implement the DAG in Python; schedule it.
24-31
: Simplify tone and use active voiceDrop “It’s easy” and conversational phrasing.
-The first to do is to run CrateDB with Docker. It's easy: once you have Docker Desktop running, copy the Docker command from the CrateDB installation page and run it in your terminal. +First, run CrateDB with Docker. With Docker Desktop running, copy the command from the CrateDB installation page and run it:
32-41
: Avoid first-person and clarify PKUse neutral tone and consistent terms (“S&P 500”).
-Let’s now create a table to store your financial data. I'm particularly interested in the "adjusted-close" value for the stocks, so I will create a table that stores the date, the stock ticker, and the adjusted-close value. I will set the `closing_date` and `ticker` as primary keys. The final statement looks like this: +Create a table to store financial data. Focus on the adjusted close value (“adjusted_close”) per ticker per day. Use a composite primary key on (`closing_date`, `ticker`):
77-81
: Don’t hardcode credentials; avoid “null” as a passwordUsing
crate:null@...
sets the literal password “null”. Prefer no password (default), environment‑specific secrets, or Docker secrets. Also call out non‑production use.-AIRFLOW_CONN_CRATEDB_CONNECTION=postgresql://crate:[email protected]:5433/doc?sslmode=disable +# For local development only; do not commit real credentials +AIRFLOW_CONN_CRATEDB_CONNECTION=postgresql://[email protected]:5433/doc?sslmode=disable
41-41
: Fix markdownlint MD052 (missing reference definition)A reference label “astronomer” is reported missing. Either add the definition at the bottom or convert to an inline link.
docs/integrate/airflow/data-retention-policy.md (1)
111-144
: Keepparameters
consistently across the full DAG snippetMirror the change above in the final DAG code block for accuracy.
- SQLExecuteQueryOperator.partial( + SQLExecuteQueryOperator.partial( task_id="delete_partition", conn_id="cratedb_connection", - sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", - ).expand(params=get_policies().map(map_policy)) + sql="DELETE FROM {{ params.table_fqn }} WHERE {{ params.column }} = %(value)s;", + ).expand(parameters=get_policies().map(map_policy))docs/integrate/airflow/index.md (1)
67-73
: Card copy fits, minor nit: broaden “Getting started” descriptionIf the Getting Started page isn’t limited to stock data, consider a neutral description.
-Define an Airflow DAG that downloads, processes, and stores stock market data in CrateDB. +Define an Airflow DAG that downloads, processes, and stores data in CrateDB.docs/integrate/airflow/getting-started.md (4)
62-71
: Fix list style (markdownlint MD004) and keep consistent with dashesSwitch asterisks to dashes.
-* Create a project directory: +- Create a project directory: @@ -* Initialize the project with the following command: +- Initialize the project with the following command: @@ -* This will create a skeleton project directory as follows: +- This creates a skeleton project directory:
84-89
: Unify bullet style and clarify Triggerer descriptionConsistent dashes and small grammar tweak.
-* PostgreSQL server (for configuration/runtime data) -* Airflow scheduler -* Web server for rendering Airflow UI -* Triggerer (running an event loop for deferrable tasks) +- PostgreSQL server (for configuration and runtime metadata) +- Airflow scheduler +- Web server for rendering the Airflow UI +- Triggerer (event loop for deferrable tasks)
89-91
: Avoid bare URLs (markdownlint MD034) and keep active voiceWrap URLs in angle brackets and tighten text.
-The PostgreSQL server is configured to listen on port 5432. The web server is listening on port 8080 and can be accessed via http://localhost:8080/ with `admin` for both username and password. +The PostgreSQL server listens on port 5432. The web server listens on port 8080 and is available at <http://localhost:8080/> with `admin`/`admin`. -Start the project with `astro dev start`. After the containers start, access the Airflow UI at `http://localhost:8081`: +Start the project with `astro dev start`. After the containers start, access the Airflow UI at <http://localhost:8081>:Also applies to: 101-106
128-141
: Credentials guidance: avoid committing secrets; show TLS variantSlightly tighten phrasing; keep active voice.
-To configure the connection to CrateDB we need to set up a corresponding environment variable. On Astronomer the environment variable can be set up via the Astronomer UI, via `Dockerfile`, or via a `.env` file which is automatically generated during project initialization. +To configure the CrateDB connection, set an environment variable. On Astronomer, set it via the UI, `Dockerfile`, or the `.env` file (generated during initialization). @@ -In case a TLS connection is required, change `sslmode=require`. To confirm that a new variable is applied, first, start the Airflow project and then create a bash session in the scheduler container by running `docker exec -it <scheduler_container_name> /bin/bash`. +For TLS, set `sslmode=require`. To confirm that the variable is applied, start the project and open a bash session in the scheduler container: `docker exec -it <scheduler_container_name> /bin/bash`.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
docs/integrate/airflow/data-retention-hot-cold.md
(1 hunks)docs/integrate/airflow/data-retention-policy.md
(1 hunks)docs/integrate/airflow/export-s3.md
(1 hunks)docs/integrate/airflow/getting-started.md
(1 hunks)docs/integrate/airflow/import-parquet.md
(1 hunks)docs/integrate/airflow/import-stock-market-data.md
(1 hunks)docs/integrate/airflow/index.md
(3 hunks)docs/integrate/arrow/import-parquet.md
(1 hunks)docs/integrate/arrow/index.md
(1 hunks)docs/integrate/index.md
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- docs/integrate/index.md
- docs/integrate/arrow/index.md
🚧 Files skipped from review as they are similar to previous changes (3)
- docs/integrate/airflow/export-s3.md
- docs/integrate/arrow/import-parquet.md
- docs/integrate/airflow/data-retention-hot-cold.md
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-14T11:40:39.584Z
Learnt from: amotl
PR: crate/cratedb-guide#249
File: docs/ingest/etl/index.md:30-31
Timestamp: 2025-08-14T11:40:39.584Z
Learning: In the CrateDB Guide documentation, the project policy is to prune ALL legacy aliases (like `(apache-airflow)=`, `(aws-*)=`) and fix forward if anything breaks, rather than maintaining backward compatibility with dual anchors.
Applied to files:
docs/integrate/airflow/data-retention-policy.md
🪛 markdownlint-cli2 (0.17.2)
docs/integrate/airflow/getting-started.md
62-62: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
66-66: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
70-70: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
84-84: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
85-85: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
86-86: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
87-87: Unordered list style
Expected: dash; Actual: asterisk
(MD004, ul-style)
89-89: Bare URL used
(MD034, no-bare-urls)
docs/integrate/airflow/import-parquet.md
90-90: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
91-91: Unordered list indentation
Expected: 2; Actual: 3
(MD007, ul-indent)
docs/integrate/airflow/import-stock-market-data.md
41-41: Reference links and images should use a label that is defined
Missing link or image reference definition: "astronomer"
(MD052, reference-links-images)
143-143: Link and image reference definitions should be needed
Unused link or image reference definition: "cratedb and apache airflow"
(MD053, link-image-reference-definitions)
144-144: Link and image reference definitions should be needed
Unused link or image reference definition: "etl pipeline using apache airflow with cratedb (source)"
(MD053, link-image-reference-definitions)
145-145: Link and image reference definitions should be needed
Unused link or image reference definition: "etl with astro and cratedb cloud in 30min - fully up in the cloud"
(MD053, link-image-reference-definitions)
146-146: Link and image reference definitions should be needed
Unused link or image reference definition: "run an etl pipeline with cratedb and data quality checks"
(MD053, link-image-reference-definitions)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Build docs
🔇 Additional comments (4)
docs/integrate/airflow/import-parquet.md (1)
47-73
: Request validation: generated partition column typesPartitioning by a generated
TIMESTAMP WITH TIME ZONE
column should work on current CrateDB versions, but confirm against the version you target.Would you like me to run a quick check against your ReadTheDocs preview or a local CrateDB to confirm the DDL executes as written?
docs/integrate/airflow/import-stock-market-data.md (2)
121-130
: Pin dependencies thoughtfully; verify current versions
yfinance==0.1.87
is dated. Recommend updating to a current, tested version and aligning provider versions with your Airflow version.Would you like me to check the latest compatible versions for Airflow 2.x and propose a minimal, tested set?
189-197
: CrateDB supports INSERT ... ON CONFLICT ... DO UPDATE (since CrateDB 3.0.0) — verify runtime version & PK.This syntax was added in CrateDB 3.0.0 and is valid; keep as‑is only if your deployed CrateDB is >= 3.0.0 and (closing_date, ticker) are declared as the table PRIMARY KEY or a unique index; otherwise use MERGE or an idempotent INSERT+UPDATE pattern. Applies to: docs/integrate/airflow/import-stock-market-data.md lines 189–197 and 213–217
docs/integrate/airflow/index.md (1)
119-129
: Good use of See also block and internal refsLinks and references look consistent.
SQLExecuteQueryOperator.partial( | ||
task_id="delete_partition", | ||
conn_id="cratedb_connection", | ||
sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};", | ||
).expand(params=get_policies().map(map_policy)) | ||
``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefer DB‑API parameters over Jinja for values (quoting/injection safety)
Use parameters=
(DB binding) while keeping table and column names via Jinja.
-SQLExecuteQueryOperator.partial(
- task_id="delete_partition",
- conn_id="cratedb_connection",
- sql="DELETE FROM {{params.table_fqn}} WHERE {{params.column}} = {{params.value}};",
-).expand(params=get_policies().map(map_policy))
+SQLExecuteQueryOperator.partial(
+ task_id="delete_partition",
+ conn_id="cratedb_connection",
+ sql="DELETE FROM {{ params.table_fqn }} WHERE {{ params.column }} = %(value)s;",
+).expand(parameters=get_policies().map(map_policy))
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In docs/integrate/airflow/data-retention-policy.md around lines 100 to 105, the
SQL inlines the value into the Jinja template, which risks quoting/injection
issues; change the operator to pass the value via DB-API parameter binding (use
the operator's parameters= argument) while keeping table and column names
produced by Jinja; update the SQL string to use a parameter placeholder
(DB-specific, e.g. %s or ?), and pass the corresponding value(s) via
parameters=get_policies().map(map_policy) or equivalent so the value is bound by
the DB driver instead of being interpolated into the SQL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can someone review/validate this suggestion by @coderabbitai?
About
Continue adding integration guides from the community forum.
Preview
Outlook
Further improvements can easily happen on subsequent iterations. Let us know what you would like to see fixed/improved immediately, or later, when needed.
References