Skip to content

Commit 3625e30

Browse files
committed
[FLINK-38190][python] Support unordered mode of async function in Python DataStream API
1 parent 5b9e453 commit 3625e30

File tree

10 files changed

+1041
-17
lines changed

10 files changed

+1041
-17
lines changed

flink-python/pyflink/datastream/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@
256256
- :class:`OutputTag`:
257257
Tag with a name and type for identifying side output of an operator
258258
"""
259+
from pyflink.datastream.async_data_stream import AsyncDataStream
259260
from pyflink.datastream.checkpoint_config import CheckpointConfig
260261
from pyflink.datastream.externalized_checkpoint_retention import ExternalizedCheckpointRetention
261262
from pyflink.datastream.checkpointing_mode import CheckpointingMode
@@ -268,7 +269,8 @@
268269
SinkFunction, CoProcessFunction, KeyedProcessFunction,
269270
KeyedCoProcessFunction, AggregateFunction, WindowFunction,
270271
ProcessWindowFunction, BroadcastProcessFunction,
271-
KeyedBroadcastProcessFunction)
272+
KeyedBroadcastProcessFunction, AsyncFunction,
273+
ResultFuture)
272274
from pyflink.datastream.slot_sharing_group import SlotSharingGroup, MemorySize
273275
from pyflink.datastream.state_backend import (StateBackend, CustomStateBackend,
274276
PredefinedOptions, HashMapStateBackend,
@@ -292,6 +294,7 @@
292294
'ConnectedStreams',
293295
'BroadcastStream',
294296
'BroadcastConnectedStream',
297+
'AsyncDataStream',
295298
'DataStreamSink',
296299
'MapFunction',
297300
'CoMapFunction',
@@ -308,6 +311,7 @@
308311
'AggregateFunction',
309312
'BroadcastProcessFunction',
310313
'KeyedBroadcastProcessFunction',
314+
'AsyncFunction',
311315
'RuntimeContext',
312316
'TimerService',
313317
'CheckpointingMode',
@@ -338,5 +342,6 @@
338342
'SinkFunction',
339343
'SlotSharingGroup',
340344
'MemorySize',
341-
'OutputTag'
345+
'OutputTag',
346+
'ResultFuture'
342347
]
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
import inspect
19+
20+
from pyflink.common import Time, TypeInformation
21+
from pyflink.datastream.data_stream import DataStream, _get_one_input_stream_operator
22+
from pyflink.datastream.functions import AsyncFunctionDescriptor, AsyncFunction
23+
from pyflink.java_gateway import get_gateway
24+
from pyflink.util.java_utils import get_j_env_configuration
25+
26+
27+
class AsyncDataStream(object):
28+
"""
29+
A helper class to apply :class:`~AsyncFunction` to a data stream.
30+
"""
31+
32+
@staticmethod
33+
def unordered_wait(
34+
data_stream: DataStream,
35+
async_function: AsyncFunction,
36+
timeout: Time,
37+
capacity: int = 100,
38+
output_type: TypeInformation = None) -> 'DataStream':
39+
"""
40+
Adds an async function to the data stream. The order of output stream records may be
41+
reordered.
42+
43+
:param data_stream: The input data stream.
44+
:param async_function: The async function.
45+
:param timeout: The timeout for the asynchronous operation to complete.
46+
:param capacity: The max number of async i/o operation that can be triggered.
47+
:param output_type: The output data type.
48+
:return: The transformed DataStream.
49+
"""
50+
AsyncDataStream._validate(data_stream, async_function)
51+
52+
from pyflink.fn_execution import flink_fn_execution_pb2
53+
j_python_data_stream_function_operator, j_output_type_info = \
54+
_get_one_input_stream_operator(
55+
data_stream,
56+
AsyncFunctionDescriptor(
57+
async_function, timeout, capacity,
58+
AsyncFunctionDescriptor.OutputMode.UNORDERED),
59+
flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS, # type: ignore
60+
output_type)
61+
return DataStream(data_stream._j_data_stream.transform(
62+
"async wait operator",
63+
j_output_type_info,
64+
j_python_data_stream_function_operator))
65+
66+
@staticmethod
67+
def _validate(data_stream: DataStream, async_function: AsyncFunction):
68+
if not inspect.iscoroutinefunction(async_function.async_invoke):
69+
raise Exception("Method 'async_invoke' of class '%s' should be declared as 'async def'."
70+
% type(async_function))
71+
72+
gateway = get_gateway()
73+
j_conf = get_j_env_configuration(data_stream._j_data_stream.getExecutionEnvironment())
74+
python_execution_mode = (
75+
j_conf.get(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE))
76+
if python_execution_mode == 'thread':
77+
raise Exception("AsyncFunction is still not supported for 'thread' mode.")

flink-python/pyflink/datastream/data_stream.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
KeyedBroadcastProcessFunction,
4747
InternalSingleValueAllWindowFunction,
4848
PassThroughAllWindowFunction,
49-
InternalSingleValueProcessAllWindowFunction)
49+
InternalSingleValueProcessAllWindowFunction,
50+
AsyncFunctionDescriptor)
5051
from pyflink.datastream.output_tag import OutputTag
5152
from pyflink.datastream.slot_sharing_group import SlotSharingGroup
5253
from pyflink.datastream.state import (ListStateDescriptor, StateDescriptor, ReducingStateDescriptor,
@@ -2757,7 +2758,8 @@ def _is_keyed_stream(self):
27572758
def _get_one_input_stream_operator(data_stream: DataStream,
27582759
func: Union[Function,
27592760
FunctionWrapper,
2760-
WindowOperationDescriptor],
2761+
WindowOperationDescriptor,
2762+
AsyncFunctionDescriptor],
27612763
func_type: int,
27622764
output_type: Union[TypeInformation, List] = None):
27632765
"""
@@ -2891,7 +2893,8 @@ def _get_two_input_stream_operator(connected_streams: ConnectedStreams,
28912893

28922894

28932895
def _create_j_data_stream_python_function_info(
2894-
func: Union[Function, FunctionWrapper, WindowOperationDescriptor], func_type: int
2896+
func: Union[Function, FunctionWrapper, WindowOperationDescriptor, AsyncFunctionDescriptor],
2897+
func_type: int
28952898
) -> bytes:
28962899
gateway = get_gateway()
28972900

flink-python/pyflink/datastream/functions.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
################################################################################
1818

1919
from abc import ABC, abstractmethod
20+
from enum import Enum
21+
2022
from py4j.java_gateway import JavaObject
21-
from typing import Union, Any, Generic, TypeVar, Iterable
23+
from typing import Union, Any, Generic, TypeVar, Iterable, List
2224

2325
from pyflink.datastream.state import ValueState, ValueStateDescriptor, ListStateDescriptor, \
2426
ListState, MapStateDescriptor, MapState, ReducingStateDescriptor, ReducingState, \
@@ -53,6 +55,9 @@
5355
'BaseBroadcastProcessFunction',
5456
'BroadcastProcessFunction',
5557
'KeyedBroadcastProcessFunction',
58+
'AsyncFunction',
59+
'AsyncFunctionDescriptor',
60+
'ResultFuture'
5661
]
5762

5863

@@ -897,6 +902,83 @@ def on_timer(self, timestamp: int, ctx: 'KeyedCoProcessFunction.OnTimerContext')
897902
pass
898903

899904

905+
class ResultFuture(Generic[OUT]):
906+
"""
907+
Collects data / error in user codes while processing async i/o.
908+
"""
909+
910+
@abstractmethod
911+
def complete(self, result: List[OUT]):
912+
"""
913+
Completes the result future with a collection of result objects.
914+
915+
Note that it should be called for exactly one time in the user code. Calling this function
916+
for multiple times will cause data lose.
917+
918+
Put all results in a collection and then emit output.
919+
920+
:param result: A list of results.
921+
"""
922+
pass
923+
924+
@abstractmethod
925+
def complete_exceptionally(self, error: Exception):
926+
"""
927+
Completes the result future exceptionally with an exception.
928+
929+
:param error: An Exception object.
930+
"""
931+
pass
932+
933+
934+
class AsyncFunction(Function, Generic[IN, OUT]):
935+
"""
936+
A function to trigger Async I/O operation.
937+
938+
For each #async_invoke, an async io operation can be triggered, and once it has been done, the
939+
result can be collected by calling :func:`~ResultFuture.complete`. For each async operation, its
940+
context is stored in the operator immediately after invoking #async_invoke, avoiding blocking
941+
for each stream input as long as the internal buffer is not full.
942+
943+
:class:`~ResultFuture` can be passed into callbacks or futures to collect the result data. An
944+
error can also be propagated to the async IO operator by
945+
:func:`~ResultFuture.complete_exceptionally`.
946+
"""
947+
948+
@abstractmethod
949+
async def async_invoke(self, value: IN, result_future: ResultFuture[OUT]):
950+
"""
951+
Trigger async operation for each stream input.
952+
In case of a user code error. You can raise an exception to make the task fail and
953+
trigger fail-over process.
954+
955+
:param value: Input element coming from an upstream task.
956+
:param result_future: A future to be completed with the result data.
957+
"""
958+
pass
959+
960+
def timeout(self, value: IN, result_future: ResultFuture[OUT]):
961+
"""
962+
In case :func:`~ResultFuture.async_invoke` timeout occurred. By default, the result future
963+
is exceptionally completed with a timeout exception.
964+
"""
965+
result_future.complete_exceptionally(
966+
TimeoutError("Async function call has timed out for input: " + str(value)))
967+
968+
969+
class AsyncFunctionDescriptor(object):
970+
971+
class OutputMode(Enum):
972+
ORDERED = 0
973+
UNORDERED = 1
974+
975+
def __init__(self, async_function, timeout, capacity, output_mode):
976+
self.async_function = async_function
977+
self.timeout = timeout
978+
self.capacity = capacity
979+
self.output_mode = output_mode
980+
981+
900982
class WindowFunction(Function, Generic[IN, OUT, KEY, W]):
901983
"""
902984
Base interface for functions that are evaluated over keyed (grouped) windows.

0 commit comments

Comments
 (0)