Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perpartition with global - Regression Tests #45415

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
05c256d
Migrate `IssueComments` and `IssueWorklogs` streams to low code
tolik0 May 23, 2024
9b95d9e
Add Global Parent Cursor
tolik0 Jul 16, 2024
49e75bf
Add PerPartitionWithGlobalCursor
tolik0 Sep 6, 2024
6874c76
Add switch between partitions
tolik0 Sep 10, 2024
16847c1
Update tests
tolik0 Sep 10, 2024
10c09a8
Fix incremental tests
tolik0 Sep 11, 2024
01a55a7
Add docs
tolik0 Sep 11, 2024
5c90376
Refactor per partition with global
tolik0 Sep 11, 2024
baa8af9
Refactor
tolik0 Sep 12, 2024
7c5fe66
Merge branch 'tolik0/source-jira/migrate-incremental-substreams' into…
tolik0 Sep 12, 2024
766bd72
Fix timer
tolik0 Sep 12, 2024
405bb54
Migrate `IssueComments` and `IssueWorklogs` streams to low code
tolik0 May 23, 2024
bd15d98
Add Global Parent Cursor
tolik0 Jul 16, 2024
7b26c3a
Delete state migration
tolik0 Sep 12, 2024
9065d46
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 12, 2024
f45c5ee
Merge branch 'tolik0/source-jira/migrate-incremental-substreams' into…
tolik0 Sep 12, 2024
e18d430
Add new test
tolik0 Sep 12, 2024
bfd4a45
Refactor and update tests
tolik0 Sep 13, 2024
2c60ef0
Update ClientSideIncrementalRecordFilterDecorator
tolik0 Sep 13, 2024
506d296
Fix formatting
tolik0 Sep 13, 2024
d8e27d4
Fix mypy errors
tolik0 Sep 17, 2024
2b629e0
Fix formatting
tolik0 Sep 17, 2024
563b54c
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
69a84b5
chore: auto-fix lint and format issues
octavia-squidington-iii Sep 17, 2024
0dc55e1
Fix typo in test
tolik0 Sep 17, 2024
4fc0eb0
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
fadb1fd
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Sep 17, 2024
9316f95
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
dee105f
Fix migration from previous format
tolik0 Sep 18, 2024
1c0fb26
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 18, 2024
379f11d
Fix conflicts
tolik0 Sep 18, 2024
6f68b1b
Delete wrong changes
tolik0 Sep 18, 2024
c1c2d3e
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 18, 2024
96f1150
Update PerPartitionCursor docs
tolik0 Sep 19, 2024
3042d28
Simplified iterate_with_last_flag
tolik0 Oct 4, 2024
8cfc0ba
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Oct 7, 2024
bd507e0
Fix formatting
tolik0 Oct 7, 2024
e6976fc
Fix mypy error
tolik0 Oct 7, 2024
c998f5c
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Oct 18, 2024
58c96f2
Align with new SubstreamPartitionRouter
tolik0 Oct 22, 2024
1fb01a9
Update the state management
tolik0 Oct 23, 2024
855a69b
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Oct 23, 2024
8ad8ffa
Set DEFAULT_MAX_PARTITIONS_NUMBER for PerPartitionCursor to 5 for tes…
tolik0 Oct 23, 2024
031c294
Update version
tolik0 Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from datetime import timedelta
from typing import Optional

from airbyte_cdk import StreamSlice
from airbyte_cdk.sources.declarative.async_job.timer import Timer
from airbyte_cdk.sources.types import StreamSlice

from .status import AsyncJobStatus

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from abc import abstractmethod
from typing import Any, Iterable, Mapping, Set

from airbyte_cdk import StreamSlice
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.types import StreamSlice


class AsyncJobRepository:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.incremental import GlobalSubstreamCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
Expand Down Expand Up @@ -200,7 +200,7 @@ def _get_checkpoint_reader(
cursor = self.get_cursor()
checkpoint_mode = self._checkpoint_mode

if isinstance(cursor, (GlobalSubstreamCursor, PerPartitionCursor)):
if isinstance(cursor, (GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor)):
self.has_multiple_slices = True
return CursorBasedCheckpointReader(
stream_slices=mappings_or_slices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#
import datetime
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionWithGlobalCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -55,14 +55,12 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
def __init__(
self,
date_time_based_cursor: DatetimeBasedCursor,
per_partition_cursor: Optional[PerPartitionCursor] = None,
is_global_substream_cursor: bool = False,
substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]],
**kwargs: Any,
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor
self.is_global_substream_cursor = is_global_substream_cursor
self._substream_cursor = substream_cursor

@property
def _cursor_field(self) -> str:
Expand Down Expand Up @@ -108,15 +106,9 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice)
:param StreamSlice stream_slice: Current Stream slice
:return Optional[str]: cursor_value in case it was found, otherwise None.
"""
if self._per_partition_cursor:
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None
state = (self._substream_cursor or self._date_time_based_cursor).select_state(stream_slice)

if self.is_global_substream_cursor:
return stream_state.get("state", {}).get(self._cursor_field) # type: ignore # state is inside a dict for GlobalSubstreamCursor

return stream_state.get(self._cursor_field)
return state.get(self._cursor_field) if state else None

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
start_date_parsed = self._start_date_from_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor, ChildPartitionResumableFullRefreshCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import PerPartitionWithGlobalCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import (
ChildPartitionResumableFullRefreshCursor,
ResumableFullRefreshCursor,
)

__all__ = [
"CursorFactory",
"DatetimeBasedCursor",
"DeclarativeCursor",
"GlobalSubstreamCursor",
"PerPartitionCursor",
"PerPartitionWithGlobalCursor",
"ResumableFullRefreshCursor",
"ChildPartitionResumableFullRefreshCursor"
"ChildPartitionResumableFullRefreshCursor",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,33 @@

import threading
import time
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, Mapping, Optional, TypeVar, Union

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

T = TypeVar("T")


def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[T, bool]]:
"""
Iterates over the given generator and returns a tuple containing the element and a flag
indicating whether it's the last element in the generator. If the generator is empty,
it returns an empty iterator.
"""
iterator = iter(generator)
try:
current = next(iterator)
except StopIteration:
return # Return an empty iterator

for next_item in iterator:
yield current, False
current = next_item
yield current, True


class Timer:
"""
Expand All @@ -25,7 +45,7 @@ def start(self) -> None:

def finish(self) -> int:
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__()
return (time.perf_counter_ns() - self._start + 999_999_999) // 1_000_000_000

else:
raise RuntimeError("Global substream cursor timer not started")

Expand All @@ -52,6 +72,11 @@ def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: Partiti
self._slice_semaphore = threading.Semaphore(0) # Start with 0, indicating no slices being tracked
self._all_slices_yielded = False
self._lookback_window: Optional[int] = None
self._current_partition: Optional[Mapping[str, Any]] = None
self._last_slice: bool = False

def start_slices_generation(self) -> None:
self._timer.start()

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Expand All @@ -68,32 +93,40 @@ def stream_slices(self) -> Iterable[StreamSlice]:
* Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
"""
previous_slice = None

slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice)
for partition in self._partition_router.stream_slices()
for cursor_slice in self._stream_cursor.stream_slices()
)
self._timer.start()

for slice in slice_generator:
if previous_slice is not None:
# Release the semaphore to indicate that a slice has been yielded
self._slice_semaphore.release()
yield previous_slice
self.start_slices_generation()
for slice, last in iterate_with_last_flag(slice_generator):
self._current_partition = slice.partition
self.register_slice(last)
yield slice
self._current_partition = None
self._last_slice = True

# Store the current slice as the previous slice for the next iteration
previous_slice = slice
def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice) for cursor_slice in self._stream_cursor.stream_slices()
)

# After all slices have been generated, release the semaphore one final time
# and flag that all slices have been yielded
self._slice_semaphore.release()
self._all_slices_yielded = True
yield from slice_generator

def register_slice(self, last: bool) -> None:
"""
Tracks the processing of a stream slice.

# Yield the last slice
if previous_slice is not None:
yield previous_slice
Releases the semaphore for each slice. If it's the last slice (`last=True`),
sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.

Args:
last (bool): True if the current slice is the last in the sequence.
"""
self._slice_semaphore.release()
if last:
self._all_slices_yielded = True

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down Expand Up @@ -125,7 +158,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:
self._lookback_window = stream_state["lookback_window"]
self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])

self._stream_cursor.set_initial_state(stream_state["state"])
if "state" in stream_state:
self._stream_cursor.set_initial_state(stream_state["state"])
elif "states" not in stream_state:
# We assume that `stream_state` is in the old global format
# Example: {"global_state_format_key": "global_state_format_value"}
self._stream_cursor.set_initial_state(stream_state)

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
Expand Down Expand Up @@ -169,10 +207,12 @@ def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
self._lookback_window = self._timer.finish()
self._stream_cursor.close_slice(StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice), *args)

def get_stream_state(self) -> StreamState:
def get_stream_state(self, partition: Optional[Mapping[str, Any]] = None, last: bool = True) -> StreamState:
state: dict[str, Any] = {"state": self._stream_cursor.get_stream_state()}

parent_state = self._partition_router.get_stream_state()
parent_state = self._partition_router.get_stream_state(
partition=partition or self._current_partition, last=self._last_slice or last
)
if parent_state:
state["parent_state"] = parent_state

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import PerPartitionKeySerializer
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

logger = logging.getLogger("airbyte")


class CursorFactory:
def __init__(self, create_function: Callable[[], DeclarativeCursor]):
Expand All @@ -22,26 +24,23 @@ def create(self) -> DeclarativeCursor:

class PerPartitionCursor(DeclarativeCursor):
"""
Given a stream has many partitions, it is important to provide a state per partition.

Record | Stream Slice | Last Record | DatetimeCursorBased cursor
-- | -- | -- | --
1 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "1"''} | cursor_field: “2021-01-15” | 2021-01-15
2 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "1"''} | cursor_field: “2021-02-15” | 2021-02-15
3 | {"start_time": "2021-01-01","end_time": "2021-01-31","owner_resource": "2"''} | cursor_field: “2021-01-03” | 2021-01-03
4 | {"start_time": "2021-02-01","end_time": "2021-02-28","owner_resource": "2"''} | cursor_field: “2021-02-14” | 2021-02-14

Given the following errors, this can lead to some loss or duplication of records:
When | Problem | Affected Record
-- | -- | --
Between record #1 and #2 | Loss | #3
Between record #2 and #3 | Loss | #3, #4
Between record #3 and #4 | Duplication | #1, #2

Therefore, we need to manage state per partition.
Manages state per partition when a stream has many partitions, to prevent data loss or duplication.

**Partition Limitation and Limit Reached Logic**

- **DEFAULT_MAX_PARTITIONS_NUMBER**: The maximum number of partitions to keep in memory (default is 10,000).
- **_cursor_per_partition**: An ordered dictionary that stores cursors for each partition.
- **_over_limit**: A counter that increments each time an oldest partition is removed when the limit is exceeded.

The class ensures that the number of partitions tracked does not exceed the `DEFAULT_MAX_PARTITIONS_NUMBER` to prevent excessive memory usage.

- When the number of partitions exceeds the limit, the oldest partitions are removed from `_cursor_per_partition`, and `_over_limit` is incremented accordingly.
- The `limit_reached` method returns `True` when `_over_limit` exceeds `DEFAULT_MAX_PARTITIONS_NUMBER`, indicating that the global cursor should be used instead of per-partition cursors.

This approach avoids unnecessary switching to a global cursor due to temporary spikes in partition counts, ensuring that switching is only done when a sustained high number of partitions is observed.
"""

DEFAULT_MAX_PARTITIONS_NUMBER = 10000
DEFAULT_MAX_PARTITIONS_NUMBER = 5
_NO_STATE: Mapping[str, Any] = {}
_NO_CURSOR_STATE: Mapping[str, Any] = {}
_KEY = 0
Expand All @@ -54,30 +53,41 @@ def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRou
# The dict is ordered to ensure that once the maximum number of partitions is reached,
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
self._over_limit = 0
self._partition_serializer = PerPartitionKeySerializer()
self._current_partition: Optional[Mapping[str, Any]] = None

def stream_slices(self) -> Iterable[StreamSlice]:
slices = self._partition_router.stream_slices()
for partition in slices:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()
yield from self.generate_slices_from_partition(partition)

def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor

for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields)
for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice, extra_fields=partition.extra_fields)

def _ensure_partition_limit(self) -> None:
"""
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
"""
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
oldest_partition = self._cursor_per_partition.popitem(last=False)[0] # Remove the oldest partition
logging.warning(f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}.")
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
)

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down Expand Up @@ -121,6 +131,10 @@ def set_initial_state(self, stream_state: StreamState) -> None:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

# set default state for missing partitions if it is per partition with fallback to global
if "state" in stream_state:
self._state_to_migrate_from = stream_state["state"]

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

Expand All @@ -140,7 +154,7 @@ def close_slice(self, stream_slice: StreamSlice, *args: Any) -> None:
f"we should only update state for partitions that were emitted during `stream_slices`"
)

def get_stream_state(self) -> StreamState:
def get_stream_state(self, partition: Optional[Mapping[str, Any]] = None, last: bool = True) -> StreamState:
states = []
for partition_tuple, cursor in self._cursor_per_partition.items():
cursor_state = cursor.get_stream_state()
Expand All @@ -153,7 +167,7 @@ def get_stream_state(self) -> StreamState:
)
state: dict[str, Any] = {"states": states}

parent_state = self._partition_router.get_stream_state()
parent_state = self._partition_router.get_stream_state(partition=partition, last=last)
if parent_state:
state["parent_state"] = parent_state
return state
Expand Down
Loading
Loading