Skip to content

Commit 8ef6b05

Browse files
authored
Parallel copy (#188)
* bulk copy of files * version bump * bulk upload * added workers to fs.copy and fs.move * bulk test fixes * coverage * test workers * version bump * changelog * improved tests * version bump
1 parent c620603 commit 8ef6b05

File tree

11 files changed

+527
-209
lines changed

11 files changed

+527
-209
lines changed

CHANGELOG.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,18 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](http://keepachangelog.com/)
55
and this project adheres to [Semantic Versioning](http://semver.org/).
66

7+
## [2.0.25] - 2018-07-20
8+
9+
### Added
10+
11+
- workers parameter to fs.copy, fs.move, and fs.mirror for concurrent
12+
copies
13+
714
## [2.0.24] - 2018-06-28
815

9-
### Added timeout to FTP opener
16+
### Added
17+
18+
- timeout to FTP opener
1019

1120
## [2.0.23] - 2018-05-02
1221

@@ -210,5 +219,3 @@ No changes, pushed wrong branch to PyPi.
210219
## [2.0.0] - 2016-12-07
211220

212221
New version of the PyFilesystem API.
213-
214-

fs/_bulk.py

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""
2+
3+
Implements a thread pool for parallel copying of files.
4+
5+
"""
6+
7+
from __future__ import unicode_literals
8+
9+
import threading
10+
11+
from six.moves.queue import Queue
12+
13+
from .copy import copy_file_internal
14+
from .errors import BulkCopyFailed
15+
from .tools import copy_file_data
16+
17+
if False: # typing.TYPE_CHECKING
18+
from .base import FS
19+
from types import TracebackType
20+
from typing import IO, Iterator, List, Optional, Mapping, Text, Type, Union
21+
22+
23+
class _Worker(threading.Thread):
24+
"""Worker thread that pulls tasks from a queue."""
25+
26+
def __init__(self, copier):
27+
# type (Copier) -> None
28+
self.copier = copier
29+
super(_Worker, self).__init__()
30+
self.daemon = True
31+
32+
def run(self):
33+
# type () -> None
34+
queue = self.copier.queue
35+
while True:
36+
task = queue.get(block=True)
37+
try:
38+
if task is None:
39+
break # Sentinel to exit thread
40+
task()
41+
except Exception as error:
42+
self.copier.add_error(error)
43+
finally:
44+
queue.task_done()
45+
46+
47+
class _Task(object):
48+
"""Base class for a task."""
49+
50+
def __call__(self):
51+
# type: () -> None
52+
"""Task implementation."""
53+
54+
55+
class _CopyTask(_Task):
56+
"""A callable that copies from one file another."""
57+
58+
def __init__(self, src_file, dst_file):
59+
# type: (IO, IO) -> None
60+
self.src_file = src_file
61+
self.dst_file = dst_file
62+
63+
def __call__(self):
64+
# type: () -> None
65+
try:
66+
copy_file_data(self.src_file, self.dst_file, chunk_size=1024 * 1024)
67+
finally:
68+
try:
69+
self.src_file.close()
70+
finally:
71+
self.dst_file.close()
72+
73+
74+
class Copier(object):
75+
"""Copy files in worker threads."""
76+
77+
def __init__(self, num_workers=4):
78+
# type: (int) -> None
79+
if num_workers < 0:
80+
raise ValueError('num_workers must be >= 0')
81+
self.num_workers = num_workers
82+
self.queue = None # type: Optional[Queue[_Task]]
83+
self.workers = [] # type: List[_Worker]
84+
self.errors = [] # type: List[Exception]
85+
self.running = False
86+
87+
def start(self):
88+
"""Start the workers."""
89+
if self.num_workers:
90+
self.queue = Queue(maxsize=self.num_workers)
91+
self.workers = [_Worker(self) for _ in range(self.num_workers)]
92+
for worker in self.workers:
93+
worker.start()
94+
self.running = True
95+
96+
def stop(self):
97+
"""Stop the workers (will block until they are finished)."""
98+
if self.running and self.num_workers:
99+
for worker in self.workers:
100+
self.queue.put(None)
101+
for worker in self.workers:
102+
worker.join()
103+
# Free up references help by workers
104+
del self.workers[:]
105+
self.queue.join()
106+
self.running = False
107+
108+
def add_error(self, error):
109+
"""Add an exception raised by a task."""
110+
self.errors.append(error)
111+
112+
def __enter__(self):
113+
self.start()
114+
return self
115+
116+
def __exit__(
117+
self,
118+
exc_type, # type: Optional[Type[BaseException]]
119+
exc_value, # type: Optional[BaseException]
120+
traceback, # type: Optional[TracebackType]
121+
):
122+
self.stop()
123+
if traceback is None and self.errors:
124+
raise BulkCopyFailed(self.errors)
125+
126+
def copy(self, src_fs, src_path, dst_fs, dst_path):
127+
# type: (FS, Text, FS, Text) -> None
128+
"""Copy a file from one fs to another."""
129+
if self.queue is None:
130+
# This should be the most performant for a single-thread
131+
copy_file_internal(src_fs, src_path, dst_fs, dst_path)
132+
else:
133+
src_file = src_fs.openbin(src_path, "r")
134+
try:
135+
dst_file = dst_fs.openbin(dst_path, "w")
136+
except Exception:
137+
src_file.close()
138+
raise
139+
task = _CopyTask(src_file, dst_file)
140+
self.queue.put(task)

fs/_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
"""Version, used in module and setup.py.
22
"""
3-
__version__ = "2.0.24"
3+
__version__ = "2.0.25"

0 commit comments

Comments
 (0)