Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions .github/workflows/facilities_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ jobs:
AWS_SECRET_ACCESS_KEY: "op://Data Engineering/DO_keys/AWS_SECRET_ACCESS_KEY"
AWS_ACCESS_KEY_ID: "op://Data Engineering/DO_keys/AWS_ACCESS_KEY_ID"
BUILD_ENGINE_SERVER: "op://Data Engineering/EDM_DATA/server_url"
BUILD_ENGINE_HOST: "op://Data Engineering/EDM_DATA/server"
BUILD_ENGINE_USER: "op://Data Engineering/EDM_DATA/username"
BUILD_ENGINE_PASSWORD: "op://Data Engineering/EDM_DATA/password"
BUILD_ENGINE_PORT: "op://Data Engineering/EDM_DATA/port"

- name: Run container setup
working-directory: ./
Expand Down Expand Up @@ -82,8 +86,14 @@ jobs:
- name: Modify facdb table
run: python -m facdb.cli reformat_facdb

- name: Test build tables
run: |
dbt deps
dbt debug
dbt test

- name: Export facdb
run: ./facdb.sh export
run: python -m facdb.cli export

- name: Upload Artifacts
run: ./facdb.sh upload
run: python3 -m dcpy.connectors.edm.publishing upload --product db-facilities --acl public-read
210 changes: 173 additions & 37 deletions notebooks/marimo/lifecycle/build_qa/s3_compare.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import marimo

__generated_with = "0.23.1"
__generated_with = "0.23.3"
app = marimo.App(width="full")

with app.setup:
Expand Down Expand Up @@ -32,12 +32,12 @@ def _():
def _():
bucket_input = mo.ui.text(value="edm-publishing", label="Bucket")
path_a_input = mo.ui.text(
value="db-cpdb/draft/26prelim/2/",
value="db-xxx/xxx/",
label="Path A — baseline (e.g. published draft)",
full_width=True,
)
path_b_input = mo.ui.text(
value="db-cpdb/build/dm-cpdb-26prelim/",
value="db-xxx/xxx/",
label="Path B — new build",
full_width=True,
)
Expand Down Expand Up @@ -112,6 +112,31 @@ def _(df_a, df_b, path_a_input, path_b_input):
return (comparison,)


@app.cell(hide_code=True)
def _(comparison):
_only_a = comparison[~comparison["in_b"]].shape[0]
_only_b = comparison[~comparison["in_a"]].shape[0]
_shared = comparison[comparison["in_a"] & comparison["in_b"]]
_sizes_equal = (
(_shared["size_a"] == _shared["size_b"]).all() if len(_shared) else True
)
_equal = _only_a == 0 and _only_b == 0 and _sizes_equal
if _equal:
_msg = "**Directory equal:** ✅ Yes — same files and sizes."
elif _only_a == 0 and _only_b == 0:
_msg = "**Directory equal:** ❌ No — same file set but sizes differ for some files."
else:
_msg = (
f"**Directory equal:** ❌ No — {_only_a} file(s) only in A, {_only_b} file(s) only in B"
+ ("." if _sizes_equal else ", and sizes differ.")
)
mo.callout(
mo.md(_msg),
kind="success" if _equal else "danger",
)
return


@app.cell(hide_code=True)
def _():
mo.md("""
Expand Down Expand Up @@ -170,7 +195,7 @@ def _(comparison):
@app.cell(hide_code=True)
def _():
mo.md("""
## Table Comparison
## File Comparison
""")
return

Expand All @@ -180,17 +205,17 @@ def _(comparison):
_files_in_both = sorted(
comparison[comparison["in_a"] & comparison["in_b"]]["filename"].tolist()
)
table_selector = mo.ui.dropdown(
file_selector = mo.ui.dropdown(
options=_files_in_both,
label="Select a file to compare",
searchable=True,
)
mo.vstack([table_selector], align="center")
return (table_selector,)
mo.vstack([file_selector], align="center")
return (file_selector,)


@app.cell(hide_code=True)
def _(bucket_input, path_a_input, path_b_input, table_selector):
def _(bucket_input, file_selector, path_a_input, path_b_input):
from io import BytesIO

def _load_file(prefix: str, filename: str) -> pd.DataFrame:
Expand All @@ -207,56 +232,139 @@ def _load_file(prefix: str, filename: str) -> pd.DataFrame:
else:
raise ValueError(f"Unsupported file extension: .{ext}")

if table_selector.value:
with mo.status.spinner(title=f"Loading {table_selector.value}…"):
tbl_a = _load_file(path_a_input.value, table_selector.value)
tbl_b = _load_file(path_b_input.value, table_selector.value)
if file_selector.value:
with mo.status.spinner(title=f"Loading {file_selector.value}…"):
file_df_a = _load_file(path_a_input.value, file_selector.value)
file_df_b = _load_file(path_b_input.value, file_selector.value)
else:
tbl_a = tbl_b = None
return tbl_a, tbl_b
file_df_a = file_df_b = None
return file_df_a, file_df_b


@app.cell(hide_code=True)
def _(table_selector, tbl_a, tbl_b):
def _(file_df_a, file_df_b):
mo.stop(file_df_a is None or file_df_b is None)
_equal = file_df_a.reset_index(drop=True).equals(file_df_b.reset_index(drop=True))
mo.callout(
mo.md(
"**File equal:** ✅ Yes — contents are identical."
if _equal
else "**File equal:** ❌ No — contents differ."
),
kind="success" if _equal else "danger",
)
return


@app.cell(hide_code=True)
def _(file_df_a, file_df_b, file_selector):
mo.stop(
tbl_a is None or tbl_b is None,
file_df_a is None or file_df_b is None,
mo.md("_Select a file above to load and compare._"),
)
_cols_only_a = sorted(set(tbl_a.columns) - set(tbl_b.columns))
_cols_only_b = sorted(set(tbl_b.columns) - set(tbl_a.columns))
_cols_only_a = sorted(set(file_df_a.columns) - set(file_df_b.columns))
_cols_only_b = sorted(set(file_df_b.columns) - set(file_df_a.columns))
_col_status = (
"Column sets match."
if not _cols_only_a and not _cols_only_b
else f"Only in A: `{'`, `'.join(_cols_only_a) or '—'}` · Only in B: `{'`, `'.join(_cols_only_b) or '—'}`"
else (
("Only in A: " + ", ".join(_cols_only_a) if _cols_only_a else "")
+ (" · Only in B: " + ", ".join(_cols_only_b) if _cols_only_b else "")
)
)

def _with_row_num(df):
out = df.reset_index(drop=True)
out.insert(0, "row", range(len(out)))
return out

_summary = (
f"| | A | B |\n"
f"|---|---|---|\n"
f"| Rows | {len(file_df_a):,} | {len(file_df_b):,} |\n"
f"| Columns | {len(file_df_a.columns)} | {len(file_df_b.columns)} |\n"
f"\n**Columns:** {_col_status}"
)

mo.vstack(
[
mo.md(f"""
| | A | B |
|---|---|---|
| Rows | {len(tbl_a):,} | {len(tbl_b):,} |
| Columns | {len(tbl_a.columns)} | {len(tbl_b.columns)} |

**Columns:** {_col_status}
"""),
mo.ui.tabs(
{
f"A — {table_selector.value}": mo.ui.table(tbl_a, selection=None),
f"B — {table_selector.value}": mo.ui.table(tbl_b, selection=None),
}
mo.md(_summary),
mo.hstack(
[
mo.vstack(
[
mo.md(f"**A — {file_selector.value}**"),
mo.ui.table(_with_row_num(file_df_a), selection=None),
]
),
mo.vstack(
[
mo.md(f"**B — {file_selector.value}**"),
mo.ui.table(_with_row_num(file_df_b), selection=None),
]
),
],
gap=2,
align="start",
),
]
)
return


@app.cell(hide_code=True)
def _():
_df = mo.sql(
"""
select * from
"""
def _(file_df_a, file_df_b):
mo.stop(
file_df_a is None or file_df_b is None,
mo.md("_Select a file above to load and compare._"),
)

_shared_cols = [c for c in file_df_a.columns if c in file_df_b.columns]
_a = file_df_a[_shared_cols].reset_index(drop=True)
_b = file_df_b[_shared_cols].reset_index(drop=True)
_min_len = min(len(_a), len(_b))
_a_cmp = _a.iloc[:_min_len]
_b_cmp = _b.iloc[:_min_len]

_records = []
for col in _shared_cols:
_col_a = _a_cmp[col]
_col_b = _b_cmp[col]
_changed = ~((_col_a == _col_b) | (_col_a.isna() & _col_b.isna()))
for idx in _changed[_changed].index:
_records.append(
{
"row": int(idx),
"column": col,
"value_a": _col_a.at[idx],
"value_b": _col_b.at[idx],
}
)

_row_count_note = (
f"\n\n> \u26a0\ufe0f Row counts differ (A: {len(_a):,}, B: {len(_b):,}) \u2014 only first {_min_len:,} rows compared."
if len(_a) != len(_b)
else ""
)

if not _records and len(_a) == len(_b):
_diff_out = mo.md("_No differences found in shared columns._")
elif not _records:
_diff_out = mo.md(
f"_Shared columns match across first {_min_len:,} rows.{_row_count_note}_"
)
else:
_n_diff_rows = len({r["row"] for r in _records})
_n_diff_cols = len({r["column"] for r in _records})
_diff_out = mo.vstack(
[
mo.md(
f"**{_n_diff_rows:,} row(s)** with differences across **{_n_diff_cols:,}** column(s).{_row_count_note}"
),
mo.ui.table(pd.DataFrame(_records), selection=None),
]
)
_diff_out
return


Expand Down Expand Up @@ -330,6 +438,34 @@ def _load_shapefile(prefix: str, filename: str) -> gpd.GeoDataFrame:
return geo_gdf_a, geo_gdf_b


@app.cell(hide_code=True)
def _(geo_gdf_a, geo_gdf_b):
_geom_col = geo_gdf_a.geometry.name
_attr_cols = [c for c in geo_gdf_a.columns if c != _geom_col]
_attrs_equal = (
geo_gdf_a[_attr_cols]
.reset_index(drop=True)
.equals(geo_gdf_b[_attr_cols].reset_index(drop=True))
)
_geom_equal = geo_gdf_a.geometry.reset_index(drop=True).equals(
geo_gdf_b.geometry.reset_index(drop=True)
)
_equal = _attrs_equal and _geom_equal
mo.callout(
mo.md(
"**GeoDataFrame equal:** ✅ Yes — features and geometries match."
if _equal
else "**GeoDataFrame equal:** ❌ No — "
+ ("" if _attrs_equal else "attributes differ")
+ (" and " if not _attrs_equal and not _geom_equal else "")
+ ("" if _geom_equal else "geometries differ")
+ "."
),
kind="success" if _equal else "danger",
)
return


@app.cell(hide_code=True)
def _(geo_gdf_a, geo_selector, path_a_input):
_geom_col = geo_gdf_a.geometry.name
Expand Down
21 changes: 21 additions & 0 deletions products/facilities/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: facilities

profile: dcp-de-postgres

model-paths: [ "models" ]

tests:
+store_failures: true
schema: "_tests"

models:
facilities:
staging:
+materialized: view
intermediate:
+materialized: view
product:
+materialized: table

flags:
fail-fast: true
15 changes: 0 additions & 15 deletions products/facilities/facdb.sh

This file was deleted.

Loading
Loading