From 856847ca17aad7baa466a4821956a1b1f7890e5b Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Thu, 6 Feb 2025 17:04:59 +0900 Subject: [PATCH 01/11] Add minhash scripts for all japanese corpus --- .../common/dedup/installer/install.sh | 19 +++ .../common/dedup/installer/requirements.txt | 3 + .../common/dedup/subcorpus/minhash_dedup.py | 156 ++++++++++++++++++ .../common/dedup/subcorpus/reshard.sh | 68 ++++++++ .../common/dedup/subcorpus/run_all.sh | 86 ++++++++++ 5 files changed, 332 insertions(+) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh new file mode 100644 index 00000000..23a01722 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +work_dir=/home/shared/experiments/0118_dedup_corpusv4_ja +env_dir=environment +venv_dir=.venv +script_dir=${work_dir}/scripts + +# pyenv: python version 3.10.14 + +cd $work_dir || exit + +mkdir -p $env_dir +cd $env_dir || exit +python3.10 -m venv $venv_dir + +source $venv_dir/bin/activate +pip install --upgrade --no-cache-dir pip uv +uv init +uv add --no-cache -r ${script_dir}/installer/requirements.txt diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt new file mode 100644 index 00000000..b0e7acd5 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -0,0 +1,3 @@ +datatrove[io,processing,cli]==0.4.0 # all w/o s3 +spacy[ja] # required when running minhash (data-trove) +sudachipy==0.5.4 \ No newline at end of file diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py new file mode 100644 index 00000000..202faf3b --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py @@ -0,0 +1,156 @@ +from argparse import ArgumentParser +from dataclasses import dataclass +from pathlib import Path + +from datatrove.executor.slurm import SlurmPipelineExecutor +from datatrove.pipeline.dedup import MinhashDedupSignature +from datatrove.pipeline.dedup.minhash import (MinhashConfig, + MinhashDedupBuckets, + MinhashDedupCluster, + MinhashDedupFilter) +from datatrove.pipeline.readers import JsonlReader +from datatrove.pipeline.tokens import TokensCounter +from datatrove.pipeline.writers.jsonl import JsonlWriter +from datatrove.utils.hashing import HashConfig +from datatrove.utils.typeshelper import Languages + +WORK_DIR = "/home/shared/experiments/0118_dedup_corpusv4_ja" +VENV_PATH = f"{WORK_DIR}/environment/.venv/bin/activate" + +@dataclass +class Args: + ngram: int + buckets: int + hashes_per_bucket: int + input: str + input_pattern:str + output: str + venv:str + +argparser = ArgumentParser() +argparser.add_argument("--ngram", default=5, type=int) +argparser.add_argument("-r", "--buckets", default=20, type=int) +argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) +argparser.add_argument("--input", type=str) +argparser.add_argument("--output", type=str) +argparser.add_argument("--venv", default=VENV_PATH,type=str) +args = argparser.parse_args(namespace=Args) + + + +MINHASH_DIRNAME = f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +MINHASH_DIR=Path(args.output)/MINHASH_DIRNAME +RESULT_DIR = f"{MINHASH_DIR}/results" +LOG_DIR = f"{MINHASH_DIR}/logs" +SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" + +all_files=[_f for _f in Path(args.input).rglob("*") if _f.is_file()] +TOTAL_TASKS = len(all_files) + +# this is the original data that we want to deduplicate +INPUT_READER = JsonlReader(args.input) +# you can also change ngrams or the number of buckets and their size here +minhash_config = MinhashConfig( + hash_config=HashConfig(precision=64), + n_grams=args.ngram, + num_buckets=args.buckets, + hashes_per_bucket=args.hashes_per_bucket, +) # better precision -> fewer false positives (collisions) + + +# stage 1 computes minhash signatures for each task (each task gets a set of files) +stage1 = SlurmPipelineExecutor( + job_name="calc_minhash", + pipeline=[ + INPUT_READER, + MinhashDedupSignature( + output_folder=f"{RESULT_DIR}/signatures", + config=minhash_config, + language=Languages.japanese, + skip_existing_sigs=True, + ), + ], + tasks=TOTAL_TASKS, + time="48:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/signatures", + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=VENV_PATH, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 2 finds matches between signatures in each bucket +stage2 = SlurmPipelineExecutor( + job_name="match_minhash", + pipeline=[ + MinhashDedupBuckets( + input_folder=f"{RESULT_DIR}/signatures", + output_folder=f"{RESULT_DIR}/buckets", + config=minhash_config, + ), + ], + tasks=minhash_config.num_buckets, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/buckets", + depends=stage1, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=VENV_PATH, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 3 creates clusters of duplicates using the results from all buckets +stage3 = SlurmPipelineExecutor( + job_name="calc_cluster", + pipeline=[ + MinhashDedupCluster( + input_folder=f"{RESULT_DIR}/buckets", + output_folder=f"{RESULT_DIR}/remove_ids", + config=minhash_config, + save_cluster_id=True, + save_cluster_size=True, + ), + ], + tasks=1, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/clusters", + mem_per_cpu_gb=70, + cpus_per_task=2, + depends=stage2, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=VENV_PATH, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + +# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster +# the data must match exactly stage 1, so number of tasks and the input source must be the same +stage4 = SlurmPipelineExecutor( + job_name="remove_duplicate", + pipeline=[ + INPUT_READER, + TokensCounter(), # nice way to see how many tokens we had before and after deduplication + MinhashDedupFilter( + input_folder=f"{RESULT_DIR}/remove_ids", + exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"), + load_cluster_ids=True, + load_cluster_sizes=True, + ), + JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"), + ], + tasks=TOTAL_TASKS, + time="120:00:00", + partition="cpu", + logging_dir=f"{LOG_DIR}/filter", + depends=stage3, + slurm_logs_folder=SLURM_LOGS_DIR, + venv_path=VENV_PATH, + max_array_launch_parallel=True, + stagger_max_array_jobs=5, +) + + +stage4.run() diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh new file mode 100644 index 00000000..f0e97001 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh @@ -0,0 +1,68 @@ +#!/bin/bash +#SBATCH --job-name=0118_reshard_corpus +#SBATCH --partition=cpu +#SBATCH --exclusive +#SBATCH --mem=0 +#SBATCH --output=slurm_logs/%j-%x.out +#SBATCH --error=slurm_logs/%j-%x.err + +set -eux + +input_dir=$1 +output_dir=$2 +unit_size=${3:-"1G"} +pattern=${4:-""} + +input_dir=$(realpath "$input_dir") +mkdir -p "$output_dir" + +# ファイル一覧取得(フォルダ構造を考慮) +all_files=$(find -L "$input_dir" -type f) + +# パターンが指定された場合はフィルタリング +if [[ -n "$pattern" ]]; then + all_files=$(echo "$all_files" | grep -E "$pattern" || true) +fi + +# ファイルが見つからない場合の処理 +if [[ -z "$all_files" ]]; then + echo "No matching files found. Exiting." + exit 1 +fi + +# 各フォルダごとに処理するため、ディレクトリ単位でグループ化 +declare -A dir_files_map +while IFS= read -r file; do + # ファイルの属するディレクトリ(input_dir からの相対パス)を取得 + relative_dir=$(dirname "${file#$input_dir/}") + output_subdir="$output_dir/$relative_dir" + + # ディレクトリ構造を再現 + mkdir -p "$output_subdir" + + # ディレクトリごとにファイルリストを格納 + dir_files_map["$output_subdir"]+="$file " +done <<< "$all_files" + +# 各ディレクトリごとに分割処理を実行 +for subdir in "${!dir_files_map[@]}"; do + file_list=${dir_files_map["$subdir"]} + + split_args=( + --suffix-length=4 + --additional-suffix=.jsonl + --line-bytes="$unit_size" + -d + --filter='zstd -T0 -o $FILE.zst' + --verbose + ) + + # ファイルを適切に処理(.gz ファイルは解凍) + for file in $file_list; do + if [[ "$file" == *.gz ]]; then + gunzip -c "$file" + else + cat "$file" + fi + done | split "${split_args[@]}" - "$subdir/" +done \ No newline at end of file diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh new file mode 100644 index 00000000..a89e70fe --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh @@ -0,0 +1,86 @@ +#!/bin/bash + +set -eux + +work_dir="/home/shared/experiments/0118_dedup_corpusv4_ja" +reshard_script=${work_dir}/scripts/subcorpus/reshard.sh +minhash_script=${work_dir}/scripts/subcorpus/minhash_dedup.py +python_env=${work_dir}/environment/.venv + +trg_dirs=( + aozorabunko +# cc + ceek_news + e-gov + fineweb-2 + kaken + kokkai_giji + nwc2010 + nwjc + sip_comprehensive_html + sip_comprehensive_pdf-pdf2text + sip_comprehensive_pdf-surya + warp_html + warp_pdf_e0 + warp_pdf_e0.2 + wiki +) + +job_ids=() + +wait_for_jobs() { + local job_ids=("$@") + set +x + for job_id in "${job_ids[@]}"; do + while squeue -j "$job_id" 2>/dev/null | grep -q "$job_id"; do + sleep 10 + done + done + set -x +} + +# reshard +reshard_script=${work_dir}/scripts/subcorpus/reshard.sh +unit_size=1G + +declare -A patterns=( + ["kaken"]="train_*" + ["wiki"]="*train*" +) + +for _dir in "${trg_dirs[@]}"; do + trg_dir=$work_dir/data/subcorpus/${_dir} + if [ ! -d "$trg_dir" ]; then + echo "Directory does not exit. Skip: $trg_dir" + continue + fi + continue + + pattern=${patterns[$_dir]:-""} + + job_id=$( + sbatch $reshard_script \ + "${trg_dir}/raw" \ + "${trg_dir}/reshard_${unit_size}B" \ + "$unit_size" \ + "$pattern" + ) + job_ids+=("$job_id") +done +wait_for_jobs "${job_ids[@]}" +job_ids=() + +# minhash +source ${python_env}/bin/activate +for _dir in "${trg_dirs[@]}"; do + trg_dir=$work_dir/data/subcorpus/${_dir} + if [ ! -d "$trg_dir" ]; then + echo "Directory does not exit. Skip: $trg_dir" + continue + fi + #continue + + python $minhash_script \ + --input "${trg_dir}/reshard_${unit_size}B" \ + --output "${trg_dir}" +done From 778c040332660b74bb371fa9934ba5fec0dd56ac Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Sat, 8 Feb 2025 20:34:11 +0900 Subject: [PATCH 02/11] Update script worked on local servers --- .../common/dedup/installer/install.sh | 6 +- .../common/dedup/installer/requirements.txt | 3 +- .../local_multi_node/minhash_dedup.py | 132 ++++++++++++++++++ .../propcess/local_multi_node/run_tasks.py | 74 ++++++++++ .../common/dedup/propcess/make_links.sh | 12 ++ .../dedup/{subcorpus => propcess}/reshard.sh | 0 .../dedup/{subcorpus => propcess}/run_all.sh | 3 +- .../slum}/minhash_dedup.py | 0 8 files changed, 225 insertions(+), 5 deletions(-) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh rename corpus/llm-jp-corpus-v4/common/dedup/{subcorpus => propcess}/reshard.sh (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{subcorpus => propcess}/run_all.sh (94%) rename corpus/llm-jp-corpus-v4/common/dedup/{subcorpus => propcess/slum}/minhash_dedup.py (100%) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh index 23a01722..ee86f780 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -1,9 +1,9 @@ #!/bin/bash -work_dir=/home/shared/experiments/0118_dedup_corpusv4_ja +work_dir=/model/experiments/0118_dedup_corpusv4_ja env_dir=environment venv_dir=.venv -script_dir=${work_dir}/scripts +script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup # pyenv: python version 3.10.14 @@ -16,4 +16,4 @@ python3.10 -m venv $venv_dir source $venv_dir/bin/activate pip install --upgrade --no-cache-dir pip uv uv init -uv add --no-cache -r ${script_dir}/installer/requirements.txt +uv add --no-cache -r ${script_root}/installer/requirements.txt diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt index b0e7acd5..648bddbc 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -1,3 +1,4 @@ datatrove[io,processing,cli]==0.4.0 # all w/o s3 spacy[ja] # required when running minhash (data-trove) -sudachipy==0.5.4 \ No newline at end of file +sudachipy==0.5.4 # required when running minhash (data-trove) +paramiko # for local multi node \ No newline at end of file diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py new file mode 100644 index 00000000..80955747 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py @@ -0,0 +1,132 @@ +from argparse import ArgumentParser +from dataclasses import dataclass +from pathlib import Path + +from datatrove.executor.local import LocalPipelineExecutor +from datatrove.pipeline.dedup import MinhashDedupSignature +from datatrove.pipeline.dedup.minhash import (MinhashConfig, + MinhashDedupBuckets, + MinhashDedupCluster, + MinhashDedupFilter) +from datatrove.pipeline.readers import JsonlReader +from datatrove.pipeline.tokens import TokensCounter +from datatrove.pipeline.writers.jsonl import JsonlWriter +from datatrove.utils.hashing import HashConfig +from datatrove.utils.typeshelper import Languages + +WORK_DIR = "/home/shared/experiments/0118_dedup_corpusv4_ja" +VENV_PATH = f"{WORK_DIR}/environment/.venv/bin/activate" + +@dataclass +class Args: + ngram: int + buckets: int + hashes_per_bucket: int + input: str + output: str + local_tasks:int + local_rank_offset:int + + +argparser = ArgumentParser() +argparser.add_argument("input", type=str) +argparser.add_argument("output", type=str) +argparser.add_argument("--ngram", default=5, type=int) +argparser.add_argument("-r", "--buckets", default=20, type=int) +argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) +argparser.add_argument("-task","--local_tasks", type=int) +argparser.add_argument("-rank","--local_rank_offset", type=int) +args = argparser.parse_args(namespace=Args) + + +MINHASH_DIRNAME = f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +MINHASH_DIR=Path(args.output)/MINHASH_DIRNAME +RESULT_DIR = f"{MINHASH_DIR}/results" +LOG_DIR = f"{MINHASH_DIR}/logs" +SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" + +all_files=[_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] +TOTAL_TASKS = len(all_files) +# this is the original data that we want to deduplicate +INPUT_READER = JsonlReader(args.input) +# you can also change ngrams or the number of buckets and their size here +minhash_config = MinhashConfig( + hash_config=HashConfig(precision=64), + n_grams=args.ngram, + num_buckets=args.buckets, + hashes_per_bucket=args.hashes_per_bucket, +) # better precision -> fewer false positives (collisions) + + +# stage 1 computes minhash signatures for each task (each task gets a set of files) +stage1 = LocalPipelineExecutor( + pipeline=[ + INPUT_READER, + MinhashDedupSignature( + output_folder=f"{RESULT_DIR}/signatures", + config=minhash_config, + language=Languages.japanese, + skip_existing_sigs=True, + ), + ], + tasks=TOTAL_TASKS, + workers=args.local_tasks, + logging_dir=f"{LOG_DIR}/signatures", + local_tasks=args.local_tasks, + local_rank_offset=args.local_rank_offset, + randomize_start_duration=10, +) + +# stage 2 finds matches between signatures in each bucket +stage2 = LocalPipelineExecutor( + pipeline=[ + MinhashDedupBuckets( + input_folder=f"{RESULT_DIR}/signatures", + output_folder=f"{RESULT_DIR}/buckets", + config=minhash_config, + ), + ], + tasks=minhash_config.num_buckets, + logging_dir=f"{LOG_DIR}/buckets", + depends=stage1, +) + +# stage 3 creates clusters of duplicates using the results from all buckets +stage3 = LocalPipelineExecutor( + pipeline=[ + MinhashDedupCluster( + input_folder=f"{RESULT_DIR}/buckets", + output_folder=f"{RESULT_DIR}/remove_ids", + config=minhash_config, + save_cluster_id=True, + save_cluster_size=True, + ), + ], + tasks=1, + logging_dir=f"{LOG_DIR}/clusters", + depends=stage2, +) + +# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster +# the data must match exactly stage 1, so number of tasks and the input source must be the same +stage4 = LocalPipelineExecutor( + pipeline=[ + INPUT_READER, + TokensCounter(), # nice way to see how many tokens we had before and after deduplication + MinhashDedupFilter( + input_folder=f"{RESULT_DIR}/remove_ids", + exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"), + load_cluster_ids=True, + load_cluster_sizes=True, + ), + JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"), + ], + tasks=TOTAL_TASKS, + logging_dir=f"{LOG_DIR}/filter", + depends=stage3, + #local_tasks=args.local_tasks, + #local_rank_offset=args.local_rank_offset, +) + +if __name__ == "__main__": + stage1.run() diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py b/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py new file mode 100644 index 00000000..55c784c8 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py @@ -0,0 +1,74 @@ +import getpass +from concurrent.futures import ThreadPoolExecutor, as_completed + +import paramiko +from tqdm import tqdm + +# === 設定項目 === +servers = [f"z-cpu{i}" for i in range(63)] +username = getpass.getuser() +target_dir = "/model/experiments/0118_dedup_corpusv4_ja" +python_script = "scripts/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py" +task_per_server=100 +python_args=[ + "data/all/deduped_subcorpus", + "data/all", + "--local_tasks", + f"{task_per_server}", + "--local_rank_offset", +] +log_dir = "local_logs/all-stage1" +max_workers = 10 +# =========================== + + +def run_task(server, task_number): + try: + # SSHクライアントの準備 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy( + paramiko.AutoAddPolicy() + ) # ホストキーの確認を無効化 + + # SSHエージェントから鍵を取得 + agent = paramiko.Agent() + keys = agent.get_keys() + if len(keys) == 0: + raise Exception( + "SSHエージェントに鍵が登録されていません。`ssh-add`で鍵を追加してください。" + ) + + # 実行ユーザー名でSSH接続 + ssh.connect(server, username=username, pkey=keys[0]) # エージェントの鍵を使用 + + # 実行コマンド + args_concat=" ".join(python_args) + args_concat+=f" {task_per_server*task_number}" + + python_commands=f"python {python_script} {args_concat}" + python_logging=f"> {log_dir}/{server}.out 2>&1" + + command = f""" + cd {target_dir} && \ + source environment/.venv/bin/activate && \ + nohup bash -c '{python_commands} {python_logging}' > nohup.out 2>&1 & + """ + #command = "pkill -u ytsuta python" + ssh.exec_command(command) + ssh.close() + + return f"{server}: ✅ Task Started (Task #{task_number}) as {username}" + except Exception as e: + return f"{server}: ❌ FAILED - {str(e)}" + + +# 並列実行と進捗バー表示 +if __name__ == "__main__": + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [ + executor.submit(run_task, server, i) for i, server in enumerate(servers) + ] + for future in tqdm( + as_completed(futures), total=len(servers), desc="Running Tasks" + ): + print(future.result()) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh b/corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh new file mode 100644 index 00000000..65968bf0 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh @@ -0,0 +1,12 @@ +#!/bin/bash +data_dir=/model/experiments/0118_dedup_corpusv4_ja/data + +for dir in "${data_dir}"/subcorpus/*; do + dir_name=$(basename "$dir") + + for file in "$dir/minhash-5gram-20buckets-10hashes/results/deduplicated_output/"*; do + file_name=$(basename "$file") + mkdir -p "$data_dir/all/deduped_subcorpus/$dir_name" + ln -s "$file" "$data_dir/all/deduped_subcorpus/$dir_name/$file_name" + done +done \ No newline at end of file diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/propcess/reshard.sh similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh rename to corpus/llm-jp-corpus-v4/common/dedup/propcess/reshard.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/propcess/run_all.sh similarity index 94% rename from corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh rename to corpus/llm-jp-corpus-v4/common/dedup/propcess/run_all.sh index a89e70fe..10c2fb4b 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/run_all.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/propcess/run_all.sh @@ -17,6 +17,7 @@ trg_dirs=( kokkai_giji nwc2010 nwjc + patent sip_comprehensive_html sip_comprehensive_pdf-pdf2text sip_comprehensive_pdf-surya @@ -40,7 +41,7 @@ wait_for_jobs() { } # reshard -reshard_script=${work_dir}/scripts/subcorpus/reshard.sh +reshard_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh unit_size=1G declare -A patterns=( diff --git a/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/propcess/slum/minhash_dedup.py similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/subcorpus/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/propcess/slum/minhash_dedup.py From 09c5f1b8a9a74c6250e31ac7b49f766030cdeb6f Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Sat, 8 Feb 2025 20:46:34 +0900 Subject: [PATCH 03/11] rename folder --- .../{propcess => preprocess}/local_multi_node/minhash_dedup.py | 0 .../dedup/{propcess => preprocess}/local_multi_node/run_tasks.py | 0 .../common/dedup/{propcess => preprocess}/make_links.sh | 0 .../common/dedup/{propcess => preprocess}/reshard.sh | 0 .../common/dedup/{propcess => preprocess}/run_all.sh | 0 .../common/dedup/{propcess => preprocess}/slum/minhash_dedup.py | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/local_multi_node/minhash_dedup.py (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/local_multi_node/run_tasks.py (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/make_links.sh (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/reshard.sh (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/run_all.sh (100%) rename corpus/llm-jp-corpus-v4/common/dedup/{propcess => preprocess}/slum/minhash_dedup.py (100%) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/minhash_dedup.py similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/minhash_dedup.py diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/run_tasks.py rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/make_links.sh rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/reshard.sh rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/run_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/run_all.sh rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/propcess/slum/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/slum/minhash_dedup.py similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/propcess/slum/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/preprocess/slum/minhash_dedup.py From 65132f399c9cc811a11e510faccb3400f4012510 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Tue, 25 Feb 2025 13:48:43 +0900 Subject: [PATCH 04/11] Update install scripts --- .../dedup/installer/datatrove_diff.patch | 26 +++++++++++++++++++ .../common/dedup/installer/install.sh | 21 +++++++++++---- .../common/dedup/installer/requirements.txt | 8 +++--- 3 files changed, 46 insertions(+), 9 deletions(-) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch b/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch new file mode 100644 index 00000000..52054816 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/datatrove_diff.patch @@ -0,0 +1,26 @@ +diff --git a/src/datatrove/pipeline/dedup/minhash.py b/src/datatrove/pipeline/dedup/minhash.py +index f644389..3f1b50b 100644 +--- a/src/datatrove/pipeline/dedup/minhash.py ++++ b/src/datatrove/pipeline/dedup/minhash.py +@@ -256,6 +254,8 @@ class MinhashDedupSignature(PipelineStep): + doc_idx, + ) + ) ++ else: ++ logger.warning(f"No singles {doc.text=} on {rank=}") + for file in buckets: + file.close() + +diff --git a/src/datatrove/utils/text.py b/src/datatrove/utils/text.py +index 7ab7d3d..d0c9cb6 100644 +--- a/src/datatrove/utils/text.py ++++ b/src/datatrove/utils/text.py +@@ -259,6 +257,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str: + + # from https://tedboy.github.io/nlps/_modules/nltk/util.html#ngrams + def ngrams(sequence: Iterable, n: int): ++ if isinstance(sequence, list) and len(sequence) < n: ++ sequence += [""] * (n - len(sequence)) + iterables = tee(sequence, n) + + for i, sub_iterable in enumerate(iterables): # For each window, diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh index ee86f780..287daa92 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -1,19 +1,30 @@ #!/bin/bash work_dir=/model/experiments/0118_dedup_corpusv4_ja -env_dir=environment -venv_dir=.venv +env_dir=${work_dir}/environment +src_dir=${env_dir}/src +venv_dir=${env_dir}/.venv script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup -# pyenv: python version 3.10.14 +# pyenv: python version >= 3.10 +# create environment cd $work_dir || exit - mkdir -p $env_dir cd $env_dir || exit -python3.10 -m venv $venv_dir +python3.10 -m venv $venv_dir source $venv_dir/bin/activate pip install --upgrade --no-cache-dir pip uv uv init + +# install customized datatrove +mkdir -p $src_dir +cd $src_dir || exit +git clone https://github.com/huggingface/datatrove.git -b v0.4.0 +cd datatrove || exit +patch -p1 < ${script_root}/installer/datatrove_diff.patch +uv pip install --no-cache-dir ".[io,processing,cli]" + +# install others uv add --no-cache -r ${script_root}/installer/requirements.txt diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt index 648bddbc..53ed4648 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -1,4 +1,4 @@ -datatrove[io,processing,cli]==0.4.0 # all w/o s3 -spacy[ja] # required when running minhash (data-trove) -sudachipy==0.5.4 # required when running minhash (data-trove) -paramiko # for local multi node \ No newline at end of file +# datatrove[io,processing,cli]==0.4.0 # all w/o s3 # install from source +spacy[ja] # required when running minhash (datatrove) +sudachipy==0.5.4 # required when running minhash (datatrove) +paramiko # required to run processes on local multi node \ No newline at end of file From 7cebae1fd06988c59cb3dbddede7faf1c3f0b8a6 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Tue, 25 Feb 2025 13:59:35 +0900 Subject: [PATCH 05/11] Add last line --- corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt index 53ed4648..14fc031a 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -1,4 +1,4 @@ # datatrove[io,processing,cli]==0.4.0 # all w/o s3 # install from source spacy[ja] # required when running minhash (datatrove) sudachipy==0.5.4 # required when running minhash (datatrove) -paramiko # required to run processes on local multi node \ No newline at end of file +paramiko # required to run processes on local multi node From 96ed3e0846dfb0928f5e42e5481ac12e5f14aa10 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Tue, 25 Feb 2025 15:00:00 +0900 Subject: [PATCH 06/11] Update to correctly work --- .../common/dedup/installer/install.sh | 18 +++++++++--------- .../common/dedup/installer/requirements.txt | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh index 287daa92..4a44a698 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -1,30 +1,30 @@ #!/bin/bash -work_dir=/model/experiments/0118_dedup_corpusv4_ja +work_dir=/model/ytsuta/workspace-model/space-dedup_analysis env_dir=${work_dir}/environment -src_dir=${env_dir}/src venv_dir=${env_dir}/.venv +src_dir=${env_dir}/src script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup -# pyenv: python version >= 3.10 +export UV_PROJECT_ENVIRONMENT=$venv_dir # create environment cd $work_dir || exit mkdir -p $env_dir cd $env_dir || exit -python3.10 -m venv $venv_dir +python3 -m venv $venv_dir source $venv_dir/bin/activate pip install --upgrade --no-cache-dir pip uv uv init +# install requirement +uv add --no-cache -r ${script_root}/installer/requirements.txt + # install customized datatrove mkdir -p $src_dir cd $src_dir || exit git clone https://github.com/huggingface/datatrove.git -b v0.4.0 cd datatrove || exit -patch -p1 < ${script_root}/installer/datatrove_diff.patch -uv pip install --no-cache-dir ".[io,processing,cli]" - -# install others -uv add --no-cache -r ${script_root}/installer/requirements.txt +patch -p1 <${script_root}/installer/datatrove_diff.patch +uv pip install --no-cache-dir ".[io,processing,cli]" diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt index 14fc031a..eee707e2 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/requirements.txt @@ -1,4 +1,4 @@ -# datatrove[io,processing,cli]==0.4.0 # all w/o s3 # install from source +datatrove[io,processing,cli]==0.4.0 # will be re-installed, but install to resolve dependency spacy[ja] # required when running minhash (datatrove) sudachipy==0.5.4 # required when running minhash (datatrove) paramiko # required to run processes on local multi node From e65c9cff6655470ae66006a5df5e256a4bf3cb97 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Tue, 25 Feb 2025 15:08:34 +0900 Subject: [PATCH 07/11] Change variable --- corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh index 4a44a698..c2a00f75 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh @@ -1,6 +1,6 @@ #!/bin/bash -work_dir=/model/ytsuta/workspace-model/space-dedup_analysis +work_dir=/model/experiments/0118_dedup_corpusv4_ja env_dir=${work_dir}/environment venv_dir=${env_dir}/.venv src_dir=${env_dir}/src From b335c4b7fb484a3d5f3db73a754f9bdacc8f5c09 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Mon, 31 Mar 2025 17:48:05 +0900 Subject: [PATCH 08/11] Refactor minhash dedup code on local multi-node and --- .../local_multi_node/minhash_dedup.py | 44 ++-- .../local_multi_node/submit_minhash.py | 204 ++++++++++++++++++ .../submit_minhash_all_subcorpus.sh | 59 +++++ .../slum/minhash_dedup.py | 32 +-- .../preprocess/local_multi_node/run_tasks.py | 74 ------- .../common/dedup/preprocess/reshard_all.sh | 53 +++++ .../common/dedup/preprocess/run_all.sh | 87 -------- 7 files changed, 358 insertions(+), 195 deletions(-) rename corpus/llm-jp-corpus-v4/common/dedup/{preprocess => minhash}/local_multi_node/minhash_dedup.py (79%) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh rename corpus/llm-jp-corpus-v4/common/dedup/{preprocess => minhash}/slum/minhash_dedup.py (90%) delete mode 100644 corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh delete mode 100644 corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py similarity index 79% rename from corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py index 80955747..4435b42c 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/minhash_dedup.py +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py @@ -14,19 +14,19 @@ from datatrove.utils.hashing import HashConfig from datatrove.utils.typeshelper import Languages -WORK_DIR = "/home/shared/experiments/0118_dedup_corpusv4_ja" -VENV_PATH = f"{WORK_DIR}/environment/.venv/bin/activate" @dataclass class Args: + input: str + output: str ngram: int buckets: int hashes_per_bucket: int - input: str - output: str - local_tasks:int - local_rank_offset:int - + local_tasks: int + local_rank_offset: int + max_worker: int + stage:int + argparser = ArgumentParser() argparser.add_argument("input", type=str) @@ -34,19 +34,24 @@ class Args: argparser.add_argument("--ngram", default=5, type=int) argparser.add_argument("-r", "--buckets", default=20, type=int) argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) -argparser.add_argument("-task","--local_tasks", type=int) -argparser.add_argument("-rank","--local_rank_offset", type=int) +argparser.add_argument("-task", "--local_tasks", type=int, default=-1) +argparser.add_argument("-rank", "--local_rank_offset", type=int, default=0) +argparser.add_argument("-worker", "--max_worker", type=int, default=16) +argparser.add_argument("-stage", "--stage", type=int, choices=[1,2,3,4],default=4) args = argparser.parse_args(namespace=Args) -MINHASH_DIRNAME = f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" -MINHASH_DIR=Path(args.output)/MINHASH_DIRNAME +MINHASH_DIRNAME = ( + f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +) +MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME RESULT_DIR = f"{MINHASH_DIR}/results" LOG_DIR = f"{MINHASH_DIR}/logs" SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" -all_files=[_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] +all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] TOTAL_TASKS = len(all_files) + # this is the original data that we want to deduplicate INPUT_READER = JsonlReader(args.input) # you can also change ngrams or the number of buckets and their size here @@ -57,7 +62,6 @@ class Args: hashes_per_bucket=args.hashes_per_bucket, ) # better precision -> fewer false positives (collisions) - # stage 1 computes minhash signatures for each task (each task gets a set of files) stage1 = LocalPipelineExecutor( pipeline=[ @@ -70,7 +74,7 @@ class Args: ), ], tasks=TOTAL_TASKS, - workers=args.local_tasks, + workers=args.max_worker, logging_dir=f"{LOG_DIR}/signatures", local_tasks=args.local_tasks, local_rank_offset=args.local_rank_offset, @@ -87,6 +91,7 @@ class Args: ), ], tasks=minhash_config.num_buckets, + workers=args.max_worker, logging_dir=f"{LOG_DIR}/buckets", depends=stage1, ) @@ -102,7 +107,7 @@ class Args: save_cluster_size=True, ), ], - tasks=1, + tasks=args.max_worker, logging_dir=f"{LOG_DIR}/clusters", depends=stage2, ) @@ -112,7 +117,7 @@ class Args: stage4 = LocalPipelineExecutor( pipeline=[ INPUT_READER, - TokensCounter(), # nice way to see how many tokens we had before and after deduplication + # TokensCounter(), # nice way to see how many tokens we had before and after deduplication MinhashDedupFilter( input_folder=f"{RESULT_DIR}/remove_ids", exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"), @@ -124,9 +129,10 @@ class Args: tasks=TOTAL_TASKS, logging_dir=f"{LOG_DIR}/filter", depends=stage3, - #local_tasks=args.local_tasks, - #local_rank_offset=args.local_rank_offset, + workers=args.max_worker, + local_tasks=args.local_tasks, + local_rank_offset=args.local_rank_offset, ) if __name__ == "__main__": - stage1.run() + exec(f"stage{args.stage}.run()") diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py new file mode 100644 index 00000000..71d0ad06 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py @@ -0,0 +1,204 @@ +import argparse +import getpass +import logging +from pathlib import Path + +import paramiko + +# Args default +WORK_DIR_DEFAULT = "/model/experiments/0118_dedup_corpusv4_ja/data" +PYTHON_SCRIPT_PATH_DEFAULT = ( + WORK_DIR_DEFAULT + + "/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py" +) +VENV_PATH_DEFAULT = WORK_DIR_DEFAULT + "/environment/.venv/bin/activate" + +# server settings +USER_NAME = getpass.getuser() + +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +class Args: + # For runner + input_dir: str + output_dir: str + stage: int + + # About paths + log_dir: str + venv_path: str + python_script: str + + # About server + node_list: list[str] + max_node_worker: int + + +def setup_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser() + runner_parser = parser.add_argument_group("Arguments for running scripts") + runner_parser.add_argument( + "--input_dir", + required=True, + type=str, + ) + runner_parser.add_argument( + "--output_dir", + required=True, + type=str, + ) + runner_parser.add_argument( + "-stage", "--stage", type=int, choices=[1, 2, 3, 4], default=4 + ) + path_parser = parser.add_argument_group("Arguments about paths") + path_parser.add_argument( + "--log_dir", + required=True, + type=str, + ) + path_parser.add_argument( + "--venv_path", + default=VENV_PATH_DEFAULT, + type=str, + ) + path_parser.add_argument( + "--python_script", + default=PYTHON_SCRIPT_PATH_DEFAULT, + type=str, + ) + server_parser = parser.add_argument_group("Arguments about server") + server_parser.add_argument( + "--node_list", + nargs="+", + type=str, + ) + server_parser.add_argument( + "--max_node_worker", + type=int, + ) + + return parser + + +# command = "kill -9 -- -1" + + +def submit_task(node, command): + try: + # SSHクライアントの準備 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy( + paramiko.AutoAddPolicy() + ) # ホストキーの確認を無効化 + + # SSHエージェントから鍵を取得 + agent = paramiko.Agent() + keys = agent.get_keys() + if len(keys) == 0: + raise Exception( + "SSHエージェントに鍵が登録されていません。`ssh-add`で鍵を追加してください。" + ) + + # 実行ユーザー名でSSH接続 + ssh.connect(node, username=USER_NAME, pkey=keys[0]) # エージェントの鍵を使用 + + ssh.exec_command(command) + ssh.close() + + logger.info(f"{node}: ✅ Task Started as {USER_NAME}") + except Exception as e: + logger.info(f"{node}: ❌ FAILED - {str(e)}") + + +def prepare_command_prefix( + input_dir: Path, + output_dir: Path, + venv_path: Path, + python_script: Path, +): + python_args = [input_dir, output_dir] + concat_args = " ".join(python_args) + python_commands = f"python {python_script} {concat_args}" + command_prefix = "&&".join( + [ + "ulimit -n 65536 1048576", + f"source {venv_path}", + f"nohup bash -c '{python_commands}", + ] + ) + return command_prefix + + +def submit_minhash_job( + node_list: list[str], + input_dir: Path, + command_prefix, + stage: int, + log_dir: Path, + max_node_worker: int, +): + all_files = [_f for _f in input_dir.rglob("*") if _f.resolve().is_file()] + total_tasks = len(all_files) + + for i, finish_tasks in enumerate(range(0, total_tasks, max_node_worker)): + rest_tasks = total_tasks - finish_tasks + local_tasks = min(rest_tasks, max_node_worker) + node = node_list[i] + + # complete commannd + log_path = log_dir / (node + ".out") + logging_command = f"> {log_path} 2>&1" + nohup_sufix = "> nohup.out 2>&1 &" + command = "".join( + [ + command_prefix, + "--stage", + f"{stage}", + "--local_tasks", + f"{local_tasks}", + "--local_rank_offset", + f"{finish_tasks}", + f"{logging_command}", + f"{nohup_sufix}", + ] + ) + + submit_task(node, command) + + if stage in [2,3]: + # On stage2 and stage3, process is not distributed + break + + +def main( + input_dir: str, + output_dir: str, + stage: int, + log_dir: str, + venv_path: str, + python_script: str, + node_list: list[str], + max_node_worker: int, +): + command_prefix = prepare_command_prefix( + input_dir, output_dir, venv_path, python_script + ) + submit_minhash_job(node_list, input_dir, command_prefix,stage, log_dir, max_node_worker) + + +if __name__ == "__main__": + args = setup_parser().parse_args(namespace=Args) + main( + args.input_dir, + args.output_dir, + args.stage, + args.log_dir, + args.venv_path, + args.python_script, + args.node_list, + args.max_node_worker, + ) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh new file mode 100644 index 00000000..d880859e --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +set -eux + +stage=$1 +work_dir="/model/experiments/0118_dedup_corpusv4_ja" +env_path="${work_dir}/environment/.venv/bin/activate" +log_root="${work_dir}/local_logs" + +node_list=() +for i in $(seq 0 99); do + node_list+=("z-cpu$i") +done + +source $env_path + +target_dirs=( + aozorabunko + cc + ceek_news + e-gov + fineweb-2 + kaken + kokkai_giji + nwc2010 + nwjc + patent + sip_comprehensive_html + sip_comprehensive_pdf-pdf2text + sip_comprehensive_pdf-surya + warp_html + warp_pdf_e0 + warp_pdf_e0.2 + wiki +) + + +# reshard +python_submit_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py +python_minhash_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py + +for _dirname in "${target_dirs[@]}"; do + _target_dir=$work_dir/data/subcorpus/${_dirname} + if [ ! -d "$_target_dir" ]; then + echo "Directory does not exit. Skip: $_target_dir" + continue + fi + continue + + python $python_submit_script \ + --input_dir "${_target_dir}/reshard_1B" \ + --output_dir "$_target_dir" \ + --stage "${stage}" \ + --log_dir "${log_root}/${stage}/${_dirname}"\ + --venv_path $env_path \ + --python_script $python_minhash_script \ + --node_list "${node_list[@]}" \ + --max_node_worker 150 +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/slum/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/slum/minhash_dedup.py similarity index 90% rename from corpus/llm-jp-corpus-v4/common/dedup/preprocess/slum/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/minhash/slum/minhash_dedup.py index 202faf3b..0fa00d68 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/slum/minhash_dedup.py +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/slum/minhash_dedup.py @@ -17,34 +17,36 @@ WORK_DIR = "/home/shared/experiments/0118_dedup_corpusv4_ja" VENV_PATH = f"{WORK_DIR}/environment/.venv/bin/activate" + @dataclass class Args: + input: str + output: str ngram: int buckets: int hashes_per_bucket: int - input: str - input_pattern:str - output: str - venv:str + venv: str + argparser = ArgumentParser() +argparser.add_argument("input", type=str) +argparser.add_argument("output", type=str) argparser.add_argument("--ngram", default=5, type=int) argparser.add_argument("-r", "--buckets", default=20, type=int) argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int) -argparser.add_argument("--input", type=str) -argparser.add_argument("--output", type=str) -argparser.add_argument("--venv", default=VENV_PATH,type=str) +argparser.add_argument("--venv", default=VENV_PATH, type=str) args = argparser.parse_args(namespace=Args) - -MINHASH_DIRNAME = f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" -MINHASH_DIR=Path(args.output)/MINHASH_DIRNAME +MINHASH_DIRNAME = ( + f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes" +) +MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME RESULT_DIR = f"{MINHASH_DIR}/results" LOG_DIR = f"{MINHASH_DIR}/logs" SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs" -all_files=[_f for _f in Path(args.input).rglob("*") if _f.is_file()] +all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()] TOTAL_TASKS = len(all_files) # this is the original data that we want to deduplicate @@ -75,7 +77,7 @@ class Args: partition="cpu", logging_dir=f"{LOG_DIR}/signatures", slurm_logs_folder=SLURM_LOGS_DIR, - venv_path=VENV_PATH, + venv_path=args.venv, max_array_launch_parallel=True, stagger_max_array_jobs=5, ) @@ -96,7 +98,7 @@ class Args: logging_dir=f"{LOG_DIR}/buckets", depends=stage1, slurm_logs_folder=SLURM_LOGS_DIR, - venv_path=VENV_PATH, + venv_path=args.venv, max_array_launch_parallel=True, stagger_max_array_jobs=5, ) @@ -121,7 +123,7 @@ class Args: cpus_per_task=2, depends=stage2, slurm_logs_folder=SLURM_LOGS_DIR, - venv_path=VENV_PATH, + venv_path=args.venv, max_array_launch_parallel=True, stagger_max_array_jobs=5, ) @@ -147,7 +149,7 @@ class Args: logging_dir=f"{LOG_DIR}/filter", depends=stage3, slurm_logs_folder=SLURM_LOGS_DIR, - venv_path=VENV_PATH, + venv_path=args.venv, max_array_launch_parallel=True, stagger_max_array_jobs=5, ) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py deleted file mode 100644 index 55c784c8..00000000 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/local_multi_node/run_tasks.py +++ /dev/null @@ -1,74 +0,0 @@ -import getpass -from concurrent.futures import ThreadPoolExecutor, as_completed - -import paramiko -from tqdm import tqdm - -# === 設定項目 === -servers = [f"z-cpu{i}" for i in range(63)] -username = getpass.getuser() -target_dir = "/model/experiments/0118_dedup_corpusv4_ja" -python_script = "scripts/corpus/llm-jp-corpus-v4/common/dedup/propcess/local_multi_node/minhash_dedup.py" -task_per_server=100 -python_args=[ - "data/all/deduped_subcorpus", - "data/all", - "--local_tasks", - f"{task_per_server}", - "--local_rank_offset", -] -log_dir = "local_logs/all-stage1" -max_workers = 10 -# =========================== - - -def run_task(server, task_number): - try: - # SSHクライアントの準備 - ssh = paramiko.SSHClient() - ssh.set_missing_host_key_policy( - paramiko.AutoAddPolicy() - ) # ホストキーの確認を無効化 - - # SSHエージェントから鍵を取得 - agent = paramiko.Agent() - keys = agent.get_keys() - if len(keys) == 0: - raise Exception( - "SSHエージェントに鍵が登録されていません。`ssh-add`で鍵を追加してください。" - ) - - # 実行ユーザー名でSSH接続 - ssh.connect(server, username=username, pkey=keys[0]) # エージェントの鍵を使用 - - # 実行コマンド - args_concat=" ".join(python_args) - args_concat+=f" {task_per_server*task_number}" - - python_commands=f"python {python_script} {args_concat}" - python_logging=f"> {log_dir}/{server}.out 2>&1" - - command = f""" - cd {target_dir} && \ - source environment/.venv/bin/activate && \ - nohup bash -c '{python_commands} {python_logging}' > nohup.out 2>&1 & - """ - #command = "pkill -u ytsuta python" - ssh.exec_command(command) - ssh.close() - - return f"{server}: ✅ Task Started (Task #{task_number}) as {username}" - except Exception as e: - return f"{server}: ❌ FAILED - {str(e)}" - - -# 並列実行と進捗バー表示 -if __name__ == "__main__": - with ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [ - executor.submit(run_task, server, i) for i, server in enumerate(servers) - ] - for future in tqdm( - as_completed(futures), total=len(servers), desc="Running Tasks" - ): - print(future.result()) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh new file mode 100644 index 00000000..9549cf88 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +set -eux + +work_dir="/model/experiments/0118_dedup_corpusv4_ja" +reshard_script=${work_dir}/scripts/subcorpus/reshard.sh + +target_dirs=( + aozorabunko + cc + ceek_news + e-gov + fineweb-2 + kaken + kokkai_giji + nwc2010 + nwjc + patent + sip_comprehensive_html + sip_comprehensive_pdf-pdf2text + sip_comprehensive_pdf-surya + warp_html + warp_pdf_e0 + warp_pdf_e0.2 + wiki +) + + +# reshard +reshard_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh +unit_size=1G + +declare -A patterns=( + ["kaken"]="train_*" + ["wiki"]="*train*" +) + +for _dir in "${target_dirs[@]}"; do + trg_dir=$work_dir/data/subcorpus/${_dir} + if [ ! -d "$trg_dir" ]; then + echo "Directory does not exit. Skip: $trg_dir" + continue + fi + continue + + pattern=${patterns[$_dir]:-""} + + bash $reshard_script \ + "${trg_dir}/raw" \ + "${trg_dir}/reshard_${unit_size}B" \ + "$unit_size" \ + "$pattern" +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh deleted file mode 100644 index 10c2fb4b..00000000 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/run_all.sh +++ /dev/null @@ -1,87 +0,0 @@ -#!/bin/bash - -set -eux - -work_dir="/home/shared/experiments/0118_dedup_corpusv4_ja" -reshard_script=${work_dir}/scripts/subcorpus/reshard.sh -minhash_script=${work_dir}/scripts/subcorpus/minhash_dedup.py -python_env=${work_dir}/environment/.venv - -trg_dirs=( - aozorabunko -# cc - ceek_news - e-gov - fineweb-2 - kaken - kokkai_giji - nwc2010 - nwjc - patent - sip_comprehensive_html - sip_comprehensive_pdf-pdf2text - sip_comprehensive_pdf-surya - warp_html - warp_pdf_e0 - warp_pdf_e0.2 - wiki -) - -job_ids=() - -wait_for_jobs() { - local job_ids=("$@") - set +x - for job_id in "${job_ids[@]}"; do - while squeue -j "$job_id" 2>/dev/null | grep -q "$job_id"; do - sleep 10 - done - done - set -x -} - -# reshard -reshard_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/subcorpus/reshard.sh -unit_size=1G - -declare -A patterns=( - ["kaken"]="train_*" - ["wiki"]="*train*" -) - -for _dir in "${trg_dirs[@]}"; do - trg_dir=$work_dir/data/subcorpus/${_dir} - if [ ! -d "$trg_dir" ]; then - echo "Directory does not exit. Skip: $trg_dir" - continue - fi - continue - - pattern=${patterns[$_dir]:-""} - - job_id=$( - sbatch $reshard_script \ - "${trg_dir}/raw" \ - "${trg_dir}/reshard_${unit_size}B" \ - "$unit_size" \ - "$pattern" - ) - job_ids+=("$job_id") -done -wait_for_jobs "${job_ids[@]}" -job_ids=() - -# minhash -source ${python_env}/bin/activate -for _dir in "${trg_dirs[@]}"; do - trg_dir=$work_dir/data/subcorpus/${_dir} - if [ ! -d "$trg_dir" ]; then - echo "Directory does not exit. Skip: $trg_dir" - continue - fi - #continue - - python $minhash_script \ - --input "${trg_dir}/reshard_${unit_size}B" \ - --output "${trg_dir}" -done From 69ab366abe0e4fad45a69a8d3aba481314f91ea8 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Mon, 31 Mar 2025 17:48:57 +0900 Subject: [PATCH 09/11] Add postporcess scripts to split files by data types --- .../check_original_path_consisitency.py | 59 +++++++ .../postprocess/reconstruct_stracture.py | 145 ++++++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py new file mode 100644 index 00000000..c4afadd0 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py @@ -0,0 +1,59 @@ +import gzip +import json +import multiprocessing +import sys +from pathlib import Path + +from tqdm import tqdm + +input_dir = "/model/experiments/0118_dedup_corpusv4_ja/data/all/minhash-5gram-20buckets-10hashes/results/deduplicated_output" +parallel_jobs = 32 + +# fineweb-2 preserve original path on original fineweb-2 +patterns = ["s3://commoncrawl/crawl-data", "/fsx/guilherme/cc2023-50"] + + +def convert_patterns(path): + for _pat in patterns: + if _pat in path: + return _pat + return path + + +def process_file(file): + unique_paths = set() + + try: + with gzip.open(file,"rt") as f: + for line in f: + try: + data = json.loads(line) + file_path = data.get("metadata").get("file_path") + converted_path = convert_patterns(file_path) + unique_paths.add(converted_path) + except json.JSONDecodeError: + continue + except Exception as e: + print(f"Error processing {file}: {e}", file=sys.stderr) + return None + + if len(unique_paths) != 1: + print(f"Warning: {file} has {len(unique_paths)} unique values!") + + +def main(): + files = list(Path(input_dir).rglob("*.jsonl.gz")) + + with multiprocessing.Pool(parallel_jobs) as pool: + list( + tqdm( + pool.imap_unordered(process_file, files), + total=len(files), + desc="Processing files", + ncols=0, + ), + ) + + +if __name__ == "__main__": + main() diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py new file mode 100644 index 00000000..633babd3 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py @@ -0,0 +1,145 @@ +import argparse +import gzip +import json +from concurrent.futures import ProcessPoolExecutor, as_completed +from pathlib import Path +from typing import NamedTuple + +from tqdm import tqdm + + +class Args: + worker: int + input_dir: str + output_dir: str + + +def setup_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("input_dir") + parser.add_argument("output_dir") + parser.add_argument("--worker", type=int, default=32) + return parser + + +class FilePathCreator: + def __init__(self) -> str: + self.counter = 0 + self.prev_mid_path = None + + def get_mid_path(self, path: str) -> str: + if "s3://commoncrawl/crawl-data" in path or "/fsx/guilherme/cc2023-50" in path: + return "ja_fineweb-2" + + original_file_prefix = ( + "/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/" + ) + path_sufix = path.replace(original_file_prefix, "") + path_parts = Path(path_sufix).parts + assert len(path_parts) >= 3, f"Input path is invalid format: {path}" + + path_root = "ja_" + path_parts[0] + if "sip_comprehensive_pdf" in path_root: + return path_root.replace("-", "/") + elif "warp_pdf" in path_root: + return path_root.replace("_e", "/e") + elif len(path_parts) == 3: + return path_root + else: + # len(path_parts)>3 + return "/".join([path_root] + list(path_parts[2:-1])) + + def get_file_path(self, path: str) -> Path: + mid_path = self.get_mid_path(path) + if mid_path != self.prev_mid_path: + self.counter = 0 + self.prev_mid_path = mid_path + new_file = f"{self.counter:04d}.jsonl.gz" + self.counter += 1 + return Path(mid_path) / new_file + + +def normalize_jsonl(data: dict, add_file_path: bool = False): + + # original meta info is stored in data["metadata"]["meta"] if exist + meta: dict = data.get("metadata", {}).get("meta", {}) + + # root keys except "text" and "metadata" and "id" + # "metadata" and "id" are automatically added on minhash deduplication + meta_other1 = {k: v for k, v in data.items() if k not in ["text", "metadata", "id"]} + + dedup_meta_keys = ["minhash_cluster_id", "minhash_cluster_size", "token_count"] + # keys in "metadata", but except {dedup_meta_keys} and "file_path" and "" + # Omitted keys are automatically added on minhash deduplication + meta_other2 = { + k: v + for k, v in data["metadata"].items() + if k not in (["file_path", "meta"] + dedup_meta_keys) + } + # all keys are assumed to be different + assert len(set(meta.keys()) & set(meta_other1.keys())) == 0 + assert len(set(meta.keys()) & set(meta_other2.keys())) == 0 + assert len(set(meta_other1.keys()) & set(meta_other2.keys())) == 0 + + # store meta info on deduplication in other keys + if add_file_path: + dedup_meta_keys.append("file_path") + dedup_meta = {k: v for k, v in data["metadata"].items() if k in dedup_meta_keys} + + new_meta = meta | meta_other1 | meta_other2 | {"dedup_meta": dedup_meta} + + return {"text": data["text"], "meta": new_meta} + + +def convert_file(input_file: Path, output_file: Path): + output_file.parent.mkdir(parents=True, exist_ok=True) + assert not output_file.exists(), f"{output_file} exists!" + # Add file_path keys if original data is from ja_fineweb + # This is because the meta info is comes from original data + add_file_path = False + if output_file.parts[3] == "ja_fineweb-2": + add_file_path = True + with gzip.open(input_file, "rt") as f_read, gzip.open(output_file, "wt") as f_write: + for line in f_read: + data = json.loads(line) + normalized = normalize_jsonl(data, add_file_path) + f_write.write(json.dumps(normalized, ensure_ascii=False) + "\n") + + +class IO_File(NamedTuple): + input_file: Path + output_file: Path + + +def setup_io(input_files: Path, output_dir: Path) -> list[IO_File]: + io_list = [] + file_path_creator = FilePathCreator() + for _file in tqdm(input_files, ncols=0): + with gzip.open(_file, "rt") as f: + line = f.readline() + data = json.loads(line) + original_file_path = data["metadata"]["file_path"] + output_file = file_path_creator.get_file_path(str(original_file_path)) + output_file = Path(output_dir) / output_file + io_list.append(IO_File(_file, output_file)) + return io_list + + +def main(input_dir: str, output_dir: str, worker: int): + input_files = list(sorted(Path(input_dir).rglob("*.jsonl.gz"))) + io_list = setup_io(input_files, Path(output_dir)) + + with ProcessPoolExecutor(max_workers=worker) as executor: + futures = [executor.submit(convert_file, *_io) for _io in io_list] + for future in tqdm(as_completed(futures), total=len(io_list), ncols=0): + try: + future.result() + except Exception as e: + print(f"Worker error: {e}") + raise + + +if __name__ == "__main__": + args = setup_parser().parse_args(namespace=Args) + assert not Path(args.output_dir).exists(), f"{args.output_dir} is exists!" + main(args.input_dir, args.output_dir, args.worker) From 02024d5cd55bbcd1ad87ef0b0858f2ae3fbdf203 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Mon, 31 Mar 2025 19:30:38 +0900 Subject: [PATCH 10/11] Add comments and readmes --- .../common/dedup/README-ja.md | 39 +++++++++ .../llm-jp-corpus-v4/common/dedup/README.md | 39 +++++++++ .../common/dedup/minhash/README-ja.md | 32 ++++++++ .../common/dedup/minhash/README.md | 33 ++++++++ .../minhash/{slum => slurm}/minhash_dedup.py | 0 .../check_original_path_consisitency.py | 12 ++- .../postprocess/reconstruct_stracture.py | 82 ++++++++++++------- .../common/dedup/preprocess/reshard.sh | 28 ++++--- 8 files changed, 221 insertions(+), 44 deletions(-) create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/README-ja.md create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/README.md create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md create mode 100644 corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md rename corpus/llm-jp-corpus-v4/common/dedup/minhash/{slum => slurm}/minhash_dedup.py (100%) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md b/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md new file mode 100644 index 00000000..00a025fe --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/README-ja.md @@ -0,0 +1,39 @@ +# 類似重複除去スクリプト + +このディレクトリには、コーパスからの類似重複を除去するためのスクリプトが含まれています。 +重複除去は、[datatrove](https://github.com/huggingface/datatrove) に実装された Minhash-LSH をベースとしています。 + +重複除去は以下の2段階で行います: +- 各コーパス内での重複除去 +- 各コーパスでの重複除去後、全体での重複除去 + +## スクリプト実行順 + +0. 必要なライブラリのインストール + - `installer/install.sh` + +1. ファイルサイズを均一化して処理時間のバランスを取るためのリシャーディング + - `preprocess/reshard_all.sh` + +2. 各コーパスごとの重複除去 + - `minhash` + - 詳細は `minhash/README.md` を参照 + +3. シンボリックリンクを用いて、前処理済みのファイルを1つのディレクトリに集約 + - `preprocess/make_links.sh` + +4. 全コーパスを対象としたグローバルな重複除去 + - `minhash` + - 詳細は `minhash/README.md` を参照 + +5. 重複除去されたファイルの再配置 + - 重複除去後のファイルはディレクトリ構造を保持せずに保存されます。 + - 以下の手順で再配置を行います: + 1. 重複除去中にテキストの順序がランダム化されていないことを確認 + - `postprocess/check_original_path_consisitency.py` + 2. 各コーパスの元のディレクトリ構造を復元 + - `postprocess/reconstruct_stracture.py` + +## 関連リポジトリ + +参考:[datatrove](https://github.com/huggingface/datatrove) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/README.md b/corpus/llm-jp-corpus-v4/common/dedup/README.md new file mode 100644 index 00000000..1ea8ca60 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/README.md @@ -0,0 +1,39 @@ +# Deduplication Scripts + +This directory contains scripts for near-duplicate removal from corpora. +The deduplication process is based on Minhash-LSH implemented in [datatrove](https://github.com/huggingface/datatrove). + +We perform deduplication in two main stages: +- Deduplication within each individual corpus +- Global deduplication across all corpora that have been locally deduplicated + +## Script Execution Order + +0. Install required libraries + - `installer/install.sh` + +1. Reshard files to equalize file sizes and balance processing time + - `preprocess/reshard_all.sh` + +2. Perform deduplication within each corpus + - `minhash` + - See `minhash/README.md` for details + +3. Collect all preprocessed files into a single directory using symbolic links + - `preprocess/make_links.sh` + +4. Perform global deduplication across all corpora + - `minhash` + - See `minhash/README.md` for details + +5. Reorganize the deduplicated files + - Deduplicated files are saved without preserving directory structure. + - Steps: + 1. Verify that texts are not randomized during deduplication + - `postprocess/check_original_path_consisitency.py` + 2. Reconstruct original directory structure for each corpus + - `postprocess/reconstruct_stracture.py` + +## Related Repository + +See also: [datatrove](https://github.com/huggingface/datatrove) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md new file mode 100644 index 00000000..4c082a39 --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md @@ -0,0 +1,32 @@ +# 重複除去コード + +このディレクトリには、[datatrove](https://github.com/huggingface/datatrove) を使用した重複除去スクリプトが含まれています。 +SLURMクラスタおよび複数のローカルサーバーの両方で実行できるように構成されています。 + +## SLURM 環境 + +このバージョンは SLURM クラスタ上での実行を想定しています。 + +### 使い方 + +```bash +python slurm/minhash_dedup.py {input_dir} {output_dir} +``` + +- `input_dir`: 入力データが格納されたディレクトリのパス。サブディレクトリも再帰的に走査されます。 +- `output_dir`: 重複除去後のデータを保存するディレクトリ。出力サブディレクトリは自動で作成されます。 +- ハッシュ処理に関するハイパーパラメータ(例:n-gram サイズ、バケット数、ハッシュ数)を設定可能です。詳細はスクリプト内のコメントを参照してください。 + +> `slurm/minhash_dedup.py` は [こちらの公式サンプル](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py) をもとに作成されています。 + +## ローカルマルチノード環境 + +このバージョンは、SSH 経由で複数のローカルマシンにまたがって分散実行することを想定しています。 + +### 構成 + +- `local_multi_node/submit_minhash_all_subcorpus.sh`: 全てのサブコーパスに対して重複除去を実行するシェルスクリプト。 +- `local_multi_node/submit_minhash.py`: ノードリストを読み込み、各ノードで重複除去処理を起動するランチャースクリプト。 +- `local_multi_node/minhash_dedup.py`: 各ノード上で実行されるワーカースクリプト。 + +> ※ このコードはプロトタイプ段階ですが、参考実装として共有します。 diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md new file mode 100644 index 00000000..ee7b317e --- /dev/null +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md @@ -0,0 +1,33 @@ +# Deduplication Code + +This directory provides deduplication scripts using [datatrove](https://github.com/huggingface/datatrove). +It supports execution in two environments: on a SLURM cluster and on multiple local servers. + +## SLURM + +This version is designed for use on a SLURM-based cluster. + +### Usage + +```bash +python slurm/minhash_dedup.py {input_dir} {output_dir} +``` + +- `input_dir`: Path to the directory containing the input data. The script recursively scans subdirectories for files. +- `output_dir`: Path where deduplicated files will be written. Subdirectories will be automatically created under this path. +- You can also configure hyperparameters related to hashing (e.g., n-gram size, number of buckets, number of hashes per bucket). + Please refer to the comments in the code for details. + +> The script `slurm/minhash_dedup.py` was adapted from [this official example](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py). + +## Local Multi-Node + +This version supports deduplication across multiple local machines using distributed processing via SSH. + +### Structure + +- `local_multi_node/submit_minhash_all_subcorpus.sh`: Main launcher shell script to deduplicate all sub-corpus. +- `local_multi_node/submit_minhash.py`: Main launcher that reads the node list and runs deduplication on each machine. +- `local_multi_node/minhash_dedup.py`: Worker script executed on each node. + +> Note: This code is a prototype, but it is shared here for reference. diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/slum/minhash_dedup.py b/corpus/llm-jp-corpus-v4/common/dedup/minhash/slurm/minhash_dedup.py similarity index 100% rename from corpus/llm-jp-corpus-v4/common/dedup/minhash/slum/minhash_dedup.py rename to corpus/llm-jp-corpus-v4/common/dedup/minhash/slurm/minhash_dedup.py diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py index c4afadd0..01ae3c97 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/check_original_path_consisitency.py @@ -13,7 +13,17 @@ patterns = ["s3://commoncrawl/crawl-data", "/fsx/guilherme/cc2023-50"] -def convert_patterns(path): +def convert_patterns(path: str) -> str: + """ + Normalize the file path based on known prefix patterns. + + Examples: + >>> convert_patterns("s3://commoncrawl/crawl-data/CC-MAIN-2023/file1") + "s3://commoncrawl/crawl-data" + + >>> convert_patterns("/data/local/custom_corpus/file3") + "/data/local/custom_corpus/file3" + """ for _pat in patterns: if _pat in path: return _pat diff --git a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py index 633babd3..f0113d3b 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py +++ b/corpus/llm-jp-corpus-v4/common/dedup/postprocess/reconstruct_stracture.py @@ -1,7 +1,12 @@ +# Description: +# This script reconstructs the original directory structure and metadata for deduplicated JSONL.gz files. +# It uses the original file paths (stored in metadata) to group and rename the deduplicated files into subfolders +# using 0-based sequential filenames (e.g., 0000.jsonl.gz, 0001.jsonl.gz, ...). +# The metadata is also normalized to preserve both original and deduplication-related information. + import argparse import gzip import json -from concurrent.futures import ProcessPoolExecutor, as_completed from pathlib import Path from typing import NamedTuple @@ -9,12 +14,14 @@ class Args: + """Argument container for command-line parsing.""" worker: int input_dir: str output_dir: str def setup_parser(): + """Set up and return the command-line argument parser.""" parser = argparse.ArgumentParser() parser.add_argument("input_dir") parser.add_argument("output_dir") @@ -23,11 +30,30 @@ def setup_parser(): class FilePathCreator: + """ + Helper class to create output file paths that mirror original corpus structure. + + It groups files using metadata information and creates sequential file names. + """ def __init__(self) -> str: self.counter = 0 self.prev_mid_path = None def get_mid_path(self, path: str) -> str: + """ + Convert original input file path into a logical middle path for grouping. + Some known datasets are mapped to a fixed identifier like "ja_fineweb-2". + + Examples: + >>> get_mid_path("s3://commoncrawl/crawl-data/CC-MAIN-2023/file1") + "ja_fineweb-2" + + >>> get_mid_path("/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/warp_pdf_e0/metadata/sample.jsonl.gz") + "ja_warp_pdf/e0" + + >>> get_mid_path("/model/experiments/0118_dedup_corpusv4_ja/data/subcorpus/sip_comprehensive_pdf/section/sample.jsonl.gz") + "ja_sip/comprehensive/pdf" + """ if "s3://commoncrawl/crawl-data" in path or "/fsx/guilherme/cc2023-50" in path: return "ja_fineweb-2" @@ -50,6 +76,10 @@ def get_mid_path(self, path: str) -> str: return "/".join([path_root] + list(path_parts[2:-1])) def get_file_path(self, path: str) -> Path: + """ + Generate a new file path using the normalized middle path and a counter-based filename. + The counter resets when the middle path changes. + """ mid_path = self.get_mid_path(path) if mid_path != self.prev_mid_path: self.counter = 0 @@ -60,28 +90,28 @@ def get_file_path(self, path: str) -> Path: def normalize_jsonl(data: dict, add_file_path: bool = False): + """ + Normalize the metadata format of a JSONL entry. - # original meta info is stored in data["metadata"]["meta"] if exist + Combines metadata from various levels and relocates deduplication-related fields under `meta["dedup_meta"]`. + """ meta: dict = data.get("metadata", {}).get("meta", {}) - - # root keys except "text" and "metadata" and "id" - # "metadata" and "id" are automatically added on minhash deduplication meta_other1 = {k: v for k, v in data.items() if k not in ["text", "metadata", "id"]} dedup_meta_keys = ["minhash_cluster_id", "minhash_cluster_size", "token_count"] - # keys in "metadata", but except {dedup_meta_keys} and "file_path" and "" - # Omitted keys are automatically added on minhash deduplication + # Extract metadata keys excluding the deduplication-related ones meta_other2 = { k: v for k, v in data["metadata"].items() if k not in (["file_path", "meta"] + dedup_meta_keys) } - # all keys are assumed to be different + + # Ensure no overlapping keys between different metadata sections assert len(set(meta.keys()) & set(meta_other1.keys())) == 0 assert len(set(meta.keys()) & set(meta_other2.keys())) == 0 assert len(set(meta_other1.keys()) & set(meta_other2.keys())) == 0 - # store meta info on deduplication in other keys + # Store deduplication metadata if required if add_file_path: dedup_meta_keys.append("file_path") dedup_meta = {k: v for k, v in data["metadata"].items() if k in dedup_meta_keys} @@ -92,13 +122,18 @@ def normalize_jsonl(data: dict, add_file_path: bool = False): def convert_file(input_file: Path, output_file: Path): + """ + Read a gzipped JSONL file, normalize each line's metadata, and write to a new gzipped file. + If the file is from ja_fineweb-2, include the original file path in dedup metadata. + """ output_file.parent.mkdir(parents=True, exist_ok=True) assert not output_file.exists(), f"{output_file} exists!" - # Add file_path keys if original data is from ja_fineweb - # This is because the meta info is comes from original data + + # Determine if the original file is from ja_fineweb-2 to include additional metadata add_file_path = False if output_file.parts[3] == "ja_fineweb-2": add_file_path = True + with gzip.open(input_file, "rt") as f_read, gzip.open(output_file, "wt") as f_write: for line in f_read: data = json.loads(line) @@ -107,11 +142,16 @@ def convert_file(input_file: Path, output_file: Path): class IO_File(NamedTuple): + """Simple tuple that pairs an input file with its output file path.""" input_file: Path output_file: Path def setup_io(input_files: Path, output_dir: Path) -> list[IO_File]: + """ + Prepare a list of IO_File pairs by inspecting metadata from each input file. + Determines the correct output location and file name based on metadata. + """ io_list = [] file_path_creator = FilePathCreator() for _file in tqdm(input_files, ncols=0): @@ -123,23 +163,3 @@ def setup_io(input_files: Path, output_dir: Path) -> list[IO_File]: output_file = Path(output_dir) / output_file io_list.append(IO_File(_file, output_file)) return io_list - - -def main(input_dir: str, output_dir: str, worker: int): - input_files = list(sorted(Path(input_dir).rglob("*.jsonl.gz"))) - io_list = setup_io(input_files, Path(output_dir)) - - with ProcessPoolExecutor(max_workers=worker) as executor: - futures = [executor.submit(convert_file, *_io) for _io in io_list] - for future in tqdm(as_completed(futures), total=len(io_list), ncols=0): - try: - future.result() - except Exception as e: - print(f"Worker error: {e}") - raise - - -if __name__ == "__main__": - args = setup_parser().parse_args(namespace=Args) - assert not Path(args.output_dir).exists(), f"{args.output_dir} is exists!" - main(args.input_dir, args.output_dir, args.worker) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh index f0e97001..a9a32e42 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh @@ -6,45 +6,49 @@ #SBATCH --output=slurm_logs/%j-%x.out #SBATCH --error=slurm_logs/%j-%x.err +# This script splits files from an input directory into smaller compressed chunks, +# preserving directory structure. Supports optional file pattern filtering and .gz input. +# +# Usage: +# sbatch this_script.sh [unit_size] [pattern] +# Example: +# sbatch this_script.sh ./data ./sharded 500M '\.jsonl$' + set -eux input_dir=$1 output_dir=$2 -unit_size=${3:-"1G"} -pattern=${4:-""} +unit_size=${3:-"1G"} # Target size per split chunk (default: 1G) +pattern=${4:-""} # Optional pattern to filter files input_dir=$(realpath "$input_dir") mkdir -p "$output_dir" -# ファイル一覧取得(フォルダ構造を考慮) +# Get list of all files (respecting directory structure) all_files=$(find -L "$input_dir" -type f) -# パターンが指定された場合はフィルタリング +# Filter files if a pattern is specified if [[ -n "$pattern" ]]; then all_files=$(echo "$all_files" | grep -E "$pattern" || true) fi -# ファイルが見つからない場合の処理 +# Exit if no files match the pattern if [[ -z "$all_files" ]]; then echo "No matching files found. Exiting." exit 1 fi -# 各フォルダごとに処理するため、ディレクトリ単位でグループ化 +# Group files by their parent directory (relative to input_dir) declare -A dir_files_map while IFS= read -r file; do - # ファイルの属するディレクトリ(input_dir からの相対パス)を取得 relative_dir=$(dirname "${file#$input_dir/}") output_subdir="$output_dir/$relative_dir" - # ディレクトリ構造を再現 mkdir -p "$output_subdir" - - # ディレクトリごとにファイルリストを格納 dir_files_map["$output_subdir"]+="$file " done <<< "$all_files" -# 各ディレクトリごとに分割処理を実行 +# For each group of files, perform splitting for subdir in "${!dir_files_map[@]}"; do file_list=${dir_files_map["$subdir"]} @@ -57,7 +61,7 @@ for subdir in "${!dir_files_map[@]}"; do --verbose ) - # ファイルを適切に処理(.gz ファイルは解凍) + # Concatenate and split each file; decompress if .gz for file in $file_list; do if [[ "$file" == *.gz ]]; then gunzip -c "$file" From 89b1beb3ec3033b50583a4be49b94d9d14cbc6e8 Mon Sep 17 00:00:00 2001 From: Yuma Tsuta Date: Mon, 31 Mar 2025 19:32:38 +0900 Subject: [PATCH 11/11] format scripts --- .../local_multi_node/submit_minhash_all_subcorpus.sh | 3 +-- .../common/dedup/preprocess/make_links.sh | 2 +- .../common/dedup/preprocess/reshard.sh | 10 +++++----- .../common/dedup/preprocess/reshard_all.sh | 1 - 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh index d880859e..6ae79447 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash_all_subcorpus.sh @@ -34,7 +34,6 @@ target_dirs=( wiki ) - # reshard python_submit_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/submit_minhash.py python_minhash_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/minhash/local_multi_node/minhash_dedup.py @@ -51,7 +50,7 @@ for _dirname in "${target_dirs[@]}"; do --input_dir "${_target_dir}/reshard_1B" \ --output_dir "$_target_dir" \ --stage "${stage}" \ - --log_dir "${log_root}/${stage}/${_dirname}"\ + --log_dir "${log_root}/${stage}/${_dirname}" \ --venv_path $env_path \ --python_script $python_minhash_script \ --node_list "${node_list[@]}" \ diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh index 65968bf0..bb9cfa8b 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/make_links.sh @@ -9,4 +9,4 @@ for dir in "${data_dir}"/subcorpus/*; do mkdir -p "$data_dir/all/deduped_subcorpus/$dir_name" ln -s "$file" "$data_dir/all/deduped_subcorpus/$dir_name/$file_name" done -done \ No newline at end of file +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh index a9a32e42..1f3d46fa 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh @@ -8,7 +8,7 @@ # This script splits files from an input directory into smaller compressed chunks, # preserving directory structure. Supports optional file pattern filtering and .gz input. -# +# # Usage: # sbatch this_script.sh [unit_size] [pattern] # Example: @@ -18,8 +18,8 @@ set -eux input_dir=$1 output_dir=$2 -unit_size=${3:-"1G"} # Target size per split chunk (default: 1G) -pattern=${4:-""} # Optional pattern to filter files +unit_size=${3:-"1G"} # Target size per split chunk (default: 1G) +pattern=${4:-""} # Optional pattern to filter files input_dir=$(realpath "$input_dir") mkdir -p "$output_dir" @@ -46,7 +46,7 @@ while IFS= read -r file; do mkdir -p "$output_subdir" dir_files_map["$output_subdir"]+="$file " -done <<< "$all_files" +done <<<"$all_files" # For each group of files, perform splitting for subdir in "${!dir_files_map[@]}"; do @@ -69,4 +69,4 @@ for subdir in "${!dir_files_map[@]}"; do cat "$file" fi done | split "${split_args[@]}" - "$subdir/" -done \ No newline at end of file +done diff --git a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh index 9549cf88..559b5d36 100644 --- a/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh +++ b/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard_all.sh @@ -25,7 +25,6 @@ target_dirs=( wiki ) - # reshard reshard_script=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup/preprocess/reshard.sh unit_size=1G