Skip to content

Add scripts to run minhash deduplication for Japanese corpus #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
39 changes: 39 additions & 0 deletions corpus/llm-jp-corpus-v4/common/dedup/README-ja.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# 類似重複除去スクリプト

このディレクトリには、コーパスからの類似重複を除去するためのスクリプトが含まれています。
重複除去は、[datatrove](https://github.com/huggingface/datatrove) に実装された Minhash-LSH をベースとしています。

重複除去は以下の2段階で行います:
- 各コーパス内での重複除去
- 各コーパスでの重複除去後、全体での重複除去

## スクリプト実行順

0. 必要なライブラリのインストール
- `installer/install.sh`

1. ファイルサイズを均一化して処理時間のバランスを取るためのリシャーディング
- `preprocess/reshard_all.sh`

2. 各コーパスごとの重複除去
- `minhash`
- 詳細は `minhash/README.md` を参照

3. シンボリックリンクを用いて、前処理済みのファイルを1つのディレクトリに集約
- `preprocess/make_links.sh`

4. 全コーパスを対象としたグローバルな重複除去
- `minhash`
- 詳細は `minhash/README.md` を参照

5. 重複除去されたファイルの再配置
- 重複除去後のファイルはディレクトリ構造を保持せずに保存されます。
- 以下の手順で再配置を行います:
1. 重複除去中にテキストの順序がランダム化されていないことを確認
- `postprocess/check_original_path_consisitency.py`
2. 各コーパスの元のディレクトリ構造を復元
- `postprocess/reconstruct_stracture.py`

## 関連リポジトリ

参考:[datatrove](https://github.com/huggingface/datatrove)
39 changes: 39 additions & 0 deletions corpus/llm-jp-corpus-v4/common/dedup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Deduplication Scripts

This directory contains scripts for near-duplicate removal from corpora.
The deduplication process is based on Minhash-LSH implemented in [datatrove](https://github.com/huggingface/datatrove).

We perform deduplication in two main stages:
- Deduplication within each individual corpus
- Global deduplication across all corpora that have been locally deduplicated

## Script Execution Order

0. Install required libraries
- `installer/install.sh`

1. Reshard files to equalize file sizes and balance processing time
- `preprocess/reshard_all.sh`

2. Perform deduplication within each corpus
- `minhash`
- See `minhash/README.md` for details

3. Collect all preprocessed files into a single directory using symbolic links
- `preprocess/make_links.sh`

4. Perform global deduplication across all corpora
- `minhash`
- See `minhash/README.md` for details

5. Reorganize the deduplicated files
- Deduplicated files are saved without preserving directory structure.
- Steps:
1. Verify that texts are not randomized during deduplication
- `postprocess/check_original_path_consisitency.py`
2. Reconstruct original directory structure for each corpus
- `postprocess/reconstruct_stracture.py`

## Related Repository

See also: [datatrove](https://github.com/huggingface/datatrove)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
diff --git a/src/datatrove/pipeline/dedup/minhash.py b/src/datatrove/pipeline/dedup/minhash.py
index f644389..3f1b50b 100644
--- a/src/datatrove/pipeline/dedup/minhash.py
+++ b/src/datatrove/pipeline/dedup/minhash.py
@@ -256,6 +254,8 @@ class MinhashDedupSignature(PipelineStep):
doc_idx,
)
)
+ else:
+ logger.warning(f"No singles {doc.text=} on {rank=}")
for file in buckets:
file.close()

diff --git a/src/datatrove/utils/text.py b/src/datatrove/utils/text.py
index 7ab7d3d..d0c9cb6 100644
--- a/src/datatrove/utils/text.py
+++ b/src/datatrove/utils/text.py
@@ -259,6 +257,8 @@ def simplify_text(text: str, config=DEF_TEXT_NORM_CONFIG) -> str:

# from https://tedboy.github.io/nlps/_modules/nltk/util.html#ngrams
def ngrams(sequence: Iterable, n: int):
+ if isinstance(sequence, list) and len(sequence) < n:
+ sequence += [""] * (n - len(sequence))
iterables = tee(sequence, n)

for i, sub_iterable in enumerate(iterables): # For each window,
30 changes: 30 additions & 0 deletions corpus/llm-jp-corpus-v4/common/dedup/installer/install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

work_dir=/model/experiments/0118_dedup_corpusv4_ja
env_dir=${work_dir}/environment
venv_dir=${env_dir}/.venv
src_dir=${env_dir}/src
script_root=${work_dir}/scripts/corpus/llm-jp-corpus-v4/common/dedup

export UV_PROJECT_ENVIRONMENT=$venv_dir

# create environment
cd $work_dir || exit
mkdir -p $env_dir
cd $env_dir || exit

python3 -m venv $venv_dir
source $venv_dir/bin/activate
pip install --upgrade --no-cache-dir pip uv
uv init

# install requirement
uv add --no-cache -r ${script_root}/installer/requirements.txt

# install customized datatrove
mkdir -p $src_dir
cd $src_dir || exit
git clone https://github.com/huggingface/datatrove.git -b v0.4.0
cd datatrove || exit
patch -p1 <${script_root}/installer/datatrove_diff.patch
uv pip install --no-cache-dir ".[io,processing,cli]"
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
datatrove[io,processing,cli]==0.4.0 # will be re-installed, but install to resolve dependency
spacy[ja] # required when running minhash (datatrove)
sudachipy==0.5.4 # required when running minhash (datatrove)
paramiko # required to run processes on local multi node
32 changes: 32 additions & 0 deletions corpus/llm-jp-corpus-v4/common/dedup/minhash/README-ja.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# 重複除去コード

このディレクトリには、[datatrove](https://github.com/huggingface/datatrove) を使用した重複除去スクリプトが含まれています。
SLURMクラスタおよび複数のローカルサーバーの両方で実行できるように構成されています。

## SLURM 環境

このバージョンは SLURM クラスタ上での実行を想定しています。

### 使い方

```bash
python slurm/minhash_dedup.py {input_dir} {output_dir}
```

- `input_dir`: 入力データが格納されたディレクトリのパス。サブディレクトリも再帰的に走査されます。
- `output_dir`: 重複除去後のデータを保存するディレクトリ。出力サブディレクトリは自動で作成されます。
- ハッシュ処理に関するハイパーパラメータ(例:n-gram サイズ、バケット数、ハッシュ数)を設定可能です。詳細はスクリプト内のコメントを参照してください。

> `slurm/minhash_dedup.py` は [こちらの公式サンプル](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py) をもとに作成されています。

## ローカルマルチノード環境

このバージョンは、SSH 経由で複数のローカルマシンにまたがって分散実行することを想定しています。

### 構成

- `local_multi_node/submit_minhash_all_subcorpus.sh`: 全てのサブコーパスに対して重複除去を実行するシェルスクリプト。
- `local_multi_node/submit_minhash.py`: ノードリストを読み込み、各ノードで重複除去処理を起動するランチャースクリプト。
- `local_multi_node/minhash_dedup.py`: 各ノード上で実行されるワーカースクリプト。

> ※ このコードはプロトタイプ段階ですが、参考実装として共有します。
33 changes: 33 additions & 0 deletions corpus/llm-jp-corpus-v4/common/dedup/minhash/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Deduplication Code

This directory provides deduplication scripts using [datatrove](https://github.com/huggingface/datatrove).
It supports execution in two environments: on a SLURM cluster and on multiple local servers.

## SLURM

This version is designed for use on a SLURM-based cluster.

### Usage

```bash
python slurm/minhash_dedup.py {input_dir} {output_dir}
```

- `input_dir`: Path to the directory containing the input data. The script recursively scans subdirectories for files.
- `output_dir`: Path where deduplicated files will be written. Subdirectories will be automatically created under this path.
- You can also configure hyperparameters related to hashing (e.g., n-gram size, number of buckets, number of hashes per bucket).
Please refer to the comments in the code for details.

> The script `slurm/minhash_dedup.py` was adapted from [this official example](https://github.com/huggingface/datatrove/blob/main/examples/minhash_deduplication.py).

## Local Multi-Node

This version supports deduplication across multiple local machines using distributed processing via SSH.

### Structure

- `local_multi_node/submit_minhash_all_subcorpus.sh`: Main launcher shell script to deduplicate all sub-corpus.
- `local_multi_node/submit_minhash.py`: Main launcher that reads the node list and runs deduplication on each machine.
- `local_multi_node/minhash_dedup.py`: Worker script executed on each node.

> Note: This code is a prototype, but it is shared here for reference.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from argparse import ArgumentParser
from dataclasses import dataclass
from pathlib import Path

from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import (MinhashConfig,
MinhashDedupBuckets,
MinhashDedupCluster,
MinhashDedupFilter)
from datatrove.pipeline.readers import JsonlReader
from datatrove.pipeline.tokens import TokensCounter
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.hashing import HashConfig
from datatrove.utils.typeshelper import Languages


@dataclass
class Args:
input: str
output: str
ngram: int
buckets: int
hashes_per_bucket: int
local_tasks: int
local_rank_offset: int
max_worker: int
stage:int


argparser = ArgumentParser()
argparser.add_argument("input", type=str)
argparser.add_argument("output", type=str)
argparser.add_argument("--ngram", default=5, type=int)
argparser.add_argument("-r", "--buckets", default=20, type=int)
argparser.add_argument("-b", "--hashes_per_bucket", default=10, type=int)
argparser.add_argument("-task", "--local_tasks", type=int, default=-1)
argparser.add_argument("-rank", "--local_rank_offset", type=int, default=0)
argparser.add_argument("-worker", "--max_worker", type=int, default=16)
argparser.add_argument("-stage", "--stage", type=int, choices=[1,2,3,4],default=4)
args = argparser.parse_args(namespace=Args)


MINHASH_DIRNAME = (
f"minhash-{args.ngram}gram-{args.buckets}buckets-{args.hashes_per_bucket}hashes"
)
MINHASH_DIR = Path(args.output) / MINHASH_DIRNAME
RESULT_DIR = f"{MINHASH_DIR}/results"
LOG_DIR = f"{MINHASH_DIR}/logs"
SLURM_LOGS_DIR = f"{MINHASH_DIR}/slurm_logs"

all_files = [_f for _f in Path(args.input).rglob("*") if _f.resolve().is_file()]
TOTAL_TASKS = len(all_files)

# this is the original data that we want to deduplicate
INPUT_READER = JsonlReader(args.input)
# you can also change ngrams or the number of buckets and their size here
minhash_config = MinhashConfig(
hash_config=HashConfig(precision=64),
n_grams=args.ngram,
num_buckets=args.buckets,
hashes_per_bucket=args.hashes_per_bucket,
) # better precision -> fewer false positives (collisions)

# stage 1 computes minhash signatures for each task (each task gets a set of files)
stage1 = LocalPipelineExecutor(
pipeline=[
INPUT_READER,
MinhashDedupSignature(
output_folder=f"{RESULT_DIR}/signatures",
config=minhash_config,
language=Languages.japanese,
skip_existing_sigs=True,
),
],
tasks=TOTAL_TASKS,
workers=args.max_worker,
logging_dir=f"{LOG_DIR}/signatures",
local_tasks=args.local_tasks,
local_rank_offset=args.local_rank_offset,
randomize_start_duration=10,
)

# stage 2 finds matches between signatures in each bucket
stage2 = LocalPipelineExecutor(
pipeline=[
MinhashDedupBuckets(
input_folder=f"{RESULT_DIR}/signatures",
output_folder=f"{RESULT_DIR}/buckets",
config=minhash_config,
),
],
tasks=minhash_config.num_buckets,
workers=args.max_worker,
logging_dir=f"{LOG_DIR}/buckets",
depends=stage1,
)

# stage 3 creates clusters of duplicates using the results from all buckets
stage3 = LocalPipelineExecutor(
pipeline=[
MinhashDedupCluster(
input_folder=f"{RESULT_DIR}/buckets",
output_folder=f"{RESULT_DIR}/remove_ids",
config=minhash_config,
save_cluster_id=True,
save_cluster_size=True,
),
],
tasks=args.max_worker,
logging_dir=f"{LOG_DIR}/clusters",
depends=stage2,
)

# stage 4 reads the original input data and removes all but 1 sample per duplicate cluster
# the data must match exactly stage 1, so number of tasks and the input source must be the same
stage4 = LocalPipelineExecutor(
pipeline=[
INPUT_READER,
# TokensCounter(), # nice way to see how many tokens we had before and after deduplication
MinhashDedupFilter(
input_folder=f"{RESULT_DIR}/remove_ids",
exclusion_writer=JsonlWriter(f"{RESULT_DIR}/removed"),
load_cluster_ids=True,
load_cluster_sizes=True,
),
JsonlWriter(output_folder=f"{RESULT_DIR}/deduplicated_output"),
],
tasks=TOTAL_TASKS,
logging_dir=f"{LOG_DIR}/filter",
depends=stage3,
workers=args.max_worker,
local_tasks=args.local_tasks,
local_rank_offset=args.local_rank_offset,
)

if __name__ == "__main__":
exec(f"stage{args.stage}.run()")
Loading