diff --git a/README.md b/README.md index 0b9d167c..c4f23d77 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ hugging_face | Utilities for interacting with our machine learning space at [Hug identification_pipeline.py | The core python script uniting this modular pipeline. More details below. openai-playground | Scripts for accessing the openai API on PDAP's shared account source_collectors| Tools for extracting metadata from different sources, including CKAN data portals and Common Crawler +collector_db | Database for storing data from source collectors ## How to use diff --git a/collector_db/BatchInfo.py b/collector_db/BatchInfo.py new file mode 100644 index 00000000..3e8c5ca8 --- /dev/null +++ b/collector_db/BatchInfo.py @@ -0,0 +1,14 @@ +from pydantic import BaseModel + + +class BatchInfo(BaseModel): + strategy: str + status: str + count: int = 0 + strategy_success_rate: float = None + metadata_success_rate: float = None + agency_match_rate: float = None + record_type_match_rate: float = None + record_category_match_rate: float = None + compute_time: int = None + parameters: dict = None \ No newline at end of file diff --git a/collector_db/DatabaseClient.py b/collector_db/DatabaseClient.py new file mode 100644 index 00000000..cba053ee --- /dev/null +++ b/collector_db/DatabaseClient.py @@ -0,0 +1,120 @@ +from functools import wraps + +from sqlalchemy import create_engine, Column, Integer, String, Float, Text, JSON, ForeignKey, CheckConstraint, TIMESTAMP, UniqueConstraint +from sqlalchemy.orm import declarative_base, sessionmaker, relationship +from typing import Optional, Dict, Any, List + +from collector_db.BatchInfo import BatchInfo +from collector_db.URLInfo import URLInfo + +# Base class for SQLAlchemy ORM models +Base = declarative_base() + +# SQLAlchemy ORM models +class Batch(Base): + __tablename__ = 'batches' + + id = Column(Integer, primary_key=True) + strategy = Column(String, nullable=False) + status = Column(String, CheckConstraint("status IN ('in-process', 'complete', 'error')"), nullable=False) + count = Column(Integer, nullable=False) + date_generated = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP") + strategy_success_rate = Column(Float) + metadata_success_rate = Column(Float) + agency_match_rate = Column(Float) + record_type_match_rate = Column(Float) + record_category_match_rate = Column(Float) + compute_time = Column(Integer) + parameters = Column(JSON) + + urls = relationship("URL", back_populates="batch") + missings = relationship("Missing", back_populates="batch") + + +class URL(Base): + __tablename__ = 'urls' + + id = Column(Integer, primary_key=True) + batch_id = Column(Integer, ForeignKey('batches.id'), nullable=False) + url = Column(Text, unique=True) + url_metadata = Column(JSON) + outcome = Column(String) + created_at = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP") + + batch = relationship("Batch", back_populates="urls") + + +class Missing(Base): + __tablename__ = 'missing' + + id = Column(Integer, primary_key=True) + place_id = Column(Integer, nullable=False) + record_type = Column(String, nullable=False) + batch_id = Column(Integer, ForeignKey('batches.id')) + strategy_used = Column(Text, nullable=False) + date_searched = Column(TIMESTAMP, nullable=False, server_default="CURRENT_TIMESTAMP") + + batch = relationship("Batch", back_populates="missings") + + +# Database Client +class DatabaseClient: + def __init__(self, db_url: str = "sqlite:///database.db"): + """Initialize the DatabaseClient.""" + self.engine = create_engine(db_url, echo=True) + Base.metadata.create_all(self.engine) + self.session_maker = sessionmaker(bind=self.engine) + self.session = None + + def session_manager(method): + @wraps(method) + def wrapper(self, *args, **kwargs): + self.session = self.session_maker() + try: + result = method(self, *args, **kwargs) + self.session.commit() + return result + except Exception as e: + self.session.rollback() + raise e + finally: + self.session.close() + self.session = None + + @session_manager + def insert_batch(self, batch_info: BatchInfo) -> Batch: + """Insert a new batch into the database.""" + batch = Batch( + **batch_info.model_dump() + ) + self.session.add(batch) + return batch + + @session_manager + def get_batch_by_id(self, batch_id: int) -> Optional[BatchInfo]: + """Retrieve a batch by ID.""" + batch = self.session.query(Batch).filter_by(id=batch_id).first() + return BatchInfo(**batch.__dict__) + + @session_manager + def insert_url(self, url_info: URLInfo): + """Insert a new URL into the database.""" + url_entry = URL( + **url_info.model_dump() + ) + self.session.add(url_entry) + + @session_manager + def get_urls_by_batch(self, batch_id: int) -> List[URLInfo]: + """Retrieve all URLs associated with a batch.""" + urls = self.session.query(URL).filter_by(batch_id=batch_id).all() + return ([URLInfo(**url.__dict__) for url in urls]) + + @session_manager + def is_duplicate_url(self, url: str) -> bool: + result = self.session.query(URL).filter_by(url=url).first() + return result is not None + +if __name__ == "__main__": + client = DatabaseClient() + print("Database client initialized.") diff --git a/collector_db/URLInfo.py b/collector_db/URLInfo.py new file mode 100644 index 00000000..df479eae --- /dev/null +++ b/collector_db/URLInfo.py @@ -0,0 +1,8 @@ +from pydantic import BaseModel + + +class URLInfo(BaseModel): + batch_id: int + url: str + url_metadata: dict + outcome: str diff --git a/collector_db/__init__.py b/collector_db/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/collector_db/database.db b/collector_db/database.db new file mode 100644 index 00000000..a12d1cf6 Binary files /dev/null and b/collector_db/database.db differ diff --git a/requirements.txt b/requirements.txt index 85b8ed8d..d4f5869f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,3 +24,5 @@ requests_html>=0.10.0 lxml~=5.1.0 pyppeteer>=2.0.0 beautifulsoup4>=4.12.3 + +sqlalchemy~=2.0.36 \ No newline at end of file