-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[source-tiktok-marketing] - Removes stream_state
interpolation, custom cursor
#53645
base: master
Are you sure you want to change the base?
[source-tiktok-marketing] - Removes stream_state
interpolation, custom cursor
#53645
Conversation
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
stream_state
interpolation, custom cursor
end_datetime: | ||
type: MinMaxDatetime | ||
datetime: "{{ config.get('end_date', today_utc()) }}" | ||
datetime_format: "%Y-%m-%d" | ||
datetime: "{{ format_datetime((str_to_datetime(config.get('end_date')) if config.get('end_date') else now_utc()) + duration('PT23H'), '%Y-%m-%d %H:%M:%S') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@maxi297 @brianjlai This duration (PT23H) added differs from the GH issue (P1D) but I think this solved the issue we ran into with mock server tests.
I noticed the partitions that were generated looked like this, assuming start date of 2024-01-01 and end date of 2024-01-02:
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-01 00:00:00', 'end_time': '2024-01-01 23:00:00'}
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-02 00:00:00', 'end_time': '2024-01-03 00:00:00'}
When changing it to "PT23H", the partitions changed to the following, and the mock server tests' second request was mocked and matched correctly:
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-01 00:00:00', 'end_time': '2024-01-01 23:00:00'}
{'advertiser_id': '872746382648', 'parent_slice': {}, 'start_time': '2024-01-02 00:00:00', 'end_time': '2024-01-02 23:00:00'}
I'll need to dive a bit deeper into the concurrent code but I think it's because the end date of the last partition was out of range relative to the actual end date and therefore the last request was not made. That's why the mock server test was previously trying to match the same request to two separate mocked requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cursor granularity is not applied on the last slice. I'm not sure why exactly. I would have assumed that it was to keep the same behavior as the datetime based cursor but this is not what I see here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall direction looks good but I would like to confirm the change on the tests, see the CATs pass and have regression test results as well
@@ -4314,7 +4319,7 @@ spec: | |||
# If time allows it, we can definitely try to scope this number a bit better empirically. | |||
concurrency_level: | |||
type: ConcurrencyLevel | |||
default_concurrency: 3 | |||
default_concurrency: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default_concurrency == 1
is dangerous because threads are shared between partition generator and partition reader with a limit in the number of partition generator so that we can read those partitions and they don't just grow over time. With only one thread, there would be no partition reader and this might lead to deadlock where we can't enqueue partitions anymore but nobody reads to clean the queue. Can we put 2
there?
@@ -1,6 +1,8 @@ | |||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | |||
|
|||
import asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: what are those imports? They don't seem used
@@ -167,7 +169,7 @@ def test_read_with_state(self, http_mocker: HttpMocker): | |||
state=self.state(), | |||
) | |||
|
|||
assert len(output.records) == 1 | |||
assert len(output.records) == 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have more records being returned? It would seem to indicate that the filtering is not working fine or that the setup/given part of this test is not valid anymore
end_datetime: | ||
type: MinMaxDatetime | ||
datetime: "{{ config.get('end_date', today_utc()) }}" | ||
datetime_format: "%Y-%m-%d" | ||
datetime: "{{ format_datetime((str_to_datetime(config.get('end_date')) if config.get('end_date') else now_utc()) + duration('PT23H'), '%Y-%m-%d %H:%M:%S') }}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cursor granularity is not applied on the last slice. I'm not sure why exactly. I would have assumed that it was to keep the same behavior as the datetime based cursor but this is not what I see here
What
stream_state
as an interpolation variable. Now referencesstream_interval
How
Review guide
manifest.yaml
unit_tests/integration/test_reports_hourly.py
User Impact
Can this PR be safely reverted and rolled back?