Skip to content

Commit 10a5e72

Browse files
authored
Merge pull request #310 from Backblaze/big-copy
Fix big copy
2 parents 5310ebd + 5d9e607 commit 10a5e72

File tree

10 files changed

+276
-93
lines changed

10 files changed

+276
-93
lines changed

CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,22 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
77
## [Unreleased]
88

99
### Changed
10-
* Introduce a common thread worker pool for all downloads
1110
* Don't run coverage in pypy in CI
11+
* Introduce a common thread worker pool for all downloads
12+
* Increase http timeout to 20min (for copy using 5GB parts)
1213

1314
### Added
1415
* Add pypy-3.8 to test matrix
1516
* Add support for unverified checksum upload mode
1617
* Add dedicated exception for unverified email
1718
* Add a parameter to customize `sync_policy_manager`
19+
* Add parameters to set the min/max part size for large file upload/copy methods
20+
* Add CopySourceTooBig exception
21+
* Add an option to set a custom file version class to FileVersionFactory
1822

1923
### Fixed
2024
* Fix downloading files with unverified checksum
25+
* Fix copying objects larger than 1TB
2126

2227
## [1.14.1] - 2022-02-23
2328

b2sdk/b2http.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class B2Http(object):
154154
"""
155155

156156
# timeout for HTTP GET/POST requests
157-
TIMEOUT = 900 # 15 minutes as server-side copy can take time
157+
TIMEOUT = 1200 # 20 minutes as server-side copy can take time
158158

159159
def __init__(self, api_config: B2HttpApiConfig = DEFAULT_HTTP_API_CONFIG):
160160
"""
@@ -346,6 +346,11 @@ def _translate_errors(cls, fcn, post_params=None):
346346
if response.status_code not in [200, 206]:
347347
# Decode the error object returned by the service
348348
error = json.loads(response.content.decode('utf-8')) if response.content else {}
349+
extra_error_keys = error.keys() - ('code', 'status', 'message')
350+
if extra_error_keys:
351+
logger.debug(
352+
'received error has extra (unsupported) keys: %s', extra_error_keys
353+
)
349354
raise interpret_b2_error(
350355
int(error.get('status', response.status_code)),
351356
error.get('code'),

b2sdk/bucket.py

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from .encryption.setting import EncryptionSetting, EncryptionSettingFactory
1515
from .encryption.types import EncryptionMode
16-
from .exception import BucketIdNotFound, FileNotPresent, FileOrBucketNotFound, UnexpectedCloudBehaviour, UnrecognizedBucketType
16+
from .exception import BucketIdNotFound, CopySourceTooBig, FileNotPresent, FileOrBucketNotFound, UnexpectedCloudBehaviour, UnrecognizedBucketType
1717
from .file_lock import (
1818
BucketRetentionSetting,
1919
FileLockConfiguration,
@@ -550,6 +550,8 @@ def create_file(
550550
encryption: Optional[EncryptionSetting] = None,
551551
file_retention: Optional[FileRetentionSetting] = None,
552552
legal_hold: Optional[LegalHold] = None,
553+
min_part_size=None,
554+
max_part_size=None,
553555
):
554556
"""
555557
Creates a new file in this bucket using an iterable (list, tuple etc) of remote or local sources.
@@ -574,6 +576,8 @@ def create_file(
574576
:param b2sdk.v2.EncryptionSetting encryption: encryption settings (``None`` if unknown)
575577
:param b2sdk.v2.FileRetentionSetting file_retention: file retention setting
576578
:param bool legal_hold: legal hold setting
579+
:param int min_part_size: lower limit of part size for the transfer planner, in bytes
580+
:param int max_part_size: upper limit of part size for the transfer planner, in bytes
577581
"""
578582
return self._create_file(
579583
self.api.services.emerger.emerge,
@@ -587,6 +591,8 @@ def create_file(
587591
encryption=encryption,
588592
file_retention=file_retention,
589593
legal_hold=legal_hold,
594+
min_part_size=min_part_size,
595+
max_part_size=max_part_size,
590596
)
591597

592598
def create_file_stream(
@@ -601,6 +607,8 @@ def create_file_stream(
601607
encryption: Optional[EncryptionSetting] = None,
602608
file_retention: Optional[FileRetentionSetting] = None,
603609
legal_hold: Optional[LegalHold] = None,
610+
min_part_size=None,
611+
max_part_size=None,
604612
):
605613
"""
606614
Creates a new file in this bucket using a stream of multiple remote or local sources.
@@ -627,6 +635,8 @@ def create_file_stream(
627635
:param b2sdk.v2.EncryptionSetting encryption: encryption settings (``None`` if unknown)
628636
:param b2sdk.v2.FileRetentionSetting file_retention: file retention setting
629637
:param bool legal_hold: legal hold setting
638+
:param int min_part_size: lower limit of part size for the transfer planner, in bytes
639+
:param int max_part_size: upper limit of part size for the transfer planner, in bytes
630640
"""
631641
return self._create_file(
632642
self.api.services.emerger.emerge_stream,
@@ -640,6 +650,8 @@ def create_file_stream(
640650
encryption=encryption,
641651
file_retention=file_retention,
642652
legal_hold=legal_hold,
653+
min_part_size=min_part_size,
654+
max_part_size=max_part_size,
643655
)
644656

645657
def _create_file(
@@ -655,6 +667,8 @@ def _create_file(
655667
encryption: Optional[EncryptionSetting] = None,
656668
file_retention: Optional[FileRetentionSetting] = None,
657669
legal_hold: Optional[LegalHold] = None,
670+
min_part_size=None,
671+
max_part_size=None,
658672
):
659673
validate_b2_file_name(file_name)
660674
progress_listener = progress_listener or DoNothingProgressListener()
@@ -671,6 +685,8 @@ def _create_file(
671685
encryption=encryption,
672686
file_retention=file_retention,
673687
legal_hold=legal_hold,
688+
min_part_size=min_part_size,
689+
max_part_size=max_part_size,
674690
)
675691

676692
def concatenate(
@@ -685,6 +701,8 @@ def concatenate(
685701
encryption: Optional[EncryptionSetting] = None,
686702
file_retention: Optional[FileRetentionSetting] = None,
687703
legal_hold: Optional[LegalHold] = None,
704+
min_part_size=None,
705+
max_part_size=None,
688706
):
689707
"""
690708
Creates a new file in this bucket by concatenating multiple remote or local sources.
@@ -706,6 +724,8 @@ def concatenate(
706724
:param b2sdk.v2.EncryptionSetting encryption: encryption settings (``None`` if unknown)
707725
:param b2sdk.v2.FileRetentionSetting file_retention: file retention setting
708726
:param bool legal_hold: legal hold setting
727+
:param int min_part_size: lower limit of part size for the transfer planner, in bytes
728+
:param int max_part_size: upper limit of part size for the transfer planner, in bytes
709729
"""
710730
return self.create_file(
711731
WriteIntent.wrap_sources_iterator(outbound_sources),
@@ -718,6 +738,8 @@ def concatenate(
718738
encryption=encryption,
719739
file_retention=file_retention,
720740
legal_hold=legal_hold,
741+
min_part_size=min_part_size,
742+
max_part_size=max_part_size,
721743
)
722744

723745
def concatenate_stream(
@@ -806,6 +828,8 @@ def copy(
806828
source_content_type: Optional[str] = None,
807829
file_retention: Optional[FileRetentionSetting] = None,
808830
legal_hold: Optional[LegalHold] = None,
831+
min_part_size=None,
832+
max_part_size=None,
809833
):
810834
"""
811835
Creates a new file in this bucket by (server-side) copying from an existing file.
@@ -831,6 +855,8 @@ def copy(
831855
:param str,None source_content_type: source file's content type, useful when copying files with SSE-C
832856
:param b2sdk.v2.FileRetentionSetting file_retention: file retention setting for the new file.
833857
:param bool legal_hold: legal hold setting for the new file.
858+
:param int min_part_size: lower limit of part size for the transfer planner, in bytes
859+
:param int max_part_size: upper limit of part size for the transfer planner, in bytes
834860
"""
835861

836862
copy_source = CopySource(
@@ -844,30 +870,38 @@ def copy(
844870
if not length:
845871
# TODO: it feels like this should be checked on lower level - eg. RawApi
846872
validate_b2_file_name(new_file_name)
847-
progress_listener = progress_listener or DoNothingProgressListener()
848-
return self.api.services.copy_manager.copy_file(
849-
copy_source,
850-
new_file_name,
851-
content_type=content_type,
852-
file_info=file_info,
853-
destination_bucket_id=self.id_,
854-
progress_listener=progress_listener,
855-
destination_encryption=destination_encryption,
856-
source_encryption=source_encryption,
857-
file_retention=file_retention,
858-
legal_hold=legal_hold,
859-
).result()
860-
else:
861-
return self.create_file(
862-
[WriteIntent(copy_source)],
863-
new_file_name,
864-
content_type=content_type,
865-
file_info=file_info,
866-
progress_listener=progress_listener,
867-
encryption=destination_encryption,
868-
file_retention=file_retention,
869-
legal_hold=legal_hold,
870-
)
873+
try:
874+
progress_listener = progress_listener or DoNothingProgressListener()
875+
return self.api.services.copy_manager.copy_file(
876+
copy_source,
877+
new_file_name,
878+
content_type=content_type,
879+
file_info=file_info,
880+
destination_bucket_id=self.id_,
881+
progress_listener=progress_listener,
882+
destination_encryption=destination_encryption,
883+
source_encryption=source_encryption,
884+
file_retention=file_retention,
885+
legal_hold=legal_hold,
886+
).result()
887+
except CopySourceTooBig as e:
888+
copy_source.length = e.size
889+
progress_listener = DoNothingProgressListener()
890+
logger.warning(
891+
'a copy of large object of unknown size is upgraded to the large file interface. No progress report will be provided.'
892+
)
893+
return self.create_file(
894+
[WriteIntent(copy_source)],
895+
new_file_name,
896+
content_type=content_type,
897+
file_info=file_info,
898+
progress_listener=progress_listener,
899+
encryption=destination_encryption,
900+
file_retention=file_retention,
901+
legal_hold=legal_hold,
902+
min_part_size=min_part_size,
903+
max_part_size=max_part_size,
904+
)
871905

872906
def delete_file_version(self, file_id, file_name):
873907
"""

b2sdk/exception.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
r'^more than one upload using auth token (?P<token>[^)]+)$'
2020
)
2121

22+
COPY_SOURCE_TOO_BIG_ERROR_MESSAGE_RE = re.compile(r'^Copy source too big: (?P<size>[\d]+)$')
23+
2224

2325
class B2Error(Exception, metaclass=ABCMeta):
2426
def __init__(self, *args, **kwargs):
@@ -312,6 +314,12 @@ def __str__(self):
312314
return '%s (%s)' % (self.message, self.code)
313315

314316

317+
class CopySourceTooBig(BadRequest):
318+
def __init__(self, message, code, size: int):
319+
super().__init__(message, code)
320+
self.size = size
321+
322+
315323
class Unauthorized(B2Error):
316324
def __init__(self, message, code):
317325
super(Unauthorized, self).__init__()
@@ -543,11 +551,19 @@ def interpret_b2_error(
543551
return PartSha1Mismatch(post_params.get('fileId'))
544552
elif status == 400 and code == "bad_bucket_id":
545553
return BucketIdNotFound(post_params.get('bucketId'))
546-
elif status == 400 and code == "bad_request":
554+
elif status == 400 and code in ('bad_request', 'auth_token_limit', 'source_too_large'):
555+
# it's "bad_request" on 2022-03-29, but will become 'auth_token_limit' in 2022-04 # TODO: cleanup after 2022-05-01
547556
matcher = UPLOAD_TOKEN_USED_CONCURRENTLY_ERROR_MESSAGE_RE.match(message)
548557
if matcher is not None:
549558
token = matcher.group('token')
550559
return UploadTokenUsedConcurrently(token)
560+
561+
# it's "bad_request" on 2022-03-29, but will become 'source_too_large' in 2022-04 # TODO: cleanup after 2022-05-01
562+
matcher = COPY_SOURCE_TOO_BIG_ERROR_MESSAGE_RE.match(message)
563+
if matcher is not None:
564+
size = int(matcher.group('size'))
565+
return CopySourceTooBig(size)
566+
551567
return BadRequest(message, code)
552568
elif status == 400:
553569
return BadRequest(message, code)

b2sdk/file_version.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,8 @@ class FileVersionFactory(object):
352352
Construct :py:class:`b2sdk.v2.FileVersion` objects from api responses.
353353
"""
354354

355+
FILE_VERSION_CLASS = FileVersion
356+
355357
def __init__(self, api: 'B2Api'):
356358
self.api = api
357359

@@ -408,8 +410,7 @@ def from_api_response(self, file_version_dict, force_action=None):
408410
file_retention = FileRetentionSetting.from_file_version_dict(file_version_dict)
409411

410412
legal_hold = LegalHold.from_file_version_dict(file_version_dict)
411-
412-
return FileVersion(
413+
return self.FILE_VERSION_CLASS(
413414
self.api,
414415
id_,
415416
file_name,

0 commit comments

Comments
 (0)