From 03574979dfc059638bf225bcc388832b7d0ada7a Mon Sep 17 00:00:00 2001 From: Ales Teska Date: Fri, 1 Dec 2023 17:02:04 +0100 Subject: [PATCH] Fix. --- asab/library/providers/libsreg.py | 114 ++++++++++++++++-------------- 1 file changed, 62 insertions(+), 52 deletions(-) diff --git a/asab/library/providers/libsreg.py b/asab/library/providers/libsreg.py index a55738e2..a163658e 100644 --- a/asab/library/providers/libsreg.py +++ b/asab/library/providers/libsreg.py @@ -80,6 +80,7 @@ def __init__(self, library, path, layer): self.App.TaskService.schedule(self._periodic_pull(None)) self.App.PubSub.subscribe("Application.tick/60!", self._periodic_pull) + async def _periodic_pull(self, event_name): """ Changes in remote repository are being pulled every minute. @@ -103,58 +104,67 @@ async def _periodic_pull(self, event_name): url = random.choice(self.URLs) - async with aiohttp.ClientSession() as session: - async with session.get(url, headers=headers) as response: - - if response.status == 200: # The request indicates a new version that we don't have yet - - etag_incoming = response.headers.get('ETag') - - # Download new version - newtarfname = os.path.join(self.RootPath, "new.tar.xz") - with open(newtarfname, 'wb') as ftmp: - while True: - chunk = await response.content.read(16 * 1024) - if not chunk: - break - ftmp.write(chunk) - - # Extract the contents to the temporary directory - temp_extract_dir = os.path.join( - self.RootPath, - "new" - ) - - # Remove temp_extract_dir if it exists (from the last, failed run) - if os.path.exists(temp_extract_dir): - shutil.rmtree(temp_extract_dir) - - # Extract the archive into the temp_extract_dir - with tarfile.open(newtarfname, mode='r:xz') as tar: - tar.extractall(temp_extract_dir) - - # Synchronize the temp_extract_dir into the library - synchronize_dirs(self.RepoPath, temp_extract_dir) - await self._set_ready() - - if etag_incoming is not None: - with open(etag_fname, 'w') as f: - f.write(etag_incoming) - - # Remove temp_extract_dir - if os.path.exists(temp_extract_dir): - shutil.rmtree(temp_extract_dir) - - # Remove newtarfname - if os.path.exists(newtarfname): - os.remove(newtarfname) - - elif response.status == 304: - # The repository has not changed ... - await self._set_ready() - - else: - L.exception("Failed to download the library.", struct_data={"url": url, 'status': response.status}) + try: + async with aiohttp.ClientSession() as session: + async with session.get(url, headers=headers) as response: + + if response.status == 200: # The request indicates a new version that we don't have yet + + etag_incoming = response.headers.get('ETag') + + # Download new version + newtarfname = os.path.join(self.RootPath, "new.tar.xz") + with open(newtarfname, 'wb') as ftmp: + while True: + chunk = await response.content.read(16 * 1024) + if not chunk: + break + ftmp.write(chunk) + + # Extract the contents to the temporary directory + temp_extract_dir = os.path.join( + self.RootPath, + "new" + ) + + # Remove temp_extract_dir if it exists (from the last, failed run) + if os.path.exists(temp_extract_dir): + shutil.rmtree(temp_extract_dir) + + # Extract the archive into the temp_extract_dir + with tarfile.open(newtarfname, mode='r:xz') as tar: + tar.extractall(temp_extract_dir) + + # Synchronize the temp_extract_dir into the library + synchronize_dirs(self.RepoPath, temp_extract_dir) + if not self.IsReady: + await self._set_ready() + + if etag_incoming is not None: + with open(etag_fname, 'w') as f: + f.write(etag_incoming) + + # Remove temp_extract_dir + if os.path.exists(temp_extract_dir): + shutil.rmtree(temp_extract_dir) + + # Remove newtarfname + if os.path.exists(newtarfname): + os.remove(newtarfname) + + elif response.status == 304: + # The repository has not changed ... + if not self.IsReady: + await self._set_ready() + + else: + L.error("Failed to download the library.", struct_data={"url": url, 'status': response.status}) + + except aiohttp.ClientError as e: + L.error("Failed to download the library.", struct_data={"url": url, 'exception': e}) + + except asyncio.TimeoutError as e: + L.error("Failed to download the library.", struct_data={"url": url, 'exception': e}) async def subscribe(self, path):