From 429124ff92392f2b425823867bf2917016f60e60 Mon Sep 17 00:00:00 2001 From: weiglszonja Date: Tue, 22 Apr 2025 16:00:05 +0200 Subject: [PATCH 1/4] remove filtering out ip_address from mapped data --- src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 28ceb435..7c1055ef 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -180,10 +180,10 @@ def _map_binned_logs_to_dandiset( for ip_address in reduced_s3_log_binned_by_blob_id["ip_address"] ] - reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.reindex( - columns=("timestamp", "bytes_sent", "region") + reordered_reduced_s3_log = reduced_s3_log_binned_by_blob_id.sort_values( + by="timestamp", + key=natsort.natsort_keygen(), ) - reordered_reduced_s3_log.sort_values(by="timestamp", key=natsort.natsort_keygen(), inplace=True) reordered_reduced_s3_log.index = range(len(reordered_reduced_s3_log)) dandiset_version_log_folder_path.mkdir(parents=True, exist_ok=True) From b9e4747f136b851566dd1f7912e95fa6664a1ced Mon Sep 17 00:00:00 2001 From: weiglszonja Date: Tue, 22 Apr 2025 16:00:36 +0200 Subject: [PATCH 2/4] add utility methods to aggregate by ip --- .../_map_binned_s3_logs_to_dandisets.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 7c1055ef..5a585b1e 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -113,6 +113,8 @@ def _map_binned_logs_to_dandiset( all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict() all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict() + all_reduced_s3_logs_per_blob_id_aggregated_by_ip = dict() + blob_id_to_asset_path = dict() total_bytes_across_versions_by_blob_id = dict() dandiset_versions = list(dandiset.get_versions()) @@ -205,6 +207,9 @@ def _map_binned_logs_to_dandiset( all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region) all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region + aggregated_activity_by_ip = _aggregate_activity_by_ip_per_asset(reduced_s3_logs_per_asset=reordered_reduced_s3_log) + all_reduced_s3_logs_per_blob_id_aggregated_by_ip[blob_id] = aggregated_activity_by_ip + total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"]) total_bytes_per_asset_path[asset.path] = total_bytes @@ -295,6 +300,29 @@ def _aggregate_activity_by_asset(total_bytes_per_asset_path: dict[str, int]) -> return aggregated_activity_by_asset +def _aggregate_activity_by_ip_per_asset(reduced_s3_logs_per_asset: pandas.DataFrame) -> pandas.DataFrame: + reduced_s3_logs_clipped = reduced_s3_logs_per_asset.reindex(columns=("date", "ip_address")) + pre_aggregated = reduced_s3_logs_clipped.groupby(by="date", as_index=False)["ip_address"].agg([list, "nunique"]) + pre_aggregated.rename(columns={"nunique": "num_unique_access"}, inplace=True) + pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True) + aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access")) + + return aggregated_activity_by_ip + + +def _aggregate_activity_by_ip(reduced_s3_logs_per_day: Iterable[pandas.DataFrame]) -> pandas.DataFrame: + all_reduced_s3_logs = pandas.concat(objs=reduced_s3_logs_per_day, ignore_index=True) + all_reduced_s3_logs_clipped = all_reduced_s3_logs.reindex(columns=("date", "num_unique_access")) + + pre_aggregated = all_reduced_s3_logs_clipped.groupby(by="date", as_index=False)["num_unique_access"].agg([list, "sum"]) + pre_aggregated.rename(columns={"sum": "num_unique_access"}, inplace=True) + pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True) + + aggregated_activity_by_ip = pre_aggregated.reindex(columns=("date", "num_unique_access")) + + return aggregated_activity_by_ip + + def _write_aggregated_activity_by_day( reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path ) -> None: From f3537ce1fc36b0018de66592ecf9fb967609947a Mon Sep 17 00:00:00 2001 From: weiglszonja Date: Tue, 22 Apr 2025 16:01:05 +0200 Subject: [PATCH 3/4] write aggregate by ip to separate file --- .../_map_binned_s3_logs_to_dandisets.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 5a585b1e..5d464f6e 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -261,6 +261,12 @@ def _map_binned_logs_to_dandiset( total_bytes_per_asset_path=total_bytes_across_versions_by_asset, file_path=dandiset_summary_by_asset_file_path ) + dandiset_summary_by_ip_file_path = dandiset_log_folder_path / "dandiset_summary_by_ip.tsv" + _write_aggregated_activity_by_ip( + reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_ip.values(), + file_path=dandiset_summary_by_ip_file_path, + ) + return None @@ -331,6 +337,14 @@ def _write_aggregated_activity_by_day( return None +def _write_aggregated_activity_by_ip( + reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path +) -> None: + aggregated_activity_by_ip = _aggregate_activity_by_ip(reduced_s3_logs_per_day=reduced_s3_logs_per_day) + aggregated_activity_by_ip.to_csv(path_or_buf=file_path, mode="w", sep="\t", header=True, index=False) + + return None + def _write_aggregated_activity_by_region( reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path From 55efdebec48951f198877a385333bfecec3d255a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 May 2025 20:18:53 +0000 Subject: [PATCH 4/4] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../_map_binned_s3_logs_to_dandisets.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py index 5d464f6e..328ebac6 100644 --- a/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py +++ b/src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py @@ -207,7 +207,9 @@ def _map_binned_logs_to_dandiset( all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region) all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region - aggregated_activity_by_ip = _aggregate_activity_by_ip_per_asset(reduced_s3_logs_per_asset=reordered_reduced_s3_log) + aggregated_activity_by_ip = _aggregate_activity_by_ip_per_asset( + reduced_s3_logs_per_asset=reordered_reduced_s3_log + ) all_reduced_s3_logs_per_blob_id_aggregated_by_ip[blob_id] = aggregated_activity_by_ip total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"]) @@ -320,7 +322,9 @@ def _aggregate_activity_by_ip(reduced_s3_logs_per_day: Iterable[pandas.DataFrame all_reduced_s3_logs = pandas.concat(objs=reduced_s3_logs_per_day, ignore_index=True) all_reduced_s3_logs_clipped = all_reduced_s3_logs.reindex(columns=("date", "num_unique_access")) - pre_aggregated = all_reduced_s3_logs_clipped.groupby(by="date", as_index=False)["num_unique_access"].agg([list, "sum"]) + pre_aggregated = all_reduced_s3_logs_clipped.groupby(by="date", as_index=False)["num_unique_access"].agg( + [list, "sum"] + ) pre_aggregated.rename(columns={"sum": "num_unique_access"}, inplace=True) pre_aggregated.sort_values(by="date", key=natsort.natsort_keygen(), inplace=True) @@ -337,6 +341,7 @@ def _write_aggregated_activity_by_day( return None + def _write_aggregated_activity_by_ip( reduced_s3_logs_per_day: Iterable[pandas.DataFrame], file_path: pathlib.Path ) -> None: