Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up method PerPartitionCursor.set_initial_state by 6% in PR #45415 (tolik0/airbyte-cdk/perpartition-with-global-regression-tests) #45657

Open
wants to merge 1 commit into
base: tolik0/airbyte-cdk/perpartition-with-global-regression-tests
Choose a base branch
from

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Sep 18, 2024

⚡️ This pull request contains optimizations for PR #45415

If you approve this dependent PR, these changes will be merged into the original PR branch tolik0/airbyte-cdk/perpartition-with-global-regression-tests.

This PR will be automatically closed if the original PR is merged.


📄 PerPartitionCursor.set_initial_state() in airbyte-cdk/python/airbyte_cdk/sources/declarative/incremental/per_partition_cursor.py

📈 Performance improved by 6% (0.06x faster)

⏱️ Runtime went down from 292 milliseconds to 275 milliseconds

Explanation and details

To optimize this Python program for better runtime performance, we can take several steps. The main points for optimization involve reducing redundant operations and leveraging efficient data structures and algorithms. With that in mind, here's the optimized code.

Explanation.

  1. Removed Unnecessary Attributes: The attribute _over_limit was removed because it's not used elsewhere in the code.
  2. Optimized Dictionary Update: For updating self._cursor_per_partition, used dictionary comprehensions for efficiency.
  3. Streamlined Setup: Simplified the logic inside set_initial_state to reduce lookups and make the code more efficient. Specifically, checked for 'states' directly within the dictionary.
  4. General Cleanup: Eliminated unnecessary comments and clarified existing ones for better readability and maintenance.

These changes should help improve the runtime performance of the function, especially in scenarios involving many partitions and states.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

✅ 8 Passed − ⚙️ Existing Unit Tests

(click to show existing tests)
- sources/declarative/incremental/test_per_partition_cursor.py

✅ 23 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
# function to test
from __future__ import annotations

from abc import abstractmethod
from collections import OrderedDict
from typing import Any, Mapping
from unittest.mock import Mock

import pytest  # used for our unit tests
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import \
    DeclarativeCursor
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import \
    PerPartitionCursor
from airbyte_cdk.sources.declarative.partition_routers.partition_router import \
    PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import \
    StreamSlicer
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import \
    PerPartitionKeySerializer
from airbyte_cdk.sources.types import StreamState

StreamState = Mapping[str, Any]


# unit tests

@pytest.fixture
def cursor_factory():
    return Mock()
    # Outputs were verified to be equal to the original implementation

@pytest.fixture
def partition_router():
    router = Mock(spec=PartitionRouter)
    return router
    # Outputs were verified to be equal to the original implementation

@pytest.fixture
def per_partition_cursor(cursor_factory, partition_router):
    return PerPartitionCursor(cursor_factory, partition_router)
    # Outputs were verified to be equal to the original implementation

def test_empty_state(per_partition_cursor):
    # Test with an empty stream_state
    per_partition_cursor.set_initial_state({})
    # Outputs were verified to be equal to the original implementation

def test_global_state_format(per_partition_cursor):
    # Test with a global state format
    global_state = {"global_state_format_key": "global_state_format_value"}
    per_partition_cursor.set_initial_state(global_state)
    # Outputs were verified to be equal to the original implementation

def test_single_partition_state(per_partition_cursor, cursor_factory):
    # Test with a single partition state
    stream_state = {
        "states": [
            {
                "partition": {"partition_key": "value"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            }
        ]
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    per_partition_cursor.set_initial_state(stream_state)
    partition_key = per_partition_cursor._to_partition_key({"partition_key": "value"})
    cursor_instance.set_initial_state.assert_called_with({"last_updated": "2023-05-27T00:00:00Z"})
    # Outputs were verified to be equal to the original implementation

def test_multiple_partition_states(per_partition_cursor, cursor_factory):
    # Test with multiple partition states
    stream_state = {
        "states": [
            {
                "partition": {"partition_key1": "value1"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            },
            {
                "partition": {"partition_key2": "value2"},
                "cursor": {"last_updated": "2023-05-28T00:00:00Z"}
            }
        ]
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    per_partition_cursor.set_initial_state(stream_state)
    partition_key1 = per_partition_cursor._to_partition_key({"partition_key1": "value1"})
    partition_key2 = per_partition_cursor._to_partition_key({"partition_key2": "value2"})
    cursor_instance.set_initial_state.assert_any_call({"last_updated": "2023-05-27T00:00:00Z"})
    cursor_instance.set_initial_state.assert_any_call({"last_updated": "2023-05-28T00:00:00Z"})
    # Outputs were verified to be equal to the original implementation

def test_missing_partition_key(per_partition_cursor):
    # Test with a partition state missing the partition key
    stream_state = {
        "states": [
            {
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            }
        ]
    }
    with pytest.raises(KeyError):
        per_partition_cursor.set_initial_state(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_single_parent_state(per_partition_cursor, partition_router):
    # Test with a single parent state
    stream_state = {
        "parent_state": {
            "parent_stream_name": {"last_updated": "2023-05-27T00:00:00Z"}
        }
    }
    per_partition_cursor.set_initial_state(stream_state)
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_multiple_parent_states(per_partition_cursor, partition_router):
    # Test with multiple parent states
    stream_state = {
        "parent_state": {
            "parent_stream_name1": {"last_updated": "2023-05-27T00:00:00Z"},
            "parent_stream_name2": {"last_updated": "2023-05-28T00:00:00Z"}
        }
    }
    per_partition_cursor.set_initial_state(stream_state)
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_combined_partition_and_parent_states(per_partition_cursor, cursor_factory, partition_router):
    # Test with both partition and parent states
    stream_state = {
        "states": [
            {
                "partition": {"partition_key": "value"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            }
        ],
        "parent_state": {
            "parent_stream_name": {"last_updated": "2023-05-27T00:00:00Z"}
        }
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    per_partition_cursor.set_initial_state(stream_state)
    partition_key = per_partition_cursor._to_partition_key({"partition_key": "value"})
    cursor_instance.set_initial_state.assert_called_with({"last_updated": "2023-05-27T00:00:00Z"})
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_no_states_key(per_partition_cursor, partition_router):
    # Test when the states key is missing from stream_state
    stream_state = {
        "parent_state": {
            "parent_stream_name": {"last_updated": "2023-05-27T00:00:00Z"}
        }
    }
    per_partition_cursor.set_initial_state(stream_state)
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_invalid_state_format(per_partition_cursor):
    # Test with an invalid state format
    stream_state = {
        "states": "invalid_format"
    }
    with pytest.raises(TypeError):
        per_partition_cursor.set_initial_state(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_state_with_extra_keys(per_partition_cursor, cursor_factory, partition_router):
    # Test with additional keys in the state
    stream_state = {
        "states": [
            {
                "partition": {"partition_key": "value"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"},
                "extra_key": "extra_value"
            }
        ],
        "parent_state": {
            "parent_stream_name": {"last_updated": "2023-05-27T00:00:00Z"}
        }
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    per_partition_cursor.set_initial_state(stream_state)
    partition_key = per_partition_cursor._to_partition_key({"partition_key": "value"})
    cursor_instance.set_initial_state.assert_called_with({"last_updated": "2023-05-27T00:00:00Z"})
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_large_number_of_partitions(per_partition_cursor, cursor_factory):
    # Test with a large number of partition states to assess performance
    stream_state = {
        "states": [
            {
                "partition": {"partition_key": f"value_{i}"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            } for i in range(10000)
        ]
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    per_partition_cursor.set_initial_state(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_large_parent_state(per_partition_cursor, partition_router):
    # Test with a large parent state to assess performance
    stream_state = {
        "parent_state": {
            f"parent_stream_name_{i}": {"last_updated": "2023-05-27T00:00:00Z"} for i in range(1000)
        }
    }
    per_partition_cursor.set_initial_state(stream_state)
    partition_router.set_initial_state.assert_called_with(stream_state)
    # Outputs were verified to be equal to the original implementation

def test_consistent_state_initialization(per_partition_cursor, cursor_factory):
    # Ensure the function initializes the state consistently
    stream_state = {
        "states": [
            {
                "partition": {"partition_key": "value"},
                "cursor": {"last_updated": "2023-05-27T00:00:00Z"}
            }
        ]
    }
    cursor_instance = Mock()
    cursor_factory.create.return_value = cursor_instance
    for _ in range(10):
        per_partition_cursor.set_initial_state(stream_state)
        partition_key = per_partition_cursor._to_partition_key({"partition_key": "value"})
        cursor_instance.set_initial_state.assert_called_with({"last_updated": "2023-05-27T00:00:00Z"})
    # Outputs were verified to be equal to the original implementation

🔘 (none found) − ⏪ Replay Tests

…45415 (`tolik0/airbyte-cdk/perpartition-with-global-regression-tests`)

To optimize this Python program for better runtime performance, we can take several steps. The main points for optimization involve reducing redundant operations and leveraging efficient data structures and algorithms. With that in mind, here's the optimized code.



### Explanation.

1. **Removed Unnecessary Attributes**: The attribute `_over_limit` was removed because it's not used elsewhere in the code.
2. **Optimized Dictionary Update**: For updating `self._cursor_per_partition`, used dictionary comprehensions for efficiency.
3. **Streamlined Setup**: Simplified the logic inside `set_initial_state` to reduce lookups and make the code more efficient. Specifically, checked for 'states' directly within the dictionary.
4. **General Cleanup**: Eliminated unnecessary comments and clarified existing ones for better readability and maintenance.

These changes should help improve the runtime performance of the function, especially in scenarios involving many partitions and states.
Copy link

vercel bot commented Sep 18, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 18, 2024 5:45pm

@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Sep 18, 2024
@CLAassistant
Copy link

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@octavia-squidington-iii octavia-squidington-iii added CDK Connector Development Kit community labels Sep 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit ⚡️ codeflash Optimization PR opened by Codeflash AI community
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants