Skip to content

Commit

Permalink
Revert to old remote executor api and unblock archive upload (#31)
Browse files Browse the repository at this point in the history
* Unblock uploading deployment archive

* Raise more informative exceptions

* Copy RemoteExecutor.run_async_subprocess to AWSLambdaExecutor

Revert this change later once we pin against a newer version of
Covalent.

* Update changelog

* Add FT for failing workflow

* Specify executor instance in failing_workflow FT
  • Loading branch information
cjao authored Sep 22, 2022
1 parent c90c387 commit aae9e74
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 13 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [UNRELEASED]

### Fixed

- Reverted temporarily to old RemoteExecutor API
- Moved uploading deployment archive off the main thread

## [0.8.2] - 2022-09-20

### Fixed
Expand Down
52 changes: 41 additions & 11 deletions covalent_awslambda_plugin/awslambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tempfile
import time
from contextlib import contextmanager
from typing import Callable, Dict, List
from typing import Callable, Dict, List, Tuple
from zipfile import ZipFile

import boto3
Expand Down Expand Up @@ -102,7 +102,7 @@ async def install(self, pkg_name: str):
pkg_name,
]
)
proc, stdout, stderr = await AWSExecutor.run_async_subprocess(cmd)
proc, stdout, stderr = await AWSLambdaExecutor.run_async_subprocess(cmd)
if proc.returncode != 0:
app_log.error(stderr)
raise RuntimeError(f"Unable to install package {pkg_name}")
Expand Down Expand Up @@ -395,6 +395,17 @@ def query_result_sync(self, workdir: str, result_filename: str):

return result_object

def _upload_deployment_archive_sync(self, deployment_archive, deployment_archive_name):
with self.get_session() as session:
client = session.client("s3")
try:
client.upload_file(
deployment_archive, self.s3_bucket_name, deployment_archive_name
)
except botocore.exceptions.ClientError as ce:
app_log.exception(ce)
raise

async def query_result(self, workdir: str, result_filename: str):
loop = asyncio.get_running_loop()
fut = loop.run_in_executor(None, self.query_result_sync, workdir, result_filename)
Expand Down Expand Up @@ -449,15 +460,13 @@ async def setup(self, task_metadata: Dict):
)

# Upload archive to s3 bucket
with self.get_session() as session:
client = session.client("s3")
try:
client.upload_file(
deployment_archive, self.s3_bucket_name, deployment_archive_name
)
except botocore.exceptions.ClientError as ce:
app_log.exception(ce)
raise
loop = asyncio.get_running_loop()
await loop.run_in_executor(
None,
self._upload_deployment_archive_sync,
deployment_archive,
deployment_archive_name,
)

app_log.debug(f"Lambda deployment archive: {deployment_archive_name} uploaded to S3 ... ")

Expand Down Expand Up @@ -592,3 +601,24 @@ async def teardown(self, task_metadata: Dict):
app_log.debug(f"Working directory {workdir} deleted")

app_log.debug(f"Finished teardown for task - {dispatch_id} - {node_id}")

# copied from RemoteExecutor
@staticmethod
async def run_async_subprocess(cmd) -> Tuple:
"""
Invokes an async subprocess to run a command.
"""

proc = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)

stdout, stderr = await proc.communicate()

if stdout:
app_log.debug(stdout)

if stderr:
app_log.debug(stderr)

return proc, stdout, stderr
3 changes: 3 additions & 0 deletions covalent_awslambda_plugin/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def lambda_handler(event, context):
s3.download_file("{s3_bucket_name}", "{func_filename}", "/tmp/{func_filename}")
except Exception as e:
print(e)
raise e
with open("/tmp/{func_filename}", "rb") as f:
function, args, kwargs = pickle.load(f)
Expand All @@ -43,6 +44,7 @@ def lambda_handler(event, context):
result = function(*args, **kwargs)
except Exception as e:
print(e)
raise e
with open("/tmp/{result_filename}", "wb") as f:
pickle.dump(result, f)
Expand All @@ -51,4 +53,5 @@ def lambda_handler(event, context):
s3.upload_file("/tmp/{result_filename}", "{s3_bucket_name}", "{result_filename}")
except Exception as e:
print(e)
raise e
"""
23 changes: 23 additions & 0 deletions tests/functional_tests/failing_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import covalent as ct
from covalent._shared_files import logger
from utils.executor_instance import executor

app_log = logger.app_log
log_stack_info = logger.log_stack_info


@ct.electron(executor=executor)
def failing_task(a, b):
raise NotImplementedError("Not implemented!!!")


@ct.lattice
def failing_workflow(a, b):
failing_task(a, b)


dispatch_id = ct.dispatch(failing_workflow)(1, 2)
app_log.debug(f"AWS Lambda functional test `failing_workflow.py` dispatch id: {dispatch_id}")

result = ct.get_result(dispatch_id, wait=True)
assert result.status == ct.status.FAILED
48 changes: 46 additions & 2 deletions tests/lambda_executor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async def test_deployment_package_builder_install_method(mocker):
proc_mock = MagicMock()
proc_mock.returncode = 0
subprocess_mock = mocker.patch(
"covalent_awslambda_plugin.awslambda.AWSExecutor.run_async_subprocess",
"covalent_awslambda_plugin.awslambda.AWSLambdaExecutor.run_async_subprocess",
return_value=(proc_mock, "", ""),
)
mocker.patch("covalent_awslambda_plugin.awslambda.ZipFile")
Expand All @@ -228,7 +228,7 @@ async def test_deployment_package_builder_install_exceptions(mocker):
proc_mock.returncode = 1

subprocess_mock = mocker.patch(
"covalent_awslambda_plugin.awslambda.AWSExecutor.run_async_subprocess",
"covalent_awslambda_plugin.awslambda.AWSLambdaExecutor.run_async_subprocess",
return_value=(proc_mock, "", "error"),
)

Expand Down Expand Up @@ -839,3 +839,47 @@ async def test_teardown_exception(lambda_executor, mocker):
assert exit_mock.call_count == 2
os_path_exists_mock.assert_called_once()
shutil_rmtree_mock.assert_called_once()


@pytest.mark.asyncio
async def test_run_async_subprocess(lambda_executor):
"""Test awslambda executor async subprocess call"""

test_dir, test_file, non_existent_file = "file_dir", "file.txt", "non_existent_file.txt"
create_file = (
f"rm -rf {test_dir} && mkdir {test_dir} && cd {test_dir} && touch {test_file} && echo 'hello remote "
f"executor' >> {test_file} "
)
read_non_existent_file = f"cat {non_existent_file}"

(
create_file_proc,
create_file_stdout,
create_file_stderr,
) = await AWSLambdaExecutor.run_async_subprocess(create_file)

# Test that file creation works as expected
assert create_file_proc.returncode == 0
assert create_file_stdout.decode() == ""
assert create_file_stderr.decode() == ""

# Test that file was created and written to correctly
try:
with open(f"{test_dir}/{test_file}", "r") as test_file:
lines = test_file.readlines()
assert lines[0].strip() == "hello remote executor"

except FileNotFoundError as fe:
pytest.fail(f'Failed to parse {test_file} with exception "{fe}"')

# Test that reading from a non-existent file throws an error and returns a non-zero exit code
(
read_file_proc,
read_file_stdout,
read_file_stderr,
) = await AWSLambdaExecutor.run_async_subprocess(read_non_existent_file)

assert read_file_proc.returncode == 1
assert (
read_file_stderr.decode().strip() == f"cat: {non_existent_file}: No such file or directory"
)

0 comments on commit aae9e74

Please sign in to comment.