Skip to content

Commit

Permalink
Merge branch 'RESTAPI-1396-example-for-transfer-s3-v1' into 'master'
Browse files Browse the repository at this point in the history
Example for large data transfer on S3

See merge request firecrest/firecrest!334
  • Loading branch information
Juan Pablo Dorsch committed Jan 28, 2025
2 parents f25ea0c + 2603da6 commit 3c500f1
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 0 deletions.
63 changes: 63 additions & 0 deletions examples/download-large-files/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# How to download large files using FirecREST S3 backend

## Motivation

FirecREST uses [Amazon S3](https://aws.amazon.com/s3/) as a staging area for large data transfer.

On downloading data from a remote server (ie, HPC cluster) to a local server (ie, your laptop), the endpoint `POST /storage/xfer-external/download` internally uploads the remote file as an object on an S3 bucket, and then creates a unique temporary self-signed URL for the client to download the object.

This works fine, but S3 has [a restriction on the max size of a single the object transfer of 5 GB](https://aws.amazon.com/s3/faqs/#:~:text=The%20largest%20object%20that%20can%20be%20uploaded%20in%20a%20single%20PUT%20is%205%20GB.).
This means that if the file you try to download is bigger than 5GB, then it is not possible to use the data transfer.

## Solution 1

You can use the `POST /storage/xfer-internal/compress` [endpoint](https://firecrest.readthedocs.io/en/latest/reference.html#post--storage-xfer-internal-compress) to compress the +5GB file to achieve a compressed file smaller than 5 GB.

If this doesn't work, then you can try with Solution 2

## Solution 2

For this, we've created this [example](f7t_split_transfer.py) on how to use pyFirecREST (though it could be also done using the API) to overcome the restriction.

### How does it work?

The steps reproduced in the example are:

1. a sbatch job (in this case, using SLURM) is created to split the file in 5 GB chunks using the function `create_sbatch_script`
2. using FirecREST, the job is executed, and on success, the parts are created with the suffix `.part.<xy>` where `<xy>` are integer numbers starting with `00`, `01`, etc.
3. using a loop over the parts, FirecREST downloads each part. After each part is downloaded, the part-object in S3 is removed
4. a Python function (`join_parts`) is used to join the parts in one file
5. using the function `remove_remote_parts` the parts on the remote server are removed using FirecREST

*Note: this is just an example, there are several different ways of reproduce all the steps listed above.*

### Sample output

```
Split the file /store/f7tuser/test-split-file/large_file in chunks of 4999MB
JobID[247754] -> Status: Job submitted
JobID[247754] -> Status: RUNNING
JobID[247754] -> Status: COMPLETED
File divided correctly
Files created:
- large_file.part.00 (size: 4999MB)
- large_file.part.01 (size: 1145MB)
Start downloading parts
Downloading /store/f7tuser/test-split-file/large_file.part.00 into /home/localuser/partdir
Part large_file.part.00 ready to be downloaded
Download to local storage started
Download to local storage finished
Removing part file large_file.part.00
Downloading /store/f7tuser/test-split-file/large_file.part.01 into /home/localuser/partdir
Part large_file.part.01 ready to be downloaded
Download to local storage started
Download to local storage finished
Removing part file large_file.part.01
Joining part large_file to /home/localuser/partdir/large_file
Finished
Joining part large_file.part.00 to /home/localuser/partdir/large_file
Finished
Joining part large_file.part.01 to /home/localuser/partdir/large_file
Finished
File /home/localuser/partdir/large_file joined
```
245 changes: 245 additions & 0 deletions examples/download-large-files/f7t_split_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
#
# Copyright (c) 2019-2025, ETH Zurich. All rights reserved.
#
# Please, refer to the LICENSE file in the root directory.
# SPDX-License-Identifier: BSD-3-Clause
#
import firecrest as fc
import time
import re
import sys
import os
import glob


system = "<replace_with_remote_system>"
dirname = "<replace_with_remote_dir>"
filename = "<replace_with_remote_large_file>"
client_id = "<replace_with_client_id>"
client_secret = "<replace_with_client_secret>"
token_uri = "<replace_with_token_uri>"
firecrest_url = "<replace_with_firecrest_uri>"
partition = "<replace_with_scheduler_queue>"
part_size_mb = 4999
script_path = "./sbatch_split_file.sh"
localdir = "<replace_your_local_dir>"

f7t_auth = fc.ClientCredentialsAuth(client_id, client_secret, token_uri)

client = fc.Firecrest(
firecrest_url=firecrest_url,
authorization=fc.ClientCredentialsAuth(client_id, client_secret, token_uri)
)


def remove_remote_part(dirname: str, part: str) -> bool:
'''
Removes parts created on the remote server when split happened
- Parameters
- `dirname (str)`: absolute path of the directory where the parts are
stored
- `part (str)`: name of the part file created in the directory
- Returns (`bool`)
- `True` if the part was removed correctly
- `False` if the part wasn't removed correctly
'''

try:
print(f"Removing part file {part}")
client.submit_delete_job(system, f"{dirname}/{part}")
return True
except fc.FirecrestException as fe:
print(f"Removal failed {fe}")
return False


def join_parts(sourcedir: str, targetpath: str) -> bool:
'''
Joins the parts after downloaded on local directory
- Parameters
- `sourcedir (str)`: directory where the parts are downloaded on local
system
- `targetpath (str)`: absolute path (including file name) where the
file is restored on local system
- Returns (`bool`)
- `True` if file was reconstructed correctly
- `False` if file wasn't reconstructed correctly
'''

try:

targetfile = open(targetpath, "wb")
print()

parts = glob.glob(f'{os.path.basename(targetpath)}.part.*')
parts.sort()
print(parts)

for part in parts:
print(f"Joining part {part} to {targetpath}")
partpath = os.path.join(sourcedir, part)

inputfile = open(partpath, "rb")
while True:
bytes = inputfile.read(1024)
if not bytes:
break
targetfile.write(bytes)
inputfile.close()
print("Finished")

targetfile.close()
print(f"File {targetpath} joined")
return True

except IOError as ioe:
print(f"Error writing file {targetpath} ({ioe})")
return False
except Exception as e:
print(f"Error writing file {targetpath} ({e})")
return False


def download_part(remotepath: str, localdir: str) -> bool:
'''
Download a part of a large file on a remote path to a local directory
- Parameters
- `remotepath (str)`: absolute path of a part of a remote file to
download
- `localdir (str)`: local directory where to store the part file
(directory must exist)
- Returns (`bool`):
- `True`: the part was downloaded successfully
- `False`: the part wasn't downloaded successfully
'''
print(f"Downloading {remotepath} into {localdir}")
part_dwn = client.external_download(system, remotepath)

file_part = remotepath.split("/")[-1]

while True:
time.sleep(30) # poll for status of the download

if part_dwn.status == "117":
print(f"\tPart {file_part} ready to be downloaded")
print("\tDownload to local storage started")
part_dwn.finish_download(f"{localdir}/{file_part}")
print("\tDownload to local storage finished")
return True
elif part_dwn.status == "118":
print(f"\tDownload of {file_part} failed")
return False


def create_sbatch_script(script_path: str, filename: str, dirname: str,
part_size_mb: int = "4999",
partition: str = "debug") -> bool:
'''
Creates a sbatch script to be used to divide a large file in 4.99G chunks
- Parameters:
- `script_path (str)`: absolute or relative path in your system where the
sbatch file is created
- `filename (str)`: name of the file (incl. extension) to be divided into
chunks (example: `bigfile.tar`)
- `dirname (str)`: path to the directory where the file to be divided is
stored in the remote system
- `part_size_mb (int|None)`: size in MB of the part
- `partition (str|None)`: partition where the job will be run on the
remote system
- Returns (`bool`):
- `True` if file has been created correctly
- `False` if file couldn't be created
'''

try:

sbatch_file = open(script_path, "w")

sbatch_file.write("#!/bin/bash -l\n")
sbatch_file.write("#SBATCH --job-name=f7t_split_file\n")
sbatch_file.write("#SBATCH --out=f7t_split_file_%J.out\n")
sbatch_file.write("#SBATCH --error=f7t_split_file_%J.err\n")
sbatch_file.write("#SBATCH --ntasks=1\n")
sbatch_file.write("#SBATCH --tasks-per-node=1\n")
sbatch_file.write(f"#SBATCH --partition={partition}\n")
sbatch_file.write(f"echo 'Changing to directory {dirname}'\n")
sbatch_file.write(f"cd {dirname}\n")
sbatch_file.write(f"echo 'Splitting file {filename} " +
f"into {filename}.part.[1..N]'\n")
sbatch_file.write(f"split -b {part_size_mb}M -d {filename} " +
f"{filename}.part.\n")
sbatch_file.write(f"ls -lh {filename}.part.*\n")

return True

except IOError as ioe:
print(f"Error writing file {script_path} ({ioe})")
return False
except Exception as e:
print(f"Error writing file {script_path} ({e})")
return False


try:
if not create_sbatch_script(script_path, filename, dirname, part_size_mb,
partition):
sys.exit(1)

print(f"Split the file {dirname}/{filename} in chunks of {part_size_mb}MB")

fc_job = client.submit(system, script_local_path=script_path) # submit job

job_id = fc_job["jobid"]
job_status = fc_job["result"]

print(f"JobID[{job_id}] -> Status: {job_status}")

while True:
time.sleep(10)
fc_job = client.poll(system, [job_id])
job_status = fc_job[0]["state"]
print(f"JobID[{job_id}] -> Status: {job_status}")

if job_status in ["COMPLETED", "FAILED"]:
break

if job_status == "FAILED":
print("Job Failed. Exiting")
sys.exit(1)

else:
print("File divided correctly")

file_list = client.list_files(system, dirname) # list remote files

filenames = [row["name"] for row in file_list
if re.search(f"{filename}.part.*", row["name"])]
filesizes = [int(row["size"])//1048576 for row in file_list
if re.search(f"{filename}.part.*", row["name"])]

print("Files created:")
for i in range(len(filenames)):
print(f"\t- {filenames[i]} (size: {filesizes[i]}MB)")

print("Start downloading parts")

for part in filenames:
if download_part(f"{dirname}/{part}", localdir):
remove_remote_part(dirname, part)
else:
sys.exit(1)

except fc.FirecrestException as fe:
print(f"Error submitting job: {fe}")
sys.exit(1)
except Exception as e:
print(f"Error submitting job: {e}")
sys.exit(1)


if join_parts(localdir, f"{localdir}/{filename}"):
print(f"Download of {dirname}/{filename} on {localdir} completed")
else:
print(f"Error downloading {dirname}/{filename} on {localdir}")

0 comments on commit 3c500f1

Please sign in to comment.