Skip to content

Commit e2cd2fb

Browse files
committed
Merge pull request #75 from ischneider/cancel
implement cancellation of async processes
2 parents 346cd1e + 0b28267 commit e2cd2fb

File tree

7 files changed

+112
-16
lines changed

7 files changed

+112
-16
lines changed

planet/api/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@
1414

1515
from .exceptions import (APIException, BadQuery, InvalidAPIKey)
1616
from .exceptions import (NoPermission, MissingResource, OverQuota)
17-
from .exceptions import (ServerError,)
17+
from .exceptions import (ServerError, RequestCancelled)
1818
from .client import Client
1919
from .utils import write_to_file
2020

2121
__all__ = [
2222
Client, APIException, BadQuery, InvalidAPIKey,
23-
NoPermission, MissingResource, OverQuota, ServerError,
23+
NoPermission, MissingResource, OverQuota, ServerError, RequestCancelled,
2424
write_to_file
2525
]

planet/api/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ def __init__(self, api_key=None, base_url='https://api.planet.com/v0/',
3535
self.base_url = base_url
3636
self.dispatcher = RequestsDispatcher(workers)
3737

38+
def shutdown(self):
39+
self.dispatcher.session.executor.shutdown(wait=False)
40+
3841
def _url(self, path):
3942
if path.startswith('http'):
4043
url = path

planet/api/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,7 @@ class ServerError(APIException):
4545
class InvalidIdentity(APIException):
4646
'''raised when logging in with identity'''
4747
pass
48+
49+
50+
class RequestCancelled(Exception):
51+
'''When requests get cancelled'''

planet/api/models.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
from ._fatomic import atomic_open
16+
from .exceptions import RequestCancelled
1617
from .utils import get_filename
1718
from .utils import check_status
1819
from .utils import GeneratorAdapter
@@ -30,6 +31,7 @@ def __init__(self, request, dispatcher):
3031
self._dispatcher = dispatcher
3132
self._body = None
3233
self._future = None
34+
self._cancel = False
3335

3436
def _create_body(self, response):
3537
return self.request.body_type(self.request, response, self._dispatcher)
@@ -45,6 +47,8 @@ def get_body(self):
4547
return self._body
4648

4749
def _async_callback(self, session, response):
50+
if self._cancel:
51+
raise RequestCancelled()
4852
check_status(response)
4953
self._body = self._create_body(response)
5054
self._handler(self._body)
@@ -68,6 +72,13 @@ def await(self):
6872
self._future.result()
6973
return self._body
7074

75+
def cancel(self):
76+
'''Cancel any request.'''
77+
if self._body:
78+
self._body._cancel = True
79+
else:
80+
self._cancel = True
81+
7182

7283
class Request(object):
7384

@@ -86,6 +97,7 @@ def __init__(self, request, http_response, dispatcher):
8697
self._dispatcher = dispatcher
8798
self.size = int(self.response.headers.get('content-length', 0))
8899
self.name = get_filename(self.response)
100+
self._cancel = False
89101

90102
def __len__(self):
91103
return self.size
@@ -108,6 +120,8 @@ def _write(self, fp, callback):
108120
callback = lambda x: None
109121
callback(self)
110122
for chunk in self:
123+
if self._cancel:
124+
raise RequestCancelled()
111125
fp.write(chunk)
112126
size = len(chunk)
113127
total += size

planet/api/sync.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@
1313
# limitations under the License.
1414
import itertools
1515
import json
16+
import logging
1617
import os
1718
from os import path
1819
import threading
1920
from ._fatomic import atomic_open
20-
from .utils import write_to_file
21+
from . import exceptions
22+
from .utils import complete
2123
from .utils import strp_timestamp
2224
from .utils import strf_timestamp
25+
from .utils import write_to_file
26+
27+
28+
_logger = logging.getLogger(__name__)
2329

2430

2531
class _SyncTool(object):
@@ -35,6 +41,8 @@ def __init__(self, client, destination, aoi, scene_type, products,
3541
self.workspace = filters.get('workspace', None)
3642
self._init()
3743
self.sync_file = path.join(self.destination, 'sync.json')
44+
self.error_handler = _logger.exception
45+
self._cancel = False
3846

3947
def _init(self):
4048
dest = self.destination
@@ -87,7 +95,7 @@ def sync(self, callback):
8795
summary = _SyncSummary(self._scene_count * len(self.products))
8896

8997
all_scenes = self.get_scenes_to_sync()
90-
while True:
98+
while not self._cancel:
9199
# bite of chunks of work to not bog down on too many queued jobs
92100
scenes = list(itertools.islice(all_scenes, 100))
93101
if not scenes:
@@ -100,17 +108,26 @@ def sync(self, callback):
100108
for h in handlers:
101109
h.run(self.client, self.scene_type, self.products)
102110
# synchronously await them and then write metadata
103-
for h in handlers:
104-
h.finish()
111+
complete(handlers, self._future_handler, self.client)
105112

106-
if summary.latest:
113+
if summary.latest and not self._cancel:
107114
sync = self._read_sync_file()
108115
sync['latest'] = strf_timestamp(summary.latest)
109116
with atomic_open(self.sync_file, 'wb') as fp:
110117
fp.write(json.dumps(sync, indent=2).encode('utf-8'))
111118

112119
return summary
113120

121+
def _future_handler(self, futures):
122+
for f in futures:
123+
try:
124+
f.finish()
125+
except exceptions.RequestCancelled:
126+
self._cancel = True
127+
break
128+
except:
129+
self.error_handler('Unexpected error')
130+
114131

115132
class _SyncSummary(object):
116133
'''Keep track of summary state, thread safe.'''
@@ -137,19 +154,40 @@ def __init__(self, destination, summary, metadata, user_callback):
137154
self.summary = summary
138155
self.metadata = metadata
139156
self.user_callback = user_callback or (lambda *args: None)
157+
self._cancel = False
158+
self.futures = []
140159

141160
def run(self, client, scene_type, products):
142-
self.futures = []
161+
'''start asynchronous execution, must call finish to await'''
162+
if self._cancel:
163+
return
143164
for product in products:
144165
self.futures.extend(client.fetch_scene_geotiffs(
145166
[self.metadata['id']],
146167
scene_type, product,
147168
callback=self))
148169

170+
def cancel(self):
171+
'''cancel pending downloads'''
172+
self._cancel = True
173+
futures = getattr(self, 'futures', [])
174+
for f in futures:
175+
f.cancel()
176+
149177
def finish(self):
178+
'''await pending downloads and write out metadata
179+
@todo this is not an atomic operation - it's possible that one
180+
product gets downloaded and the other fails.
181+
'''
182+
if self._cancel:
183+
return
184+
150185
for f in self.futures:
151186
f.await()
152187

188+
if self._cancel:
189+
return
190+
153191
# write out metadata
154192
metadata = os.path.join(self.destination,
155193
'%s_metadata.json' % self.metadata['id'])

planet/api/utils.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import json
1818
import os
1919
import re
20+
import threading
2021
from ._fatomic import atomic_open
2122

2223
_ISO_FMT = '%Y-%m-%dT%H:%M:%S.%f+00:00'
@@ -187,3 +188,33 @@ def probably_geojson(input):
187188
])
188189
valid = typename in supported_types
189190
return input if valid else None
191+
192+
193+
def complete(futures, check, client):
194+
'''Wait for the future requests to complete without blocking the main
195+
thread. This is a means to intercept a KeyboardInterrupt and gracefully
196+
shutdown current requests without waiting for them to complete.
197+
198+
The cancel function on each future object should abort processing - any
199+
blocking functions/IO will not be interrupted and this function should
200+
return immediately.
201+
202+
:param futures: sequence of objects with a cancel function
203+
:param check: function that will be called with the provided futures from
204+
a background thread
205+
:param client: the Client to termintate on interrupt
206+
'''
207+
# start a background thread to not block main (otherwise hangs on 2.7)
208+
def run():
209+
check(futures)
210+
t = threading.Thread(target=run)
211+
t.start()
212+
# poll (or we miss the interrupt) and await completion
213+
try:
214+
while t.isAlive():
215+
t.join(.1)
216+
except KeyboardInterrupt:
217+
for f in futures:
218+
f.cancel()
219+
client.shutdown()
220+
raise

planet/scripts/__init__.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import planet
2626
from planet.api.sync import _SyncTool
2727
from planet import api
28+
from planet.api.utils import complete
2829

2930
from requests.packages.urllib3 import exceptions as urllib3exc
3031

@@ -110,6 +111,8 @@ def check_futures(futures):
110111
click_exception(invalid)
111112
except api.APIException as other:
112113
click.echo('WARNING %s' % other.message)
114+
except api.RequestCancelled:
115+
pass
113116

114117

115118
def summarize_throughput(bytes, start_time):
@@ -303,9 +306,10 @@ def fetch_scene_geotiff(scene_ids, scene_type, product, dest):
303306
return
304307

305308
start_time = time.time()
306-
futures = client().fetch_scene_geotiffs(scene_ids, scene_type, product,
307-
api.utils.write_to_file(dest))
308-
check_futures(futures)
309+
cl = client()
310+
futures = cl.fetch_scene_geotiffs(scene_ids, scene_type, product,
311+
api.utils.write_to_file(dest))
312+
complete(futures, check_futures, cl)
309313
summarize_throughput(total_bytes(futures), start_time)
310314

311315

@@ -324,9 +328,10 @@ def fetch_scene_thumbnails(scene_ids, scene_type, size, fmt, dest):
324328
if not scene_ids:
325329
return
326330

327-
futures = client().fetch_scene_thumbnails(scene_ids, scene_type, size, fmt,
328-
api.write_to_file(dest))
329-
check_futures(futures)
331+
cl = client()
332+
futures = cl.fetch_scene_thumbnails(scene_ids, scene_type, size, fmt,
333+
api.write_to_file(dest))
334+
complete(futures, check_futures, cl)
330335

331336

332337
@scene_type
@@ -426,9 +431,10 @@ def download_quads(mosaic_name, quad_ids, dest):
426431
Download quad geotiffs
427432
"""
428433
quad_ids = read(quad_ids, split=True)
429-
futures = call_and_wrap(client().fetch_mosaic_quad_geotiffs, mosaic_name,
434+
cl = client()
435+
futures = call_and_wrap(cl.fetch_mosaic_quad_geotiffs, mosaic_name,
430436
quad_ids, api.write_to_file(dest))
431-
check_futures(futures)
437+
complete(futures, check_futures, cl)
432438

433439

434440
@pretty

0 commit comments

Comments
 (0)