Skip to content

Commit 1a29b6c

Browse files
committed
make_async w run_in_threadpool wrapper for resolvers
1 parent 904e47b commit 1a29b6c

File tree

10 files changed

+44
-15
lines changed

10 files changed

+44
-15
lines changed

orchestrator/graphql/resolvers/helpers.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
from collections.abc import Sequence
2+
from functools import wraps
3+
from typing import Callable, Coroutine
24

5+
import structlog
36
from sqlalchemy import CompoundSelect, Select, select
47
from sqlalchemy.orm.strategy_options import _AbstractLoad
8+
from starlette.concurrency import run_in_threadpool
59

610
from orchestrator.db import db
711
from orchestrator.db.database import BaseModel
812

13+
logger = structlog.get_logger(__name__)
14+
915

1016
def rows_from_statement(
1117
stmt: Select | CompoundSelect,
@@ -19,3 +25,12 @@ def rows_from_statement(
1925
result = db.session.scalars(from_stmt)
2026
uresult = result.unique() if unique else result
2127
return uresult.all()
28+
29+
30+
def make_async(f: Callable): # type: ignore
31+
@wraps(f)
32+
async def wrapper(*args, **kwargs) -> Coroutine: # type: ignore
33+
logger.debug(f"**async, calling fn {f.__name__}")
34+
return await run_in_threadpool(f, *args, **kwargs)
35+
36+
return wrapper

orchestrator/graphql/resolvers/process.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from orchestrator.db.sorting import Sort
2626
from orchestrator.db.sorting.process import process_sort_fields, sort_processes
2727
from orchestrator.graphql.pagination import Connection
28-
from orchestrator.graphql.resolvers.helpers import rows_from_statement
28+
from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement
2929
from orchestrator.graphql.schemas.process import ProcessType
3030
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
3131
from orchestrator.graphql.utils import (
@@ -55,7 +55,8 @@ def _enrich_process(process: ProcessTable, with_details: bool = False) -> Proces
5555
return ProcessSchema(**process_data)
5656

5757

58-
async def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessType | None:
58+
@make_async
59+
def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessType | None:
5960
query_loaders = get_query_loaders_for_gql_fields(ProcessTable, info)
6061
stmt = select(ProcessTable).options(*query_loaders).where(ProcessTable.process_id == process_id)
6162
if process := db.session.scalar(stmt):
@@ -64,7 +65,8 @@ async def resolve_process(info: OrchestratorInfo, process_id: UUID) -> ProcessTy
6465
return None
6566

6667

67-
async def resolve_processes(
68+
@make_async
69+
def resolve_processes(
6870
info: OrchestratorInfo,
6971
filter_by: list[GraphqlFilter] | None = None,
7072
sort_by: list[GraphqlSort] | None = None,

orchestrator/graphql/resolvers/product.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from orchestrator.db.sorting import Sort
1010
from orchestrator.db.sorting.product import product_sort_fields, sort_products
1111
from orchestrator.graphql.pagination import Connection
12-
from orchestrator.graphql.resolvers.helpers import rows_from_statement
12+
from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement
1313
from orchestrator.graphql.schemas.product import ProductType
1414
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
1515
from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page
@@ -19,7 +19,8 @@
1919
logger = structlog.get_logger(__name__)
2020

2121

22-
async def resolve_products(
22+
@make_async
23+
def resolve_products(
2324
info: OrchestratorInfo,
2425
filter_by: list[GraphqlFilter] | None = None,
2526
sort_by: list[GraphqlSort] | None = None,

orchestrator/graphql/resolvers/product_block.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from orchestrator.db.sorting import Sort
1414
from orchestrator.db.sorting.product_block import product_block_sort_fields, sort_product_blocks
1515
from orchestrator.graphql.pagination import Connection
16-
from orchestrator.graphql.resolvers.helpers import rows_from_statement
16+
from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement
1717
from orchestrator.graphql.schemas.product_block import ProductBlock
1818
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
1919
from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page
@@ -23,7 +23,8 @@
2323
logger = structlog.get_logger(__name__)
2424

2525

26-
async def resolve_product_blocks(
26+
@make_async
27+
def resolve_product_blocks(
2728
info: OrchestratorInfo,
2829
filter_by: list[GraphqlFilter] | None = None,
2930
sort_by: list[GraphqlSort] | None = None,

orchestrator/graphql/resolvers/resource_type.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from orchestrator.db.sorting import Sort
1414
from orchestrator.db.sorting.resource_type import resource_type_sort_fields, sort_resource_types
1515
from orchestrator.graphql.pagination import Connection
16-
from orchestrator.graphql.resolvers.helpers import rows_from_statement
16+
from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement
1717
from orchestrator.graphql.schemas.resource_type import ResourceType
1818
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
1919
from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page
@@ -23,7 +23,8 @@
2323
logger = structlog.get_logger(__name__)
2424

2525

26-
async def resolve_resource_types(
26+
@make_async
27+
def resolve_resource_types(
2728
info: OrchestratorInfo,
2829
filter_by: list[GraphqlFilter] | None = None,
2930
sort_by: list[GraphqlSort] | None = None,

orchestrator/graphql/resolvers/scheduled_tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from orchestrator.db.filters import Filter
44
from orchestrator.db.sorting import Sort
55
from orchestrator.graphql.pagination import Connection
6+
from orchestrator.graphql.resolvers.helpers import make_async
67
from orchestrator.graphql.schemas.scheduled_task import ScheduledTaskGraphql
78
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
89
from orchestrator.graphql.utils import create_resolver_error_handler, to_graphql_result_page
@@ -12,7 +13,8 @@
1213
logger = structlog.get_logger(__name__)
1314

1415

15-
async def resolve_scheduled_tasks(
16+
@make_async
17+
def resolve_scheduled_tasks(
1618
info: OrchestratorInfo,
1719
filter_by: list[GraphqlFilter] | None = None,
1820
sort_by: list[GraphqlSort] | None = None,

orchestrator/graphql/resolvers/settings.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from oauth2_lib.strawberry import authenticated_mutation_field
66
from orchestrator.api.api_v1.endpoints.settings import generate_engine_status_response
7+
from orchestrator.graphql.resolvers.helpers import make_async
78
from orchestrator.graphql.schemas.errors import Error
89
from orchestrator.graphql.schemas.settings import (
910
CACHE_FLUSH_OPTIONS,
@@ -27,6 +28,7 @@
2728

2829

2930
# Queries
31+
@make_async
3032
def resolve_settings(info: OrchestratorInfo) -> StatusType:
3133
selected_fields = get_selected_fields(info)
3234

orchestrator/graphql/resolvers/subscription.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from pydantic.alias_generators import to_camel as to_lower_camel
1919
from sqlalchemy import Select, func, select
2020
from sqlalchemy.orm import contains_eager
21+
from starlette.concurrency import run_in_threadpool
2122
from strawberry.experimental.pydantic.conversion_types import StrawberryTypeFromPydantic
2223

2324
from nwastdlib.asyncio import gather_nice
@@ -101,7 +102,7 @@ async def format_subscription(info: OrchestratorInfo, subscription: Subscription
101102
async def resolve_subscription(info: OrchestratorInfo, id: UUID) -> SubscriptionInterface | None:
102103
stmt = select(SubscriptionTable).where(SubscriptionTable.subscription_id == id)
103104

104-
if subscription := db.session.scalar(stmt):
105+
if subscription := await run_in_threadpool(db.session.scalar, stmt):
105106
return await format_subscription(info, subscription)
106107
return None
107108

@@ -141,12 +142,13 @@ async def resolve_subscriptions(
141142
stmt = filter_by_query_string(stmt, query)
142143

143144
stmt = cast(Select, sort_subscriptions(stmt, pydantic_sort_by, _error_handler))
144-
total = db.session.scalar(select(func.count()).select_from(stmt.subquery()))
145+
total = await run_in_threadpool(db.session.scalar, select(func.count()).select_from(stmt.subquery()))
145146
stmt = apply_range_to_statement(stmt, after, after + first + 1)
146147

147148
graphql_subscriptions: list[SubscriptionInterface] = []
148149
if is_querying_page_data(info):
149-
subscriptions = db.session.scalars(stmt).all()
150+
scalars = await run_in_threadpool(db.session.scalars, stmt)
151+
subscriptions = scalars.all()
150152
graphql_subscriptions = list(await gather_nice((format_subscription(info, p) for p in subscriptions))) # type: ignore
151153
logger.info("Resolve subscriptions", filter_by=filter_by, total=total)
152154

orchestrator/graphql/resolvers/version.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from structlog import get_logger
22

33
from orchestrator import __version__
4+
from orchestrator.graphql.resolvers.helpers import make_async
45
from orchestrator.graphql.schemas.version import VersionType
56
from orchestrator.graphql.types import OrchestratorInfo
67
from orchestrator.graphql.utils import create_resolver_error_handler
@@ -11,6 +12,7 @@
1112
VERSIONS = [f"orchestrator-core: {__version__}"]
1213

1314

15+
@make_async
1416
def resolve_version(info: OrchestratorInfo) -> VersionType | None:
1517
logger.debug("resolve_version() called")
1618
_error_handler = create_resolver_error_handler(info)

orchestrator/graphql/resolvers/workflow.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from orchestrator.db.sorting import Sort
1010
from orchestrator.db.sorting.workflow import sort_workflows, workflow_sort_fields
1111
from orchestrator.graphql.pagination import Connection
12-
from orchestrator.graphql.resolvers.helpers import rows_from_statement
12+
from orchestrator.graphql.resolvers.helpers import make_async, rows_from_statement
1313
from orchestrator.graphql.schemas.workflow import Workflow
1414
from orchestrator.graphql.types import GraphqlFilter, GraphqlSort, OrchestratorInfo
1515
from orchestrator.graphql.utils import create_resolver_error_handler, is_querying_page_data, to_graphql_result_page
@@ -19,7 +19,8 @@
1919
logger = structlog.get_logger(__name__)
2020

2121

22-
async def resolve_workflows(
22+
@make_async
23+
def resolve_workflows(
2324
info: OrchestratorInfo,
2425
filter_by: list[GraphqlFilter] | None = None,
2526
sort_by: list[GraphqlSort] | None = None,

0 commit comments

Comments
 (0)