Skip to content

Commit 8123cf2

Browse files
ytivyYuma Tsuta
andauthored
Add scripts to run minhash deduplication for Japanese corpus (#71)
Co-authored-by: Yuma Tsuta <[email protected]>
1 parent d784412 commit 8123cf2

File tree

16 files changed

+1131
-0
lines changed

16 files changed

+1131
-0
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# 類似重複除去スクリプト
2+
3+
このディレクトリには、コーパスからの類似重複を除去するためのスクリプトが含まれています。
4+
重複除去は、[datatrove](https://github.com/huggingface/datatrove) に実装された Minhash-LSH をベースとしています。
5+
6+
重複除去は以下の2段階で行います:
7+
- 各コーパス内での重複除去
8+
- 各コーパスでの重複除去後、全体での重複除去
9+
10+
## スクリプト実行順
11+
12+
0. 必要なライブラリのインストール
13+
- `installer/install.sh`
14+
15+
1. ファイルサイズを均一化して処理時間のバランスを取るためのリシャーディング
16+
- `preprocess/reshard_all.sh`
17+
18+
2. 各コーパスごとの重複除去
19+
- `minhash`
20+
- 詳細は `minhash/README.md` を参照
21+
22+
3. シンボリックリンクを用いて、前処理済みのファイルを1つのディレクトリに集約
23+
- `preprocess/make_links.sh`
24+
25+
4. 全コーパスを対象としたグローバルな重複除去
26+
- `minhash`
27+
- 詳細は `minhash/README.md` を参照
28+
29+
5. 重複除去されたファイルの再配置
30+
- 重複除去後のファイルはディレクトリ構造を保持せずに保存されます。
31+
- 以下の手順で再配置を行います:
32+
1. 重複除去中にテキストの順序がランダム化されていないことを確認
33+
- `postprocess/check_original_path_consisitency.py`
34+
2. 各コーパスの元のディレクトリ構造を復元
35+
- `postprocess/reconstruct_stracture.py`
36+
37+
## 関連リポジトリ
38+
39+
参考:[datatrove](https://github.com/huggingface/datatrove)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Deduplication Scripts
2+
3+
This directory contains scripts for near-duplicate removal from corpora.
4+
The deduplication process is based on Minhash-LSH implemented in [datatrove](https://github.com/huggingface/datatrove).
5+
6+
We perform deduplication in two main stages:
7+
- Deduplication within each individual corpus
8+
- Global deduplication across all corpora that have been locally deduplicated
9+
10+
## Script Execution Order
11+
12+
0. Install required libraries
13+
- `installer/install.sh`
14+
15+
1. Reshard files to equalize file sizes and balance processing time
16+
- `preprocess/reshard_all.sh`
17+
18+
2. Perform deduplication within each corpus
19+
- `minhash`
20+
- See `minhash/README.md` for details
21+
22+
3. Collect all preprocessed files into a single directory using symbolic links
23+
- `preprocess/make_links.sh`
24+
25+
4. Perform global deduplication across all corpora
26+
- `minhash`
27+
- See `minhash/README.md` for details
28+
29+
5. Reorganize the deduplicated files
30+
- Deduplicated files are saved without preserving directory structure.
31+
- Steps:
32+
1. Verify that texts are not randomized during deduplication
33+
- `postprocess/check_original_path_consisitency.py`
34+
2. Reconstruct original directory structure for each corpus
35+
- `postprocess/reconstruct_stracture.py`
36+
37+
## Related Repository
38+
39+
See also: [datatrove](https://github.com/huggingface/datatrove)
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
diff --git a/src/datatrove/pipeline/dedup/minhash.py b/src/datatrove/pipeline/dedup/minhash.py
2+
index f644389..3f1b50b 100644
3+
--- a/src/datatrove/pipeline/dedup/minhash.py
4+
+++ b/src/datatrove/pipeline/dedup/minhash.py
5+
@@ -256,6 +254,8 @@ class MinhashDedupSignature(PipelineStep):
6+
doc_idx,
7+
)
8+
)
9+
+ else:
10+
+ logger.warning(f"No singles {doc.text=} on {rank=}")
11+
for file in buckets:
12+
file.close()
13+
14+
diff --git a/src/datatrove/utils/text.py b/src/datatrove/utils/text.py
15+
index 7ab7d3d..d0c9cb6 100644
16+
--- a/src/datatrove/utils/text.py
17+
+++ b/src/datatrove/utils/text.py
18+
@@ -259,6 +257,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str:
19+
20+
# from https://tedboy.github.io/nlps/_modules/nltk/util.html#ngrams
21+
def ngrams(sequence: Iterable, n: int):
22+
+ if isinstance(sequence, list) and len(sequence) < n:
23+
+ sequence += [""] * (n - len(sequence))
24+
iterables = tee(sequence, n)
25+
26+
for i, sub_iterable in enumerate(iterables): # For each window,
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#!/bin/bash
2+
3+
work_dir=/model/experiments/0118_dedup_corpusv4_ja
4+
env_dir=${work_dir}/environment
5+
venv_dir=${env_dir}/.venv
6+
src_dir=${env_dir}/src
7+
script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup
8+
9+
export UV_PROJECT_ENVIRONMENT=$venv_dir
10+
11+
# create environment
12+
cd $work_dir || exit
13+
mkdir -p $env_dir
14+
cd $env_dir || exit
15+
16+
python3 -m venv $venv_dir
17+
source $venv_dir/bin/activate
18+
pip install --upgrade --no-cache-dir pip uv
19+
uv init
20+
21+
# install requirement
22+
uv add --no-cache -r ${script_root}/installer/requirements.txt
23+
24+
# install customized datatrove
25+
mkdir -p $src_dir
26+
cd $src_dir || exit
27+
git clone https://github.com/huggingface/datatrove.git -b v0.4.0
28+
cd datatrove || exit
29+
patch -p1 <${script_root}/installer/datatrove_diff.patch
30+
uv pip install --no-cache-dir ".[io,processing,cli]"
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
datatrove[io,processing,cli]==0.4.0 # will be re-installed, but install to resolve dependency
2+
spacy[ja] # required when running minhash (datatrove)
3+
sudachipy==0.5.4 # required when running minhash (datatrove)
4+
paramiko # required to run processes on local multi node
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# 重複除去コード
2+
3+
このディレクトリには、[datatrove](https://github.com/huggingface/datatrove) を使用した重複除去スクリプトが含まれています。
4+
SLURMクラスタおよび複数のローカルサーバーの両方で実行できるように構成されています。
5+
6+
## SLURM 環境
7+
8+
このバージョンは SLURM クラスタ上での実行を想定しています。
9+
10+
### 使い方
11+
12+
```bash
13+
python slurm/minhash_dedup.py {input_dir} {output_dir}
14+
```
15+
16+
- `input_dir`: 入力データが格納されたディレクトリのパス。サブディレクトリも再帰的に走査されます。
17+
- `output_dir`: 重複除去後のデータを保存するディレクトリ。出力サブディレクトリは自動で作成されます。
18+
- ハッシュ処理に関するハイパーパラメータ(例:n-gram サイズ、バケット数、ハッシュ数)を設定可能です。詳細はスクリプト内のコメントを参照してください。
19+
20+
> `slurm/minhash_dedup.py`[こちらの公式サンプル](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py) をもとに作成されています。
21+
22+
## ローカルマルチノード環境
23+
24+
このバージョンは、SSH 経由で複数のローカルマシンにまたがって分散実行することを想定しています。
25+
26+
### 構成
27+
28+
- `local_multi_node/submit_minhash_all_subcorpus.sh`: 全てのサブコーパスに対して重複除去を実行するシェルスクリプト。
29+
- `local_multi_node/submit_minhash.py`: ノードリストを読み込み、各ノードで重複除去処理を起動するランチャースクリプト。
30+
- `local_multi_node/minhash_dedup.py`: 各ノード上で実行されるワーカースクリプト。
31+
32+
> ※ このコードはプロトタイプ段階ですが、参考実装として共有します。
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Deduplication Code
2+
3+
This directory provides deduplication scripts using [datatrove](https://github.com/huggingface/datatrove).
4+
It supports execution in two environments: on a SLURM cluster and on multiple local servers.
5+
6+
## SLURM
7+
8+
This version is designed for use on a SLURM-based cluster.
9+
10+
### Usage
11+
12+
```bash
13+
python slurm/minhash_dedup.py {input_dir} {output_dir}
14+
```
15+
16+
- `input_dir`: Path to the directory containing the input data. The script recursively scans subdirectories for files.
17+
- `output_dir`: Path where deduplicated files will be written. Subdirectories will be automatically created under this path.
18+
- You can also configure hyperparameters related to hashing (e.g., n-gram size, number of buckets, number of hashes per bucket).
19+
Please refer to the comments in the code for details.
20+
21+
> The script `slurm/minhash_dedup.py` was adapted from [this official example](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py).
22+
23+
## Local Multi-Node
24+
25+
This version supports deduplication across multiple local machines using distributed processing via SSH.
26+
27+
### Structure
28+
29+
- `local_multi_node/submit_minhash_all_subcorpus.sh`: Main launcher shell script to deduplicate all sub-corpus.
30+
- `local_multi_node/submit_minhash.py`: Main launcher that reads the node list and runs deduplication on each machine.
31+
- `local_multi_node/minhash_dedup.py`: Worker script executed on each node.
32+
33+
> Note: This code is a prototype, but it is shared here for reference.
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
from argparse import ArgumentParser
2+
from dataclasses import dataclass
3+
from pathlib import Path
4+
5+
from datatrove.executor.local import LocalPipelineExecutor
6+
from datatrove.pipeline.dedup import MinhashDedupSignature
7+
from datatrove.pipeline.dedup.minhash import (MinhashConfig,
8+
MinhashDedupBuckets,
9+
MinhashDedupCluster,
10+
MinhashDedupFilter)
11+
from datatrove.pipeline.readers import JsonlReader
12+
from datatrove.pipeline.tokens import TokensCounter
13+
from datatrove.pipeline.writers.jsonl import JsonlWriter
14+
from datatrove.utils.hashing import HashConfig
15+
from datatrove.utils.typeshelper import Languages
16+
17+
18+
@dataclass
19+
class Args:
20+
input: str
21+
output: str
22+
ngram: int
23+
buckets: int
24+
hashes_per_bucket: int
25+
local_tasks: int
26+
local_rank_offset: int
27+
max_worker: int
28+
stage:int
29+
30+
31+
argparser = ArgumentParser()
32+
argparser.add_argument("input", type=str)
33+
argparser.add_argument("output", type=str)
34+
argparser.add_argument("--ngram", default=5, type=int)
35+
argparser.add_argument("-r", "--buckets", default=20, type=int)
36+
argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int)
37+
argparser.add_argument("-task", "--local_tasks", type=int, default=-1)
38+
argparser.add_argument("-rank", "--local_rank_offset", type=int, default=0)
39+
argparser.add_argument("-worker", "--max_worker", type=int, default=16)
40+
argparser.add_argument("-stage", "--stage", type=int, choices=[1,2,3,4],default=4)
41+
args = argparser.parse_args(namespace=Args)
42+
43+
44+
MINHASH_DIRNAME = (
45+
f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes"
46+
)
47+
MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME
48+
RESULT_DIR = f"{MINHASH_DIR}/results"
49+
LOG_DIR = f"{MINHASH_DIR}/logs"
50+
SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs"
51+
52+
all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()]
53+
TOTAL_TASKS = len(all_files)
54+
55+
# this is the original data that we want to deduplicate
56+
INPUT_READER = JsonlReader(args.input)
57+
# you can also change ngrams or the number of buckets and their size here
58+
minhash_config = MinhashConfig(
59+
hash_config=HashConfig(precision=64),
60+
n_grams=args.ngram,
61+
num_buckets=args.buckets,
62+
hashes_per_bucket=args.hashes_per_bucket,
63+
) # better precision -> fewer false positives (collisions)
64+
65+
# stage 1 computes minhash signatures for each task (each task gets a set of files)
66+
stage1 = LocalPipelineExecutor(
67+
pipeline=[
68+
INPUT_READER,
69+
MinhashDedupSignature(
70+
output_folder=f"{RESULT_DIR}/signatures",
71+
config=minhash_config,
72+
language=Languages.japanese,
73+
skip_existing_sigs=True,
74+
),
75+
],
76+
tasks=TOTAL_TASKS,
77+
workers=args.max_worker,
78+
logging_dir=f"{LOG_DIR}/signatures",
79+
local_tasks=args.local_tasks,
80+
local_rank_offset=args.local_rank_offset,
81+
randomize_start_duration=10,
82+
)
83+
84+
# stage 2 finds matches between signatures in each bucket
85+
stage2 = LocalPipelineExecutor(
86+
pipeline=[
87+
MinhashDedupBuckets(
88+
input_folder=f"{RESULT_DIR}/signatures",
89+
output_folder=f"{RESULT_DIR}/buckets",
90+
config=minhash_config,
91+
),
92+
],
93+
tasks=minhash_config.num_buckets,
94+
workers=args.max_worker,
95+
logging_dir=f"{LOG_DIR}/buckets",
96+
depends=stage1,
97+
)
98+
99+
# stage 3 creates clusters of duplicates using the results from all buckets
100+
stage3 = LocalPipelineExecutor(
101+
pipeline=[
102+
MinhashDedupCluster(
103+
input_folder=f"{RESULT_DIR}/buckets",
104+
output_folder=f"{RESULT_DIR}/remove_ids",
105+
config=minhash_config,
106+
save_cluster_id=True,
107+
save_cluster_size=True,
108+
),
109+
],
110+
tasks=args.max_worker,
111+
logging_dir=f"{LOG_DIR}/clusters",
112+
depends=stage2,
113+
)
114+
115+
# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster
116+
# the data must match exactly stage 1, so number of tasks and the input source must be the same
117+
stage4 = LocalPipelineExecutor(
118+
pipeline=[
119+
INPUT_READER,
120+
# TokensCounter(), # nice way to see how many tokens we had before and after deduplication
121+
MinhashDedupFilter(
122+
input_folder=f"{RESULT_DIR}/remove_ids",
123+
exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"),
124+
load_cluster_ids=True,
125+
load_cluster_sizes=True,
126+
),
127+
JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"),
128+
],
129+
tasks=TOTAL_TASKS,
130+
logging_dir=f"{LOG_DIR}/filter",
131+
depends=stage3,
132+
workers=args.max_worker,
133+
local_tasks=args.local_tasks,
134+
local_rank_offset=args.local_rank_offset,
135+
)
136+
137+
if __name__ == "__main__":
138+
exec(f"stage{args.stage}.run()")

0 commit comments

Comments
 (0)