diff --git a/ENV.md b/ENV.md index 386dbdae..6ad39c02 100644 --- a/ENV.md +++ b/ENV.md @@ -80,6 +80,7 @@ Note that some tasks/subtasks are themselves enabled by other tasks. | `DS_APP_SYNC_META_URL_ADD_TASK_FLAG` | Adds new meta URLs to the Data Sources App| | `DS_APP_SYNC_META_URL_UPDATE_TASK_FLAG` | Updates existing meta URLs in the Data Sources App| | `DS_APP_SYNC_META_URL_DELETE_TASK_FLAG` | Deletes meta URLs in the Data Sources App| +| `DS_APP_SYNC_USER_FOLLOWS_GET_TASK_FLAG` | Gets user follows from the Data Sources App| | `INTEGRITY_MONITOR_TASK_FLAG` | Runs integrity checks. | ### URL Task Flags diff --git a/alembic/versions/2025_12_24_1854-e88e4e962dc7_add_link__locations__user_follows_table.py b/alembic/versions/2025_12_24_1854-e88e4e962dc7_add_link__locations__user_follows_table.py new file mode 100644 index 00000000..a2b82ff0 --- /dev/null +++ b/alembic/versions/2025_12_24_1854-e88e4e962dc7_add_link__locations__user_follows_table.py @@ -0,0 +1,42 @@ +"""Add link__locations__user_follows table + +Revision ID: e88e4e962dc7 +Revises: 30ee666f15d1 +Create Date: 2025-12-24 18:54:38.897466 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +from src.util.alembic_helpers import add_enum_value, location_id_column, user_id_column, created_at_column + +# revision identifiers, used by Alembic. +revision: str = 'e88e4e962dc7' +down_revision: Union[str, None] = '30ee666f15d1' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + +def upgrade() -> None: + _add_link_locations_user_follows_table() + _add_follows_sync_task() + +def _add_link_locations_user_follows_table(): + op.create_table( + "link__locations__user_follows", + location_id_column(), + user_id_column(), + created_at_column(), + sa.PrimaryKeyConstraint("location_id", "user_id"), + ) + + +def _add_follows_sync_task(): + add_enum_value( + enum_name="task_type", + enum_value="Sync User Follows Get" + ) + +def downgrade() -> None: + pass diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/__init__.py b/src/core/tasks/scheduled/impl/sync_from_ds/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/__init__.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/__init__.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/core.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/core.py new file mode 100644 index 00000000..c26f2525 --- /dev/null +++ b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/core.py @@ -0,0 +1,56 @@ +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.models.user_location_pairs import UserLocationPairs +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.query import UpdateFollowsInDBQueryBuilder +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.types import UserID, LocationID +from src.core.tasks.scheduled.templates.operator import ScheduledTaskOperatorBase +from src.db.client.async_ import AsyncDatabaseClient +from src.db.enums import TaskType +from src.external.pdap.client import PDAPClient +from src.external.pdap.impl.sync.follows.core import GetFollowsRequestBuilder +from src.external.pdap.impl.sync.follows.response import SyncFollowGetInnerResponse + + +class DSAppSyncUserFollowsGetTaskOperator(ScheduledTaskOperatorBase): + + def __init__( + self, + adb_client: AsyncDatabaseClient, + pdap_client: PDAPClient + ): + super().__init__(adb_client) + self.pdap_client = pdap_client + + @property + def task_type(self) -> TaskType: + return TaskType.SYNC_USER_FOLLOWS_GET + + async def inner_task_logic(self) -> None: + responses = await self._get_follows_from_ds() + await self._update_follows_in_db(responses) + + async def _get_follows_from_ds(self) -> list[SyncFollowGetInnerResponse]: + return await self.pdap_client.run_request_builder( + GetFollowsRequestBuilder() + ) + + async def _update_follows_in_db(self, responses: list[SyncFollowGetInnerResponse]) -> None: + # Get response tuples + api_pairs: list[UserLocationPairs] = [ + UserLocationPairs( + user_id=UserID(response.user_id), + location_id=LocationID(response.location_id) + ) + for response in responses + ] + # Run query + await self.adb_client.run_query_builder( + UpdateFollowsInDBQueryBuilder(api_pairs=api_pairs) + ) + # + # async def _get_follows_in_db(self) -> list[tuple[int, int]]: + # query = ( + # select( + # LinkLocationUserFollow.user_id, + # LinkLocationUserFollow.location_id + # ) + # ) + # mappings: Sequence[RowMapping] = await self.adb_client.mappings(query) \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/models/__init__.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/models/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/models/user_location_pairs.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/models/user_location_pairs.py new file mode 100644 index 00000000..58664fbd --- /dev/null +++ b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/models/user_location_pairs.py @@ -0,0 +1,19 @@ +""" + +Design Notes: + - I contemplated having this be a simple tuple, but reasoned it'd be more future-proof + if I used a Pydantic Model, so it would fail loudly in cause the API response + structure changes. + +""" + +from pydantic import BaseModel + +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.types import LocationID, UserID + +class UserLocationPairs(BaseModel): + user_id: UserID + location_id: LocationID + + class Config: + frozen = True \ No newline at end of file diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/query.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/query.py new file mode 100644 index 00000000..0f78a3da --- /dev/null +++ b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/query.py @@ -0,0 +1,74 @@ +from typing import Any, Sequence + +from sqlalchemy import select, RowMapping, delete, tuple_ +from sqlalchemy.ext.asyncio import AsyncSession + +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.models.user_location_pairs import UserLocationPairs +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.types import UserID, LocationID +from src.db.models.impl.link.location__user_follow import LinkLocationUserFollow +from src.db.queries.base.builder import QueryBuilderBase + +class UpdateFollowsInDBQueryBuilder(QueryBuilderBase): + + def __init__(self, api_pairs: list[UserLocationPairs]): + super().__init__() + self.api_pairs = api_pairs + + async def run(self, session: AsyncSession) -> Any: + db_pairs: list[UserLocationPairs] = await self.get_db_pairs(session) + api_pairs_set = set(self.api_pairs) + db_pairs_set = set(db_pairs) + # Get all pairs that are in the API but not in the DB + new_pairs = api_pairs_set - db_pairs_set + # Get all pairs that are in the DB but not in the API + removed_pairs = db_pairs_set - api_pairs_set + + await self.add_new_links(session, new_pairs) + await self.remove_links(session, removed_pairs) + + + async def get_db_pairs(self, session: AsyncSession) -> list[UserLocationPairs]: + query = ( + select( + LinkLocationUserFollow.user_id, + LinkLocationUserFollow.location_id + ) + ) + mappings: Sequence[RowMapping] = await self.sh.mappings(session, query=query) + return [ + UserLocationPairs( + user_id=mapping[LinkLocationUserFollow.user_id], + location_id=mapping[LinkLocationUserFollow.location_id] + ) + for mapping in mappings + ] + + async def add_new_links( + self, + session: AsyncSession, + pairs: set[UserLocationPairs] + ) -> None: + for pair in pairs: + link = LinkLocationUserFollow( + user_id=pair.user_id, + location_id=pair.location_id + ) + session.add(link) + + async def remove_links( + self, + session: AsyncSession, + removed_pairs: set[UserLocationPairs] + ) -> None: + tuples: list[tuple[UserID, LocationID]] = [ + (pair.user_id, pair.location_id) + for pair in removed_pairs + ] + statement = delete(LinkLocationUserFollow).where( + tuple_( + LinkLocationUserFollow.user_id, + LinkLocationUserFollow.location_id, + ).in_(tuples) + ) + await session.execute(statement) + diff --git a/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/types.py b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/types.py new file mode 100644 index 00000000..b3dc8e5b --- /dev/null +++ b/src/core/tasks/scheduled/impl/sync_from_ds/impl/follows/types.py @@ -0,0 +1,4 @@ +from typing import NewType + +UserID = NewType("UserID", int) +LocationID = NewType("LocationID", int) diff --git a/src/core/tasks/scheduled/loader.py b/src/core/tasks/scheduled/loader.py index 61169a66..d2e96cc1 100644 --- a/src/core/tasks/scheduled/loader.py +++ b/src/core/tasks/scheduled/loader.py @@ -12,6 +12,7 @@ from src.core.tasks.scheduled.impl.mark_never_completed.operator import MarkTaskNeverCompletedOperator from src.core.tasks.scheduled.impl.refresh_materialized_views.operator import RefreshMaterializedViewsOperator from src.core.tasks.scheduled.impl.run_url_tasks.operator import RunURLTasksTaskOperator +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.core import DSAppSyncUserFollowsGetTaskOperator from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.add.core import DSAppSyncAgenciesAddTaskOperator from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.delete.core import DSAppSyncAgenciesDeleteTaskOperator from src.core.tasks.scheduled.impl.sync_to_ds.impl.agencies.update.core import DSAppSyncAgenciesUpdateTaskOperator @@ -136,6 +137,15 @@ async def load_entries(self) -> list[ScheduledTaskEntry]: enabled=self.setup_flag("INTEGRITY_MONITOR_TASK_FLAG") ), # Sync + ## Get + ScheduledTaskEntry( + operator=DSAppSyncUserFollowsGetTaskOperator( + adb_client=self.adb_client, + pdap_client=self.pdap_client + ), + interval_minutes=IntervalEnum.DAILY.value, + enabled=self.setup_flag("DS_APP_SYNC_USER_FOLLOWS_GET_TASK_FLAG") + ), ## Adds ### Agency ScheduledTaskEntry( diff --git a/src/db/enums.py b/src/db/enums.py index 65f446c5..97e2cc4b 100644 --- a/src/db/enums.py +++ b/src/db/enums.py @@ -75,6 +75,7 @@ class TaskType(PyEnum): SYNC_META_URLS_ADD = "Sync Meta URLs Add" SYNC_META_URLS_UPDATE = "Sync Meta URLs Update" SYNC_META_URLS_DELETE = "Sync Meta URLs Delete" + SYNC_USER_FOLLOWS_GET = "Sync User Follows Get" class ChangeLogOperationType(PyEnum): INSERT = "INSERT" diff --git a/src/db/models/impl/link/location__user_follow.py b/src/db/models/impl/link/location__user_follow.py new file mode 100644 index 00000000..a4f65281 --- /dev/null +++ b/src/db/models/impl/link/location__user_follow.py @@ -0,0 +1,20 @@ +from sqlalchemy import Integer, Column, PrimaryKeyConstraint + +from src.db.models.mixins import LocationDependentMixin, CreatedAtMixin +from src.db.models.templates_.base import Base + + +class LinkLocationUserFollow( + Base, + LocationDependentMixin, + CreatedAtMixin +): + __tablename__ = "link__locations__user_follows" + __table_args__ = ( + PrimaryKeyConstraint( + "user_id", + "location_id" + ), + ) + + user_id = Column(Integer, nullable=False) diff --git a/src/external/pdap/_templates/request_builder.py b/src/external/pdap/_templates/request_builder.py index 2cde6c51..887e2cfd 100644 --- a/src/external/pdap/_templates/request_builder.py +++ b/src/external/pdap/_templates/request_builder.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from http import HTTPStatus -from typing import Any +from typing import Any, TypeVar from pdap_access_manager.access_manager.async_ import AccessManagerAsync from pdap_access_manager.enums import RequestType @@ -8,6 +8,7 @@ from pdap_access_manager.models.response import ResponseInfo from pydantic import BaseModel +T = TypeVar("T", bound=BaseModel) class PDAPRequestBuilderBase(ABC): @@ -37,6 +38,21 @@ async def post( raise Exception(f"Failed to make request to PDAP: {response_info.data}") return response_info.data + async def get( + self, + url: str, + model: type[T] + ) -> T: + request_info = RequestInfo( + type_=RequestType.GET, + url=url, + headers=await self.access_manager.jwt_header() + ) + response_info: ResponseInfo = await self.access_manager.make_request(request_info) + if response_info.status_code != HTTPStatus.OK: + raise Exception(f"Failed to make request to PDAP: {response_info.data}") + return model(**response_info.data) + @abstractmethod async def inner_logic(self) -> Any: raise NotImplementedError diff --git a/src/external/pdap/impl/sync/follows/__init__.py b/src/external/pdap/impl/sync/follows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/external/pdap/impl/sync/follows/core.py b/src/external/pdap/impl/sync/follows/core.py new file mode 100644 index 00000000..707ac8c9 --- /dev/null +++ b/src/external/pdap/impl/sync/follows/core.py @@ -0,0 +1,13 @@ +from src.external.pdap._templates.request_builder import PDAPRequestBuilderBase +from src.external.pdap.impl.sync.follows.response import SyncFollowGetInnerResponse, SyncFollowGetOuterResponse + + +class GetFollowsRequestBuilder(PDAPRequestBuilderBase): + + async def inner_logic(self) -> list[SyncFollowGetInnerResponse]: + url: str = self.build_url("v3/sync/follows") + response: SyncFollowGetOuterResponse = await self.get( + url=url, + model=SyncFollowGetOuterResponse + ) + return response.follows diff --git a/src/external/pdap/impl/sync/follows/response.py b/src/external/pdap/impl/sync/follows/response.py new file mode 100644 index 00000000..abdde583 --- /dev/null +++ b/src/external/pdap/impl/sync/follows/response.py @@ -0,0 +1,9 @@ +from pydantic import BaseModel + + +class SyncFollowGetInnerResponse(BaseModel): + user_id: int + location_id: int + +class SyncFollowGetOuterResponse(BaseModel): + follows: list[SyncFollowGetInnerResponse] diff --git a/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/__init__.py b/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/user_follows/__init__.py b/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/user_follows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/user_follows/test_core.py b/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/user_follows/test_core.py new file mode 100644 index 00000000..b95eb102 --- /dev/null +++ b/tests/automated/integration/tasks/scheduled/impl/sync_from_ds/user_follows/test_core.py @@ -0,0 +1,115 @@ +from http import HTTPStatus +from unittest.mock import AsyncMock + +import pytest +from pdap_access_manager.models.response import ResponseInfo + +from src.core.tasks.base.run_info import TaskOperatorRunInfo +from src.core.tasks.scheduled.impl.sync_from_ds.impl.follows.core import DSAppSyncUserFollowsGetTaskOperator +from src.db.client.async_ import AsyncDatabaseClient +from src.db.models.impl.link.location__user_follow import LinkLocationUserFollow +from src.external.pdap.client import PDAPClient +from src.external.pdap.impl.sync.follows.response import SyncFollowGetInnerResponse, SyncFollowGetOuterResponse +from tests.automated.integration.conftest import MOCK_USER_ID +from tests.helpers.asserts import assert_task_run_success +from tests.helpers.data_creator.models.creation_info.county import CountyCreationInfo +from tests.helpers.data_creator.models.creation_info.locality import LocalityCreationInfo +from tests.helpers.data_creator.models.creation_info.us_state import USStateCreationInfo + + +def mock_client( + mock_pdap_client: PDAPClient, + response: list[SyncFollowGetInnerResponse] +) -> None: + mock_pdap_client.access_manager.make_request = AsyncMock( + return_value=ResponseInfo( + status_code=HTTPStatus.OK, + data=SyncFollowGetOuterResponse( + follows=response + ).model_dump(mode='json') + ) + ) + +@pytest.mark.asyncio +async def test_core( + adb_client_test: AsyncDatabaseClient, + mock_pdap_client: PDAPClient, + pittsburgh_locality: LocalityCreationInfo, + allegheny_county: CountyCreationInfo, + pennsylvania: USStateCreationInfo +): + operator = DSAppSyncUserFollowsGetTaskOperator( + adb_client=adb_client_test, + pdap_client=mock_pdap_client + ) + + # Mock client to add 3 new follows + mock_client( + mock_pdap_client, + response=[ + SyncFollowGetInnerResponse( + user_id=MOCK_USER_ID, + location_id=pittsburgh_locality.location_id + ), + SyncFollowGetInnerResponse( + user_id=MOCK_USER_ID, + location_id=allegheny_county.location_id + ), + SyncFollowGetInnerResponse( + user_id=MOCK_USER_ID, + location_id=pennsylvania.location_id + ) + ] + ) + + # # Run Task + run_info: TaskOperatorRunInfo = await operator.run_task() + assert_task_run_success(run_info) + + # confirm three follows added + links: list[LinkLocationUserFollow] = await adb_client_test.get_all(LinkLocationUserFollow) + assert len(links) == 3 + link_tuples = [(link.user_id, link.location_id) for link in links] + assert (MOCK_USER_ID, pittsburgh_locality.location_id) in link_tuples + assert (MOCK_USER_ID, allegheny_county.location_id) in link_tuples + assert (MOCK_USER_ID, pennsylvania.location_id) in link_tuples + + # # Run Task again + run_info: TaskOperatorRunInfo = await operator.run_task() + assert_task_run_success(run_info) + + # # Confirm no new follows added + links: list[LinkLocationUserFollow] = await adb_client_test.get_all(LinkLocationUserFollow) + assert len(links) == 3 + link_tuples = [(link.user_id, link.location_id) for link in links] + assert (MOCK_USER_ID, pittsburgh_locality.location_id) in link_tuples + assert (MOCK_USER_ID, allegheny_county.location_id) in link_tuples + assert (MOCK_USER_ID, pennsylvania.location_id) in link_tuples + + + # Mock client to add only two of the follows + mock_client( + mock_pdap_client, + response=[ + SyncFollowGetInnerResponse( + user_id=MOCK_USER_ID, + location_id=pittsburgh_locality.location_id + ), + SyncFollowGetInnerResponse( + user_id=MOCK_USER_ID, + location_id=allegheny_county.location_id + ), + ] + ) + + # # Run Task again + run_info: TaskOperatorRunInfo = await operator.run_task() + assert_task_run_success(run_info) + # Confirm one of the follows is removed + + links: list[LinkLocationUserFollow] = await adb_client_test.get_all(LinkLocationUserFollow) + assert len(links) == 2 + link_tuples = [(link.user_id, link.location_id) for link in links] + assert (MOCK_USER_ID, pittsburgh_locality.location_id) in link_tuples + assert (MOCK_USER_ID, allegheny_county.location_id) in link_tuples + diff --git a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py index 4e5bb551..cb70ff8c 100644 --- a/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py +++ b/tests/automated/integration/tasks/scheduled/loader/test_happy_path.py @@ -2,7 +2,7 @@ from src.core.tasks.scheduled.loader import ScheduledTaskOperatorLoader -NUMBER_OF_ENTRIES = 21 +NUMBER_OF_ENTRIES = 22 @pytest.mark.asyncio async def test_happy_path( diff --git a/tests/manual/external/pdap/conftest.py b/tests/manual/external/pdap/conftest.py index de386ad7..51c1947c 100644 --- a/tests/manual/external/pdap/conftest.py +++ b/tests/manual/external/pdap/conftest.py @@ -1,7 +1,8 @@ import pytest import pytest_asyncio from aiohttp import ClientSession -from pdap_access_manager import AccessManager +from pdap_access_manager.access_manager.async_ import AccessManagerAsync as AccessManager +from pdap_access_manager.models.auth import AuthInfo from src.external.pdap.client import PDAPClient from src.util.helper_functions import get_from_env @@ -15,8 +16,10 @@ async def client_session(): @pytest.fixture def access_manager(client_session): return AccessManager( - email=get_from_env("PDAP_PROD_EMAIL"), - password=get_from_env("PDAP_PROD_PASSWORD"), + auth=AuthInfo( + email=get_from_env("PDAP_PROD_EMAIL"), + password=get_from_env("PDAP_PROD_PASSWORD"), + ), api_key=get_from_env("PDAP_API_KEY", allow_none=True), session=client_session ) @@ -24,8 +27,10 @@ def access_manager(client_session): @pytest.fixture def access_manager_dev(client_session): return AccessManager( - email=get_from_env("PDAP_DEV_EMAIL"), - password=get_from_env("PDAP_DEV_PASSWORD"), + auth=AuthInfo( + email=get_from_env("PDAP_DEV_EMAIL"), + password=get_from_env("PDAP_DEV_PASSWORD"), + ), api_key=get_from_env("PDAP_DEV_API_KEY", allow_none=True), data_sources_url=get_from_env("PDAP_DEV_API_URL"), session=client_session diff --git a/tests/manual/external/pdap/test_get_follows_sync.py b/tests/manual/external/pdap/test_get_follows_sync.py new file mode 100644 index 00000000..9d62209b --- /dev/null +++ b/tests/manual/external/pdap/test_get_follows_sync.py @@ -0,0 +1,12 @@ +import pytest + +from src.external.pdap.impl.sync.follows.core import GetFollowsRequestBuilder + + +@pytest.mark.asyncio +async def test_get_follows_sync(pdap_client_dev): + + response = await pdap_client_dev.run_request_builder( + GetFollowsRequestBuilder() + ) + print(response)