2727 _get_blob_encryptor_and_padder ,
2828)
2929from .models import BlobBlock
30+ from ._constants import (
31+ _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
32+ )
3033
3134
3235def _upload_blob_chunks (blob_service , container_name , blob_name ,
@@ -342,6 +345,7 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj):
342345 # derivations of io.IOBase and thus do not implement seekable().
343346 # Python > 3.0: file-like objects created with open() are derived from io.IOBase.
344347 try :
348+ # only the main thread runs this, so there's no need grabbing the lock
345349 wrapped_stream .seek (0 , SEEK_CUR )
346350 except :
347351 raise ValueError ("Wrapped stream must support seek()." )
@@ -351,9 +355,14 @@ def __init__(self, wrapped_stream, stream_begin_index, length, lockObj):
351355 self ._position = 0
352356 self ._stream_begin_index = stream_begin_index
353357 self ._length = length
354- self ._count = 0
355358 self ._buffer = BytesIO ()
356- self ._read_buffer_size = 4 * 1024 * 1024
359+
360+ # we must avoid buffering more than necessary, and also not use up too much memory
361+ # so the max buffer size is capped at 4MB
362+ self ._max_buffer_size = length if length < _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE \
363+ else _LARGE_BLOB_UPLOAD_MAX_READ_BUFFER_SIZE
364+ self ._current_buffer_start = 0
365+ self ._current_buffer_size = 0
357366
358367 def __len__ (self ):
359368 return self ._length
@@ -382,35 +391,45 @@ def read(self, n):
382391 if n is 0 or self ._buffer .closed :
383392 return b''
384393
385- # attempt first read from the read buffer
394+ # attempt first read from the read buffer and update position
386395 read_buffer = self ._buffer .read (n )
387396 bytes_read = len (read_buffer )
388397 bytes_remaining = n - bytes_read
398+ self ._position += bytes_read
389399
390400 # repopulate the read buffer from the underlying stream to fulfill the request
391401 # ensure the seek and read operations are done atomically (only if a lock is provided)
392402 if bytes_remaining > 0 :
393403 with self ._buffer :
404+ # either read in the max buffer size specified on the class
405+ # or read in just enough data for the current block/sub stream
406+ current_max_buffer_size = min (self ._max_buffer_size , self ._length - self ._position )
407+
394408 # lock is only defined if max_connections > 1 (parallel uploads)
395409 if self ._lock :
396410 with self ._lock :
397- # reposition the underlying stream to match the start of the substream
411+ # reposition the underlying stream to match the start of the data to read
398412 absolute_position = self ._stream_begin_index + self ._position
399413 self ._wrapped_stream .seek (absolute_position , SEEK_SET )
400414 # If we can't seek to the right location, our read will be corrupted so fail fast.
401415 if self ._wrapped_stream .tell () != absolute_position :
402416 raise IOError ("Stream failed to seek to the desired location." )
403- buffer_from_stream = self ._wrapped_stream .read (self . _read_buffer_size )
417+ buffer_from_stream = self ._wrapped_stream .read (current_max_buffer_size )
404418 else :
405- buffer_from_stream = self ._wrapped_stream .read (self . _read_buffer_size )
419+ buffer_from_stream = self ._wrapped_stream .read (current_max_buffer_size )
406420
407421 if buffer_from_stream :
422+ # update the buffer with new data from the wrapped stream
423+ # we need to note down the start position and size of the buffer, in case seek is performed later
408424 self ._buffer = BytesIO (buffer_from_stream )
425+ self ._current_buffer_start = self ._position
426+ self ._current_buffer_size = len (buffer_from_stream )
427+
428+ # read the remaining bytes from the new buffer and update position
409429 second_read_buffer = self ._buffer .read (bytes_remaining )
410- bytes_read += len (second_read_buffer )
411430 read_buffer += second_read_buffer
431+ self ._position += len (second_read_buffer )
412432
413- self ._position += bytes_read
414433 return read_buffer
415434
416435 def readable (self ):
@@ -437,6 +456,15 @@ def seek(self, offset, whence=0):
437456 elif pos < 0 :
438457 pos = 0
439458
459+ # check if buffer is still valid
460+ # if not, drop buffer
461+ if pos < self ._current_buffer_start or pos >= self ._current_buffer_start + self ._current_buffer_size :
462+ self ._buffer .close ()
463+ self ._buffer = BytesIO ()
464+ else : # if yes seek to correct position
465+ delta = pos - self ._current_buffer_start
466+ self ._buffer .seek (delta , SEEK_SET )
467+
440468 self ._position = pos
441469 return pos
442470
0 commit comments