Skip to content

Commit 7000ef9

Browse files
committed
Add support for data deduplication on catalog consolidation
1 parent 3c04ced commit 7000ef9

File tree

1 file changed

+36
-4
lines changed

1 file changed

+36
-4
lines changed

nautilus_trader/persistence/catalog/parquet.py

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ def consolidate_catalog(
519519
start: TimestampLike | None = None,
520520
end: TimestampLike | None = None,
521521
ensure_contiguous_files: bool = True,
522+
deduplicate: bool = False,
522523
) -> None:
523524
"""
524525
Consolidate all parquet files across the entire catalog within the specified
@@ -541,6 +542,8 @@ def consolidate_catalog(
541542
up to the end of time will be considered.
542543
ensure_contiguous_files : bool, default True
543544
If True, ensures that files have contiguous timestamps before consolidation.
545+
deduplicate : bool, default False
546+
If True, removes duplicate rows from the consolidated file.
544547
545548
Notes
546549
-----
@@ -558,7 +561,13 @@ def consolidate_catalog(
558561
leaf_directories = self._find_leaf_data_directories()
559562

560563
for directory in leaf_directories:
561-
self._consolidate_directory(directory, start, end, ensure_contiguous_files)
564+
self._consolidate_directory(
565+
directory,
566+
start,
567+
end,
568+
ensure_contiguous_files,
569+
deduplicate=deduplicate,
570+
)
562571

563572
def consolidate_data(
564573
self,
@@ -567,6 +576,7 @@ def consolidate_data(
567576
start: TimestampLike | None = None,
568577
end: TimestampLike | None = None,
569578
ensure_contiguous_files: bool = True,
579+
deduplicate: bool = False,
570580
) -> None:
571581
"""
572582
Consolidate multiple parquet files for a specific data class and instrument ID
@@ -593,6 +603,8 @@ def consolidate_data(
593603
up to the end of time will be considered.
594604
ensure_contiguous_files : bool, default True
595605
If True, ensures that files have contiguous timestamps before consolidation.
606+
deduplicate : bool, default False
607+
If True, removes duplicate rows from the consolidated file.
596608
597609
Notes
598610
-----
@@ -604,14 +616,21 @@ def consolidate_data(
604616
605617
"""
606618
directory = self._make_path(data_cls, identifier)
607-
self._consolidate_directory(directory, start, end, ensure_contiguous_files)
619+
self._consolidate_directory(
620+
directory,
621+
start,
622+
end,
623+
ensure_contiguous_files,
624+
deduplicate=deduplicate,
625+
)
608626

609627
def _consolidate_directory(
610628
self,
611629
directory: str,
612630
start: TimestampLike | None = None,
613631
end: TimestampLike | None = None,
614632
ensure_contiguous_files: bool = True,
633+
deduplicate: bool = False,
615634
) -> None:
616635
parquet_files = self.fs.glob(os.path.join(directory, "*.parquet"))
617636
files_to_consolidate = []
@@ -643,20 +662,33 @@ def _consolidate_directory(
643662
_timestamps_to_filename(intervals[0][0], intervals[-1][1]),
644663
)
645664
files_to_consolidate.sort()
646-
self._combine_parquet_files(files_to_consolidate, new_file_name)
665+
self._combine_parquet_files(files_to_consolidate, new_file_name, deduplicate=deduplicate)
647666

648-
def _combine_parquet_files(self, file_list: list[str], new_file: str) -> None:
667+
def _combine_parquet_files(
668+
self,
669+
file_list: list[str],
670+
new_file: str,
671+
deduplicate: bool = False,
672+
) -> None:
649673
if len(file_list) <= 1:
650674
return
651675

652676
tables = [pq.read_table(file, memory_map=True, pre_buffer=False) for file in file_list]
653677
combined_table = pa.concat_tables(tables)
678+
679+
if deduplicate:
680+
combined_table = self._deduplicate_table(combined_table)
681+
654682
pq.write_table(combined_table, where=new_file)
655683

656684
for file in file_list:
657685
if file != new_file:
658686
self.fs.rm(file)
659687

688+
@staticmethod
689+
def _deduplicate_table(table: pa.Table) -> pa.Table:
690+
return table.group_by(table.column_names).aggregate([])
691+
660692
def consolidate_catalog_by_period(
661693
self,
662694
period: pd.Timedelta = pd.Timedelta(days=1),

0 commit comments

Comments
 (0)