Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
- name: Run Ruff linter
run: |
uv tool run ruff check
uv tool run ruff format --check

type-check:
name: Type Safety Check
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ generate:
lint:
@echo "Running Ruff linter and fixing lint issues..."
uv tool run ruff check --fix
uv tool run ruff format

# Run type checker
type-check:
Expand Down Expand Up @@ -52,4 +53,3 @@ help:
@echo " make integration-test - Run integration tests"
@echo " make clean - Remove generated files and caches"
@echo " make help - Show this help message"

2 changes: 0 additions & 2 deletions cadence/_internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,3 @@
"""

__all__: list[str] = []


6 changes: 1 addition & 5 deletions cadence/_internal/activity/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@


from ._activity_executor import (
ActivityExecutor
)
from ._activity_executor import ActivityExecutor

__all__ = [
"ActivityExecutor",
Expand Down
54 changes: 36 additions & 18 deletions cadence/_internal/activity/_activity_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@
from cadence._internal.activity._context import _Context, _SyncContext
from cadence.activity import ActivityInfo, ActivityDefinition, ExecutionStrategy
from cadence.api.v1.common_pb2 import Failure
from cadence.api.v1.service_worker_pb2 import PollForActivityTaskResponse, RespondActivityTaskFailedRequest, \
RespondActivityTaskCompletedRequest
from cadence.api.v1.service_worker_pb2 import (
PollForActivityTaskResponse,
RespondActivityTaskFailedRequest,
RespondActivityTaskCompletedRequest,
)
from cadence.client import Client

_logger = getLogger(__name__)


class ActivityExecutor:
def __init__(self, client: Client, task_list: str, identity: str, max_workers: int, registry: Callable[[str], ActivityDefinition]):
def __init__(
self,
client: Client,
task_list: str,
identity: str,
max_workers: int,
registry: Callable[[str], ActivityDefinition],
):
self._client = client
self._data_converter = client.data_converter
self._registry = registry
self._identity = identity
self._task_list = task_list
self._thread_pool = ThreadPoolExecutor(max_workers=max_workers,
thread_name_prefix=f'{task_list}-activity-')
self._thread_pool = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix=f"{task_list}-activity-"
)

async def execute(self, task: PollForActivityTaskResponse):
try:
Expand All @@ -46,27 +58,33 @@ def _create_context(self, task: PollForActivityTaskResponse) -> _Context:
else:
return _SyncContext(self._client, info, activity_def, self._thread_pool)

async def _report_failure(self, task: PollForActivityTaskResponse, error: Exception):
async def _report_failure(
self, task: PollForActivityTaskResponse, error: Exception
):
try:
await self._client.worker_stub.RespondActivityTaskFailed(RespondActivityTaskFailedRequest(
task_token=task.task_token,
failure=_to_failure(error),
identity=self._identity,
))
await self._client.worker_stub.RespondActivityTaskFailed(
RespondActivityTaskFailedRequest(
task_token=task.task_token,
failure=_to_failure(error),
identity=self._identity,
)
)
except Exception:
_logger.exception('Exception reporting activity failure')
_logger.exception("Exception reporting activity failure")

async def _report_success(self, task: PollForActivityTaskResponse, result: Any):
as_payload = await self._data_converter.to_data([result])

try:
await self._client.worker_stub.RespondActivityTaskCompleted(RespondActivityTaskCompletedRequest(
task_token=task.task_token,
result=as_payload,
identity=self._identity,
))
await self._client.worker_stub.RespondActivityTaskCompleted(
RespondActivityTaskCompletedRequest(
task_token=task.task_token,
result=as_payload,
identity=self._identity,
)
)
except Exception:
_logger.exception('Exception reporting activity complete')
_logger.exception("Exception reporting activity complete")

def _create_info(self, task: PollForActivityTaskResponse) -> ActivityInfo:
return ActivityInfo(
Expand Down
17 changes: 14 additions & 3 deletions cadence/_internal/activity/_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@


class _Context(ActivityContext):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefinition[[Any], Any]):
def __init__(
self,
client: Client,
info: ActivityInfo,
activity_fn: ActivityDefinition[[Any], Any],
):
self._client = client
self._info = info
self._activity_fn = activity_fn
Expand All @@ -28,8 +33,15 @@ def client(self) -> Client:
def info(self) -> ActivityInfo:
return self._info


class _SyncContext(_Context):
def __init__(self, client: Client, info: ActivityInfo, activity_fn: ActivityDefinition[[Any], Any], executor: ThreadPoolExecutor):
def __init__(
self,
client: Client,
info: ActivityInfo,
activity_fn: ActivityDefinition[[Any], Any],
executor: ThreadPoolExecutor,
):
super().__init__(client, info, activity_fn)
self._executor = executor

Expand All @@ -44,4 +56,3 @@ def _run(self, args: list[Any]) -> Any:

def client(self) -> Client:
raise RuntimeError("client is only supported in async activities")

Loading