diff --git a/.gitignore b/.gitignore index 14ed0260..cc8c68f9 100644 --- a/.gitignore +++ b/.gitignore @@ -6,8 +6,10 @@ scripts/*.sh # exclude vmms/id_rsa* +vmms/id_ed25519* courselabs/* dockerTmp/* + # config config.py @@ -27,3 +29,10 @@ pip-selfcheck.json # Backup files *.bak + +# Redis +*.rdb + +# Tests +tests/* +!tests/*.py diff --git a/Dockerfile b/Dockerfile index c120e15f..1fd31c81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -32,13 +32,25 @@ RUN apt-get update && apt-get install -y \ ca-certificates \ lxc \ iptables \ + iputils-ping \ && apt-get clean \ && rm -rf /var/lib/apt/lists/* WORKDIR /opt/TangoService/Tango/ -# Install Docker from Docker Inc. repositories. -RUN curl -sSL https://get.docker.com/ -o get_docker.sh && sh get_docker.sh +# Install Docker +RUN set -eux; \ + apt-get update; \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends ca-certificates curl gnupg; \ + install -m 0755 -d /etc/apt/keyrings; \ + curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc; \ + chmod a+r /etc/apt/keyrings/docker.asc; \ + . /etc/os-release; \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu ${VERSION_CODENAME} stable" > /etc/apt/sources.list.d/docker.list; \ + apt-get update; \ + DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends \ + docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin; \ + apt-get clean; rm -rf /var/lib/apt/lists/* # Install the magic wrapper. ADD ./wrapdocker /usr/local/bin/wrapdocker @@ -69,6 +81,7 @@ RUN mkdir -p /var/log/docker /var/log/supervisor # Move custom config file to proper location RUN cp /opt/TangoService/Tango/deployment/config/nginx.conf /etc/nginx/nginx.conf RUN cp /opt/TangoService/Tango/deployment/config/supervisord.conf /etc/supervisor/supervisord.conf +RUN if [ -f /opt/TangoService/Tango/boto.cfg ]; then cp /opt/TangoService/Tango/boto.cfg ~/.boto; fi # Reload new config scripts CMD ["/usr/bin/supervisord", "-c", "/etc/supervisor/supervisord.conf"] diff --git a/README.md b/README.md index d6bcb348..2da007a5 100644 --- a/README.md +++ b/README.md @@ -2,8 +2,7 @@ -Tango -====== +# Tango Tango is a standalone RESTful Web service that runs and manages jobs. A job is a set of files that must satisfy the following constraints: @@ -16,13 +15,13 @@ Upon receiving a job, Tango will copy all of the job's input files into a VM, ru A brief overview of the Tango respository: -* `tango.py` - Main tango server -* `jobQueue.py` - Manages the job queue -* `jobManager.py` - Assigns jobs to free VMs -* `worker.py` - Shepherds a job through its execution -* `preallocator.py` - Manages pools of VMs -* `vmms/` - VMMS library implementations -* `restful_tango/` - HTTP server layer on the main Tango +- `tango.py` - Main tango server +- `jobQueue.py` - Manages the job queue +- `jobManager.py` - Assigns jobs to free VMs +- `worker.py` - Shepherds a job through its execution +- `preallocator.py` - Manages pools of VMs +- `vmms/` - VMMS library implementations +- `restful_tango/` - HTTP server layer on the main Tango Tango was developed as a distributed grading system for [Autolab](https://github.com/autolab/Autolab) at Carnegie Mellon University and has been extensively used for autograding programming assignments in CMU courses. @@ -35,7 +34,63 @@ Please feel free to use Tango at your school/organization. If you run into any p 3. [Read the documentation for the VMMS API](https://docs.autolabproject.com/tango-vmms/). 4. [Test whether Tango is set up properly and can process jobs](https://docs.autolabproject.com/tango-cli/). +## Stress Testing Tango + +To stress test Tango by running a large number of submissions, use `stressTest.py`. Currently, this is not a feature on the master branch. To use this feature, go on the `copy-in`. + +### Setting up the testing directory + +Create your testing directory by copying the  `sample_test` directory into the `my_tests` directory. + +``` +cp -r sample_tests my_tests/ +``` + +A brief overview of the testing directory + +- `input` - Directory to put your input files +- `output` - Directory for the autograder output for each of the test submissions +- `.yaml` - Yaml file to configure the stress test +- `expected_output.txt` - Expected JSON output of the autograder +- `summary.txt` - Summary of the autograder outputs +- `log.txt` - Log of the submissions + +First, rename the `sample_test.yaml` to be `.yaml` + +``` +mv sample_test.yaml .yaml +``` + +Next, update the Yaml file. + +```yaml +num_submissions: 5 +submission_delay: 0.1 +autograder_image: +output_file: log.txt +tango_port: 3001 +cli_path: /clients/tango-cli.py +instance_type: +timeout: 180 +ec2: True +expected_output: expected_output.txt +stop_before: +``` + +After creating the Yaml file, copy the `autograde-Makefile`, `autograde.tar` and the file to submit in the `input` directory. + +### Running the stress test + +``` +virtualenv env +source env/bin/activate +pip install -r requirements.txt +cd /tests +python3 stressTest.py --test_dir my_tests/ +``` + ## Python 2 Support + Tango now runs on Python 3. However, there is a legacy branch [master-python2](https://github.com/autolab/Tango/tree/master-python2) which is a snapshot of the last Python 2 Tango commit for legacy reasons. You are strongly encouraged to upgrade to the current Python 3 version of Tango if you are still on the Python 2 version, as future enhancements and bug fixes will be focused on the current master. We will not be backporting new features from `master` to `master-python2`. diff --git a/clients/tango-cli.py b/clients/tango-cli.py index 83dd12ce..701e66cd 100755 --- a/clients/tango-cli.py +++ b/clients/tango-cli.py @@ -12,10 +12,45 @@ import argparse import sys import os +from dataclasses import dataclass, asdict +from typing import Optional sys.path.append("/usr/lib/python2.7/site-packages/") +def get_arg(name, default=None): + """Helper function to safely get arguments using dictionary .get() method""" + if not hasattr(args, name): + raise KeyError(f"Argument '{name}' not found in args") + return vars(args).get(name, default) + + +@dataclass +class RequestObj: + """Dataclass for job request objects""" + image: str + files: str + timeout: int + max_kb: int + output_file: str + jobName: str + accessKeyId: str + accessKey: str + disable_network: bool + instanceType: str + stopBefore: str + notifyURL: Optional[str] = None + callback_url: Optional[str] = None + + +@dataclass +class VmObj: + """Dataclass for VM allocation objects""" + vmms: str + cores: int + memory: int + + # # # Set up the command line parser @@ -71,11 +106,6 @@ parser.add_argument("--runJob", help="Run a job from a specific directory") parser.add_argument("--numJobs", type=int, default=1, help="Number of jobs to run") -parser.add_argument( - "--vmms", - default="localDocker", - help="Choose vmms between ec2SSH, tashiSSH, localDocker, and distDocker", -) parser.add_argument( "--image", default="", help='VM image name (default "autograding_image")' ) @@ -116,6 +146,10 @@ "--notifyURL", help="Complete URL for Tango to give callback to once job is complete.", ) +parser.add_argument( + "--callbackURL", + help="Complete URL for Tango to give callback to once job is complete.", +) parser.add_argument( "--disableNetwork", action="store_true", @@ -127,45 +161,45 @@ parser.add_argument("--accessKeyId", default="", help="AWS account access key ID") parser.add_argument("--accessKey", default="", help="AWS account access key content") parser.add_argument("--instanceType", default="", help="AWS EC2 instance type") - +parser.add_argument("--stopBefore", default="", help="Stops the worker before a function is executed") def checkKey(): - if args.key is None: + if get_arg('key') is None: print("Key must be specified with -k") return -1 return 0 def checkCourselab(): - if args.courselab is None: + if get_arg('courselab') is None: print("Courselab must be specified with -l") return -1 return 0 def checkFilename(): - if args.filename is None: + if get_arg('filename') is None: print("Filename must be specified with --filename") return -1 return 0 def checkInfiles(): - if args.infiles is None: + if get_arg('infiles') is None: print("Input files must be specified with --infiles") return -1 return 0 def checkDeadjobs(): - if args.deadJobs is None: + if get_arg('deadJobs') is None: print("Deadjobs must be specified with --deadJobs") return -1 return 0 def checkImageName(): - if args.imageName is None: + if get_arg('imageName') is None: print("Image name must be specified with --imageName") return -1 return 0 @@ -184,18 +218,18 @@ def tango_open(): response = requests.get( "%s://%s:%d/open/%s/%s/" - % (_tango_protocol, args.server, args.port, args.key, args.courselab) + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) ) print( "Sent request to %s:%d/open/%s/%s/" - % (args.server, args.port, args.key, args.courselab) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/open/%s/%s/" - % (args.server, args.port, args.key, args.courselab) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')) ) print(str(err)) sys.exit(0) @@ -210,28 +244,28 @@ def tango_upload(): if res != 0: raise Exception("Invalid usage: [upload] " + upload_help) - f = open(args.filename) - dirs = args.filename.split("/") + dirs = get_arg('filename').split("/") filename = dirs[len(dirs) - 1] header = {"Filename": filename} + f = open(get_arg('filename'), 'rb') response = requests.post( "%s://%s:%d/upload/%s/%s/" - % (_tango_protocol, args.server, args.port, args.key, args.courselab), + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')), data=f.read(), headers=header, ) f.close() print( "Sent request to %s:%d/upload/%s/%s/ filename=%s" - % (args.server, args.port, args.key, args.courselab, args.filename) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), get_arg('filename')) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/upload/%s/%s/ filename=%s" - % (args.server, args.port, args.key, args.courselab, args.filename) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), get_arg('filename')) ) print(str(err)) sys.exit(0) @@ -247,36 +281,37 @@ def tango_addJob(): if res != 0: raise Exception("Invalid usage: [addJob] " + addJob_help) - requestObj["image"] = args.image - requestObj["files"] = args.infiles - requestObj["timeout"] = args.timeout - requestObj["max_kb"] = args.maxsize - requestObj["output_file"] = args.outputFile - requestObj["jobName"] = args.jobname - - if args.notifyURL: - requestObj["notifyURL"] = args.notifyURL - - requestObj["accessKeyId"] = args.accessKeyId - requestObj["accessKey"] = args.accessKey - requestObj["disable_network"] = args.disableNetwork - requestObj["instanceType"] = args.instanceType + requestObj = RequestObj( + image=get_arg('image'), + files=get_arg('infiles'), + timeout=get_arg('timeout'), + max_kb=get_arg('maxsize'), + output_file=get_arg('outputFile'), + jobName=get_arg('jobname'), + accessKeyId=get_arg('accessKeyId'), + accessKey=get_arg('accessKey'), + disable_network=get_arg('disableNetwork'), + instanceType=get_arg('instanceType'), + stopBefore=get_arg('stopBefore'), + notifyURL=get_arg('notifyURL'), + callback_url=get_arg('callbackURL'), + ) response = requests.post( "%s://%s:%d/addJob/%s/%s/" - % (_tango_protocol, args.server, args.port, args.key, args.courselab), - data=json.dumps(requestObj), + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab')), + data=json.dumps(asdict(requestObj)), ) print( "Sent request to %s:%d/addJob/%s/%s/ \t jobObj=%s" - % (args.server, args.port, args.key, args.courselab, json.dumps(requestObj)) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), json.dumps(asdict(requestObj))) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/addJob/%s/%s/ \t jobObj=%s" - % (args.server, args.port, args.key, args.courselab, json.dumps(requestObj)) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('courselab'), json.dumps(asdict(requestObj)) if 'requestObj' in locals() else 'N/A') ) print(str(err)) sys.exit(0) @@ -291,19 +326,19 @@ def tango_getPartialOutput(): "%s://%s:%d/getPartialOutput/%s/%s/" % ( _tango_protocol, - args.server, - args.port, - args.key, - args.jobid, + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('jobid'), ) ) print( "Sent request to %s:%d/getPartialOutput/%s/%s/" % ( - args.server, - args.port, - args.key, - args.jobid, + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('jobid'), ) ) print(response.text) @@ -311,10 +346,10 @@ def tango_getPartialOutput(): print( "Failed to send request to %s:%d/getPartialOutput/%s/%s/" % ( - args.server, - args.port, - args.key, - args.jobid, + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('jobid'), ) ) print(str(err)) @@ -334,21 +369,21 @@ def tango_poll(): "%s://%s:%d/poll/%s/%s/%s/" % ( _tango_protocol, - args.server, - args.port, - args.key, - args.courselab, - urllib.parse.quote(args.outputFile), + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('courselab'), + urllib.parse.quote(get_arg('outputFile')), ) ) print( "Sent request to %s:%d/poll/%s/%s/%s/" % ( - args.server, - args.port, - args.key, - args.courselab, - urllib.parse.quote(args.outputFile), + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('courselab'), + urllib.parse.quote(get_arg('outputFile')), ) ) print(response.text) @@ -357,11 +392,11 @@ def tango_poll(): print( "Failed to send request to %s:%d/poll/%s/%s/%s/" % ( - args.server, - args.port, - args.key, - args.courselab, - urllib.parse.quote(args.outputFile), + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('courselab'), + urllib.parse.quote(get_arg('outputFile')), ) ) print(str(err)) @@ -378,15 +413,15 @@ def tango_info(): raise Exception("Invalid usage: [info] " + info_help) response = requests.get( - "%s://%s:%d/info/%s/" % (_tango_protocol, args.server, args.port, args.key) + "%s://%s:%d/info/%s/" % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key')) ) - print("Sent request to %s:%d/info/%s/" % (args.server, args.port, args.key)) + print("Sent request to %s:%d/info/%s/" % (get_arg('server'), get_arg('port'), get_arg('key'))) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/info/%s/" - % (args.server, args.port, args.key) + % (get_arg('server'), get_arg('port'), get_arg('key')) ) print(str(err)) sys.exit(0) @@ -403,18 +438,18 @@ def tango_jobs(): response = requests.get( "%s://%s:%d/jobs/%s/%d/" - % (_tango_protocol, args.server, args.port, args.key, args.deadJobs) + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) ) print( "Sent request to %s:%d/jobs/%s/%d/" - % (args.server, args.port, args.key, args.deadJobs) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/jobs/%s/%d/" - % (args.server, args.port, args.key, args.deadJobs) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('deadJobs')) ) print(str(err)) sys.exit(0) @@ -431,18 +466,18 @@ def tango_pool(): response = requests.get( "%s://%s:%d/pool/%s/%s/" - % (_tango_protocol, args.server, args.port, args.key, args.image) + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) ) print( "Sent request to %s:%d/pool/%s/%s/" - % (args.server, args.port, args.key, args.image) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) ) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/pool/%s/%s/" - % (args.server, args.port, args.key, args.image) + % (get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image')) ) print(str(err)) sys.exit(0) @@ -458,23 +493,23 @@ def tango_prealloc(): if res != 0: raise Exception("Invalid usage: [prealloc] " + prealloc_help) - vmObj["vmms"] = args.vmms - vmObj["cores"] = args.cores - vmObj["memory"] = args.memory + vmObj["vmms"] = get_arg('vmms') + vmObj["cores"] = get_arg('cores') + vmObj["memory"] = get_arg('memory') response = requests.post( "%s://%s:%d/prealloc/%s/%s/%s/" - % (_tango_protocol, args.server, args.port, args.key, args.image, args.num), + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key'), get_arg('image'), get_arg('num')), data=json.dumps(vmObj), ) print( "Sent request to %s:%d/prealloc/%s/%s/%s/ \t vmObj=%s" % ( - args.server, - args.port, - args.key, - args.image, - args.num, + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('image'), + get_arg('num'), json.dumps(vmObj), ) ) @@ -484,11 +519,11 @@ def tango_prealloc(): print( "Failed to send request to %s:%d/prealloc/%s/%s/%s/ \t vmObj=%s" % ( - args.server, - args.port, - args.key, - args.image, - args.num, + get_arg('server'), + get_arg('port'), + get_arg('key'), + get_arg('image'), + get_arg('num'), json.dumps(vmObj), ) ) @@ -514,21 +549,21 @@ def tango_build(): if res != 0: raise Exception("Invalid usage: [build] " + build_help) - f = open(args.filename, "rb") - header = {"imageName": args.imageName} + f = open(get_arg('filename'), "rb") + header = {"imageName": get_arg('imageName')} response = requests.post( "%s://%s:%d/build/%s/" - % (_tango_protocol, args.server, args.port, args.key), + % (_tango_protocol, get_arg('server'), get_arg('port'), get_arg('key')), data=f.read(), headers=header, ) - print("Sent request to %s:%d/build/%s/" % (args.server, args.port, args.key)) + print("Sent request to %s:%d/build/%s/" % (get_arg('server'), get_arg('port'), get_arg('key'))) print(response.text) except Exception as err: print( "Failed to send request to %s:%d/build/%s/" - % (args.server, args.port, args.key) + % (get_arg('server'), get_arg('port'), get_arg('key')) ) print(str(err)) sys.exit(0) @@ -538,11 +573,11 @@ def tango_build(): def tango_runJob(): - if args.runJob is None: + if get_arg('runJob') is None: print("Invalid usage: [runJob]") sys.exit(0) - dir = args.runJob + dir = get_arg('runJob') infiles = [ file for file in os.listdir(dir) if os.path.isfile(os.path.join(dir, file)) ] @@ -551,7 +586,7 @@ def tango_runJob(): args.jobname += "-0" args.outputFile += "-0" - for i in range(1, args.numJobs + 1): + for i in range(1, get_arg('numJobs') + 1): print( "----------------------------------------- STARTING JOB " + str(i) @@ -574,27 +609,27 @@ def tango_runJob(): def router(): - if args.open: + if get_arg('open'): tango_open() - elif args.upload: + elif get_arg('upload'): tango_upload() - elif args.addJob: + elif get_arg('addJob'): tango_addJob() - elif args.poll: + elif get_arg('poll'): tango_poll() - elif args.info: + elif get_arg('info'): tango_info() - elif args.jobs: + elif get_arg('jobs'): tango_jobs() - elif args.pool: + elif get_arg('pool'): tango_pool() - elif args.prealloc: + elif get_arg('prealloc'): tango_prealloc() - elif args.runJob: + elif get_arg('runJob'): tango_runJob() - elif args.getPartialOutput: + elif get_arg('getPartialOutput'): tango_getPartialOutput() - elif args.build: + elif get_arg('build'): tango_build() @@ -603,32 +638,32 @@ def router(): # args = parser.parse_args() if ( - not args.open - and not args.upload - and not args.addJob - and not args.poll - and not args.info - and not args.jobs - and not args.pool - and not args.prealloc - and not args.runJob - and not args.getPartialOutput - and not args.build + not get_arg('open') + and not get_arg('upload') + and not get_arg('addJob') + and not get_arg('poll') + and not get_arg('info') + and not get_arg('jobs') + and not get_arg('pool') + and not get_arg('prealloc') + and not get_arg('runJob') + and not get_arg('getPartialOutput') + and not get_arg('build') ): parser.print_help() sys.exit(0) -if args.ssl: +if get_arg('ssl'): _tango_protocol = "https" - if args.port == 3000: + if get_arg('port') == 3000: args.port = 443 try: - response = requests.get("%s://%s:%d/" % (_tango_protocol, args.server, args.port)) + response = requests.get("%s://%s:%d/" % (_tango_protocol, get_arg('server'), get_arg('port'))) response.raise_for_status() except BaseException: - print("Tango not reachable on %s:%d!\n" % (args.server, args.port)) + print("Tango not reachable on %s:%d!\n" % (get_arg('server'), get_arg('port'))) sys.exit(0) router() diff --git a/jobManager.py b/jobManager.py index c31b9bc4..752c670a 100644 --- a/jobManager.py +++ b/jobManager.py @@ -11,9 +11,9 @@ # import copy -import time import logging import threading +import time import traceback from datetime import datetime @@ -21,8 +21,6 @@ from config import Config from tangoObjects import TangoQueue from worker import Worker -from preallocator import Preallocator -from jobQueue import JobQueue class JobManager(object): @@ -74,10 +72,11 @@ def __manage(self): try: # if the job is a ec2 vmms job # spin up an ec2 instance for that job - if job.vm.ec2_vmms: + if Config.VMMS_NAME == "ec2SSH": from vmms.ec2SSH import Ec2SSH vmms = Ec2SSH(job.accessKeyId, job.accessKey) + newVM = copy.deepcopy(job.vm) newVM.id = self._getNextID() try: @@ -86,7 +85,9 @@ def __manage(self): self.log.error("ERROR initialization VM: %s", e) self.log.error(traceback.format_exc()) if preVM is None: - raise Exception("EC2 SSH VM initialization failed: see log") + raise Exception( + "EC2 SSH VM initialization failed: see log" + ) else: # Try to find a vm on the free list and allocate it to # the worker if successful. @@ -113,17 +114,24 @@ def __manage(self): ) # Mark the job assigned self.jobQueue.assignJob(job.id, preVM) - Worker(job, vmms, self.jobQueue, self.preallocator, preVM).start() + Worker( + job, vmms, self.jobQueue, self.preallocator, preVM + ).start() except Exception as err: - self.log.error("job failed during creation %d %s" % (job.id, str(err))) - self.jobQueue.makeDead(job.id, str(err)) + if job is None: + self.log.info("job_manager: job is None") + else: + self.log.error( + "job failed during creation %d %s" % (job.id, str(err)) + ) + self.jobQueue.makeDead(job.id, str(err)) if __name__ == "__main__": if not Config.USE_REDIS: - print( + tango.log.error( "You need to have Redis running to be able to initiate stand-alone\ JobManager" ) @@ -133,7 +141,14 @@ def __manage(self): tango.resetTango(tango.preallocator.vmms) for key in tango.preallocator.machines.keys(): tango.preallocator.machines.set(key, [[], TangoQueue(key)]) + + # The above call sets the total pool empty. But the free pool which + # is a queue in redis, may not be empty. When the job manager restarts, + # resetting the free queue using the key doesn't change its content. + # Therefore we empty the queue, thus the free pool, to keep it consistent + # with the total pool. + tango.preallocator.machines.get(key)[1].make_empty() jobs = JobManager(tango.jobQueue) - print("Starting the stand-alone Tango JobManager") + tango.log.info("Starting the stand-alone Tango JobManager") jobs.run() diff --git a/preallocator.py b/preallocator.py index 9d7670e9..93e9c822 100644 --- a/preallocator.py +++ b/preallocator.py @@ -47,6 +47,7 @@ def update(self, vm, num): self.lock.acquire() if vm.name not in self.machines: self.machines.set(vm.name, [[], TangoQueue(vm.name)]) + self.machines.get(vm.name)[1].make_empty() self.log.debug("Creating empty pool of %s instances" % (vm.name)) self.lock.release() diff --git a/requirements.txt b/requirements.txt index 0591f3e7..548d9625 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ urllib3==1.26.19 docker==5.0.3 backoff==2.2.1 pytz +pyyaml diff --git a/restful_tango/tangoREST.py b/restful_tango/tangoREST.py index 39774111..c6a13686 100644 --- a/restful_tango/tangoREST.py +++ b/restful_tango/tangoREST.py @@ -164,9 +164,11 @@ def convertJobObj(self, dirName, jobObj): ) input.append(handinfile) - ec2_vmms = False - if "ec2Vmms" in jobObj: - ec2_vmms = True + ec2_vmms = Config.VMMS_NAME == "ec2SSH" + + stopBefore = "" + if "stopBefore" in jobObj: + stopBefore = jobObj["stopBefore"] instance_type = None if "instanceType" in jobObj and len(jobObj["instanceType"]) > 0: @@ -198,6 +200,7 @@ def convertJobObj(self, dirName, jobObj): accessKey=accessKey, accessKeyId=accessKeyId, disableNetwork=disableNetwork, + stopBefore=stopBefore ) self.log.debug("inputFiles: %s" % [file.localFile for file in input]) diff --git a/tangoObjects.py b/tangoObjects.py index 2dc03dbe..899c3181 100644 --- a/tangoObjects.py +++ b/tangoObjects.py @@ -100,6 +100,7 @@ def __init__( accessKeyId=None, accessKey=None, disableNetwork=None, + stopBefore="", ): self.assigned = False self.retries = 0 @@ -120,6 +121,7 @@ def __init__( self.accessKeyId = accessKeyId self.accessKey = accessKey self.disableNetwork = disableNetwork + self.stopBefore = "stopBefore" def __repr__(self): self.syncRemote() @@ -319,6 +321,9 @@ def remove(self, item): def _clean(self): self.__db.delete(self.key) + def make_empty(self): + self.__db.delete(self.key) + # This is an abstract class that decides on # if we should initiate a TangoRemoteDictionary or TangoNativeDictionary diff --git a/tests/sample_test/expected_output.txt b/tests/sample_test/expected_output.txt new file mode 100644 index 00000000..70c379b6 --- /dev/null +++ b/tests/sample_test/expected_output.txt @@ -0,0 +1 @@ +Hello world \ No newline at end of file diff --git a/tests/sample_test/input/autograde-Makefile b/tests/sample_test/input/autograde-Makefile new file mode 100644 index 00000000..10870823 --- /dev/null +++ b/tests/sample_test/input/autograde-Makefile @@ -0,0 +1,2 @@ +autograde: + bash hello.sh diff --git a/tests/sample_test/input/hello.sh b/tests/sample_test/input/hello.sh new file mode 100755 index 00000000..46c0d0e9 --- /dev/null +++ b/tests/sample_test/input/hello.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "Hello world" diff --git a/tests/sample_test/sample_test.yaml b/tests/sample_test/sample_test.yaml new file mode 100644 index 00000000..43dd5030 --- /dev/null +++ b/tests/sample_test/sample_test.yaml @@ -0,0 +1,11 @@ +num_submissions: 5 +submission_delay: 0.1 +autograder_image: ec2_213_llvm_14_s25 +output_file: log.txt +tango_port: 3001 +cli_path: /home/snarita/Autolab/Tango/clients/tango-cli.py +instance_type: t2.micro +timeout: 180 +ec2: True +expected_output: expected_output.txt +stop_before: diff --git a/tests/stressTest.py b/tests/stressTest.py new file mode 100644 index 00000000..6494a8c4 --- /dev/null +++ b/tests/stressTest.py @@ -0,0 +1,161 @@ +import argparse +import subprocess +import time +import os +import sys + +import tornado.web + +import yaml + +test_dir = "" +sub_num = 0 +finished_tests = dict() +start_time = time.time() +expected_output = "" + +def printProgressBar (iteration, total, prefix = '', suffix = '', decimals = 1, length = 100, fill = '█', printEnd = "\r"): + """ + Call in a loop to create terminal progress bar + @params: + iteration - Required : current iteration (Int) + total - Required : total iterations (Int) + prefix - Optional : prefix string (Str) + suffix - Optional : suffix string (Str) + decimals - Optional : positive number of decimals in percent complete (Int) + length - Optional : character length of bar (Int) + fill - Optional : bar fill character (Str) + printEnd - Optional : end character (e.g. "\r", "\r\n") (Str) + """ + percent = ("{0:." + str(decimals) + "f}").format(100 * (iteration / float(total))) + filledLength = int(length * iteration // total) + bar = fill * filledLength + '-' * (length - filledLength) + print(f'\r{prefix} |{bar}| {percent}% {suffix}', end = printEnd) + # Print New Line on Complete + if iteration == total: + print() + +def run_stress_test(num_submissions, submission_delay, autograder_image, output_file, tango_port, cli_path, + job_name, job_path, instance_type, timeout, ec2): + printProgressBar(0, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50) + with open(output_file, 'a') as f: + f.write(f"Stress testing with {num_submissions} submissions\n") + + for i in range(1, num_submissions + 1): + command = [ + 'python3', cli_path, + '-P', str(tango_port), + '-k', 'test', + '-l', job_name, + '--runJob', job_path, + '--image', autograder_image, + '--instanceType', instance_type, + '--timeout', str(timeout), + '--callbackURL', ("http://localhost:8888/autograde_done?id=%d" % (i)) + ] + if ec2: + command += ['--ec2'] + subprocess.run(command, stdout=f, stderr=f) + f.write(f"Submission {i} completed\n") + printProgressBar(i, num_submissions, prefix = 'Jobs Added:', suffix = 'Complete', length = 50) + if submission_delay > 0: + time.sleep(submission_delay) + print() + +class AutogradeDoneHandler(tornado.web.RequestHandler): + def post(self): + global finished_tests + global test_dir + global sub_num + global start_time + id = self.get_query_argument("id") + fileBody = self.request.files["file"][0]["body"].decode() + scoreJson = fileBody.split("\n")[-2] + with open(os.path.join(test_dir, "output", "output%s.txt" % id), 'w') as f: + f.write(fileBody) + finished_tests[str(id)] = scoreJson + printProgressBar(len(finished_tests), sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50) + self.write("ok") + self.flush() + + def on_finish(self): + global finished_tests, sub_num, shutdown_event + if len(finished_tests) == sub_num: + print("\nAll tests completed. Generating summary...") + create_summary() + print("Test Summary in summary.txt") + print("\nShutting down server...") + tornado.ioloop.IOLoop.current().stop() + +def create_summary(): + success = [] + failed = [] + for i in range(1, sub_num + 1): + if expected_output == finished_tests[str(i)]: + success.append(i) + else: + failed.append(i) + with open(os.path.join(test_dir, "summary.txt"), 'w') as f: + f.write("Total Time: %d seconds\n" % (time.time() - start_time)) + f.write("Total Succeeded: %d / %d\n" % (len(success), sub_num)) + f.write("Total Failed: %d / %d\n" % (len(failed), sub_num)) + f.write("\n===========================================================\n") + f.write("The expected value is:\n") + f.write(expected_output) + f.write("\n\n===========================================================\n") + f.write("Failed Cases:\n") + for i in range(0, len(failed)): + f.write("Test Case #%d: %s\n" % (failed[i], finished_tests[str(failed[i])])) + +def make_app(): + return tornado.web.Application([ + (r"/autograde_done", AutogradeDoneHandler), + ]) + +def notifyServer(): + global shutdown_event + app = make_app() + app.listen(8888) + printProgressBar(0, sub_num, prefix = 'Tests Done:', suffix = 'Complete', length = 50) + tornado.ioloop.IOLoop.current().start() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Stress test script for Tango") + parser.add_argument('--test_dir', type=str, required=True, help="Directory to run the test in") + + args = parser.parse_args() + + dirname = os.path.basename(args.test_dir) + + test_dir = args.test_dir + + with open(os.path.join(args.test_dir, dirname + '.yaml'), 'r') as f: + data = yaml.load(f, Loader=yaml.SafeLoader) + + with open(os.path.join(args.test_dir, data["expected_output"]), 'r') as f: + expected_output = f.read() + + sub_num = data["num_submissions"] + finished_tests = dict() + start_time = time.time() + + subprocess.run("rm -rf %s/output" % args.test_dir, shell=True) + subprocess.run("mkdir %s/output" % args.test_dir, shell=True) + + print() + + run_stress_test( + data["num_submissions"], + data["submission_delay"], + data["autograder_image"], + os.path.join(args.test_dir, data["output_file"]), + data["tango_port"], + data["cli_path"], + dirname, + os.path.join(args.test_dir, 'input'), + data["instance_type"], + data["timeout"], + data["ec2"] + ) + + notifyServer() \ No newline at end of file diff --git a/vmms/ec2SSH.py b/vmms/ec2SSH.py index c6616afc..e3b7607c 100644 --- a/vmms/ec2SSH.py +++ b/vmms/ec2SSH.py @@ -8,21 +8,19 @@ # Ec2Exception - EC2 raises this if it encounters any problem # ec2CallError - raised by ec2Call() function # -# TODO: this currently probably does not work on Python 3 yet -import subprocess +import logging import os import re -import time -import logging +import subprocess import threading -import config +import time + import backoff import boto3 from botocore.exceptions import ClientError -import pytz - +import config from tangoObjects import TangoMachine # suppress most boto logging @@ -222,13 +220,6 @@ def __init__(self, accessKeyId=None, accessKey=None): taggedAMIs = [self.img2ami[key].id for key in self.img2ami] ignoredAMIs = list(set(imageAMIs) - set(taggedAMIs)) - self.log.info("imageAMIs") - self.log.info(imageAMIs) - self.log.info("taggedAMIs") - self.log.info(taggedAMIs) - self.log.info("ignoredAMIs") - self.log.info(ignoredAMIs) - if len(ignoredAMIs) > 0: self.log.info( "Ignored images %s for lack of or ill-formed name tag" @@ -381,23 +372,34 @@ def initializeVM(self, vm): start_time = time.time() while True: - filters = [{"Name": "instance-state-name", "Values": ["running"]}] + filters = [ + {"Name": "instance-state-name", "Values": ["running"]} + ] instances = self.boto3resource.instances.filter(Filters=filters) instanceRunning = False # reload the state of the new instance try_load_instance(newInstance) for inst in instances.filter(InstanceIds=[newInstance.id]): - self.log.debug("VM %s %s: is running" % (vm.name, newInstance.id)) + self.log.debug( + "VM %s %s: is running" % (vm.name, newInstance.id) + ) instanceRunning = True if instanceRunning: break - if time.time() - start_time > config.Config.INITIALIZEVM_TIMEOUT: + if ( + time.time() - start_time + > config.Config.INITIALIZEVM_TIMEOUT + ): raise ValueError( "VM %s %s: timeout (%d seconds) before reaching 'running' state" - % (vm.name, newInstance.id, config.Config.TIMER_POLL_INTERVAL) + % ( + vm.name, + newInstance.id, + config.Config.TIMER_POLL_INTERVAL, + ) ) self.log.debug( @@ -408,7 +410,8 @@ def initializeVM(self, vm): # Assign name to EC2 instance self.boto3resource.create_tags( - Resources=[newInstance.id], Tags=[{"Key": "Name", "Value": vm.name}] + Resources=[newInstance.id], + Tags=[{"Key": "Name", "Value": vm.name}], ) self.log.info( @@ -510,18 +513,28 @@ def waitVM(self, vm, max_secs): # Sleep a bit before trying again time.sleep(config.Config.TIMER_POLL_INTERVAL) - def copyIn(self, vm, inputFiles): - """copyIn - Copy input files to VM""" - self.log.info("copyIn %s - writing files" % self.instanceName(vm.id, vm.name)) + def copyIn(self, vm, inputFiles, job_id=None): + """copyIn - Copy input files to VM + Args: + - vm is a TangoMachine object + - inputFiles is a list of objects with attributes localFile and destFile. + localFile is the file on the host, destFile is the file on the VM. + - job_id is the job id of the job being run on the VM. + It is used for logging purposes only. + """ + self.log.info( + "copyIn %s - writing files" % self.instanceName(vm.id, vm.name) + ) domain_name = self.domainName(vm) + # Creates directory and add permissions result = subprocess.run( ["ssh"] + self.ssh_flags + [ "%s@%s" % (self.ec2User, domain_name), - "(mkdir autolab)", + "(mkdir -p autolab && chmod 775 autolab)", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -530,11 +543,22 @@ def copyIn(self, vm, inputFiles): # Print the output and error for line in result.stdout: - self.log.info("%s" % line) - self.log.info("Standard Error: %s" % result.stderr) - self.log.info("Return Code: %s" % result.returncode) + self.log.info("%s for job %s" % (line, job_id)) + self.log.info("Return Code: %s, job: %s" % (result.returncode, job_id)) + if result.stderr != 0: + self.log.info( + "Standard Error: %s, job: %s" % (result.stderr, job_id) + ) + + # Validate inputFiles structure + if not inputFiles or not all( + hasattr(file, "localFile") and hasattr(file, "destFile") + for file in inputFiles + ): + self.log.info( + "Error: Invalid inputFiles Structure, job: %s" % job_id + ) - # Copy the input files to the input directory for file in inputFiles: self.log.info("%s - %s" % (file.localFile, file.destFile)) ret = timeout_with_retries( @@ -542,11 +566,13 @@ def copyIn(self, vm, inputFiles): + self.ssh_flags + [ file.localFile, - "%s@%s:~/autolab/%s" % (self.ec2User, domain_name, file.destFile), + "%s@%s:~/autolab/%s" + % (self.ec2User, domain_name, file.destFile), ], config.Config.COPYIN_TIMEOUT, ) if ret != 0: + self.log.info("Copy-in Error: SCP failure, job: %s" % job_id) return ret return 0 @@ -574,7 +600,9 @@ def runJob(self, vm, runTimeout, maxOutputFileSize, disableNetwork): # no logging for now ret = timeout( - ["ssh"] + self.ssh_flags + ["%s@%s" % (self.ec2User, domain_name), runcmd], + ["ssh"] + + self.ssh_flags + + ["%s@%s" % (self.ec2User, domain_name), runcmd], runTimeout * 2, ) @@ -586,7 +614,9 @@ def copyOut(self, vm, destFile): outputFile on the Tango host. """ self.log.info( - "copyOut %s - writing to %s", self.instanceName(vm.id, vm.name), destFile + "copyOut %s - writing to %s", + self.instanceName(vm.id, vm.name), + destFile, ) domain_name = self.domainName(vm) @@ -631,7 +661,10 @@ def copyOut(self, vm, destFile): return timeout( ["scp"] + self.ssh_flags - + ["%s@%s:output" % (config.Config.EC2_USER_NAME, domain_name), destFile], + + [ + "%s@%s:output" % (config.Config.EC2_USER_NAME, domain_name), + destFile, + ], config.Config.COPYOUT_TIMEOUT, ) @@ -647,7 +680,9 @@ def destroyVM(self, vm): InstanceIds=[vm.instance_id] ) if not instances: - self.log.debug("no instances found with instance id %s", vm.instance_id) + self.log.debug( + "no instances found with instance id %s", vm.instance_id + ) # Keep the vm and mark with meaningful tags for debugging if ( hasattr(config.Config, "KEEP_VM_AFTER_FAILURE") @@ -674,7 +709,9 @@ def destroyVM(self, vm): if not self.useDefaultKeyPair: self.deleteKeyPair() except Exception as e: - self.log.error("destroyVM failed: %s for vm %s" % (e, vm.instance_id)) + self.log.error( + "destroyVM failed: %s for vm %s" % (e, vm.instance_id) + ) Ec2SSH.release_vm_semaphore() @@ -688,7 +725,10 @@ def getVMs(self): try: vms = list() filters = [ - {"Name": "instance-state-name", "Values": ["running", "pending"]} + { + "Name": "instance-state-name", + "Values": ["running", "pending"], + } ] # gets all running instances instances = self.boto3resource.instances.filter(Filters=filters) @@ -705,8 +745,13 @@ def getVMs(self): instance.tags, "Name" ) # inst name PREFIX-serial-IMAGE # Name tag is the standard form of prefix-serial-image - if not (instName and re.match("%s-" % config.Config.PREFIX, instName)): - self.log.debug("getVMs: Instance id %s skipped" % vm.instance_id) + if not ( + instName + and re.match("%s-" % config.Config.PREFIX, instName) + ): + self.log.debug( + "getVMs: Instance id %s skipped" % vm.instance_id + ) continue # instance without name tag or proper prefix vm.name = instName @@ -720,7 +765,8 @@ def getVMs(self): vms.append(vm) self.log.debug( - "getVMs: Instance id %s, name %s" % (vm.instance_id, vm.name) + "getVMs: Instance id %s, name %s" + % (vm.instance_id, vm.name) ) except Exception as e: self.log.debug("getVMs Failed: %s" % e) @@ -762,11 +808,13 @@ def getPartialOutput(self, vm): ) sshcmd = ( - ["ssh"] + self.ssh_flags + ["%s@%s" % (self.ec2User, domain_name), runcmd] + ["ssh"] + + self.ssh_flags + + ["%s@%s" % (self.ec2User, domain_name), runcmd] ) - output = subprocess.check_output(sshcmd, stderr=subprocess.STDOUT).decode( - "utf-8" - ) + output = subprocess.check_output( + sshcmd, stderr=subprocess.STDOUT + ).decode("utf-8") return output diff --git a/worker.py b/worker.py index 1d197337..b49489f0 100644 --- a/worker.py +++ b/worker.py @@ -49,7 +49,7 @@ def detachVM(self, return_vm=False, replace_vm=False): this function before returning. """ # job-owned instance, simply destroy after job is completed - if self.job.vm.ec2_vmms: + if Config.VMMS_NAME == "ec2SSH": self.vmms.safeDestroyVM(self.job.vm) elif return_vm: self.preallocator.freeVM(self.job.vm) @@ -254,6 +254,12 @@ def run(self): ) ) self.log.debug("Waiting for VM") + if self.job.stopBefore == "waitvm": + msg = "Execution stopped before %s" % self.job.stopBefore + returnVM = True + self.job.vm.keep_for_debugging = True + self.afterJobExecution(hdrfile, msg, returnVM) + return ret["waitvm"] = self.vmms.waitVM(vm, Config.WAITVM_TIMEOUT) self.log.debug("Waited for VM") @@ -287,8 +293,17 @@ def run(self): ) ) + if (self.job.stopBefore == "copyin"): + msg = "Execution stopped before %s" % self.job.stopBefore + returnVM = True + self.job.vm.keep_for_debugging = True + self.afterJobExecution(hdrfile, msg, returnVM) + return # Copy input files to VM - ret["copyin"] = self.vmms.copyIn(vm, self.job.input) + self.log.debug(f"Before copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") + ret["copyin"] = self.vmms.copyIn(vm, self.job.input, self.job.id) + self.log.debug(f"After copyIn: ret[copyin] = {ret['copyin']}, job_id: {str(self.job.id)}") + if ret["copyin"] != 0: Config.copyin_errors += 1 msg = "Error: Copy in to VM failed (status=%d)" % (ret["copyin"]) @@ -306,6 +321,12 @@ def run(self): % (datetime.now().ctime(), self.job.name, self.job.id, ret["copyin"]) ) + if (self.job.stopBefore == "runjob"): + msg = "Execution stopped before %s" % self.job.stopBefore + returnVM = True + self.job.vm.keep_for_debugging = True + self.afterJobExecution(hdrfile, msg, returnVM) + return # Run the job on the virtual machine ret["runjob"] = self.vmms.runJob( vm, @@ -326,6 +347,12 @@ def run(self): % (datetime.now().ctime(), self.job.name, self.job.id, ret["runjob"]) ) + if (self.job.stopBefore == "copyout"): + msg = "Execution stopped before %s" % self.job.stopBefore + returnVM = True + self.job.vm.keep_for_debugging = True + self.afterJobExecution(hdrfile, msg, returnVM) + return # Copy the output back. ret["copyout"] = self.vmms.copyOut(vm, self.job.outputFile) if ret["copyout"] != 0: