Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions backend/lib/github/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class GithubTriggerState(pydantic_utils.BaseModel):


@dataclasses.dataclass(frozen=True)
class GithubTriggerProcessor(task_base.TriggerProcessor[GithubTriggerConfig]):
class GithubTriggerProcessor(task_base.BaseTriggerProcessor[GithubTriggerConfig]):
config: GithubTriggerConfig
raw_state: lib.task.protocols.StateProtocol
gql_github_client: github_clients.GqlGithubClient
Expand Down Expand Up @@ -258,26 +258,33 @@ async def _acquire_state(self) -> typing.AsyncIterator[GithubTriggerState]:
finally:
await self.raw_state.set(state.model_dump(mode="json"))

async def produce_events(self) -> typing.AsyncGenerator[task_base.Event, None]:
repositories: list[github_models.Repository] = []
async def _get_repositories(self) -> list[github_models.Repository]:
request = github_clients.GetRepositoriesRequest(
owner=self.config.owner,
)

async for repository in self.gql_github_client.get_repositories(
github_clients.GetRepositoriesRequest(
owner=self.config.owner,
)
):
result: list[github_models.Repository] = []

async for repository in self.gql_github_client.get_repositories(request):
if self.config.is_repository_applicable(repository):
repositories.append(repository)
result.append(repository)

return result

async def produce_events(self) -> typing.AsyncGenerator[task_base.Event, None]:
repositories = await self._get_repositories()

async with self._acquire_state() as state:
async for event in asyncio_utils.GatherIterators(
iterators = (
self._process_subtrigger_factory(
subtrigger=subtrigger,
state=state,
repositories=repositories,
)
for subtrigger in self.config.subtriggers
):
)

async for event in asyncio_utils.GatherIterators(iterators):
yield event

def _process_subtrigger_factory(
Expand Down
8 changes: 4 additions & 4 deletions backend/lib/task/base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from .action import (
ActionProcessor,
ActionProcessorProtocol,
BaseActionConfig,
BaseActionProcessor,
action_config_factory,
action_processor_factory,
register_action,
Expand All @@ -16,26 +16,26 @@
from .task import BaseTaskConfig, CronTaskConfig, OncePerRunTaskConfig
from .trigger import (
BaseTriggerConfig,
TriggerProcessor,
BaseTriggerProcessor,
TriggerProcessorProtocol,
register_trigger,
trigger_config_factory,
trigger_processor_factory,
)

__all__ = [
"ActionProcessor",
"ActionProcessorProtocol",
"BaseActionConfig",
"BaseActionProcessor",
"BaseSecretConfig",
"BaseTaskConfig",
"BaseTriggerConfig",
"BaseTriggerProcessor",
"CronTaskConfig",
"EnvSecretConfig",
"Event",
"OncePerRunTaskConfig",
"RootConfig",
"TriggerProcessor",
"TriggerProcessorProtocol",
"action_config_factory",
"action_processor_factory",
Expand Down
8 changes: 4 additions & 4 deletions backend/lib/task/base/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def factory(cls, data: typing.Any) -> "BaseActionConfig":
return action_config_factory(data)


class ActionProcessor[ConfigT: BaseActionConfig](abc.ABC):
class BaseActionProcessor[ConfigT: BaseActionConfig](abc.ABC):
@classmethod
@abc.abstractmethod
def from_config(
Expand All @@ -35,7 +35,7 @@ async def process(self, event: task_configs_event.Event) -> None: ...
@dataclasses.dataclass(frozen=True)
class RegistryRecord[ConfigT: BaseActionConfig]:
config_class: type[ConfigT]
processor_class: type[ActionProcessor[ConfigT]]
processor_class: type[BaseActionProcessor[ConfigT]]


_REGISTRY: dict[str, RegistryRecord[typing.Any]] = {}
Expand All @@ -44,7 +44,7 @@ class RegistryRecord[ConfigT: BaseActionConfig]:
def register_action[ConfigT: BaseActionConfig](
name: str,
config_class: type[ConfigT],
processor_class: type[ActionProcessor[ConfigT]],
processor_class: type[BaseActionProcessor[ConfigT]],
) -> None:
_REGISTRY[name] = RegistryRecord(config_class=config_class, processor_class=processor_class)

Expand All @@ -68,9 +68,9 @@ def action_processor_factory(


__all__ = [
"ActionProcessor",
"ActionProcessorProtocol",
"BaseActionConfig",
"BaseActionProcessor",
"action_config_factory",
"action_processor_factory",
"register_action",
Expand Down
9 changes: 9 additions & 0 deletions backend/lib/task/base/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ class BaseSecretConfig(pydantic_utils.TypedBaseModel):
def value(self) -> typing.Any: ...


class PlainSecretConfig(BaseSecretConfig):
plain_value: str

@property
def value(self) -> str:
return self.plain_value


class EnvSecretConfig(BaseSecretConfig):
key: str

Expand All @@ -29,6 +37,7 @@ def value(self) -> str:
def register_default_plugins() -> None:
logger.info("Registering default task base secret plugins")
BaseSecretConfig.register("env", EnvSecretConfig)
BaseSecretConfig.register("plain", PlainSecretConfig)


__all__ = [
Expand Down
8 changes: 4 additions & 4 deletions backend/lib/task/base/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def factory(cls, data: typing.Any) -> "BaseTriggerConfig":
return trigger_config_factory(data)


class TriggerProcessor[ConfigT: BaseTriggerConfig](abc.ABC):
class BaseTriggerProcessor[ConfigT: BaseTriggerConfig](abc.ABC):
@classmethod
@abc.abstractmethod
def from_config(
Expand All @@ -36,7 +36,7 @@ async def dispose(self) -> None: ...
@dataclasses.dataclass(frozen=True)
class RegistryRecord[ConfigT: BaseTriggerConfig]:
config_class: type[ConfigT]
processor_class: type[TriggerProcessor[ConfigT]]
processor_class: type[BaseTriggerProcessor[ConfigT]]


_REGISTRY: dict[str, RegistryRecord[typing.Any]] = {}
Expand All @@ -45,7 +45,7 @@ class RegistryRecord[ConfigT: BaseTriggerConfig]:
def register_trigger[ConfigT: BaseTriggerConfig](
name: str,
config_class: type[ConfigT],
processor_class: type[TriggerProcessor[ConfigT]],
processor_class: type[BaseTriggerProcessor[ConfigT]],
) -> None:
_REGISTRY[name] = RegistryRecord(config_class=config_class, processor_class=processor_class)

Expand All @@ -71,7 +71,7 @@ def trigger_processor_factory(

__all__ = [
"BaseTriggerConfig",
"TriggerProcessor",
"BaseTriggerProcessor",
"TriggerProcessorProtocol",
"register_trigger",
"trigger_config_factory",
Expand Down
2 changes: 1 addition & 1 deletion backend/lib/telegram/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class TelegramWebhookActionConfig(task_base.BaseActionConfig):


@dataclasses.dataclass(frozen=True)
class TelegramWebhookProcessor(task_base.ActionProcessor[TelegramWebhookActionConfig]):
class TelegramWebhookProcessor(task_base.BaseActionProcessor[TelegramWebhookActionConfig]):
config: TelegramWebhookActionConfig
aiohttp_client: aiohttp.ClientSession
telegram_client: telegram_clients.RestTelegramClient
Expand Down
6 changes: 6 additions & 0 deletions backend/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import pytest

import lib.plugin_registration as plugin_registration
import tests.settings as test_settings


@pytest.fixture(name="register_default_plugins", autouse=True, scope="session")
def register_default_plugins_fixture() -> None:
plugin_registration.register_default_plugins()


@pytest.fixture(name="settings")
def settings_fixture() -> test_settings.Settings:
return test_settings.Settings()
8 changes: 0 additions & 8 deletions backend/tests/unit/github/triggers/conftest.py

This file was deleted.

Empty file.
Empty file.
Empty file.
22 changes: 22 additions & 0 deletions backend/tests/unit/telegram/actions/webhook/test_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import lib.task.base as task_base
import lib.telegram.actions as telegram_actions
import tests.settings as test_settings


def test_config_factory(settings: test_settings.Settings):
config = task_base.action_config_factory(
data={
"id": "test_telegram_webhook",
"type": "telegram_webhook",
"chat_id_secret": {
"type": "env",
"key": "TELEGRAM_CHAT_ID",
},
"token_secret": {
"type": "env",
"key": "TELEGRAM_TOKEN",
},
},
)

assert isinstance(config, telegram_actions.TelegramWebhookActionConfig)
75 changes: 75 additions & 0 deletions backend/tests/unit/telegram/actions/webhook/test_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import aiohttp
import pytest
import pytest_mock

import lib.task.base as task_base
import lib.telegram.actions as telegram_actions
import lib.telegram.clients as telegram_clients


@pytest.fixture(name="config")
def config_fixture() -> telegram_actions.TelegramWebhookActionConfig:
result = task_base.action_config_factory(
data={
"id": "test_telegram_webhook",
"type": "telegram_webhook",
"chat_id_secret": {
"type": "plain",
"plain_value": "1234567890",
},
"token_secret": {
"type": "plain",
"plain_value": "1234567890",
},
},
)

assert isinstance(result, telegram_actions.TelegramWebhookActionConfig)
return result


@pytest.mark.asyncio
async def test_factory(config: telegram_actions.TelegramWebhookActionConfig):
processor = task_base.action_processor_factory(config=config)

assert isinstance(processor, telegram_actions.TelegramWebhookProcessor)
assert processor.config == config
assert isinstance(processor.aiohttp_client, aiohttp.ClientSession)
assert isinstance(processor.telegram_client, telegram_clients.RestTelegramClient)
assert processor.telegram_client.token == config.token_secret.value
assert processor.telegram_client.aiohttp_client == processor.aiohttp_client

await processor.dispose()


@pytest.mark.asyncio
async def test_process(
mocker: pytest_mock.MockerFixture,
config: telegram_actions.TelegramWebhookActionConfig,
):
aiohttp_client = mocker.MagicMock(spec=aiohttp.ClientSession)
telegram_client = mocker.MagicMock(spec=telegram_clients.RestTelegramClient)

processor = telegram_actions.TelegramWebhookProcessor(
config=config,
aiohttp_client=aiohttp_client,
telegram_client=telegram_client,
)

event = task_base.Event(
id="test_event",
title="test_title",
body="test_body",
url="test_url",
)

await processor.process(
event=event,
)

assert telegram_client.send_message.awaited_once_with(
request=telegram_clients.SendMessageRequest(
chat_id=config.chat_id_secret.value,
text="<b>test_title</b>\ntest_body\ntest_url",
),
)