diff --git a/t5x/configs/dataset/pile/download_all_pile.py b/t5x/configs/dataset/pile/download_all_pile.py new file mode 100644 index 000000000..2851d7e50 --- /dev/null +++ b/t5x/configs/dataset/pile/download_all_pile.py @@ -0,0 +1,80 @@ +import argparse +import functools +import subprocess +from multiprocessing import Pool +import wget + +def get_args(): + parser = argparse.ArgumentParser() + parser.add_argument( + "--procs", type=int, required=True, help="Number of processes." + ) + parser.add_argument( + "--local-base-dir", type=str, required=True, help="Folder to download the document to" + ) + return parser.parse_args() + + +def download_unztd_and_send_to_gcloud(relative_path, local_base_dir, gcp_base): + BASE_PILE_URL = "https://the-eye.eu/public/AI/pile" + local_path = f"{local_base_dir}/{relative_path}" + + # Create folder + process = subprocess.Popen(["mkdir", "-p", local_path.rsplit("/", 1)[0]]) + process.wait() + + # download files + wget.download(f"{BASE_PILE_URL}/{relative_path}", local_path) + process.wait() + + # decompress files + process = subprocess.Popen(['zstd', '-d', local_path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + process.wait() + + assert local_path.endswith(".zst") + local_uncompressed_path = local_path[:-4] + assert relative_path.endswith(".zst") + gcp_uncompressed_path = f"{gcp_base}/{relative_path[:-4]}" + + # upload to gcp + process = subprocess.Popen(["gsutil", "cp", local_uncompressed_path, gcp_uncompressed_path]) + process.wait() + + # delete file locally + process = subprocess.Popen(['rm', local_path]) + process.wait() + process = subprocess.Popen(['rm', local_uncompressed_path]) + process.wait() + +def main(): + args = get_args() + + pile_urls = { + "train": [ + f"train/{i:02d}.jsonl.zst" for i in range(30) + ], + "test": [ + f"test.jsonl.zst" + ], + "val": [ + f"val.jsonl.zst" + ] + } + local_base_dir = args.local_base_dir + gcp_base = "gs://bigscience/pile/raw" + + process = subprocess.Popen(["mkdir", "-p", local_base_dir]) + process.wait() + + # pool = Pool(args.procs) + # pool.map( + # functools.partial(download_unztd_and_send_to_gcloud, local_base_dir=local_base_dir, gcp_base=gcp_base), + # [local_path for _, local_paths in pile_urls.items() for local_path in local_paths] + # ) + for local_path in [local_path for _, local_paths in pile_urls.items() for local_path in local_paths]: + download_unztd_and_send_to_gcloud(local_path, local_base_dir=local_base_dir, gcp_base=gcp_base) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/t5x/configs/dataset/pile/pile/__init__.py b/t5x/configs/dataset/pile/pile/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/t5x/configs/dataset/pile/pile/task.py b/t5x/configs/dataset/pile/pile/task.py new file mode 100644 index 000000000..60b150a08 --- /dev/null +++ b/t5x/configs/dataset/pile/pile/task.py @@ -0,0 +1,74 @@ +import functools +import seqio +from t5.data import preprocessors, utils +import tensorflow as tf + +vocabulary = seqio.SentencePieceVocabulary( + 'gs://t5-data/vocabs/cc_all.32000/sentencepiece.model', extra_ids=100) +output_features = { + 'inputs': seqio.Feature(vocabulary=vocabulary), + 'targets': seqio.Feature(vocabulary=vocabulary) +} + +DEFAULT_OUTPUT_FEATURES = { + "inputs": seqio.Feature( + vocabulary=vocabulary, add_eos=True, + required=False), + "targets": seqio.Feature( + vocabulary=vocabulary, add_eos=True) +} + +DATASET_FOLDER="gs://bigscience/pile/raw" +DATASET_SPLITS_TO_FILEPATTERN={ + "train": f"{DATASET_FOLDER}/train/*.jsonl", + "val": f"{DATASET_FOLDER}/val.jsonl", + "test": f"{DATASET_FOLDER}/test.jsonl" +} + +@utils.map_over_dataset +def extract_text_from_json_tf(json: str): + output = tf.strings.split(json, '{"text": "', maxsplit=1)[1] + output = tf.strings.split(output, '", "meta": {', maxsplit=1)[0] + return {"text": output} + +seqio.TaskRegistry.add( + 'pile_t2t_span_corruption', + source=seqio.TextLineDataSource( + split_to_filepattern=DATASET_SPLITS_TO_FILEPATTERN, + ), + preprocessors=[ + extract_text_from_json_tf, + functools.partial( + preprocessors.rekey, key_map={ + "inputs": None, + "targets": "text" + }), + seqio.preprocessors.tokenize, + seqio.CacheDatasetPlaceholder(), + preprocessors.span_corruption, + seqio.preprocessors.append_eos_after_trim, + ], + output_features=DEFAULT_OUTPUT_FEATURES, + metric_fns=[] +) + +seqio.TaskRegistry.add( + "pile_t2t_prefix_lm", + source=seqio.TextLineDataSource( + split_to_filepattern=DATASET_SPLITS_TO_FILEPATTERN, + ), + preprocessors=[ + extract_text_from_json_tf, + functools.partial( + preprocessors.rekey, key_map={ + "inputs": None, + "targets": "text" + }), + seqio.preprocessors.tokenize, + seqio.CacheDatasetPlaceholder(), + preprocessors.prefix_lm, + seqio.preprocessors.append_eos_after_trim, + ], + output_features=DEFAULT_OUTPUT_FEATURES, + metric_fns=[] +) diff --git a/t5x/configs/dataset/pile/run_cache_tasks_main.sh b/t5x/configs/dataset/pile/run_cache_tasks_main.sh new file mode 100644 index 000000000..e479da942 --- /dev/null +++ b/t5x/configs/dataset/pile/run_cache_tasks_main.sh @@ -0,0 +1,16 @@ +# Need to install seqio +# gcloud auth application-default login + + +MODULE_IMPORT=pile.task +TASK_NAME=pile_t2t_span_corruption +JOB_NAME=pilet2tspancorruption # the name must consist of only the characters [-a-z0-9], starting with a letter and ending with a letter or number +BUCKET=gs://bigscience/seqio_cached_tasks/$TASK_NAME # Don't know is cache needs to be task specific or not ... +PROJECT=bigscience +REGION=europe-west1 + +seqio_cache_tasks \ + --module_import=$MODULE_IMPORT \ + --tasks=${TASK_NAME} \ + --output_cache_dir=${BUCKET}/cache \ + --pipeline_options="--runner=DataflowRunner,--project=$PROJECT,--region=$REGION,--job_name=$JOB_NAME,--staging_location=$BUCKET/binaries,--temp_location=$BUCKET/tmp,--setup_file=$PWD/setup.py,--num_workers=32,--autoscaling_algorithm=NONE,--machine_type=n1-highmem-2" diff --git a/t5x/configs/dataset/pile/setup.py b/t5x/configs/dataset/pile/setup.py new file mode 100644 index 000000000..e314f7e61 --- /dev/null +++ b/t5x/configs/dataset/pile/setup.py @@ -0,0 +1,125 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Setup.py module for the workflow's worker utilities. +All the workflow related code is gathered in a package that will be built as a +source distribution, staged in the staging area for the workflow being run and +then installed in the workers when they start running. +This behavior is triggered by specifying the --setup_file command line option +when running the workflow for remote execution. +""" + +# pytype: skip-file + +import subprocess +from distutils.command.build import build as _build # type: ignore + +import setuptools + + +# This class handles the pip install mechanism. +class build(_build): # pylint: disable=invalid-name + """A build command class that will be invoked during package install. + The package built using the current setup.py will be staged and later + installed in the worker using `pip install package'. This class will be + instantiated during install for this specific scenario and will trigger + running the custom commands specified. + """ + sub_commands = _build.sub_commands + [('CustomCommands', None)] + + +# Some custom command to run during setup. The command is not essential for this +# workflow. It is used here as an example. Each command will spawn a child +# process. Typically, these commands will include steps to install non-Python +# packages. For instance, to install a C++-based library libjpeg62 the following +# two commands will have to be added: +# +# ['apt-get', 'update'], +# ['apt-get', '--assume-yes', 'install', 'libjpeg62'], +# +# First, note that there is no need to use the sudo command because the setup +# script runs with appropriate access. +# Second, if apt-get tool is used then the first command needs to be 'apt-get +# update' so the tool refreshes itself and initializes links to download +# repositories. Without this initial step the other apt-get install commands +# will fail with package not found errors. Note also --assume-yes option which +# shortcuts the interactive confirmation. +# +# Note that in this example custom commands will run after installing required +# packages. If you have a PyPI package that depends on one of the custom +# commands, move installation of the dependent package to the list of custom +# commands, e.g.: +# +# ['pip', 'install', 'my_package'], +# +# TODO(BEAM-3237): Output from the custom commands are missing from the logs. +# The output of custom commands (including failures) will be logged in the +# worker-startup log. +CUSTOM_COMMANDS = [ + ['echo', 'Custom command worked!'], + ['pip', 'install', 'seqio'], + ['pip', 'install', 't5[cache-tasks]'] +] + + +class CustomCommands(setuptools.Command): + """A setuptools Command class able to run arbitrary commands.""" + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def RunCustomCommand(self, command_list): + print('Running command: %s' % command_list) + p = subprocess.Popen( + command_list, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + # Can use communicate(input='y\n'.encode()) if the command run requires + # some confirmation. + stdout_data, _ = p.communicate() + print('Command output: %s' % stdout_data) + if p.returncode != 0: + raise RuntimeError( + 'Command %s failed: exit code: %s' % (command_list, p.returncode)) + + def run(self): + for command in CUSTOM_COMMANDS: + self.RunCustomCommand(command) + + +# Configure the required packages and scripts to install. +# Note that the Python Dataflow containers come with numpy already installed +# so this dependency will not trigger anything to be installed unless a version +# restriction is specified. +REQUIRED_PACKAGES = [ + 'numpy', +] + +setuptools.setup( + name='pile', + version='0.0.1', + description='Cache pile set workflow package.', + install_requires=REQUIRED_PACKAGES, + packages=setuptools.find_packages(), + cmdclass={ + # Command class instantiated and run during pip install scenarios. + 'build': build, + 'CustomCommands': CustomCommands, + }) \ No newline at end of file