Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@
dependencies = [
"alembic>=1.13",
"asyncpg>=0.29",
"cachetools>=7.0.5",
"caveclient",
"cloudpathlib[gs,s3]>=0.23.0",
"deltalake>=0.17",
"fastapi>=0.115.0",
"fsspec>=2026.3.0",
"gcsfs>=2026.3.0",
"google-auth>=2.0",
"httpx>=0.27",
"jinja2>=3.1.6",
"polars>=1.0",
"pyarrow>=23.0.1",
"pydantic-settings>=2.3",
"pydantic>=2.7",
"python-multipart>=0.0.26",
"requests>=2.31",
"sqlalchemy[asyncio]>=2.0",
"structlog>=24.2",
Expand Down Expand Up @@ -48,3 +55,6 @@ testpaths = ["tests"]
disallow_untyped_defs = true
ignore_missing_imports = true
packages = ["cave_catalog"]

[tool.uv.sources]
caveclient = { workspace = true }
17 changes: 15 additions & 2 deletions src/cave_catalog/app.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from pathlib import Path

import structlog
from fastapi import FastAPI
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from fastapi.staticfiles import StaticFiles

from cave_catalog.config import get_settings
from cave_catalog.db.session import get_engine
from cave_catalog.routers import assets, health
from cave_catalog.routers import assets, health, tables, ui
from cave_catalog.routers.ui import _RedirectException

logger = structlog.get_logger()

Expand Down Expand Up @@ -52,5 +56,14 @@ def create_app() -> FastAPI:

app.include_router(health.router)
app.include_router(assets.router)
app.include_router(tables.router)
app.include_router(ui.router)

@app.exception_handler(_RedirectException)
async def _handle_redirect(request: Request, exc: _RedirectException):
return RedirectResponse(url=exc.url, status_code=302)

_pkg_dir = Path(__file__).resolve().parent
app.mount("/static", StaticFiles(directory=_pkg_dir / "static"), name="static")

return app
5 changes: 4 additions & 1 deletion src/cave_catalog/auth/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,11 @@ def create_token_cookie_response(redirect_url: str, token: str) -> Response:


def get_authorize_url(settings: Settings, redirect_url: str) -> str:
# AUTH_SERVICE_URL is e.g. "https://globalv1.daf-apis.com/auth" (for API calls)
# but the OAuth authorize endpoint lives on the sticky auth app at /sticky_auth/api/v1/authorize
auth_url = settings.auth.service_url.rstrip("/")
return f"{auth_url}/api/v1/authorize?redirect={quote(redirect_url)}"
base_url = auth_url.removesuffix("/auth")
return f"{base_url}/sticky_auth/api/v1/authorize?redirect={quote(redirect_url)}"


async def get_current_user(
Expand Down
15 changes: 15 additions & 0 deletions src/cave_catalog/config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from functools import lru_cache

from pydantic import Field
Expand All @@ -24,8 +25,22 @@ class Settings(BaseSettings):
service_name: str = Field(default="cave-catalog", alias="SERVICE_NAME")
mat_engine_url: str | None = Field(default=None, alias="MAT_ENGINE_URL")
log_level: str = Field(default="INFO", alias="LOG_LEVEL")
datastacks_raw: str = Field(default="", alias="DATASTACKS")
cave_token: str | None = Field(default=None, alias="CAVE_TOKEN")
caveclient_server_address: str | None = Field(
default=None, alias="CAVECLIENT_SERVER_ADDRESS"
)
auth: AuthSettings = Field(default_factory=AuthSettings)

@property
def datastacks(self) -> list[str]:
raw = self.datastacks_raw.strip()
if not raw:
return []
if raw.startswith("["):
return json.loads(raw)
return [s.strip() for s in raw.split(",") if s.strip()]

model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
Expand Down
38 changes: 37 additions & 1 deletion src/cave_catalog/db/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
import uuid
from collections.abc import MutableMapping
from datetime import datetime, timezone

from sqlalchemy import JSON, Boolean, DateTime, Index, Integer, String, text
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column


class _FallbackPolymorphicMap(dict, MutableMapping):
"""Allow unknown ``asset_type`` values to load as the base ``Asset`` class."""

def __missing__(self, key):
return self["asset"]


class Base(DeclarativeBase):
pass

Expand All @@ -23,7 +31,7 @@ class Asset(Base):
mat_version: Mapped[int | None] = mapped_column(Integer, nullable=True)
revision: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
uri: Mapped[str] = mapped_column(String, nullable=False)
format: Mapped[str] = mapped_column(String, nullable=False)
format: Mapped[str | None] = mapped_column(String, nullable=True)
asset_type: Mapped[str] = mapped_column(String, nullable=False)
owner: Mapped[int] = mapped_column(Integer, nullable=False)
is_managed: Mapped[bool] = mapped_column(Boolean, nullable=False)
Expand All @@ -40,6 +48,20 @@ class Asset(Base):
DateTime(timezone=True), nullable=True
)

__mapper_args__ = {
"polymorphic_on": "asset_type",
"polymorphic_identity": "asset",
"with_polymorphic": "*",
}

# Table-specific nullable columns (populated only for asset_type="table")
source: Mapped[str | None] = mapped_column(String, nullable=True)
cached_metadata: Mapped[dict | None] = mapped_column(JSON, nullable=True)
metadata_cached_at: Mapped[datetime | None] = mapped_column(
DateTime(timezone=True), nullable=True
)
column_annotations: Mapped[list | None] = mapped_column(JSON, nullable=True)

__table_args__ = (
# Uniqueness when mat_version is present
Index(
Expand All @@ -61,3 +83,17 @@ class Asset(Base):
postgresql_where=text("mat_version IS NULL"),
),
)


# Install fallback polymorphic map so unknown asset_type values load as Asset
Asset.__mapper__.polymorphic_map = _FallbackPolymorphicMap(
Asset.__mapper__.polymorphic_map
)


class Table(Asset):
"""Table asset — single table inheritance subclass of Asset."""

__mapper_args__ = {
"polymorphic_identity": "table",
}
203 changes: 203 additions & 0 deletions src/cave_catalog/extractors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
"""Metadata extractors for table assets.

Each extractor reads lightweight metadata (schema, row count, size, partition
info) from a cloud storage URI and returns a ``TableMetadata`` instance.
"""

from __future__ import annotations

import abc
import asyncio
from typing import Any

import structlog

from cave_catalog.table_schemas import ColumnInfo, TableMetadata

logger = structlog.get_logger()


# ---------------------------------------------------------------------------
# Base interface (task 2.1)
# ---------------------------------------------------------------------------


class MetadataExtractor(abc.ABC):
"""Base interface for format-specific metadata extractors."""

@abc.abstractmethod
async def extract(
self,
uri: str,
storage_options: dict[str, Any] | None = None,
) -> TableMetadata:
"""Extract metadata from the table at *uri*.

Parameters
----------
uri
Cloud or local path to the table/file.
storage_options
Optional storage credentials (e.g. GCS token dict).

Returns
-------
TableMetadata
Discovered metadata.
"""


# ---------------------------------------------------------------------------
# Delta Lake extractor (task 2.2)
# ---------------------------------------------------------------------------


class DeltaMetadataExtractor(MetadataExtractor):
"""Extract metadata from a Delta Lake table via the transaction log."""

async def extract(
self,
uri: str,
storage_options: dict[str, Any] | None = None,
) -> TableMetadata:
from deltalake import DeltaTable

logger.debug("delta_extract_start", uri=uri)

kwargs: dict[str, Any] = {}
if storage_options:
kwargs["storage_options"] = storage_options

try:
dt = await asyncio.to_thread(lambda: DeltaTable(uri, **kwargs))
except Exception as exc:
msg = str(exc)
if "no files in log segment" in msg.lower() or "log segment" in msg.lower():
raise ValueError(
f"No Delta transaction log found at '{uri}'. "
"This path may not contain a Delta Lake table."
) from exc
raise

schema = await asyncio.to_thread(lambda: dt.schema())
columns = [
ColumnInfo(name=field.name, dtype=str(field.type))
for field in schema.fields
]

metadata = await asyncio.to_thread(lambda: dt.metadata())
partition_columns = list(metadata.partition_columns)

n_rows: int | None = None
n_bytes: int | None = None

# Try to get row count and size from file stats
try:
actions_table = await asyncio.to_thread(
lambda: dt.get_add_actions(flatten=True)
)
# Convert arro3 Table to dict-of-lists via pyarrow
import pyarrow as pa

file_actions = pa.table(actions_table).to_pydict()
if "num_records" in file_actions:
row_counts = file_actions["num_records"]
if all(r is not None for r in row_counts):
n_rows = sum(row_counts)
if "size_bytes" in file_actions:
sizes = file_actions["size_bytes"]
if all(s is not None for s in sizes):
n_bytes = sum(sizes)
except Exception:
logger.debug("delta_stats_unavailable", uri=uri)

return TableMetadata(
n_rows=n_rows,
n_columns=len(columns),
n_bytes=n_bytes,
columns=columns,
partition_columns=partition_columns,
)


# ---------------------------------------------------------------------------
# Parquet extractor (task 2.3)
# ---------------------------------------------------------------------------


class ParquetMetadataExtractor(MetadataExtractor):
"""Extract metadata from a Parquet file/dataset via polars."""

async def extract(
self,
uri: str,
storage_options: dict[str, Any] | None = None,
) -> TableMetadata:
import polars as pl

logger.debug("parquet_extract_start", uri=uri)

kwargs: dict[str, Any] = {}
if storage_options:
kwargs["storage_options"] = storage_options

lf = await asyncio.to_thread(
lambda: pl.scan_parquet(uri, **kwargs)
)
schema = await asyncio.to_thread(lambda: lf.collect_schema())
columns = [
ColumnInfo(name=name, dtype=str(dtype))
for name, dtype in schema.items()
]

# Get on-disk size via fsspec and row count from parquet metadata
n_rows: int | None = None
n_bytes: int | None = None
try:
import fsspec
import pyarrow.parquet as pq

fs, path = await asyncio.to_thread(
lambda: fsspec.core.url_to_fs(uri, **(storage_options or {}))
)
info = await asyncio.to_thread(lambda: fs.info(path))
n_bytes = info.get("size")

pq_meta = await asyncio.to_thread(
lambda: pq.read_metadata(path, filesystem=fs)
)
n_rows = pq_meta.num_rows
except Exception as exc:
logger.warning("parquet_stats_unavailable", uri=uri, error=str(exc))

return TableMetadata(
n_rows=n_rows,
n_columns=len(columns),
n_bytes=n_bytes,
columns=columns,
partition_columns=[],
)


# ---------------------------------------------------------------------------
# Extractor registry (task 2.4)
# ---------------------------------------------------------------------------

EXTRACTORS: dict[str, MetadataExtractor] = {
"delta": DeltaMetadataExtractor(),
"parquet": ParquetMetadataExtractor(),
}


def get_extractor(fmt: str) -> MetadataExtractor:
"""Look up extractor by format string.

Raises ``ValueError`` if no extractor is registered for *fmt*.
"""
try:
return EXTRACTORS[fmt.lower()]
except KeyError:
raise ValueError(
f"No metadata extractor for format '{fmt}'. "
f"Supported: {', '.join(sorted(EXTRACTORS))}"
)
Loading
Loading