From e7024cf3a283d57c4f5bdca2367f8ded322dcca4 Mon Sep 17 00:00:00 2001 From: Edmund Mills Date: Sun, 11 May 2025 18:25:56 +0000 Subject: [PATCH] Proof of concept for async pipe --- expression/extra/__init__.py | 4 +- expression/extra/async_result/__init__.py | 6 + expression/extra/async_result/async_result.py | 19 ++ expression/extra/pipe/__init__.py | 6 + expression/extra/pipe/async_pipe.py | 168 ++++++++++++++++++ tests/test_async_pipe.py | 83 +++++++++ 6 files changed, 284 insertions(+), 2 deletions(-) create mode 100644 expression/extra/async_result/__init__.py create mode 100644 expression/extra/async_result/async_result.py create mode 100644 expression/extra/pipe/__init__.py create mode 100644 expression/extra/pipe/async_pipe.py create mode 100644 tests/test_async_pipe.py diff --git a/expression/extra/__init__.py b/expression/extra/__init__.py index a0f725e5..9b337b43 100644 --- a/expression/extra/__init__.py +++ b/expression/extra/__init__.py @@ -4,7 +4,7 @@ functions. """ -from . import option, result +from . import option, result, pipe, async_result -__all__ = ["option", "result"] +__all__ = ["option", "result", "pipe", "async_result"] diff --git a/expression/extra/async_result/__init__.py b/expression/extra/async_result/__init__.py new file mode 100644 index 00000000..379b69a9 --- /dev/null +++ b/expression/extra/async_result/__init__.py @@ -0,0 +1,6 @@ +"""Extra async_result functions.""" + +from .async_result import bind + + +__all__ = ["bind"] diff --git a/expression/extra/async_result/async_result.py b/expression/extra/async_result/async_result.py new file mode 100644 index 00000000..53b5485b --- /dev/null +++ b/expression/extra/async_result/async_result.py @@ -0,0 +1,19 @@ +from typing import Callable, Coroutine, Any, TypeVar +from expression import Result +from expression.effect.async_result import async_result + +_TSource = TypeVar("_TSource") +_TResult = TypeVar("_TResult") +_TError = TypeVar("_TError") + + +def bind( + mapper: Callable[[_TSource], Coroutine[Any, Any, Result[_TResult, Any]]], +) -> Callable[[Result[_TSource, _TError]], Coroutine[Any, Any, Result[_TResult, _TError]]]: + async def wrapped(result: Result[_TSource, _TError]) -> Result[_TResult, _TError]: + return await async_result.bind(result, mapper) + + return wrapped + + +__all__ = ["bind"] diff --git a/expression/extra/pipe/__init__.py b/expression/extra/pipe/__init__.py new file mode 100644 index 00000000..1b52a648 --- /dev/null +++ b/expression/extra/pipe/__init__.py @@ -0,0 +1,6 @@ +"""Extra pipe functions.""" + +from .async_pipe import async_pipe + + +__all__ = ["async_pipe"] diff --git a/expression/extra/pipe/async_pipe.py b/expression/extra/pipe/async_pipe.py new file mode 100644 index 00000000..e1a41c31 --- /dev/null +++ b/expression/extra/pipe/async_pipe.py @@ -0,0 +1,168 @@ +"""Async Pipe module + +The pipe handles both synchronous and asynchronous functions; if a function +returns a Coroutine, it is awaited before its result is passed to the next function. +""" + +from collections.abc import Callable +from typing import Any, TypeVar, overload, Union, Coroutine +import inspect + +_A = TypeVar("_A") +_B = TypeVar("_B") +_C = TypeVar("_C") +_D = TypeVar("_D") +_E = TypeVar("_E") +_F = TypeVar("_F") +_G = TypeVar("_G") +_H = TypeVar("_H") +_T = TypeVar("_T") +_J = TypeVar("_J") + +_X = TypeVar("_X") +_Y = TypeVar("_Y") + +SyncCallable = Callable[[_X], _Y] +AsyncCallable = Callable[[_X], Coroutine[Any, Any, _Y]] +SyncOrAsyncCallable = Union[SyncCallable[_X, _Y], AsyncCallable[_X, _Y]] + + +@overload +async def async_pipe(value: _A, /) -> _A: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + /, +) -> _B: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + /, +) -> _C: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + /, +) -> _D: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + /, +) -> _E: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + fn5: SyncOrAsyncCallable[_E, _F], + /, +) -> _F: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + fn5: SyncOrAsyncCallable[_E, _F], + fn6: SyncOrAsyncCallable[_F, _G], + /, +) -> _G: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + fn5: SyncOrAsyncCallable[_E, _F], + fn6: SyncOrAsyncCallable[_F, _G], + fn7: SyncOrAsyncCallable[_G, _H], + /, +) -> _H: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + fn5: SyncOrAsyncCallable[_E, _F], + fn6: SyncOrAsyncCallable[_F, _G], + fn7: SyncOrAsyncCallable[_G, _H], + fn8: SyncOrAsyncCallable[_H, _T], + /, +) -> _T: ... + + +@overload +async def async_pipe( + value: _A, + fn1: SyncOrAsyncCallable[_A, _B], + fn2: SyncOrAsyncCallable[_B, _C], + fn3: SyncOrAsyncCallable[_C, _D], + fn4: SyncOrAsyncCallable[_D, _E], + fn5: SyncOrAsyncCallable[_E, _F], + fn6: SyncOrAsyncCallable[_F, _G], + fn7: SyncOrAsyncCallable[_G, _H], + fn8: SyncOrAsyncCallable[_H, _T], + fn9: SyncOrAsyncCallable[_T, _J], + /, +) -> _J: ... + + +async def async_pipe(value: Any, *functions: SyncOrAsyncCallable[Any, Any]) -> Any: + """Functional async pipe (`|>`). + + Passes the `value` to the first function in `functions`, then the result of that + to the second function, and so on, recursively. If any function in the sequence + returns a coroutine, it is `await`ed before its result is passed to the next function. + + Args: + value: The initial value for the pipeline. + *functions: A sequence of functions to apply. Each function should + accept the output of the previous function (or the initial + `value` for the first function) as its input. Functions can be + synchronous or asynchronous. + + Returns: + The result of passing the value through all functions in the sequence. + """ + if inspect.iscoroutine(value): + value = await value + if not functions: + return value + next_func, *remaining_functions = functions + result_or_awaitable = next_func(value) + return await async_pipe(result_or_awaitable, *remaining_functions) + + +__all__ = ["async_pipe"] diff --git a/tests/test_async_pipe.py b/tests/test_async_pipe.py new file mode 100644 index 00000000..ead63589 --- /dev/null +++ b/tests/test_async_pipe.py @@ -0,0 +1,83 @@ +from collections.abc import Callable +import asyncio + +from hypothesis import given +from hypothesis import strategies as st + +from expression.extra.pipe import async_pipe +from expression.extra import async_result +from expression import Result, Ok, result + + +@given(st.integers()) +def test_pipe_id(x: int): + value = asyncio.run(async_pipe(x)) + assert value == x + + +@given(st.integers()) +def test_pipe_awaitable_only(x: int): + async def awaitable_value() -> int: + return x + + value = asyncio.run(async_pipe(awaitable_value())) + assert value == x + + +@given(st.integers()) +def test_pipe_fn(x: int): + async def gn(x: int) -> int: + return x + 1 + + value = asyncio.run(async_pipe(x, gn)) + assert value == asyncio.run(gn(x)) + + +@given(st.integers(), st.integers(), st.integers()) +def test_pipe_fn_gn(x: int, y: int, z: int): + fn: Callable[[int], int] = lambda x: x + z + + async def gn(x: int) -> int: + return x * y + + value = asyncio.run(async_pipe(x, fn, gn)) + + assert value == asyncio.run(gn(fn(x))) + + value = asyncio.run( + async_pipe( + x, + gn, + fn, + ) + ) + + assert value == fn(asyncio.run(gn(x))) + + +@given(st.integers(), st.integers(), st.integers()) +def test_pipe_async_result(x: int, y: int, z: int): + fn: Callable[[int], Result[int, str]] = lambda x: Ok(x + z) + + async def gn(x: int) -> Result[int, str]: + return Ok(x * y) + + value = asyncio.run( + async_pipe( + Ok(x), + result.bind(fn), + async_result.bind(gn), + ) + ) + + assert value == Ok((x + z) * y) + + value = asyncio.run( + async_pipe( + Ok(x), + async_result.bind(gn), + result.bind(fn), + ) + ) + + assert value == Ok((x * y) + z)