-
Notifications
You must be signed in to change notification settings - Fork 3.2k
fix(ingestion/dremio): handle dremio oom errors when ingesting large amount of metadata #14883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
def test_api_streaming_methods_exist(self): | ||
"""Test that API has all required streaming methods""" | ||
# Verify streaming methods exist | ||
assert hasattr(DremioAPIOperations, "_fetch_results_iter") | ||
assert hasattr(DremioAPIOperations, "execute_query_iter") | ||
assert hasattr(DremioAPIOperations, "get_all_tables_and_columns_iter") | ||
assert hasattr(DremioAPIOperations, "extract_all_queries_iter") | ||
|
||
def test_backward_compatibility(self, mock_dremio_source): | ||
"""Test that legacy methods still work for backward compatibility""" | ||
# Verify legacy methods still exist | ||
assert hasattr(DremioAPIOperations, "_fetch_all_results") | ||
assert hasattr(DremioAPIOperations, "execute_query") | ||
assert hasattr(DremioAPIOperations, "get_all_tables_and_columns") | ||
assert hasattr(DremioAPIOperations, "extract_all_queries") | ||
|
||
# Test that legacy batch methods still exist for backward compatibility | ||
mock_dataset = Mock(spec=DremioDataset) | ||
mock_dataset.path = ["test"] | ||
mock_dataset.resource_name = "table" | ||
|
||
mock_datasets = deque([mock_dataset]) | ||
mock_dremio_source.dremio_catalog.get_datasets.return_value = mock_datasets | ||
mock_dremio_source.dremio_catalog.get_containers.return_value = [] | ||
mock_dremio_source.dremio_catalog.get_glossary_terms.return_value = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing whether class has some functions, which are defined statically, is not really needed. Also comment in line 168th uses the verb "test", whereas there are no tests there. All those lines should be removed.
def test_iterator_memory_efficiency_concept(self): | ||
"""Conceptual test for memory efficiency of iterator approach""" | ||
# This test demonstrates the concept - in iterator approach, we process one item at a time | ||
# rather than loading all items into memory first | ||
|
||
def mock_iterator_generator(): | ||
"""Simulate streaming data source""" | ||
for i in range(1000): # Large dataset | ||
yield {"id": i, "data": f"row_{i}"} | ||
|
||
def batch_processing(data_source): | ||
"""Simulate batch processing (loads all data first)""" | ||
all_data = list(data_source) # This would use more memory | ||
processed = [] | ||
for item in all_data: | ||
processed.append(f"processed_{item['id']}") | ||
return processed | ||
|
||
def streaming_processing(data_source): | ||
"""Simulate streaming processing (one item at a time)""" | ||
for item in data_source: # This uses constant memory | ||
yield f"processed_{item['id']}" | ||
|
||
# Test streaming approach | ||
data_stream = mock_iterator_generator() | ||
streaming_results = list(streaming_processing(data_stream)) | ||
|
||
# Test batch approach | ||
data_batch = mock_iterator_generator() | ||
batch_results = batch_processing(data_batch) | ||
|
||
# Results should be the same | ||
assert streaming_results == batch_results | ||
|
||
# But streaming approach would use less memory for large datasets | ||
# (This is conceptual - actual memory measurement would require profiling tools) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this code testing?
def test_catalog_iterator_method_exists(self): | ||
"""Test that DremioCatalog has iterator method""" | ||
# Verify the method exists | ||
assert hasattr(DremioCatalog, "get_datasets_iter") | ||
|
||
# Verify it's a method | ||
assert inspect.ismethod(DremioCatalog.get_datasets_iter) or inspect.isfunction( | ||
DremioCatalog.get_datasets_iter | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think tests like those are not really meaningful.
class TestDremioIteratorIntegration: | ||
@pytest.fixture | ||
def mock_config(self): | ||
"""Create mock configuration""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to reduce the amount of comments stating the obvious.
|
||
@pytest.fixture | ||
def mock_dremio_source(self, mock_config, monkeypatch): | ||
"""Create mock Dremio source""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to reduce the amount of comments stating the obvious.
def test_memory_usage_comparison(self, dremio_api): | ||
"""Test that streaming uses less memory than batch processing""" | ||
# This is a conceptual test - in practice, we'd need memory profiling tools | ||
# Here we just verify that streaming yields results one at a time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rename that test to not indicate memory, as we are not really testing that (and certainly not comparing anything). The test itself is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beside the comments on particular details here. I would ask you to rename this file, current name is very generic. I would even consider merging it with the other test file introduced by this PR.
I would also reduce verbosity of comments stating the obvious.
username="user", | ||
password="pass", | ||
) | ||
assert config.hostname == "test" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't seem like we have any custom validators for those fields, why are we making assertions on them?
rows.extend(result["rows"]) | ||
|
||
# Handle cases where API response doesn't contain 'rows' key | ||
# This can happen with OOM errors or when no rows are returned |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is OOM happening on the client or server side?
logger.debug(f"Query returned no rows for job {job_id}") | ||
break | ||
|
||
offset = offset + limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we should rather use the length of received rows here instead of the limit
we have requested?
rows.extend(result_rows) | ||
|
||
# Check row count to determine if we should continue | ||
row_count = result.get("rowCount", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we have 3 values, which should logically be the equal:
results['rowCount']
len(results_rows)
limit
I think we should commit to using just one and I lean towards getting length of actual results. I see it as defensive measures to potential problems. WDYT?
} | ||
|
||
# Yield tables one at a time | ||
for table_path, table_info in table_metadata.items(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do I understand correctly that we can not be yielding directly in previous loop? It would help us reduce memory footprint as well. I am not aware of exact pattern of results in container_results
though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@acrylJonny it seems that this change introduces *_iter
versions of several functions but keeps the other ones in the code, completely unused. I think we should delete them if we are not using them. Also we might want to drop that suffix as well, just acknowledge the fact that those functions are now streaming in their nature.
Many comments seem to be very much redundant, I think we should ask ourselves whether they bring any meaningful information, for example:
Check row count to determine if we should continue
placed above get
and simple if
condition, is not really needed, in my opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened to this file causing the same lines to be recognized as changes? Could we somehow fix it so it shows actual changes? It is very hard to review the actual changes in this file at the moment.
No description provided.