From f59d38d5fabbde15b79c0ea8072a262430d4bc90 Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 23 Nov 2022 09:08:42 -0800 Subject: [PATCH 1/3] Add Runner.add_periodic_callback --- adaptive/runner.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index ecc7fe14b..21d6804fe 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -565,6 +565,7 @@ def goal(_): self.task = self.ioloop.create_task(self._run()) self.saving_task = None + self.callbacks = [] if in_ipynb() and not self.ioloop.is_running(): warnings.warn( "The runner has been scheduled, but the asyncio " @@ -669,6 +670,31 @@ def elapsed_time(self): end_time = time.time() return end_time - self.start_time + def add_periodic_callback( + self, + method: Callable[[AsyncRunner]], + interval: int = 30, + ): + """Start a periodic callback that calls the given method on the runner. + + Parameters + ---------- + method : callable + The method to call periodically. + interval : int + The interval in seconds between the calls. + """ + + async def _callback(): + while self.status() == "running": + method(self) + await asyncio.sleep(interval) + method(self) # one last time + + task = self.ioloop.create_task(_callback()) + self.callbacks.append(task) + return task + def start_periodic_saving( self, save_kwargs: dict[str, Any] | None = None, @@ -697,6 +723,8 @@ def start_periodic_saving( ... save_kwargs=dict(fname='data/test.pickle'), ... interval=600) """ + if self.saving_task is not None: + raise RuntimeError("Already saving.") def default_save(learner): learner.save(**save_kwargs) @@ -706,13 +734,7 @@ def default_save(learner): if save_kwargs is None: raise ValueError("Must provide `save_kwargs` if method=None.") - async def _saver(): - while self.status() == "running": - method(self.learner) - await asyncio.sleep(interval) - method(self.learner) # one last time - - self.saving_task = self.ioloop.create_task(_saver()) + self.saving_task = self.add_periodic_callback(method, interval=interval) return self.saving_task From 2fc9f8583e33586fbe2ee6119c434d96357b358b Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 23 Nov 2022 09:42:07 -0800 Subject: [PATCH 2/3] Add cancel_point to the Runner --- adaptive/runner.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/adaptive/runner.py b/adaptive/runner.py index 21d6804fe..79ae4a399 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -223,6 +223,11 @@ def _process_futures(self, done_futs): try: y = fut.result() t = time.time() - fut.start_time # total execution time + except asyncio.CancelledError: + # Cleanup + self._to_retry.pop(pid, None) + self._tracebacks.pop(pid, None) + self._id_to_point.pop(pid, None) except Exception as e: self._tracebacks[pid] = traceback.format_exc() self._to_retry[pid] = self._to_retry.get(pid, 0) + 1 @@ -670,6 +675,22 @@ def elapsed_time(self): end_time = time.time() return end_time - self.start_time + def cancel_point( + self, point: Any | None = None, future: asyncio.Future | None = None + ): + """Cancel a point that is currently being evaluated. + + Parameters + ---------- + point + The point that should be cancelled. + """ + if point is None and future is None: + raise ValueError("Either point or future must be given") + if future is None: + future = next(fut for fut, p in self.pending_points if p == point) + future.cancel() + def add_periodic_callback( self, method: Callable[[AsyncRunner]], From bf1a2c61d3067c6b3ec41f0b8452060a989ffe1d Mon Sep 17 00:00:00 2001 From: Bas Nijholt Date: Wed, 23 Nov 2022 15:32:14 -0800 Subject: [PATCH 3/3] doc-string fixes --- adaptive/runner.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/adaptive/runner.py b/adaptive/runner.py index 79ae4a399..66825b82c 100644 --- a/adaptive/runner.py +++ b/adaptive/runner.py @@ -676,12 +676,16 @@ def elapsed_time(self): return end_time - self.start_time def cancel_point( - self, point: Any | None = None, future: asyncio.Future | None = None + self, future: asyncio.Future | None = None, point: Any | None = None ): - """Cancel a point that is currently being evaluated. + """Cancel a future or point that is currently being evaluated. + + Either the ``future`` or the ``point`` must be provided. Parameters ---------- + future : asyncio.Future + The future that is currently being evaluated. point The point that should be cancelled. """ @@ -691,9 +695,9 @@ def cancel_point( future = next(fut for fut, p in self.pending_points if p == point) future.cancel() - def add_periodic_callback( + def start_periodic_callback( self, - method: Callable[[AsyncRunner]], + method: Callable[[AsyncRunner], None], interval: int = 30, ): """Start a periodic callback that calls the given method on the runner. @@ -753,9 +757,11 @@ def default_save(learner): if method is None: method = default_save if save_kwargs is None: - raise ValueError("Must provide `save_kwargs` if method=None.") + raise ValueError("Must provide `save_kwargs` if `method=None`.") - self.saving_task = self.add_periodic_callback(method, interval=interval) + self.saving_task = self.start_periodic_callback( + lambda r: method(r.learner), interval=interval + ) return self.saving_task