Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perpartition with global - Regression Tests #45415

Draft
wants to merge 44 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
05c256d
Migrate `IssueComments` and `IssueWorklogs` streams to low code
tolik0 May 23, 2024
9b95d9e
Add Global Parent Cursor
tolik0 Jul 16, 2024
49e75bf
Add PerPartitionWithGlobalCursor
tolik0 Sep 6, 2024
6874c76
Add switch between partitions
tolik0 Sep 10, 2024
16847c1
Update tests
tolik0 Sep 10, 2024
10c09a8
Fix incremental tests
tolik0 Sep 11, 2024
01a55a7
Add docs
tolik0 Sep 11, 2024
5c90376
Refactor per partition with global
tolik0 Sep 11, 2024
baa8af9
Refactor
tolik0 Sep 12, 2024
7c5fe66
Merge branch 'tolik0/source-jira/migrate-incremental-substreams' into…
tolik0 Sep 12, 2024
766bd72
Fix timer
tolik0 Sep 12, 2024
405bb54
Migrate `IssueComments` and `IssueWorklogs` streams to low code
tolik0 May 23, 2024
bd15d98
Add Global Parent Cursor
tolik0 Jul 16, 2024
7b26c3a
Delete state migration
tolik0 Sep 12, 2024
9065d46
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 12, 2024
f45c5ee
Merge branch 'tolik0/source-jira/migrate-incremental-substreams' into…
tolik0 Sep 12, 2024
e18d430
Add new test
tolik0 Sep 12, 2024
bfd4a45
Refactor and update tests
tolik0 Sep 13, 2024
2c60ef0
Update ClientSideIncrementalRecordFilterDecorator
tolik0 Sep 13, 2024
506d296
Fix formatting
tolik0 Sep 13, 2024
d8e27d4
Fix mypy errors
tolik0 Sep 17, 2024
2b629e0
Fix formatting
tolik0 Sep 17, 2024
563b54c
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
69a84b5
chore: auto-fix lint and format issues
octavia-squidington-iii Sep 17, 2024
0dc55e1
Fix typo in test
tolik0 Sep 17, 2024
4fc0eb0
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
fadb1fd
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Sep 17, 2024
9316f95
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 17, 2024
dee105f
Fix migration from previous format
tolik0 Sep 18, 2024
1c0fb26
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 18, 2024
379f11d
Fix conflicts
tolik0 Sep 18, 2024
6f68b1b
Delete wrong changes
tolik0 Sep 18, 2024
c1c2d3e
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Sep 18, 2024
96f1150
Update PerPartitionCursor docs
tolik0 Sep 19, 2024
3042d28
Simplified iterate_with_last_flag
tolik0 Oct 4, 2024
8cfc0ba
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Oct 7, 2024
bd507e0
Fix formatting
tolik0 Oct 7, 2024
e6976fc
Fix mypy error
tolik0 Oct 7, 2024
c998f5c
Merge branch 'master' into tolik0/airbyte-cdk/add-per-partition-with-…
tolik0 Oct 18, 2024
58c96f2
Align with new SubstreamPartitionRouter
tolik0 Oct 22, 2024
1fb01a9
Update the state management
tolik0 Oct 23, 2024
855a69b
Merge branch 'tolik0/airbyte-cdk/add-per-partition-with-global-fallba…
tolik0 Oct 23, 2024
8ad8ffa
Set DEFAULT_MAX_PARTITIONS_NUMBER for PerPartitionCursor to 5 for tes…
tolik0 Oct 23, 2024
031c294
Update version
tolik0 Oct 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from datetime import timedelta
from typing import Optional

from airbyte_cdk import StreamSlice
from airbyte_cdk.sources.declarative.async_job.timer import Timer
from airbyte_cdk.sources.types import StreamSlice

from .status import AsyncJobStatus

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import time
from typing import Any, Generator, Iterable, List, Mapping, Optional, Set

from airbyte_cdk import StreamSlice
from airbyte_cdk.logger import lazy_log
from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
from airbyte_cdk.sources.types import StreamSlice
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

LOGGER = logging.getLogger("airbyte")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from abc import abstractmethod
from typing import Any, Iterable, Mapping, Set

from airbyte_cdk import StreamSlice
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
from airbyte_cdk.sources.types import StreamSlice


class AsyncJobRepository:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#
import datetime
from dataclasses import InitVar, dataclass
from typing import Any, Iterable, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor, GlobalSubstreamCursor, PerPartitionWithGlobalCursor
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState

Expand Down Expand Up @@ -50,14 +50,12 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
def __init__(
self,
date_time_based_cursor: DatetimeBasedCursor,
per_partition_cursor: Optional[PerPartitionCursor] = None,
is_global_substream_cursor: bool = False,
substream_cursor: Optional[Union[PerPartitionWithGlobalCursor, GlobalSubstreamCursor]],
**kwargs: Any,
):
super().__init__(**kwargs)
self._date_time_based_cursor = date_time_based_cursor
self._per_partition_cursor = per_partition_cursor
self.is_global_substream_cursor = is_global_substream_cursor
self._substream_cursor = substream_cursor

@property
def _cursor_field(self) -> str:
Expand Down Expand Up @@ -103,15 +101,9 @@ def _get_state_value(self, stream_state: StreamState, stream_slice: StreamSlice)
:param StreamSlice stream_slice: Current Stream slice
:return Optional[str]: cursor_value in case it was found, otherwise None.
"""
if self._per_partition_cursor:
# self._per_partition_cursor is the same object that DeclarativeStream uses to save/update stream_state
partition_state = self._per_partition_cursor.select_state(stream_slice=stream_slice)
return partition_state.get(self._cursor_field) if partition_state else None
state = (self._substream_cursor or self._date_time_based_cursor).select_state(stream_slice)

if self.is_global_substream_cursor:
return stream_state.get("state", {}).get(self._cursor_field) # type: ignore # state is inside a dict for GlobalSubstreamCursor

return stream_state.get(self._cursor_field)
return state.get(self._cursor_field) if state else None

def _get_filter_date(self, state_value: Optional[str]) -> datetime.datetime:
start_date_parsed = self._start_date_from_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import GlobalSubstreamCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import CursorFactory, PerPartitionCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ResumableFullRefreshCursor, ChildPartitionResumableFullRefreshCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import PerPartitionWithGlobalCursor
from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import (
ChildPartitionResumableFullRefreshCursor,
ResumableFullRefreshCursor,
)

__all__ = [
"CursorFactory",
"DatetimeBasedCursor",
"DeclarativeCursor",
"GlobalSubstreamCursor",
"PerPartitionCursor",
"PerPartitionWithGlobalCursor",
"ResumableFullRefreshCursor",
"ChildPartitionResumableFullRefreshCursor"
"ChildPartitionResumableFullRefreshCursor",
]
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,43 @@

import threading
import time
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, Mapping, Optional, TypeVar, Union

from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

T = TypeVar("T")


def iterate_with_last_flag(generator: Iterable[T]) -> Iterable[tuple[Optional[T], bool]]:
"""
Iterates over the given generator and returns a tuple containing the element and a flag
indicating whether it's the last element in the generator. If the generator is empty,
yields (None, True).

Args:
generator (Generator): The generator to iterate through.

Yields:
tuple: A tuple containing (element, is_last) where is_last is a boolean indicating
if the element is the last one. Returns (None, True) if the generator is empty.
"""
iterator = iter(generator)
try:
prev = next(iterator) # Attempt to get the first element
except StopIteration:
# If there is no element, yield (None, True) and return
yield None, True
return

# If there are more elements, proceed as before
for item in iterator:
yield prev, False # This is not the last element
prev = item
yield prev, True # The last element


class Timer:
"""
Expand All @@ -25,7 +55,7 @@ def start(self) -> None:

def finish(self) -> int:
if self._start:
return int((time.perf_counter_ns() - self._start) // 1e9)
return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__()
return (time.perf_counter_ns() - self._start + 999_999_999) // 1_000_000_000

else:
raise RuntimeError("Global substream cursor timer not started")

Expand Down Expand Up @@ -53,6 +83,9 @@ def __init__(self, stream_cursor: DatetimeBasedCursor, partition_router: Partiti
self._all_slices_yielded = False
self._lookback_window: Optional[int] = None

def start_slices_generation(self) -> None:
self._timer.start()

def stream_slices(self) -> Iterable[StreamSlice]:
"""
Generates stream slices, ensuring the last slice is properly flagged and processed.
Expand All @@ -68,32 +101,39 @@ def stream_slices(self) -> Iterable[StreamSlice]:
* Setting `self._all_slices_yielded = True`. We do that before actually yielding the last slice as the caller of `stream_slices` might stop iterating at any point and hence the code after `yield` might not be executed
* Yield the last slice. At that point, once there are as many slices yielded as closes, the global slice will be closed too
"""
previous_slice = None

slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice)
for partition in self._partition_router.stream_slices()
for cursor_slice in self._stream_cursor.stream_slices()
)
self._timer.start()

for slice in slice_generator:
if previous_slice is not None:
# Release the semaphore to indicate that a slice has been yielded
self._slice_semaphore.release()
yield previous_slice
self.start_slices_generation()
for slice, last in iterate_with_last_flag(slice_generator):
self.register_slice(last)

# Store the current slice as the previous slice for the next iteration
previous_slice = slice
if slice is not None:
yield slice

# After all slices have been generated, release the semaphore one final time
# and flag that all slices have been yielded
self._slice_semaphore.release()
self._all_slices_yielded = True
def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
slice_generator = (
StreamSlice(partition=partition, cursor_slice=cursor_slice) for cursor_slice in self._stream_cursor.stream_slices()
)

yield from slice_generator

# Yield the last slice
if previous_slice is not None:
yield previous_slice
def register_slice(self, last: bool) -> None:
"""
Tracks the processing of a stream slice.

Releases the semaphore for each slice. If it's the last slice (`last=True`),
sets `_all_slices_yielded` to `True` to indicate no more slices will be processed.

Args:
last (bool): True if the current slice is the last in the sequence.
"""
self._slice_semaphore.release()
if last:
self._all_slices_yielded = True

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down Expand Up @@ -125,7 +165,12 @@ def set_initial_state(self, stream_state: StreamState) -> None:
self._lookback_window = stream_state["lookback_window"]
self._inject_lookback_into_stream_cursor(stream_state["lookback_window"])

self._stream_cursor.set_initial_state(stream_state["state"])
if "state" in stream_state:
self._stream_cursor.set_initial_state(stream_state["state"])
elif "states" not in stream_state:
# We assume that `stream_state` is in the old global format
# Example: {"global_state_format_key": "global_state_format_value"}
self._stream_cursor.set_initial_state(stream_state)

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import PerPartitionKeySerializer
from airbyte_cdk.sources.types import Record, StreamSlice, StreamState

logger = logging.getLogger("airbyte")


class CursorFactory:
def __init__(self, create_function: Callable[[], DeclarativeCursor]):
Expand Down Expand Up @@ -54,30 +56,40 @@ def __init__(self, cursor_factory: CursorFactory, partition_router: PartitionRou
# The dict is ordered to ensure that once the maximum number of partitions is reached,
# the oldest partitions can be efficiently removed, maintaining the most recent partitions.
self._cursor_per_partition: OrderedDict[str, DeclarativeCursor] = OrderedDict()
self._over_limit = 0
self._partition_serializer = PerPartitionKeySerializer()

def stream_slices(self) -> Iterable[StreamSlice]:
slices = self._partition_router.stream_slices()
for partition in slices:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()
yield from self.generate_slices_from_partition(partition)

def generate_slices_from_partition(self, partition: StreamSlice) -> Iterable[StreamSlice]:
# Ensure the maximum number of partitions is not exceeded
self._ensure_partition_limit()

cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor
cursor = self._cursor_per_partition.get(self._to_partition_key(partition.partition))
if not cursor:
partition_state = self._state_to_migrate_from if self._state_to_migrate_from else self._NO_CURSOR_STATE
cursor = self._create_cursor(partition_state)
self._cursor_per_partition[self._to_partition_key(partition.partition)] = cursor

for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)
for cursor_slice in cursor.stream_slices():
yield StreamSlice(partition=partition, cursor_slice=cursor_slice)

def _ensure_partition_limit(self) -> None:
"""
Ensure the maximum number of partitions is not exceeded. If so, the oldest added partition will be dropped.
"""
while len(self._cursor_per_partition) > self.DEFAULT_MAX_PARTITIONS_NUMBER - 1:
self._over_limit += 1
oldest_partition = self._cursor_per_partition.popitem(last=False)[0] # Remove the oldest partition
logging.warning(f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}.")
logger.warning(
f"The maximum number of partitions has been reached. Dropping the oldest partition: {oldest_partition}. Over limit: {self._over_limit}."
)

def limit_reached(self) -> bool:
return self._over_limit > self.DEFAULT_MAX_PARTITIONS_NUMBER

def set_initial_state(self, stream_state: StreamState) -> None:
"""
Expand Down Expand Up @@ -121,6 +133,10 @@ def set_initial_state(self, stream_state: StreamState) -> None:
for state in stream_state["states"]:
self._cursor_per_partition[self._to_partition_key(state["partition"])] = self._create_cursor(state["cursor"])

# set default state for missing partitions if it is per partition with fallback to global
if "state" in stream_state:
self._state_to_migrate_from = stream_state["state"]

# Set parent state for partition routers based on parent streams
self._partition_router.set_initial_state(stream_state)

Expand Down
Loading
Loading