Skip to content

Large file support #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ echo YOUR_TOKEN_HERE > ~/.taiga/token
```

### Installing
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install taigapy.
The following script will use the package manager [pip](https://pip.pypa.io/en/stable/) to install taigapy
in "editable" mode.

```bash
pip install taigapy
./install_prepreqs.sh
```

### Usage
Expand Down
6 changes: 6 additions & 0 deletions install_prereqs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -ex

pre-commit install

pip install -e .
20 changes: 10 additions & 10 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
aiobotocore==1.2.0
boto3==1.16.39
aiobotocore>=1.2.0
boto3>=1.16.39
colorful==0.5.4
nest_asyncio==1.5.1
pandas==1.1.5
nest_asyncio>=1.5.1
pandas>=1.1.5
pre-commit==2.2.0
progressbar2==3.50.1
pyarrow==3.0.0
pytest==5.4.1
requests==2.23.0
twine==3.1.1
typing-extensions==3.7.4.2
progressbar2>=3.50.1
pyarrow>=3.0.0
pytest>=6.2.5
requests>=2.23.0
twine>=3.1.1
typing-extensions>=3.7.4.2
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"nest_asyncio>=1.5.1,<2.0.0",
"colorful",
"progressbar2>=3.3.0<4.0.0",
"pyarrow>=3.0.0,<4.0.0",
"pyarrow>=7.0.0,<8.0.0",
]

import ast
Expand Down
9 changes: 9 additions & 0 deletions static-checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

pylint --disable invalid-name,\
line-too-long,\
missing-function-docstring,\
missing-class-docstring,\
missing-module-docstring,consider-using-f-string taigapy

mypy taigapy
4 changes: 2 additions & 2 deletions taigapy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
__version__ = "3.3.3"
__version__ = "3.3.2"

from .consts import DEFAULT_TAIGA_URL, DEFAULT_CACHE_DIR, CACHE_FILE
from .client import TaigaClient

try:
default_tc = TaigaClient()
except Exception as e:
print("default_tc could not be set for this reason: {}".format(e))
print(f"default_tc could not be set for this reason: {e}")
print(
"You can import TaigaClient and add your custom options if you would want to customize it to your settings"
)
124 changes: 58 additions & 66 deletions taigapy/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import os
import tempfile
from typing import List, MutableSequence, Optional, Tuple, Union
from typing import List, MutableSequence, Optional, Sequence, Tuple, Union

import aiobotocore
import boto3
Expand Down Expand Up @@ -31,17 +31,21 @@
UploadS3DataFileDict,
UploadVirtualDataFile,
UploadVirtualDataFileDict,
UploadGCSDataFileDict,
UploadGCSDataFile,
UploadDataFile,
)
from taigapy.utils import (
find_first_existing,
format_datafile_id,
format_datafile_id_from_datafile_metadata,
get_latest_valid_version_from_metadata,
modify_upload_files,
transform_upload_args_to_upload_list,
untangle_dataset_id_with_version,
)
from .consts import DEFAULT_TAIGA_URL, DEFAULT_CACHE_DIR, CACHE_FILE


class TaigaClient:
def __init__(
self,
Expand Down Expand Up @@ -360,28 +364,29 @@ def _get_dataframe_or_path_offline(
print(cf.red("The datafile you requested was not in the cache."))
return None

def _validate_create_dataset_arguments(
def _preprocess_create_dataset_arguments(
self,
dataset_name: str,
upload_files: MutableSequence[UploadS3DataFileDict],
add_taiga_ids: MutableSequence[UploadVirtualDataFileDict],
upload_files: Sequence[UploadS3DataFileDict],
add_taiga_ids: Sequence[UploadVirtualDataFileDict],
add_gcs_files: Sequence[UploadGCSDataFileDict],
folder_id: str,
):
if len(dataset_name) == 0:
raise ValueError("dataset_name must be a nonempty string.")
if len(upload_files) == 0 and len(add_taiga_ids) == 0:
raise ValueError("upload_files and add_taiga_ids cannot both be empty.")

upload_s3_datafiles, upload_virtual_datafiles = modify_upload_files(
upload_files, add_taiga_ids
(all_uploads,) = transform_upload_args_to_upload_list(
upload_files, add_taiga_ids, add_gcs_files
)

try:
self.api.get_folder(folder_id)
except Taiga404Exception:
raise ValueError("No folder found with id {}.".format(folder_id))

return upload_s3_datafiles, upload_virtual_datafiles
return all_uploads

def _validate_update_dataset_arguments(
self,
Expand All @@ -392,9 +397,8 @@ def _validate_update_dataset_arguments(
upload_files: MutableSequence[UploadS3DataFileDict],
add_taiga_ids: MutableSequence[UploadVirtualDataFileDict],
add_all_existing_files: bool,
) -> Tuple[
List[UploadS3DataFile], List[UploadVirtualDataFile], DatasetVersionMetadataDict
]:
) -> Tuple[List[UploadDataFile], DatasetVersionMetadataDict]:
# FIXME: this needs to be corrected for new signature
if changes_description is None or changes_description == "":
raise ValueError("Description of changes cannot be empty.")

Expand Down Expand Up @@ -430,14 +434,14 @@ def _validate_update_dataset_arguments(
self._get_dataset_metadata(dataset_permaname, dataset_version)
)

upload_s3_datafiles, upload_virtual_datafiles = modify_upload_files(
all_uploads = transform_upload_args_to_upload_list(
upload_files,
add_taiga_ids,
dataset_version_metadata,
add_all_existing_files,
)

return upload_s3_datafiles, upload_virtual_datafiles, dataset_version_metadata
return all_uploads, dataset_version_metadata

async def _upload_to_s3_and_request_conversion(
self,
Expand Down Expand Up @@ -494,8 +498,7 @@ async def _upload_files_async(

def _upload_files_serial(
self,
upload_s3_datafiles: List[UploadS3DataFile],
upload_virtual_datafiles: List[UploadVirtualDataFile],
uploads: List[UploadDataFile],
upload_session_id: str,
s3_credentials: S3Credentials,
):
Expand All @@ -507,29 +510,34 @@ def _upload_files_serial(
aws_session_token=s3_credentials.session_token,
)

for upload_file in upload_s3_datafiles:
bucket = s3_credentials.bucket
partial_prefix = s3_credentials.prefix
key = "{}{}/{}".format(
partial_prefix, upload_session_id, upload_file.file_name
)

s3_client.upload_file(upload_file.file_path, bucket, key)
upload_file.add_s3_upload_information(bucket, key)
print("Finished uploading {} to S3".format(upload_file.file_name))

print("Uploading {} to Taiga".format(upload_file.file_name))
self.api.upload_file_to_taiga(upload_session_id, upload_file)
print("Finished uploading {} to Taiga".format(upload_file.file_name))
for upload in uploads:
if isinstance(upload, UploadS3DataFile):
upload_file = upload
bucket = s3_credentials.bucket
partial_prefix = s3_credentials.prefix
key = "{}{}/{}".format(
partial_prefix, upload_session_id, upload_file.file_name
)

for upload_virtual_file in upload_virtual_datafiles:
print("Linking virtual file {}".format(upload_virtual_file.taiga_id))
self.api.upload_file_to_taiga(upload_session_id, upload_virtual_file)
s3_client.upload_file(upload_file.file_path, bucket, key)
upload_file.add_s3_upload_information(bucket, key)
print("Finished uploading {} to S3".format(upload_file.file_name))

print("Uploading {} to Taiga".format(upload_file.file_name))
self.api.upload_file_to_taiga(upload_session_id, upload_file)
print("Finished uploading {} to Taiga".format(upload_file.file_name))
elif isinstance(upload, UploadVirtualDataFile) or isinstance(
upload, UploadGCSDataFile
):
upload_virtual_file = upload
print("Linking virtual file {}".format(upload_virtual_file.taiga_id))
self.api.upload_file_to_taiga(upload_session_id, upload_virtual_file)
else:
raise Exception(f"Unknown upload type: {type(upload)}")

def _upload_files(
self,
upload_s3_datafiles: List[UploadS3DataFile],
upload_virtual_datafiles: List[UploadVirtualDataFile],
all_uploads: List[UploadDataFile],
upload_async: bool,
) -> str:
upload_session_id = self.api.create_upload_session()
Expand All @@ -541,29 +549,28 @@ def _upload_files(
asyncio.set_event_loop(loop)
loop.run_until_complete(
self._upload_files_async(
upload_s3_datafiles,
upload_virtual_datafiles,
all_uploads,
upload_session_id,
s3_credentials,
)
)
loop.close()
else:
self._upload_files_serial(
upload_s3_datafiles,
upload_virtual_datafiles,
all_uploads,
upload_session_id,
s3_credentials,
)

return upload_session_id

def _get_dataset_metadata(
self, dataset_id: str, version: Optional[DatasetVersion]
self, dataset_id: str, version: Optional[str]
) -> Optional[Union[DatasetMetadataDict, DatasetVersionMetadataDict]]:
self._set_token_and_initialized_api()

if "." in dataset_id:
assert version is None
dataset_id, version, _ = untangle_dataset_id_with_version(dataset_id)

return self.api.get_dataset_version_metadata(dataset_id, version)
Expand Down Expand Up @@ -639,8 +646,9 @@ def create_dataset(
self,
dataset_name: str,
dataset_description: Optional[str] = None,
upload_files: Optional[MutableSequence[UploadS3DataFileDict]] = None,
add_taiga_ids: Optional[MutableSequence[UploadVirtualDataFileDict]] = None,
upload_files: Optional[List[UploadS3DataFileDict]] = None,
add_taiga_ids: Optional[List[UploadVirtualDataFileDict]] = None,
add_gcs_files: Optional[List[UploadGCSDataFileDict]] = None,
folder_id: str = None,
upload_async: bool = True,
) -> Optional[str]:
Expand All @@ -664,6 +672,9 @@ def create_dataset(
- "taiga_id" equal to the Taiga ID of the reference datafile in dataset_permaname.dataset_version/datafile_name format
- "name" (optional) for what the virtual datafile should be called in the new dataset (will use the reference datafile name if not provided).
(default: {None})
add_gcs_files {Optional[List[Dict[str, str]]} -- List of GCS objects to add where each dictionary has the keys
- "gcs_path" the GCS path (must start with "gs://...") of the object to associate with the provided name
- "name" for what the datafile should be called in the new dataset
folder_id {str} -- The ID of the containing folder. If not specified, will use home folder of user. (default: {None})
upload_async {bool} -- Whether to upload asynchronously (parallel) or in serial

Expand All @@ -676,28 +687,21 @@ def create_dataset(
upload_files = []
if add_taiga_ids is None:
add_taiga_ids = []
if add_gcs_files is None:
add_gcs_files = []

try:
if folder_id is None:
folder_id = self.api.get_user()["home_folder_id"]
(
upload_s3_datafiles,
upload_virtual_datafiles,
) = self._validate_create_dataset_arguments(
(all_uploads) = self._preprocess_create_dataset_arguments(
dataset_name, upload_files, add_taiga_ids, folder_id
)
except ValueError as e:
print(cf.red(str(e)))
return None

if upload_s3_datafiles is None:
# User declined to upload to public folder
return None

try:
upload_session_id = self._upload_files(
upload_s3_datafiles, upload_virtual_datafiles, upload_async
)
upload_session_id = self._upload_files(all_uploads, upload_async)
except ValueError as e:
print(cf.red(str(e)))
return None
Expand Down Expand Up @@ -756,8 +760,7 @@ def update_dataset(

try:
(
upload_s3_datafiles,
upload_virtual_datafiles,
all_uploads,
dataset_version_metadata,
) = self._validate_update_dataset_arguments(
dataset_id,
Expand All @@ -773,9 +776,7 @@ def update_dataset(
return None

try:
upload_session_id = self._upload_files(
upload_s3_datafiles, upload_virtual_datafiles, upload_async
)
upload_session_id = self._upload_files(all_uploads, upload_async)
except ValueError as e:
print(cf.red(str(e)))
return None
Expand Down Expand Up @@ -902,12 +903,3 @@ def upload_to_gcs(self, queried_taiga_id: str, dest_gcs_path: str) -> bool:
except (ValueError, TaigaHttpException) as e:
print(cf.red(str(e)))
return False


try:
default_tc = TaigaClient()
except Exception as e:
print("default_tc could not be set for this reason: {}".format(e))
print(
"You can import TaigaClient and add your custom options if you would want to customize it to your settings"
)
8 changes: 0 additions & 8 deletions taigapy/custom_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@ class TaigaHttpException(Exception):
class Taiga404Exception(TaigaHttpException):
"""Exception to retrieve a NotFound returned by Taiga"""

pass


class TaigaDeletedVersionException(TaigaHttpException):
"""Exception to retrieve a deleted dataset version"""

pass


class TaigaServerError(TaigaHttpException):
"""500 errors"""
Expand All @@ -29,14 +25,10 @@ def __init__(self):
class TaigaRawTypeException(Exception):
"""Exception when we are trying to get a file from a Table or Matrix format, whereas it is Raw data"""

pass


class TaigaClientConnectionException(Exception):
"""Exception when we are unable to connect to Taiga"""

pass


class TaigaTokenFileNotFound(Exception):
def __init__(self, file_paths_checked: Iterable[str]):
Expand Down
Loading