From a076c32cf06e5893fa3d9dad80ccaf402d0759dc Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Thu, 25 Feb 2021 12:04:29 -0300 Subject: [PATCH 01/24] add awsBatchRetryAttempts as a runtime attribute --- .../backend/impl/aws/AwsBatchJob.scala | 9 +++-- .../impl/aws/AwsBatchJobDefinition.scala | 39 ++++++++++++------- .../impl/aws/AwsBatchRuntimeAttributes.scala | 34 +++++++++++++++- 3 files changed, 63 insertions(+), 19 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 2378c48c8ef..283e32c3b2b 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -388,16 +388,19 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL // See: // // http://aws-java-sdk-javadoc.s3-website-us-west-2.amazonaws.com/latest/software/amazon/awssdk/services/batch/model/RegisterJobDefinitionRequest.Builder.html - val definitionRequest = RegisterJobDefinitionRequest.builder + var definitionRequest = RegisterJobDefinitionRequest.builder .containerProperties(jobDefinition.containerProperties) .jobDefinitionName(jobDefinitionName) // See https://stackoverflow.com/questions/24349517/scala-method-named-type .`type`(JobDefinitionType.CONTAINER) - .build + + if (jobDefinitionContext.runtimeAttributes.awsBatchRetryAttempts != 0){ + definitionRequest = definitionRequest.retryStrategy(jobDefinition.retryStrategy) + } Log.debug(s"Submitting definition request: $definitionRequest") - val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest) + val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest.build) Log.info(s"Definition created: $response") response.jobDefinitionArn() } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 137cce9a4ef..87e646dad10 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -35,7 +35,7 @@ import scala.language.postfixOps import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, Volume} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, Volume, RetryStrategy} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.collection.JavaConverters._ @@ -63,11 +63,13 @@ import wdl4s.parser.MemoryUnit sealed trait AwsBatchJobDefinition { def containerProperties: ContainerProperties def name: String + def retryStrategy: RetryStrategy override def toString: String = { new ToStringBuilder(this, ToStringStyle.JSON_STYLE) .append("name", name) .append("containerProperties", containerProperties) + .append("retryStrategy", retryStrategy) .build } } @@ -131,15 +133,7 @@ trait AwsBatchJobDefinitionBuilder { } def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { - val str = s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" - - val sha1 = MessageDigest.getInstance("SHA-1") - .digest( str.getBytes("UTF-8") ) - .map("%02x".format(_)).mkString - - val prefix = s"cromwell_$imageName".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 - - sanitize(prefix + sha1) + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" } @@ -150,7 +144,7 @@ trait AwsBatchJobDefinitionBuilder { val packedCommand = packCommand("/bin/bash", "-c", cmdName) val volumes = buildVolumes( context.runtimeAttributes.disks ) val mountPoints = buildMountPoints( context.runtimeAttributes.disks) - val jobDefinitionName = buildName( + val containerPropsName = buildName( context.runtimeAttributes.dockerImage, packedCommand.mkString(","), volumes, @@ -166,7 +160,7 @@ trait AwsBatchJobDefinitionBuilder { .mountPoints( mountPoints.asJava) .environment(environment.asJava), - jobDefinitionName) + containerPropsName) } private def packCommand(shell: String, options: String, mainCommand: String): Seq[String] = { @@ -191,13 +185,28 @@ object StandardAwsBatchJobDefinitionBuilder extends AwsBatchJobDefinitionBuilder def build(context: AwsBatchJobDefinitionContext): AwsBatchJobDefinition = { //instantiate a builder with the name of the docker image val builderInst = builder(context.runtimeAttributes.dockerImage) - val (b, name) = buildResources(builderInst, context) + val (container, containerPropsName) = buildResources(builderInst, context) + val retry = RetryStrategy.builder().attempts(context.runtimeAttributes.awsBatchRetryAttempts).build - new StandardAwsBatchJobDefinitionBuilder(b.build, name) + val name = buildName(context.runtimeAttributes.dockerImage, containerPropsName,context.runtimeAttributes.awsBatchRetryAttempts) + + new StandardAwsBatchJobDefinitionBuilder(container.build, name, retry) } + + def buildName(imageName: String, containerPropsName: String, retryAttemps: Int): String = { + val str = s"$imageName:$containerPropsName:${retryAttemps.toString}" + + val sha1 = MessageDigest.getInstance("SHA-1") + .digest( str.getBytes("UTF-8") ) + .map("%02x".format(_)).mkString + + val prefix = s"cromwell_$imageName".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 + + sanitize(prefix + sha1) + } } -case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, name: String) extends AwsBatchJobDefinition +case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, name: String, retryStrategy: RetryStrategy) extends AwsBatchJobDefinition object AwsBatchJobDefinitionContext diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index c6fc2a5f51f..45379071bd0 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -71,6 +71,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, continueOnReturnCode: ContinueOnReturnCode, noAddress: Boolean, scriptS3BucketName: String, + awsBatchRetryAttempts: Int, fileSystem:String= "s3") object AwsBatchRuntimeAttributes { @@ -79,6 +80,8 @@ object AwsBatchRuntimeAttributes { val scriptS3BucketKey = "scriptBucketName" + val awsBatchRetryAttemptsKey = "awsBatchRetryAttempts" + val ZonesKey = "zones" private val ZonesDefaultValue = WomString("us-east-1a") @@ -134,6 +137,11 @@ object AwsBatchRuntimeAttributes { QueueArnValidation.withDefault(QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse (throw new RuntimeException("queueArn is required"))) + private def awsBatchRetryAttemptsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { + AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault(AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) + .configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) + } + def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = { val runtimeConfig = configuration.runtimeConfig def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( @@ -146,7 +154,8 @@ object AwsBatchRuntimeAttributes { noAddressValidation(runtimeConfig), dockerValidation, queueArnValidation(runtimeConfig), - scriptS3BucketNameValidation(runtimeConfig) + scriptS3BucketNameValidation(runtimeConfig), + awsBatchRetryAttemptsValidation(runtimeConfig), ) def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( cpuValidation(runtimeConfig), @@ -181,6 +190,7 @@ object AwsBatchRuntimeAttributes { case AWSBatchStorageSystems.s3 => RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes) case _ => "" } + val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract(awsBatchRetryAttemptsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( @@ -194,6 +204,7 @@ object AwsBatchRuntimeAttributes { continueOnReturnCode, noAddress, scriptS3BucketName, + awsBatchRetryAttempts, fileSystem ) } @@ -372,3 +383,24 @@ object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be a comma separated String or Array[String]" } + +object AwsBatchRetryAttemptsValidation { + def apply(key: String): AwsBatchRetryAttemptsValidation = new AwsBatchRetryAttemptsValidation(key) +} + +class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesValidation(key) { + override protected def validateValue: PartialFunction[WomValue, ErrorOr[Int]] = { + case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => + WomIntegerType.coerceRawValue(womValue).get match { + case WomInteger(value) => + if (value.toInt < 0) + s"Expecting $key runtime attribute value greater than or equal to 0".invalidNel + else if (value.toInt > 10) + s"Expecting $key runtime attribute value lower than or equal to 10".invalidNel + else + value.toInt.validNel + } + } + + override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Integer" +} From de905d8bf76f89943a2856756488a709c901e8fa Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Fri, 26 Feb 2021 10:55:12 -0300 Subject: [PATCH 02/24] exit reconfigure-script with same exit code as rc file --- .../src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 2378c48c8ef..13f020ab61c 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -212,6 +212,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |echo '*** DELOCALIZING OUTPUTS ***' |$outputCopyCommand |echo '*** COMPLETED DELOCALIZATION ***' + |echo '*** EXITING WITH RC CODE ***' + |exit $$(head -n 1 $workDir/${jobPaths.returnCodeFilename}) |} |""".stripMargin } From 0beb8f42b146ed7893c8526b6f4c0c1782dda019 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Mon, 1 Mar 2021 14:52:45 -0300 Subject: [PATCH 03/24] re-structure code --- .../impl/aws/AwsBatchJobDefinition.scala | 77 ++++++++++--------- .../impl/aws/AwsBatchRuntimeAttributes.scala | 2 +- .../aws/AwsBatchRuntimeAttributesSpec.scala | 18 ++++- 3 files changed, 57 insertions(+), 40 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 87e646dad10..e3ac3109f64 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -35,7 +35,7 @@ import scala.language.postfixOps import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, Volume, RetryStrategy} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.collection.JavaConverters._ @@ -62,8 +62,8 @@ import wdl4s.parser.MemoryUnit */ sealed trait AwsBatchJobDefinition { def containerProperties: ContainerProperties - def name: String def retryStrategy: RetryStrategy + def name: String override def toString: String = { new ToStringBuilder(this, ToStringStyle.JSON_STYLE) @@ -80,23 +80,13 @@ trait AwsBatchJobDefinitionBuilder { /** Gets a builder, seeded with appropriate portions of the container properties * - * @param dockerImage docker image with which to run - * @return ContainerProperties builder ready for modification + * @param context AwsBatchJobDefinitionContext with all the runtime attributes + * @return ContainerProperties builder ready for modification and name * */ - def builder(dockerImage: String): ContainerProperties.Builder = - ContainerProperties.builder().image(dockerImage) - - - def buildResources(builder: ContainerProperties.Builder, - context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = { - // The initial buffer should only contain one item - the hostpath of the - // local disk mount point, which will be needed by the docker container - // that copies data around - - val environment = List.empty[KeyValuePair] - - + def containerPropertiesBuilder(context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = { + + def buildVolumes(disks: Seq[AwsBatchVolume]): List[Volume] = { //all the configured disks plus the fetch and run volume and the aws-cli volume @@ -113,6 +103,7 @@ trait AwsBatchJobDefinitionBuilder { ) } + def buildMountPoints(disks: Seq[AwsBatchVolume]): List[MountPoint] = { //all the configured disks plus the fetch and run mount point and the AWS cli mount point @@ -132,14 +123,16 @@ trait AwsBatchJobDefinitionBuilder { ) } + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" } + - + val environment = List.empty[KeyValuePair] val cmdName = context.runtimeAttributes.fileSystem match { - case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh" - case _ => context.commandText + case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh" + case _ => context.commandText } val packedCommand = packCommand("/bin/bash", "-c", cmdName) val volumes = buildVolumes( context.runtimeAttributes.disks ) @@ -152,17 +145,26 @@ trait AwsBatchJobDefinitionBuilder { environment ) - (builder - .command(packedCommand.asJava) - .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) - .vcpus(context.runtimeAttributes.cpu##) - .volumes( volumes.asJava) - .mountPoints( mountPoints.asJava) - .environment(environment.asJava), + (ContainerProperties.builder() + .image(context.runtimeAttributes.dockerImage) + .command(packedCommand.asJava) + .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) + .vcpus(context.runtimeAttributes.cpu##) + .volumes( volumes.asJava) + .mountPoints( mountPoints.asJava) + .environment(environment.asJava), + containerPropsName) + } - containerPropsName) + def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = { + // In the future we can add here the 'evaluateOnExit' statement + + (RetryStrategy.builder() + .attempts(context.runtimeAttributes.awsBatchRetryAttempts), + context.runtimeAttributes.awsBatchRetryAttempts.toString) } + private def packCommand(shell: String, options: String, mainCommand: String): Seq[String] = { val rc = new ListBuffer[String]() val lim = 1024 @@ -183,30 +185,29 @@ trait AwsBatchJobDefinitionBuilder { object StandardAwsBatchJobDefinitionBuilder extends AwsBatchJobDefinitionBuilder { def build(context: AwsBatchJobDefinitionContext): AwsBatchJobDefinition = { - //instantiate a builder with the name of the docker image - val builderInst = builder(context.runtimeAttributes.dockerImage) - val (container, containerPropsName) = buildResources(builderInst, context) - val retry = RetryStrategy.builder().attempts(context.runtimeAttributes.awsBatchRetryAttempts).build + + val (containerPropsInst, containerPropsName) = containerPropertiesBuilder(context) + val (retryStrategyInst, retryStrategyName) = retryStrategyBuilder(context) - val name = buildName(context.runtimeAttributes.dockerImage, containerPropsName,context.runtimeAttributes.awsBatchRetryAttempts) + val name = buildName(context.runtimeAttributes.dockerImage, containerPropsName, retryStrategyName) - new StandardAwsBatchJobDefinitionBuilder(container.build, name, retry) + new StandardAwsBatchJobDefinitionBuilder(containerPropsInst.build, retryStrategyInst.build, name) } - def buildName(imageName: String, containerPropsName: String, retryAttemps: Int): String = { - val str = s"$imageName:$containerPropsName:${retryAttemps.toString}" + def buildName(imageName: String, containerPropsName: String, retryStrategyName: String): String = { + val str = s"$imageName:$containerPropsName:$retryStrategyName" val sha1 = MessageDigest.getInstance("SHA-1") .digest( str.getBytes("UTF-8") ) .map("%02x".format(_)).mkString - val prefix = s"cromwell_$imageName".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 + val prefix = s"cromwell_${imageName}_".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128 sanitize(prefix + sha1) } } -case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, name: String, retryStrategy: RetryStrategy) extends AwsBatchJobDefinition +case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, retryStrategy: RetryStrategy, name: String) extends AwsBatchJobDefinition object AwsBatchJobDefinitionContext diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index 45379071bd0..40f424a3f79 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -72,7 +72,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, noAddress: Boolean, scriptS3BucketName: String, awsBatchRetryAttempts: Int, - fileSystem:String= "s3") + fileSystem: String= "s3") object AwsBatchRuntimeAttributes { diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 09e1ee94351..fa944c6d7e2 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -65,7 +65,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout false, ContinueOnReturnCodeSet(Set(0)), false, - "my-stuff") + "my-stuff", 0) val expectedDefaultsLocalFS = new AwsBatchRuntimeAttributes(refineMV[Positive](1), Vector("us-east-1a", "us-east-1b"), @@ -339,6 +339,22 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout val expectedRuntimeAttributes = expectedDefaults.copy(cpu = refineMV[Positive](4)) assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes, workflowOptions) } + + "validate a valid awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(9)) + val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 9) + assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) + } + + "fail to validate an invalid awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(-1)) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value greater than or equal to 0") + } + + "fail to validate an invalid awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(12)) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") + } } private def assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes: Map[String, WomValue], From 25abe25f963fb7f1c92be5cff700eba34d3d0e26 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 2 Mar 2021 11:38:52 -0300 Subject: [PATCH 04/24] fix runtimeAttribute tests --- .../impl/aws/AwsBatchRuntimeAttributesSpec.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index fa944c6d7e2..744b842111c 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -341,18 +341,23 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout } "validate a valid awsBatchRetryAttempts entry" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(9)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(9)) val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 9) assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) } "fail to validate an invalid awsBatchRetryAttempts entry" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(-1)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(-1)) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value greater than or equal to 0") } "fail to validate an invalid awsBatchRetryAttempts entry" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "scriptBucketName" -> WomString("my-stuff"), "awsBatchRetryAttempts" -> WomInteger(12)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(12)) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") + } + + "validate zero as awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0)) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") } } From 6470542ac56f362090506186282ba990248cf5cd Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Fri, 26 Feb 2021 10:55:12 -0300 Subject: [PATCH 05/24] exit reconfigure-script with same exit code as rc file --- .../src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 2378c48c8ef..13f020ab61c 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -212,6 +212,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |echo '*** DELOCALIZING OUTPUTS ***' |$outputCopyCommand |echo '*** COMPLETED DELOCALIZATION ***' + |echo '*** EXITING WITH RC CODE ***' + |exit $$(head -n 1 $workDir/${jobPaths.returnCodeFilename}) |} |""".stripMargin } From 7d1c547ff5133a1fc8ca2e6699c896963caad853 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Fri, 26 Feb 2021 10:55:12 -0300 Subject: [PATCH 06/24] exit reconfigure-script with same exit code as rc file --- .../src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala index 2378c48c8ef..13f020ab61c 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJob.scala @@ -212,6 +212,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL |echo '*** DELOCALIZING OUTPUTS ***' |$outputCopyCommand |echo '*** COMPLETED DELOCALIZATION ***' + |echo '*** EXITING WITH RC CODE ***' + |exit $$(head -n 1 $workDir/${jobPaths.returnCodeFilename}) |} |""".stripMargin } From c721e52610cc42d5eaa2e23c25278f17084912da Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 2 Mar 2021 15:55:50 -0300 Subject: [PATCH 07/24] add awsBatchRetryAttempts to AwsBatchJobSpec test --- .../test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index 5037fc21051..9117ba02f6e 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -113,6 +113,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi continueOnReturnCode = ContinueOnReturnCodeFlag(false), noAddress = false, scriptS3BucketName = "script-bucket", + awsBatchRetryAttempts = 1, fileSystem = "s3") private def generateBasicJob: AwsBatchJob = { From 1009398d87776fcc2589ff2c3a08237881389007 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 2 Mar 2021 17:01:34 -0300 Subject: [PATCH 08/24] add test to validate that is an integer --- .../backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala | 5 +++++ .../scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala | 1 + 2 files changed, 6 insertions(+) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 744b842111c..822d21e35f2 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -356,6 +356,11 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") } + "fail to validate an invalid awsBatchRetryAttempts entry" in { + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomString("test")) + assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute to be an Integer") + } + "validate zero as awsBatchRetryAttempts entry" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0)) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala index 38545c7e472..48d4a07c611 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala @@ -61,6 +61,7 @@ object AwsBatchTestConfig { | zones:["us-east-1a", "us-east-1b"] | queueArn: "arn:aws:batch:us-east-1:111222333444:job-queue/job-queue" | scriptBucketName: "my-bucket" + | awsBatchRetryAttempts: 1 |} | |""".stripMargin From eb0c84518ba0d83d2d0d1c3de9fed27dd4aa1534 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 2 Mar 2021 17:50:07 -0300 Subject: [PATCH 09/24] fix comment --- .../scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index e3ac3109f64..dac9a767054 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -157,7 +157,7 @@ trait AwsBatchJobDefinitionBuilder { } def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = { - // In the future we can add here the 'evaluateOnExit' statement + // We can add here the 'evaluateOnExit' statement (RetryStrategy.builder() .attempts(context.runtimeAttributes.awsBatchRetryAttempts), From 5d6a57a38d98bcd233f40354acb4307c3c0cb371 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 17 Mar 2021 10:59:37 -0300 Subject: [PATCH 10/24] add debug message to try to find the error in the reconfigure script --- .../impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 7a36946e78d..556c81a1d5d 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -205,6 +205,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar jobDescriptor: BackendJobDescriptor): Iterable[AwsBatchInput] = { (remotePathArray zip localPathArray zipWithIndex) flatMap { case ((remotePath, localPath), index) => + val localPathString = localPath.valueString + if (localPathString.startsWith("s3:/")) { + Log.error(s"!!!Debug error: ${remotePath.valueString} -> ${localPathString}") + } Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPath.valueString), workingDisk)) } } From 9a622a333e61b32142444183a3a5de285b766cb9 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 17 Mar 2021 16:17:01 -0300 Subject: [PATCH 11/24] add documentation for awsBatchRetryAttempts --- docs/RuntimeAttributes.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md index 782a80c96fe..f2419cbe621 100644 --- a/docs/RuntimeAttributes.md +++ b/docs/RuntimeAttributes.md @@ -56,6 +56,8 @@ There are a number of additional runtime attributes that apply to the Google Clo - [useDockerImageCache](#usedockerimagecache) +### AWS Specific Attributes +- [awsBatchRetryAttempts](#awsBatchRetryAttempts) ## Expression support @@ -373,6 +375,18 @@ runtime { } ``` +### `awsBatchRetryAttempts` + +*Default: _0_* + +This runtime attribute adds support to [*AWS Batch Automated Job Retries*](https://docs.aws.amazon.com/batch/latest/userguide/job_retries.html) which makes it possible to tackle transient job failures. For example, if a task fails due to a timeout from accessing an external service, then this option helps re-run the failed the task without having to re-run the entire workflow. It takes an Int, between 1 and 10, as a value that indicates the maximum number of times AWS Batch should retry a failed task. If the value 0 is passed, the [*Retry Strategy*](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#retryStrategy) will not be added to the job definiton and the task will run just once. + +``` +runtime { + awsBatchRetryAttempts: 3 +} +``` + #### How to Setup Configure your Google network to use "Private Google Access". This will allow your VMs to access Google Services including Google Container Registry, as well as Dockerhub images. From fedda022d4e7b2e2493231b4f1f52d9384531a74 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 6 Apr 2021 14:09:42 -0300 Subject: [PATCH 12/24] dump --- .../metadata/impl/MetadataServiceActor.scala | 2 +- ...wsBatchAsyncBackendJobExecutionActor.scala | 21 ++++++++++++------- .../aws/AwsBatchRuntimeAttributesSpec.scala | 2 +- .../values/EngineFunctionEvaluators.scala | 2 +- 4 files changed, 17 insertions(+), 10 deletions(-) diff --git a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala index 44b212debf1..4b5b2b5b11d 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala @@ -50,7 +50,7 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser private val metadataReadTimeout: Duration = serviceConfig.getOrElse[Duration]("metadata-read-query-timeout", Duration.Inf) private val metadataReadRowNumberSafetyThreshold: Int = - serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 1000000) + serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 3000000) def readMetadataWorkerActorProps(): Props = ReadDatabaseMetadataWorkerActor diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index 556c81a1d5d..fb929b59016 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -202,14 +202,21 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar private def inputsFromWomFiles(namePrefix: String, remotePathArray: Seq[WomFile], localPathArray: Seq[WomFile], - jobDescriptor: BackendJobDescriptor): Iterable[AwsBatchInput] = { + jobDescriptor: BackendJobDescriptor, + flag: Boolean): Iterable[AwsBatchInput] = { + (remotePathArray zip localPathArray zipWithIndex) flatMap { case ((remotePath, localPath), index) => - val localPathString = localPath.valueString - if (localPathString.startsWith("s3:/")) { - Log.error(s"!!!Debug error: ${remotePath.valueString} -> ${localPathString}") + var localPathString = localPath.valueString + // Log.warn(s"!!!RAW: ${flag}: ${remotePath.valueString} -> ${localPathString}") + if (localPathString.startsWith("s3://")){ + localPathString = localPathString.replace("s3://", "") + Log.error(s"!!!Debug error 1: ${remotePath.valueString} -> ${localPath.valueString} ->${localPathString}") + }else if (localPathString.startsWith("s3:/")) { + localPathString = localPathString.replace("s3:/", "") + Log.error(s"!!!Debug error 2: ${remotePath.valueString} -> ${localPath.valueString} -> ${localPathString}") } - Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPath.valueString), workingDisk)) + Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk)) } } @@ -241,7 +248,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar val writeFunctionFiles = instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f) } toMap val writeFunctionInputs = writeFunctionFiles flatMap { - case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor) + case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor, false) } // Collect all WomFiles from inputs to the call. @@ -261,7 +268,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar } val callInputInputs = callInputFiles flatMap { - case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor) + case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true) } val scriptInput: AwsBatchInput = AwsBatchFileInput( diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 822d21e35f2..4aebb29cae5 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -76,7 +76,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout ContinueOnReturnCodeSet(Set(0)), false, "", - "local") + 1) "AwsBatchRuntimeAttributes" should { diff --git a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala index 0a2bf3b998a..21f6f7516a5 100644 --- a/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala +++ b/wdl/transforms/new-base/src/main/scala/wdl/transforms/base/linking/expression/values/EngineFunctionEvaluators.scala @@ -51,7 +51,7 @@ object EngineFunctionEvaluators { EvaluatedValue(WomSingleFile(ioFunctionSet.pathFunctions.stderr), Seq.empty).validNel } - private val ReadWaitTimeout = 60.seconds + private val ReadWaitTimeout = 300.seconds private def readFile(fileToRead: WomSingleFile, ioFunctionSet: IoFunctionSet, sizeLimit: Int) = { Try(Await.result(ioFunctionSet.readFile(fileToRead.value, Option(sizeLimit), failOnOverflow = true), ReadWaitTimeout)) } From 97350ca16bec391d198be075940cd353e61b7592 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 13 Apr 2021 18:46:59 +0100 Subject: [PATCH 13/24] create ulimits runtime attribute --- .../backend/impl/aws/AwsBatchAttributes.scala | 8 +- .../impl/aws/AwsBatchJobDefinition.scala | 24 +- .../impl/aws/AwsBatchRuntimeAttributes.scala | 512 +++++++++++++----- 3 files changed, 400 insertions(+), 144 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala index 26f69c4e79a..be9bbdfad8b 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala @@ -71,7 +71,13 @@ object AwsBatchAttributes { "filesystems.local.auth", "filesystems.s3.auth", "filesystems.s3.caching.duplication-strategy", - "filesystems.local.caching.duplication-strategy" + "filesystems.local.caching.duplication-strategy", + "auth", + "numCreateDefinitionAttempts", + "filesystems.s3.duplication-strategy", + "numSubmitAttempts", + "default-runtime-attributes.scriptBucketName", + "awsBatchRetryAttempts" ) private val deprecatedAwsBatchKeys: Map[String, String] = Map( diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index dac9a767054..2f54355a38b 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -35,7 +35,7 @@ import scala.language.postfixOps import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume, Ulimit} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.collection.JavaConverters._ @@ -123,9 +123,21 @@ trait AwsBatchJobDefinitionBuilder { ) } + + def buildUlimits(ulimits: Seq[Map[String, String]]): List[Ulimit] = { + + ulimits.filter(_.nonEmpty).map(u => + Ulimit.builder() + .name(u("name")) + .softLimit(u("softLimit").toInt) + .hardLimit(u("hardLimit").toInt) + .build() + ).toList + } - def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { - s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" + + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit]): String = { + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}" } @@ -137,12 +149,14 @@ trait AwsBatchJobDefinitionBuilder { val packedCommand = packCommand("/bin/bash", "-c", cmdName) val volumes = buildVolumes( context.runtimeAttributes.disks ) val mountPoints = buildMountPoints( context.runtimeAttributes.disks) + val ulimits = buildUlimits( context.runtimeAttributes.ulimits) val containerPropsName = buildName( context.runtimeAttributes.dockerImage, packedCommand.mkString(","), volumes, mountPoints, - environment + environment, + ulimits ) (ContainerProperties.builder() @@ -152,7 +166,7 @@ trait AwsBatchJobDefinitionBuilder { .vcpus(context.runtimeAttributes.cpu##) .volumes( volumes.asJava) .mountPoints( mountPoints.asJava) - .environment(environment.asJava), + .environment(environment.asJava).ulimits(ulimits.asJava), containerPropsName) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index 40f424a3f79..234b254dc71 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -47,32 +47,34 @@ import wom.values._ import scala.util.matching.Regex -/** - * Attributes that are provided to the job at runtime - * @param cpu number of vCPU - * @param zones the aws availability zones to run in - * @param memory memory to allocate - * @param disks a sequence of disk volumes - * @param dockerImage the name of the docker image that the job will run in - * @param queueArn the arn of the AWS Batch queue that the job will be submitted to - * @param failOnStderr should the job fail if something is logged to `stderr` - * @param continueOnReturnCode decides if a job continues on receiving a specific return code - * @param noAddress is there no address - * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed - * @param fileSystem the filesystem type, default is "s3" - */ -case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, - zones: Vector[String], - memory: MemorySize, - disks: Seq[AwsBatchVolume], - dockerImage: String, - queueArn: String, - failOnStderr: Boolean, - continueOnReturnCode: ContinueOnReturnCode, - noAddress: Boolean, - scriptS3BucketName: String, - awsBatchRetryAttempts: Int, - fileSystem: String= "s3") +/** Attributes that are provided to the job at runtime + * @param cpu number of vCPU + * @param zones the aws availability zones to run in + * @param memory memory to allocate + * @param disks a sequence of disk volumes + * @param dockerImage the name of the docker image that the job will run in + * @param queueArn the arn of the AWS Batch queue that the job will be submitted to + * @param failOnStderr should the job fail if something is logged to `stderr` + * @param continueOnReturnCode decides if a job continues on receiving a specific return code + * @param noAddress is there no address + * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed + * @param fileSystem the filesystem type, default is "s3" + */ +case class AwsBatchRuntimeAttributes( + cpu: Int Refined Positive, + zones: Vector[String], + memory: MemorySize, + disks: Seq[AwsBatchVolume], + dockerImage: String, + queueArn: String, + failOnStderr: Boolean, + continueOnReturnCode: ContinueOnReturnCode, + noAddress: Boolean, + scriptS3BucketName: String, + awsBatchRetryAttempts: Int, + ulimits: Vector[Map[String, String]], + fileSystem: String = "s3" +) object AwsBatchRuntimeAttributes { @@ -85,8 +87,19 @@ object AwsBatchRuntimeAttributes { val ZonesKey = "zones" private val ZonesDefaultValue = WomString("us-east-1a") + val UlimitsKey = "ulimits" + // private val UlimitsDefaultValue = WomString("us-east-1a") + // private val UlimitsDefaultValue2 = WomArray( + // Vector(WomMap(Map.empty[WomValue, WomValue])) + // ) + + private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) + // private val UlimitsDefaultValue2 = WomArray(WomArrayType(WomStringType), Vector(WomString(""), WomString(""))) + + val NoAddressKey = "noAddress" - private val noAddressValidationInstance = new BooleanRuntimeAttributesValidation(NoAddressKey) + private val noAddressValidationInstance = + new BooleanRuntimeAttributesValidation(NoAddressKey) private val NoAddressDefaultValue = WomBoolean(false) // TODO: Determine good volume format @@ -95,103 +108,227 @@ object AwsBatchRuntimeAttributes { private val MemoryDefaultValue = "2 GB" - private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance - .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) - - private def cpuMinValidation(runtimeConfig: Option[Config]):RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instanceMin - .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) - - private def failOnStderrValidation(runtimeConfig: Option[Config]) = FailOnStderrValidation.default(runtimeConfig) - - private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) = ContinueOnReturnCodeValidation.default(runtimeConfig) + private def cpuValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance + .withDefault( + CpuValidation.configDefaultWomValue( + runtimeConfig + ) getOrElse CpuValidation.defaultMin + ) - private def disksValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Seq[AwsBatchVolume]] = DisksValidation - .withDefault(DisksValidation.configDefaultWomValue(runtimeConfig) getOrElse DisksDefaultValue) + private def cpuMinValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Int Refined Positive] = + CpuValidation.instanceMin + .withDefault( + CpuValidation.configDefaultWomValue( + runtimeConfig + ) getOrElse CpuValidation.defaultMin + ) + + private def failOnStderrValidation(runtimeConfig: Option[Config]) = + FailOnStderrValidation.default(runtimeConfig) + + private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) = + ContinueOnReturnCodeValidation.default(runtimeConfig) + + private def disksValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Seq[AwsBatchVolume]] = DisksValidation + .withDefault( + DisksValidation.configDefaultWomValue( + runtimeConfig + ) getOrElse DisksDefaultValue + ) - private def zonesValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Vector[String]] = ZonesValidation - .withDefault(ZonesValidation.configDefaultWomValue(runtimeConfig) getOrElse ZonesDefaultValue) + private def zonesValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Vector[String]] = ZonesValidation + .withDefault( + ZonesValidation.configDefaultWomValue( + runtimeConfig + ) getOrElse ZonesDefaultValue + ) - private def memoryValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { + private def ulimitsValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Vector[Map[String, String]]] ={ + UlimitsValidation + .withDefault( + UlimitsValidation.configDefaultWomValue( + runtimeConfig + ) getOrElse UlimitsDefaultValue + )} + + // private def ulimitsValidation( + // runtimeConfig: Option[Config] + // ): RuntimeAttributesValidation[Vector[Map[String, String]]] ={ + // println("!!! Henrique " + UlimitsValidation.configDefaultWomValue(runtimeConfig).getOrElse(UlimitsDefaultValue)) + // UlimitsValidation + // .withDefault(UlimitsDefaultValue)} + + private def memoryValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[MemorySize] = { MemoryValidation.withDefaultMemory( RuntimeAttributesKeys.MemoryKey, - MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue) + MemoryValidation.configDefaultString( + RuntimeAttributesKeys.MemoryKey, + runtimeConfig + ) getOrElse MemoryDefaultValue + ) } - private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { + private def memoryMinValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[MemorySize] = { MemoryValidation.withDefaultMemory( RuntimeAttributesKeys.MemoryMinKey, - MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryMinKey, runtimeConfig) getOrElse MemoryDefaultValue) + MemoryValidation.configDefaultString( + RuntimeAttributesKeys.MemoryMinKey, + runtimeConfig + ) getOrElse MemoryDefaultValue + ) } - private def noAddressValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Boolean] = noAddressValidationInstance - .withDefault(noAddressValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse NoAddressDefaultValue) + private def noAddressValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Boolean] = noAddressValidationInstance + .withDefault( + noAddressValidationInstance.configDefaultWomValue( + runtimeConfig + ) getOrElse NoAddressDefaultValue + ) - private def scriptS3BucketNameValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[String] = { - ScriptS3BucketNameValidation(scriptS3BucketKey).withDefault(ScriptS3BucketNameValidation(scriptS3BucketKey) - .configDefaultWomValue(runtimeConfig).getOrElse( throw new RuntimeException( "scriptBucketName is required" ))) + private def scriptS3BucketNameValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[String] = { + ScriptS3BucketNameValidation(scriptS3BucketKey).withDefault( + ScriptS3BucketNameValidation(scriptS3BucketKey) + .configDefaultWomValue(runtimeConfig) + .getOrElse(throw new RuntimeException("scriptBucketName is required")) + ) } - private val dockerValidation: RuntimeAttributesValidation[String] = DockerValidation.instance + private val dockerValidation: RuntimeAttributesValidation[String] = + DockerValidation.instance - private def queueArnValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[String] = - QueueArnValidation.withDefault(QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse - (throw new RuntimeException("queueArn is required"))) + private def queueArnValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[String] = + QueueArnValidation.withDefault( + QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse + (throw new RuntimeException("queueArn is required")) + ) - private def awsBatchRetryAttemptsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { - AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault(AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) - .configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) + private def awsBatchRetryAttemptsValidation( + runtimeConfig: Option[Config] + ): RuntimeAttributesValidation[Int] = { + AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault( + AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) + .configDefaultWomValue(runtimeConfig) + .getOrElse(WomInteger(0)) + ) } - def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = { + def runtimeAttributesBuilder( + configuration: AwsBatchConfiguration + ): StandardValidatedRuntimeAttributesBuilder = { val runtimeConfig = configuration.runtimeConfig - def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( - cpuValidation(runtimeConfig), - cpuMinValidation(runtimeConfig), - disksValidation(runtimeConfig), - zonesValidation(runtimeConfig), - memoryValidation(runtimeConfig), - memoryMinValidation(runtimeConfig), - noAddressValidation(runtimeConfig), - dockerValidation, - queueArnValidation(runtimeConfig), - scriptS3BucketNameValidation(runtimeConfig), - awsBatchRetryAttemptsValidation(runtimeConfig), - ) - def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( - cpuValidation(runtimeConfig), - cpuMinValidation(runtimeConfig), - disksValidation(runtimeConfig), - zonesValidation(runtimeConfig), - memoryValidation(runtimeConfig), - memoryMinValidation(runtimeConfig), - noAddressValidation(runtimeConfig), - dockerValidation, - queueArnValidation(runtimeConfig) - ) - - configuration.fileSystem match { - case AWSBatchStorageSystems.s3 => validationsS3backend - - case _ => validationsLocalBackend + def validationsS3backend = StandardValidatedRuntimeAttributesBuilder + .default(runtimeConfig) + .withValidation( + cpuValidation(runtimeConfig), + cpuMinValidation(runtimeConfig), + disksValidation(runtimeConfig), + zonesValidation(runtimeConfig), + memoryValidation(runtimeConfig), + memoryMinValidation(runtimeConfig), + noAddressValidation(runtimeConfig), + dockerValidation, + queueArnValidation(runtimeConfig), + scriptS3BucketNameValidation(runtimeConfig), + awsBatchRetryAttemptsValidation(runtimeConfig), + ulimitsValidation(runtimeConfig) + ) + def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder + .default(runtimeConfig) + .withValidation( + cpuValidation(runtimeConfig), + cpuMinValidation(runtimeConfig), + disksValidation(runtimeConfig), + zonesValidation(runtimeConfig), + memoryValidation(runtimeConfig), + memoryMinValidation(runtimeConfig), + noAddressValidation(runtimeConfig), + dockerValidation, + queueArnValidation(runtimeConfig) + ) + + configuration.fileSystem match { + case AWSBatchStorageSystems.s3 => validationsS3backend + + case _ => validationsLocalBackend } } - def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, runtimeAttrsConfig: Option[Config], fileSystem:String): AwsBatchRuntimeAttributes = { - val cpu: Int Refined Positive = RuntimeAttributesValidation.extract(cpuValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val zones: Vector[String] = RuntimeAttributesValidation.extract(ZonesValidation, validatedRuntimeAttributes) - val memory: MemorySize = RuntimeAttributesValidation.extract(memoryValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val disks: Seq[AwsBatchVolume] = RuntimeAttributesValidation.extract(disksValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val docker: String = RuntimeAttributesValidation.extract(dockerValidation, validatedRuntimeAttributes) - val queueArn: String = RuntimeAttributesValidation.extract(queueArnValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val failOnStderr: Boolean = RuntimeAttributesValidation.extract(failOnStderrValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val continueOnReturnCode: ContinueOnReturnCode = RuntimeAttributesValidation.extract(continueOnReturnCodeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val noAddress: Boolean = RuntimeAttributesValidation.extract(noAddressValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - val scriptS3BucketName = fileSystem match { - case AWSBatchStorageSystems.s3 => RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes) - case _ => "" - } - val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract(awsBatchRetryAttemptsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) - + def apply( + validatedRuntimeAttributes: ValidatedRuntimeAttributes, + runtimeAttrsConfig: Option[Config], + fileSystem: String + ): AwsBatchRuntimeAttributes = { + val cpu: Int Refined Positive = RuntimeAttributesValidation.extract( + cpuValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val zones: Vector[String] = RuntimeAttributesValidation.extract( + ZonesValidation, + validatedRuntimeAttributes + ) + val memory: MemorySize = RuntimeAttributesValidation.extract( + memoryValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val disks: Seq[AwsBatchVolume] = RuntimeAttributesValidation.extract( + disksValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val docker: String = RuntimeAttributesValidation.extract( + dockerValidation, + validatedRuntimeAttributes + ) + val queueArn: String = RuntimeAttributesValidation.extract( + queueArnValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val failOnStderr: Boolean = RuntimeAttributesValidation.extract( + failOnStderrValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val continueOnReturnCode: ContinueOnReturnCode = + RuntimeAttributesValidation.extract( + continueOnReturnCodeValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val noAddress: Boolean = RuntimeAttributesValidation.extract( + noAddressValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val scriptS3BucketName = fileSystem match { + case AWSBatchStorageSystems.s3 => + RuntimeAttributesValidation.extract( + scriptS3BucketNameValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + case _ => "" + } + val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract( + awsBatchRetryAttemptsValidation(runtimeAttrsConfig), + validatedRuntimeAttributes + ) + val ulimits: Vector[Map[String, String]] = RuntimeAttributesValidation + .extract(ulimitsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( cpu, @@ -205,40 +342,47 @@ object AwsBatchRuntimeAttributes { noAddress, scriptS3BucketName, awsBatchRetryAttempts, + ulimits, fileSystem ) } } object ScriptS3BucketNameValidation { - def apply(key: String): ScriptS3BucketNameValidation = new ScriptS3BucketNameValidation(key) + def apply(key: String): ScriptS3BucketNameValidation = + new ScriptS3BucketNameValidation(key) } -class ScriptS3BucketNameValidation( key: String ) extends StringRuntimeAttributesValidation(key) { +class ScriptS3BucketNameValidation(key: String) + extends StringRuntimeAttributesValidation(key) { //a reasonable but not perfect regex for a bucket. see https://stackoverflow.com/a/50484916/3573553 - protected val s3BucketNameRegex: Regex = "(?=^.{3,63}$)(?!^(\\d+\\.)+\\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" - .r - + protected val s3BucketNameRegex: Regex = + "(?=^.{3,63}$)(?!^(\\d+\\.)+\\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)".r - override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = { - case WomString(s) => validateBucketName(s) + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[String]] = { case WomString(s) => + validateBucketName(s) } - private def validateBucketName(possibleBucketName: String): ErrorOr[String] = { + private def validateBucketName( + possibleBucketName: String + ): ErrorOr[String] = { possibleBucketName match { - case s3BucketNameRegex(_@_*) => possibleBucketName.validNel - case _ => "The Script Bucket name has an invalid s3 bucket format".invalidNel + case s3BucketNameRegex(_ @_*) => possibleBucketName.validNel + case _ => + "The Script Bucket name has an invalid s3 bucket format".invalidNel } } } -object QueueArnValidation extends ArnValidation(AwsBatchRuntimeAttributes.QueueArnKey) { +object QueueArnValidation + extends ArnValidation(AwsBatchRuntimeAttributes.QueueArnKey) { // queue arn format can be found here // https://docs.aws.amazon.com/en_us/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-batch // arn:aws:batch:region:account-id:job-queue/queue-name override protected val arnRegex: Regex = - s""" + s""" (?x) # Turn on comments and whitespace insensitivity (arn) # Every AWS ARN starts with "arn" : @@ -269,15 +413,17 @@ object ArnValidation { def apply(key: String): ArnValidation = new ArnValidation(key) } -class ArnValidation(override val key: String) extends StringRuntimeAttributesValidation(key) { - override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = { - case WomString(s) => validateArn(s) +class ArnValidation(override val key: String) + extends StringRuntimeAttributesValidation(key) { + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[String]] = { case WomString(s) => + validateArn(s) } private def validateArn(possibleArn: String): ErrorOr[String] = { possibleArn match { - case arnRegex(_@_*) => possibleArn.validNel - case _ => "ARN has invalid format".invalidNel + case arnRegex(_ @_*) => possibleArn.validNel + case _ => "ARN has invalid format".invalidNel } } @@ -285,7 +431,7 @@ class ArnValidation(override val key: String) extends StringRuntimeAttributesVal // https://docs.aws.amazon.com/en_us/general/latest/gr/aws-arns-and-namespaces.html // This is quite vague regex, but it allows to support a lot of ARN formats protected val arnRegex: Regex = - s""" + s""" (?x) # Turn on comments and whitespace insensitivity (arn) # Every ARN starts with "arn" : @@ -328,9 +474,11 @@ class ArnValidation(override val key: String) extends StringRuntimeAttributesVal object ZonesValidation extends RuntimeAttributesValidation[Vector[String]] { override def key: String = AwsBatchRuntimeAttributes.ZonesKey - override def coercion: Traversable[WomType] = Set(WomStringType, WomArrayType(WomStringType)) + override def coercion: Traversable[WomType] = + Set(WomStringType, WomArrayType(WomStringType)) - override protected def validateValue: PartialFunction[WomValue, ErrorOr[Vector[String]]] = { + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Vector[String]]] = { case WomString(s) => s.split("\\s+").toVector.validNel case WomArray(womType, value) if womType.memberType == WomStringType => value.map(_.valueString).toVector.validNel @@ -340,18 +488,92 @@ object ZonesValidation extends RuntimeAttributesValidation[Vector[String]] { s"Expecting $key runtime attribute to be either a whitespace separated String or an Array[String]" } -object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] { +object UlimitsValidation + extends RuntimeAttributesValidation[Vector[Map[String, String]]] { + override def key: String = AwsBatchRuntimeAttributes.UlimitsKey + + override def coercion: Traversable[WomType] = + Set(WomStringType, WomArrayType(WomMapType(WomStringType, WomStringType))) + + var accepted_keys = Set("name", "softLimit", "hardLimit") + + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Vector[Map[String, String]]]] = { + case WomArray(womType, value) + if womType.memberType == WomMapType(WomStringType, WomStringType) => + check_maps(value.toVector) + case WomMap(_, _) => "!!! ERROR1".invalidNel + + } + + private def check_maps( + maps: Vector[WomValue] + ): ErrorOr[Vector[Map[String, String]]] = { + val entryNels: Vector[ErrorOr[Map[String, String]]] = maps.map { + case WomMap(_, value) => check_keys(value) + case _ => "!!! ERROR2".invalidNel + } + val sequenced: ErrorOr[Vector[Map[String, String]]] = sequenceNels( + entryNels + ) + sequenced + } + + private def check_keys( + dict: Map[WomValue, WomValue] + ): ErrorOr[Map[String, String]] = { + val map_keys = dict.keySet.map(_.valueString).toSet + val unrecognizedKeys = + accepted_keys.diff(map_keys) union map_keys.diff(accepted_keys) + + if (!dict.nonEmpty){ + Map.empty[String, String].validNel + }else if (unrecognizedKeys.nonEmpty) { + s"Invalid keys in $key runtime attribute. Refer to 'ulimits' section on https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties".invalidNel + } else { + dict + .collect { case (WomString(k), WomString(v)) => + (k, v) + // case _ => "!!! ERROR3".invalidNel + } + .toMap + .validNel + } + } + + private def sequenceNels( + nels: Vector[ErrorOr[Map[String, String]]] + ): ErrorOr[Vector[Map[String, String]]] = { + val emptyNel: ErrorOr[Vector[Map[String, String]]] = + Vector.empty[Map[String, String]].validNel + val seqNel: ErrorOr[Vector[Map[String, String]]] = + nels.foldLeft(emptyNel) { (acc, v) => + (acc, v) mapN { (a, v) => a :+ v } + } + seqNel + } + + override protected def missingValueMessage: String = + s"Expecting $key runtime attribute to be an Array[Map[String, String]]" +} + +object DisksValidation + extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] { override def key: String = AwsBatchRuntimeAttributes.DisksKey - override def coercion: Traversable[WomType] = Set(WomStringType, WomArrayType(WomStringType)) + override def coercion: Traversable[WomType] = + Set(WomStringType, WomArrayType(WomStringType)) - override protected def validateValue: PartialFunction[WomValue, ErrorOr[Seq[AwsBatchVolume]]] = { + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Seq[AwsBatchVolume]]] = { case WomString(value) => validateLocalDisks(value.split(",\\s*").toSeq) case WomArray(womType, values) if womType.memberType == WomStringType => validateLocalDisks(values.map(_.valueString)) } - private def validateLocalDisks(disks: Seq[String]): ErrorOr[Seq[AwsBatchVolume]] = { + private def validateLocalDisks( + disks: Seq[String] + ): ErrorOr[Seq[AwsBatchVolume]] = { val diskNels: Seq[ErrorOr[AwsBatchVolume]] = disks map validateLocalDisk val sequenced: ErrorOr[Seq[AwsBatchVolume]] = sequenceNels(diskNels) val defaulted: ErrorOr[Seq[AwsBatchVolume]] = addDefault(sequenced) @@ -361,21 +583,31 @@ object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] private def validateLocalDisk(disk: String): ErrorOr[AwsBatchVolume] = { AwsBatchVolume.parse(disk) match { case scala.util.Success(attachedDisk) => attachedDisk.validNel - case scala.util.Failure(ex) => ex.getMessage.invalidNel + case scala.util.Failure(ex) => ex.getMessage.invalidNel } } - private def sequenceNels(nels: Seq[ErrorOr[AwsBatchVolume]]): ErrorOr[Seq[AwsBatchVolume]] = { - val emptyDiskNel: ErrorOr[Vector[AwsBatchVolume]] = Vector.empty[AwsBatchVolume].validNel - val disksNel: ErrorOr[Vector[AwsBatchVolume]] = nels.foldLeft(emptyDiskNel) { - (acc, v) => (acc, v) mapN { (a, v) => a :+ v } - } + private def sequenceNels( + nels: Seq[ErrorOr[AwsBatchVolume]] + ): ErrorOr[Seq[AwsBatchVolume]] = { + val emptyDiskNel: ErrorOr[Vector[AwsBatchVolume]] = + Vector.empty[AwsBatchVolume].validNel + val disksNel: ErrorOr[Vector[AwsBatchVolume]] = + nels.foldLeft(emptyDiskNel) { (acc, v) => + (acc, v) mapN { (a, v) => a :+ v } + } disksNel } - private def addDefault(disksNel: ErrorOr[Seq[AwsBatchVolume]]): ErrorOr[Seq[AwsBatchVolume]] = { + private def addDefault( + disksNel: ErrorOr[Seq[AwsBatchVolume]] + ): ErrorOr[Seq[AwsBatchVolume]] = { disksNel map { - case disks if disks.exists(_.name == AwsBatchWorkingDisk.Name) || disks.exists(_.fsType == "efs") => disks + case disks + if disks.exists(_.name == AwsBatchWorkingDisk.Name) || disks.exists( + _.fsType == "efs" + ) => + disks case disks => disks :+ AwsBatchWorkingDisk.Default } } @@ -385,11 +617,14 @@ object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] } object AwsBatchRetryAttemptsValidation { - def apply(key: String): AwsBatchRetryAttemptsValidation = new AwsBatchRetryAttemptsValidation(key) + def apply(key: String): AwsBatchRetryAttemptsValidation = + new AwsBatchRetryAttemptsValidation(key) } -class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesValidation(key) { - override protected def validateValue: PartialFunction[WomValue, ErrorOr[Int]] = { +class AwsBatchRetryAttemptsValidation(key: String) + extends IntRuntimeAttributesValidation(key) { + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Int]] = { case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => WomIntegerType.coerceRawValue(womValue).get match { case WomInteger(value) => @@ -402,5 +637,6 @@ class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesV } } - override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Integer" + override protected def missingValueMessage: String = + s"Expecting $key runtime attribute to be an Integer" } From 3d43af4f1a1d914c171afa5e38934af266ae49fc Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 13 Apr 2021 19:47:15 +0100 Subject: [PATCH 14/24] add 'ulimits' runtime attribute --- .../impl/aws/AwsBatchJobDefinition.scala | 27 ++++-- .../impl/aws/AwsBatchRuntimeAttributes.scala | 82 +++++++++++++++++++ 2 files changed, 103 insertions(+), 6 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index dac9a767054..aa06da46b1f 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -35,7 +35,7 @@ import scala.language.postfixOps import scala.collection.mutable.ListBuffer import cromwell.backend.BackendJobDescriptor import cromwell.backend.io.JobPaths -import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume} +import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume, Ulimit} import cromwell.backend.impl.aws.io.AwsBatchVolume import scala.collection.JavaConverters._ @@ -124,8 +124,20 @@ trait AwsBatchJobDefinitionBuilder { } + def buildUlimits(ulimits: Seq[Map[String, String]]): List[Ulimit] = { + + ulimits.filter(_.nonEmpty).map(u => + Ulimit.builder() + .name(u("name")) + .softLimit(u("softLimit").toInt) + .hardLimit(u("hardLimit").toInt) + .build() + ).toList + } + + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { - s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}" + s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}" } @@ -137,12 +149,14 @@ trait AwsBatchJobDefinitionBuilder { val packedCommand = packCommand("/bin/bash", "-c", cmdName) val volumes = buildVolumes( context.runtimeAttributes.disks ) val mountPoints = buildMountPoints( context.runtimeAttributes.disks) + val ulimits = buildUlimits( context.runtimeAttributes.ulimits) val containerPropsName = buildName( context.runtimeAttributes.dockerImage, packedCommand.mkString(","), volumes, mountPoints, - environment + environment, + ulimits ) (ContainerProperties.builder() @@ -150,9 +164,10 @@ trait AwsBatchJobDefinitionBuilder { .command(packedCommand.asJava) .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) .vcpus(context.runtimeAttributes.cpu##) - .volumes( volumes.asJava) - .mountPoints( mountPoints.asJava) - .environment(environment.asJava), + .volumes(volumes.asJava) + .mountPoints(mountPoints.asJava) + .environment(environment.asJava) + .ulimits(ulimits.asJava), containerPropsName) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index 40f424a3f79..8296eefd42a 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -60,6 +60,8 @@ import scala.util.matching.Regex * @param noAddress is there no address * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed * @param fileSystem the filesystem type, default is "s3" + * @param awsBatchRetryAttempts number of attempts that AWS Batch will retry the task if it fails + * @param ulimits ulimit values to be passed to the container */ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, zones: Vector[String], @@ -72,6 +74,7 @@ case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, noAddress: Boolean, scriptS3BucketName: String, awsBatchRetryAttempts: Int, + ulimits: Vector[Map[String, String]], fileSystem: String= "s3") object AwsBatchRuntimeAttributes { @@ -95,6 +98,9 @@ object AwsBatchRuntimeAttributes { private val MemoryDefaultValue = "2 GB" + val UlimitsKey = "ulimits" + private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) + private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) @@ -142,6 +148,9 @@ object AwsBatchRuntimeAttributes { .configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) } + private def ulimitsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Vector[Map[String, String]]] = + UlimitsValidation.withDefault(UlimitsValidation.configDefaultWomValue(runtimeConfig) getOrElse UlimitsDefaultValue) + def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = { val runtimeConfig = configuration.runtimeConfig def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( @@ -156,6 +165,7 @@ object AwsBatchRuntimeAttributes { queueArnValidation(runtimeConfig), scriptS3BucketNameValidation(runtimeConfig), awsBatchRetryAttemptsValidation(runtimeConfig), + ulimitsValidation(runtimeConfig), ) def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( cpuValidation(runtimeConfig), @@ -191,6 +201,7 @@ object AwsBatchRuntimeAttributes { case _ => "" } val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract(awsBatchRetryAttemptsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val ulimits: Vector[Map[String, String]] = RuntimeAttributesValidation.extract(ulimitsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) new AwsBatchRuntimeAttributes( @@ -205,6 +216,7 @@ object AwsBatchRuntimeAttributes { noAddress, scriptS3BucketName, awsBatchRetryAttempts, + ulimits, fileSystem ) } @@ -404,3 +416,73 @@ class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesV override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Integer" } + + +object UlimitsValidation + extends RuntimeAttributesValidation[Vector[Map[String, String]]] { + override def key: String = AwsBatchRuntimeAttributes.UlimitsKey + + override def coercion: Traversable[WomType] = + Set(WomStringType, WomArrayType(WomMapType(WomStringType, WomStringType))) + + var accepted_keys = Set("name", "softLimit", "hardLimit") + + override protected def validateValue + : PartialFunction[WomValue, ErrorOr[Vector[Map[String, String]]]] = { + case WomArray(womType, value) + if womType.memberType == WomMapType(WomStringType, WomStringType) => + check_maps(value.toVector) + case WomMap(_, _) => "!!! ERROR1".invalidNel + + } + + private def check_maps( + maps: Vector[WomValue] + ): ErrorOr[Vector[Map[String, String]]] = { + val entryNels: Vector[ErrorOr[Map[String, String]]] = maps.map { + case WomMap(_, value) => check_keys(value) + case _ => "!!! ERROR2".invalidNel + } + val sequenced: ErrorOr[Vector[Map[String, String]]] = sequenceNels( + entryNels + ) + sequenced + } + + private def check_keys( + dict: Map[WomValue, WomValue] + ): ErrorOr[Map[String, String]] = { + val map_keys = dict.keySet.map(_.valueString).toSet + val unrecognizedKeys = + accepted_keys.diff(map_keys) union map_keys.diff(accepted_keys) + + if (!dict.nonEmpty){ + Map.empty[String, String].validNel + }else if (unrecognizedKeys.nonEmpty) { + s"Invalid keys in $key runtime attribute. Refer to 'ulimits' section on https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties".invalidNel + } else { + dict + .collect { case (WomString(k), WomString(v)) => + (k, v) + // case _ => "!!! ERROR3".invalidNel + } + .toMap + .validNel + } + } + + private def sequenceNels( + nels: Vector[ErrorOr[Map[String, String]]] + ): ErrorOr[Vector[Map[String, String]]] = { + val emptyNel: ErrorOr[Vector[Map[String, String]]] = + Vector.empty[Map[String, String]].validNel + val seqNel: ErrorOr[Vector[Map[String, String]]] = + nels.foldLeft(emptyNel) { (acc, v) => + (acc, v) mapN { (a, v) => a :+ v } + } + seqNel + } + + override protected def missingValueMessage: String = + s"Expecting $key runtime attribute to be an Array[Map[String, String]]" +} \ No newline at end of file From fada2c5b5126a95145e4f82d7bdd887e14a97688 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 13 Apr 2021 19:49:19 +0100 Subject: [PATCH 15/24] add runtime attributes to the list --- .../cromwell/backend/impl/aws/AwsBatchAttributes.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala index 26f69c4e79a..dffcb983235 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAttributes.scala @@ -71,7 +71,14 @@ object AwsBatchAttributes { "filesystems.local.auth", "filesystems.s3.auth", "filesystems.s3.caching.duplication-strategy", - "filesystems.local.caching.duplication-strategy" + "filesystems.local.caching.duplication-strategy", + "auth", + "numCreateDefinitionAttempts", + "filesystems.s3.duplication-strategy", + "numSubmitAttempts", + "default-runtime-attributes.scriptBucketName", + "awsBatchRetryAttempts", + "ulimits" ) private val deprecatedAwsBatchKeys: Map[String, String] = Map( From f16639264a54f96e238f700c420516465ddac2b3 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 13 Apr 2021 23:14:17 +0100 Subject: [PATCH 16/24] missing argument on building name --- .../scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index aa06da46b1f..876a23c32c0 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -136,7 +136,7 @@ trait AwsBatchJobDefinitionBuilder { } - def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = { + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit]): String = { s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}" } From a1f7d682faf30b04634385fb094e2d8479dd8793 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Tue, 13 Apr 2021 23:48:21 +0100 Subject: [PATCH 17/24] unify --- .../impl/aws/AwsBatchJobDefinition.scala | 21 +- .../impl/aws/AwsBatchRuntimeAttributes.scala | 554 +++++++----------- 2 files changed, 211 insertions(+), 364 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala index 2f54355a38b..876a23c32c0 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchJobDefinition.scala @@ -123,19 +123,19 @@ trait AwsBatchJobDefinitionBuilder { ) } - + def buildUlimits(ulimits: Seq[Map[String, String]]): List[Ulimit] = { ulimits.filter(_.nonEmpty).map(u => - Ulimit.builder() - .name(u("name")) - .softLimit(u("softLimit").toInt) - .hardLimit(u("hardLimit").toInt) - .build() + Ulimit.builder() + .name(u("name")) + .softLimit(u("softLimit").toInt) + .hardLimit(u("hardLimit").toInt) + .build() ).toList } - + def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit]): String = { s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}" } @@ -164,9 +164,10 @@ trait AwsBatchJobDefinitionBuilder { .command(packedCommand.asJava) .memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt) .vcpus(context.runtimeAttributes.cpu##) - .volumes( volumes.asJava) - .mountPoints( mountPoints.asJava) - .environment(environment.asJava).ulimits(ulimits.asJava), + .volumes(volumes.asJava) + .mountPoints(mountPoints.asJava) + .environment(environment.asJava) + .ulimits(ulimits.asJava), containerPropsName) } diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala index 234b254dc71..8296eefd42a 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributes.scala @@ -47,34 +47,35 @@ import wom.values._ import scala.util.matching.Regex -/** Attributes that are provided to the job at runtime - * @param cpu number of vCPU - * @param zones the aws availability zones to run in - * @param memory memory to allocate - * @param disks a sequence of disk volumes - * @param dockerImage the name of the docker image that the job will run in - * @param queueArn the arn of the AWS Batch queue that the job will be submitted to - * @param failOnStderr should the job fail if something is logged to `stderr` - * @param continueOnReturnCode decides if a job continues on receiving a specific return code - * @param noAddress is there no address - * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed - * @param fileSystem the filesystem type, default is "s3" - */ -case class AwsBatchRuntimeAttributes( - cpu: Int Refined Positive, - zones: Vector[String], - memory: MemorySize, - disks: Seq[AwsBatchVolume], - dockerImage: String, - queueArn: String, - failOnStderr: Boolean, - continueOnReturnCode: ContinueOnReturnCode, - noAddress: Boolean, - scriptS3BucketName: String, - awsBatchRetryAttempts: Int, - ulimits: Vector[Map[String, String]], - fileSystem: String = "s3" -) +/** + * Attributes that are provided to the job at runtime + * @param cpu number of vCPU + * @param zones the aws availability zones to run in + * @param memory memory to allocate + * @param disks a sequence of disk volumes + * @param dockerImage the name of the docker image that the job will run in + * @param queueArn the arn of the AWS Batch queue that the job will be submitted to + * @param failOnStderr should the job fail if something is logged to `stderr` + * @param continueOnReturnCode decides if a job continues on receiving a specific return code + * @param noAddress is there no address + * @param scriptS3BucketName the s3 bucket where the execution command or script will be written and, from there, fetched into the container and executed + * @param fileSystem the filesystem type, default is "s3" + * @param awsBatchRetryAttempts number of attempts that AWS Batch will retry the task if it fails + * @param ulimits ulimit values to be passed to the container + */ +case class AwsBatchRuntimeAttributes(cpu: Int Refined Positive, + zones: Vector[String], + memory: MemorySize, + disks: Seq[AwsBatchVolume], + dockerImage: String, + queueArn: String, + failOnStderr: Boolean, + continueOnReturnCode: ContinueOnReturnCode, + noAddress: Boolean, + scriptS3BucketName: String, + awsBatchRetryAttempts: Int, + ulimits: Vector[Map[String, String]], + fileSystem: String= "s3") object AwsBatchRuntimeAttributes { @@ -87,19 +88,8 @@ object AwsBatchRuntimeAttributes { val ZonesKey = "zones" private val ZonesDefaultValue = WomString("us-east-1a") - val UlimitsKey = "ulimits" - // private val UlimitsDefaultValue = WomString("us-east-1a") - // private val UlimitsDefaultValue2 = WomArray( - // Vector(WomMap(Map.empty[WomValue, WomValue])) - // ) - - private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) - // private val UlimitsDefaultValue2 = WomArray(WomArrayType(WomStringType), Vector(WomString(""), WomString(""))) - - val NoAddressKey = "noAddress" - private val noAddressValidationInstance = - new BooleanRuntimeAttributesValidation(NoAddressKey) + private val noAddressValidationInstance = new BooleanRuntimeAttributesValidation(NoAddressKey) private val NoAddressDefaultValue = WomBoolean(false) // TODO: Determine good volume format @@ -108,227 +98,111 @@ object AwsBatchRuntimeAttributes { private val MemoryDefaultValue = "2 GB" - private def cpuValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance - .withDefault( - CpuValidation.configDefaultWomValue( - runtimeConfig - ) getOrElse CpuValidation.defaultMin - ) + val UlimitsKey = "ulimits" + private val UlimitsDefaultValue = WomArray(WomArrayType(WomMapType(WomStringType,WomStringType)), Vector(WomMap(Map.empty[WomValue, WomValue]))) - private def cpuMinValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Int Refined Positive] = - CpuValidation.instanceMin - .withDefault( - CpuValidation.configDefaultWomValue( - runtimeConfig - ) getOrElse CpuValidation.defaultMin - ) - - private def failOnStderrValidation(runtimeConfig: Option[Config]) = - FailOnStderrValidation.default(runtimeConfig) - - private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) = - ContinueOnReturnCodeValidation.default(runtimeConfig) - - private def disksValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Seq[AwsBatchVolume]] = DisksValidation - .withDefault( - DisksValidation.configDefaultWomValue( - runtimeConfig - ) getOrElse DisksDefaultValue - ) + private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instance + .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) - private def zonesValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Vector[String]] = ZonesValidation - .withDefault( - ZonesValidation.configDefaultWomValue( - runtimeConfig - ) getOrElse ZonesDefaultValue - ) + private def cpuMinValidation(runtimeConfig: Option[Config]):RuntimeAttributesValidation[Int Refined Positive] = CpuValidation.instanceMin + .withDefault(CpuValidation.configDefaultWomValue(runtimeConfig) getOrElse CpuValidation.defaultMin) + + private def failOnStderrValidation(runtimeConfig: Option[Config]) = FailOnStderrValidation.default(runtimeConfig) + + private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) = ContinueOnReturnCodeValidation.default(runtimeConfig) + + private def disksValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Seq[AwsBatchVolume]] = DisksValidation + .withDefault(DisksValidation.configDefaultWomValue(runtimeConfig) getOrElse DisksDefaultValue) + + private def zonesValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Vector[String]] = ZonesValidation + .withDefault(ZonesValidation.configDefaultWomValue(runtimeConfig) getOrElse ZonesDefaultValue) - private def ulimitsValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Vector[Map[String, String]]] ={ - UlimitsValidation - .withDefault( - UlimitsValidation.configDefaultWomValue( - runtimeConfig - ) getOrElse UlimitsDefaultValue - )} - - // private def ulimitsValidation( - // runtimeConfig: Option[Config] - // ): RuntimeAttributesValidation[Vector[Map[String, String]]] ={ - // println("!!! Henrique " + UlimitsValidation.configDefaultWomValue(runtimeConfig).getOrElse(UlimitsDefaultValue)) - // UlimitsValidation - // .withDefault(UlimitsDefaultValue)} - - private def memoryValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[MemorySize] = { + private def memoryValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { MemoryValidation.withDefaultMemory( RuntimeAttributesKeys.MemoryKey, - MemoryValidation.configDefaultString( - RuntimeAttributesKeys.MemoryKey, - runtimeConfig - ) getOrElse MemoryDefaultValue - ) + MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue) } - private def memoryMinValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[MemorySize] = { + private def memoryMinValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { MemoryValidation.withDefaultMemory( RuntimeAttributesKeys.MemoryMinKey, - MemoryValidation.configDefaultString( - RuntimeAttributesKeys.MemoryMinKey, - runtimeConfig - ) getOrElse MemoryDefaultValue - ) + MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryMinKey, runtimeConfig) getOrElse MemoryDefaultValue) } - private def noAddressValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Boolean] = noAddressValidationInstance - .withDefault( - noAddressValidationInstance.configDefaultWomValue( - runtimeConfig - ) getOrElse NoAddressDefaultValue - ) + private def noAddressValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Boolean] = noAddressValidationInstance + .withDefault(noAddressValidationInstance.configDefaultWomValue(runtimeConfig) getOrElse NoAddressDefaultValue) - private def scriptS3BucketNameValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[String] = { - ScriptS3BucketNameValidation(scriptS3BucketKey).withDefault( - ScriptS3BucketNameValidation(scriptS3BucketKey) - .configDefaultWomValue(runtimeConfig) - .getOrElse(throw new RuntimeException("scriptBucketName is required")) - ) + private def scriptS3BucketNameValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[String] = { + ScriptS3BucketNameValidation(scriptS3BucketKey).withDefault(ScriptS3BucketNameValidation(scriptS3BucketKey) + .configDefaultWomValue(runtimeConfig).getOrElse( throw new RuntimeException( "scriptBucketName is required" ))) } - private val dockerValidation: RuntimeAttributesValidation[String] = - DockerValidation.instance + private val dockerValidation: RuntimeAttributesValidation[String] = DockerValidation.instance - private def queueArnValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[String] = - QueueArnValidation.withDefault( - QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse - (throw new RuntimeException("queueArn is required")) - ) + private def queueArnValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[String] = + QueueArnValidation.withDefault(QueueArnValidation.configDefaultWomValue(runtimeConfig) getOrElse + (throw new RuntimeException("queueArn is required"))) - private def awsBatchRetryAttemptsValidation( - runtimeConfig: Option[Config] - ): RuntimeAttributesValidation[Int] = { - AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault( - AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) - .configDefaultWomValue(runtimeConfig) - .getOrElse(WomInteger(0)) - ) + private def awsBatchRetryAttemptsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = { + AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey).withDefault(AwsBatchRetryAttemptsValidation(awsBatchRetryAttemptsKey) + .configDefaultWomValue(runtimeConfig).getOrElse(WomInteger(0))) } - def runtimeAttributesBuilder( - configuration: AwsBatchConfiguration - ): StandardValidatedRuntimeAttributesBuilder = { - val runtimeConfig = configuration.runtimeConfig - def validationsS3backend = StandardValidatedRuntimeAttributesBuilder - .default(runtimeConfig) - .withValidation( - cpuValidation(runtimeConfig), - cpuMinValidation(runtimeConfig), - disksValidation(runtimeConfig), - zonesValidation(runtimeConfig), - memoryValidation(runtimeConfig), - memoryMinValidation(runtimeConfig), - noAddressValidation(runtimeConfig), - dockerValidation, - queueArnValidation(runtimeConfig), - scriptS3BucketNameValidation(runtimeConfig), - awsBatchRetryAttemptsValidation(runtimeConfig), - ulimitsValidation(runtimeConfig) - ) - def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder - .default(runtimeConfig) - .withValidation( - cpuValidation(runtimeConfig), - cpuMinValidation(runtimeConfig), - disksValidation(runtimeConfig), - zonesValidation(runtimeConfig), - memoryValidation(runtimeConfig), - memoryMinValidation(runtimeConfig), - noAddressValidation(runtimeConfig), - dockerValidation, - queueArnValidation(runtimeConfig) - ) - - configuration.fileSystem match { - case AWSBatchStorageSystems.s3 => validationsS3backend - - case _ => validationsLocalBackend - } - } + private def ulimitsValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Vector[Map[String, String]]] = + UlimitsValidation.withDefault(UlimitsValidation.configDefaultWomValue(runtimeConfig) getOrElse UlimitsDefaultValue) - def apply( - validatedRuntimeAttributes: ValidatedRuntimeAttributes, - runtimeAttrsConfig: Option[Config], - fileSystem: String - ): AwsBatchRuntimeAttributes = { - val cpu: Int Refined Positive = RuntimeAttributesValidation.extract( - cpuValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val zones: Vector[String] = RuntimeAttributesValidation.extract( - ZonesValidation, - validatedRuntimeAttributes - ) - val memory: MemorySize = RuntimeAttributesValidation.extract( - memoryValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val disks: Seq[AwsBatchVolume] = RuntimeAttributesValidation.extract( - disksValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val docker: String = RuntimeAttributesValidation.extract( + def runtimeAttributesBuilder(configuration: AwsBatchConfiguration): StandardValidatedRuntimeAttributesBuilder = { + val runtimeConfig = configuration.runtimeConfig + def validationsS3backend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( + cpuValidation(runtimeConfig), + cpuMinValidation(runtimeConfig), + disksValidation(runtimeConfig), + zonesValidation(runtimeConfig), + memoryValidation(runtimeConfig), + memoryMinValidation(runtimeConfig), + noAddressValidation(runtimeConfig), + dockerValidation, + queueArnValidation(runtimeConfig), + scriptS3BucketNameValidation(runtimeConfig), + awsBatchRetryAttemptsValidation(runtimeConfig), + ulimitsValidation(runtimeConfig), + ) + def validationsLocalBackend = StandardValidatedRuntimeAttributesBuilder.default(runtimeConfig).withValidation( + cpuValidation(runtimeConfig), + cpuMinValidation(runtimeConfig), + disksValidation(runtimeConfig), + zonesValidation(runtimeConfig), + memoryValidation(runtimeConfig), + memoryMinValidation(runtimeConfig), + noAddressValidation(runtimeConfig), dockerValidation, - validatedRuntimeAttributes + queueArnValidation(runtimeConfig) ) - val queueArn: String = RuntimeAttributesValidation.extract( - queueArnValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val failOnStderr: Boolean = RuntimeAttributesValidation.extract( - failOnStderrValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val continueOnReturnCode: ContinueOnReturnCode = - RuntimeAttributesValidation.extract( - continueOnReturnCodeValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val noAddress: Boolean = RuntimeAttributesValidation.extract( - noAddressValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val scriptS3BucketName = fileSystem match { - case AWSBatchStorageSystems.s3 => - RuntimeAttributesValidation.extract( - scriptS3BucketNameValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - case _ => "" + + configuration.fileSystem match { + case AWSBatchStorageSystems.s3 => validationsS3backend + + case _ => validationsLocalBackend } - val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract( - awsBatchRetryAttemptsValidation(runtimeAttrsConfig), - validatedRuntimeAttributes - ) - val ulimits: Vector[Map[String, String]] = RuntimeAttributesValidation - .extract(ulimitsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + } + + def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, runtimeAttrsConfig: Option[Config], fileSystem:String): AwsBatchRuntimeAttributes = { + val cpu: Int Refined Positive = RuntimeAttributesValidation.extract(cpuValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val zones: Vector[String] = RuntimeAttributesValidation.extract(ZonesValidation, validatedRuntimeAttributes) + val memory: MemorySize = RuntimeAttributesValidation.extract(memoryValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val disks: Seq[AwsBatchVolume] = RuntimeAttributesValidation.extract(disksValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val docker: String = RuntimeAttributesValidation.extract(dockerValidation, validatedRuntimeAttributes) + val queueArn: String = RuntimeAttributesValidation.extract(queueArnValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val failOnStderr: Boolean = RuntimeAttributesValidation.extract(failOnStderrValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val continueOnReturnCode: ContinueOnReturnCode = RuntimeAttributesValidation.extract(continueOnReturnCodeValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val noAddress: Boolean = RuntimeAttributesValidation.extract(noAddressValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val scriptS3BucketName = fileSystem match { + case AWSBatchStorageSystems.s3 => RuntimeAttributesValidation.extract(scriptS3BucketNameValidation(runtimeAttrsConfig) , validatedRuntimeAttributes) + case _ => "" + } + val awsBatchRetryAttempts: Int = RuntimeAttributesValidation.extract(awsBatchRetryAttemptsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + val ulimits: Vector[Map[String, String]] = RuntimeAttributesValidation.extract(ulimitsValidation(runtimeAttrsConfig), validatedRuntimeAttributes) + new AwsBatchRuntimeAttributes( cpu, @@ -349,40 +223,34 @@ object AwsBatchRuntimeAttributes { } object ScriptS3BucketNameValidation { - def apply(key: String): ScriptS3BucketNameValidation = - new ScriptS3BucketNameValidation(key) + def apply(key: String): ScriptS3BucketNameValidation = new ScriptS3BucketNameValidation(key) } -class ScriptS3BucketNameValidation(key: String) - extends StringRuntimeAttributesValidation(key) { +class ScriptS3BucketNameValidation( key: String ) extends StringRuntimeAttributesValidation(key) { //a reasonable but not perfect regex for a bucket. see https://stackoverflow.com/a/50484916/3573553 - protected val s3BucketNameRegex: Regex = - "(?=^.{3,63}$)(?!^(\\d+\\.)+\\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)".r + protected val s3BucketNameRegex: Regex = "(?=^.{3,63}$)(?!^(\\d+\\.)+\\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])\\.)*([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])$)" + .r - override protected def validateValue - : PartialFunction[WomValue, ErrorOr[String]] = { case WomString(s) => - validateBucketName(s) + + override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = { + case WomString(s) => validateBucketName(s) } - private def validateBucketName( - possibleBucketName: String - ): ErrorOr[String] = { + private def validateBucketName(possibleBucketName: String): ErrorOr[String] = { possibleBucketName match { - case s3BucketNameRegex(_ @_*) => possibleBucketName.validNel - case _ => - "The Script Bucket name has an invalid s3 bucket format".invalidNel + case s3BucketNameRegex(_@_*) => possibleBucketName.validNel + case _ => "The Script Bucket name has an invalid s3 bucket format".invalidNel } } } -object QueueArnValidation - extends ArnValidation(AwsBatchRuntimeAttributes.QueueArnKey) { +object QueueArnValidation extends ArnValidation(AwsBatchRuntimeAttributes.QueueArnKey) { // queue arn format can be found here // https://docs.aws.amazon.com/en_us/general/latest/gr/aws-arns-and-namespaces.html#arn-syntax-batch // arn:aws:batch:region:account-id:job-queue/queue-name override protected val arnRegex: Regex = - s""" + s""" (?x) # Turn on comments and whitespace insensitivity (arn) # Every AWS ARN starts with "arn" : @@ -413,17 +281,15 @@ object ArnValidation { def apply(key: String): ArnValidation = new ArnValidation(key) } -class ArnValidation(override val key: String) - extends StringRuntimeAttributesValidation(key) { - override protected def validateValue - : PartialFunction[WomValue, ErrorOr[String]] = { case WomString(s) => - validateArn(s) +class ArnValidation(override val key: String) extends StringRuntimeAttributesValidation(key) { + override protected def validateValue: PartialFunction[WomValue, ErrorOr[String]] = { + case WomString(s) => validateArn(s) } private def validateArn(possibleArn: String): ErrorOr[String] = { possibleArn match { - case arnRegex(_ @_*) => possibleArn.validNel - case _ => "ARN has invalid format".invalidNel + case arnRegex(_@_*) => possibleArn.validNel + case _ => "ARN has invalid format".invalidNel } } @@ -431,7 +297,7 @@ class ArnValidation(override val key: String) // https://docs.aws.amazon.com/en_us/general/latest/gr/aws-arns-and-namespaces.html // This is quite vague regex, but it allows to support a lot of ARN formats protected val arnRegex: Regex = - s""" + s""" (?x) # Turn on comments and whitespace insensitivity (arn) # Every ARN starts with "arn" : @@ -474,11 +340,9 @@ class ArnValidation(override val key: String) object ZonesValidation extends RuntimeAttributesValidation[Vector[String]] { override def key: String = AwsBatchRuntimeAttributes.ZonesKey - override def coercion: Traversable[WomType] = - Set(WomStringType, WomArrayType(WomStringType)) + override def coercion: Traversable[WomType] = Set(WomStringType, WomArrayType(WomStringType)) - override protected def validateValue - : PartialFunction[WomValue, ErrorOr[Vector[String]]] = { + override protected def validateValue: PartialFunction[WomValue, ErrorOr[Vector[String]]] = { case WomString(s) => s.split("\\s+").toVector.validNel case WomArray(womType, value) if womType.memberType == WomStringType => value.map(_.valueString).toVector.validNel @@ -488,6 +352,72 @@ object ZonesValidation extends RuntimeAttributesValidation[Vector[String]] { s"Expecting $key runtime attribute to be either a whitespace separated String or an Array[String]" } +object DisksValidation extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] { + override def key: String = AwsBatchRuntimeAttributes.DisksKey + + override def coercion: Traversable[WomType] = Set(WomStringType, WomArrayType(WomStringType)) + + override protected def validateValue: PartialFunction[WomValue, ErrorOr[Seq[AwsBatchVolume]]] = { + case WomString(value) => validateLocalDisks(value.split(",\\s*").toSeq) + case WomArray(womType, values) if womType.memberType == WomStringType => + validateLocalDisks(values.map(_.valueString)) + } + + private def validateLocalDisks(disks: Seq[String]): ErrorOr[Seq[AwsBatchVolume]] = { + val diskNels: Seq[ErrorOr[AwsBatchVolume]] = disks map validateLocalDisk + val sequenced: ErrorOr[Seq[AwsBatchVolume]] = sequenceNels(diskNels) + val defaulted: ErrorOr[Seq[AwsBatchVolume]] = addDefault(sequenced) + defaulted + } + + private def validateLocalDisk(disk: String): ErrorOr[AwsBatchVolume] = { + AwsBatchVolume.parse(disk) match { + case scala.util.Success(attachedDisk) => attachedDisk.validNel + case scala.util.Failure(ex) => ex.getMessage.invalidNel + } + } + + private def sequenceNels(nels: Seq[ErrorOr[AwsBatchVolume]]): ErrorOr[Seq[AwsBatchVolume]] = { + val emptyDiskNel: ErrorOr[Vector[AwsBatchVolume]] = Vector.empty[AwsBatchVolume].validNel + val disksNel: ErrorOr[Vector[AwsBatchVolume]] = nels.foldLeft(emptyDiskNel) { + (acc, v) => (acc, v) mapN { (a, v) => a :+ v } + } + disksNel + } + + private def addDefault(disksNel: ErrorOr[Seq[AwsBatchVolume]]): ErrorOr[Seq[AwsBatchVolume]] = { + disksNel map { + case disks if disks.exists(_.name == AwsBatchWorkingDisk.Name) || disks.exists(_.fsType == "efs") => disks + case disks => disks :+ AwsBatchWorkingDisk.Default + } + } + + override protected def missingValueMessage: String = + s"Expecting $key runtime attribute to be a comma separated String or Array[String]" +} + +object AwsBatchRetryAttemptsValidation { + def apply(key: String): AwsBatchRetryAttemptsValidation = new AwsBatchRetryAttemptsValidation(key) +} + +class AwsBatchRetryAttemptsValidation(key: String) extends IntRuntimeAttributesValidation(key) { + override protected def validateValue: PartialFunction[WomValue, ErrorOr[Int]] = { + case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => + WomIntegerType.coerceRawValue(womValue).get match { + case WomInteger(value) => + if (value.toInt < 0) + s"Expecting $key runtime attribute value greater than or equal to 0".invalidNel + else if (value.toInt > 10) + s"Expecting $key runtime attribute value lower than or equal to 10".invalidNel + else + value.toInt.validNel + } + } + + override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Integer" +} + + object UlimitsValidation extends RuntimeAttributesValidation[Vector[Map[String, String]]] { override def key: String = AwsBatchRuntimeAttributes.UlimitsKey @@ -555,88 +485,4 @@ object UlimitsValidation override protected def missingValueMessage: String = s"Expecting $key runtime attribute to be an Array[Map[String, String]]" -} - -object DisksValidation - extends RuntimeAttributesValidation[Seq[AwsBatchVolume]] { - override def key: String = AwsBatchRuntimeAttributes.DisksKey - - override def coercion: Traversable[WomType] = - Set(WomStringType, WomArrayType(WomStringType)) - - override protected def validateValue - : PartialFunction[WomValue, ErrorOr[Seq[AwsBatchVolume]]] = { - case WomString(value) => validateLocalDisks(value.split(",\\s*").toSeq) - case WomArray(womType, values) if womType.memberType == WomStringType => - validateLocalDisks(values.map(_.valueString)) - } - - private def validateLocalDisks( - disks: Seq[String] - ): ErrorOr[Seq[AwsBatchVolume]] = { - val diskNels: Seq[ErrorOr[AwsBatchVolume]] = disks map validateLocalDisk - val sequenced: ErrorOr[Seq[AwsBatchVolume]] = sequenceNels(diskNels) - val defaulted: ErrorOr[Seq[AwsBatchVolume]] = addDefault(sequenced) - defaulted - } - - private def validateLocalDisk(disk: String): ErrorOr[AwsBatchVolume] = { - AwsBatchVolume.parse(disk) match { - case scala.util.Success(attachedDisk) => attachedDisk.validNel - case scala.util.Failure(ex) => ex.getMessage.invalidNel - } - } - - private def sequenceNels( - nels: Seq[ErrorOr[AwsBatchVolume]] - ): ErrorOr[Seq[AwsBatchVolume]] = { - val emptyDiskNel: ErrorOr[Vector[AwsBatchVolume]] = - Vector.empty[AwsBatchVolume].validNel - val disksNel: ErrorOr[Vector[AwsBatchVolume]] = - nels.foldLeft(emptyDiskNel) { (acc, v) => - (acc, v) mapN { (a, v) => a :+ v } - } - disksNel - } - - private def addDefault( - disksNel: ErrorOr[Seq[AwsBatchVolume]] - ): ErrorOr[Seq[AwsBatchVolume]] = { - disksNel map { - case disks - if disks.exists(_.name == AwsBatchWorkingDisk.Name) || disks.exists( - _.fsType == "efs" - ) => - disks - case disks => disks :+ AwsBatchWorkingDisk.Default - } - } - - override protected def missingValueMessage: String = - s"Expecting $key runtime attribute to be a comma separated String or Array[String]" -} - -object AwsBatchRetryAttemptsValidation { - def apply(key: String): AwsBatchRetryAttemptsValidation = - new AwsBatchRetryAttemptsValidation(key) -} - -class AwsBatchRetryAttemptsValidation(key: String) - extends IntRuntimeAttributesValidation(key) { - override protected def validateValue - : PartialFunction[WomValue, ErrorOr[Int]] = { - case womValue if WomIntegerType.coerceRawValue(womValue).isSuccess => - WomIntegerType.coerceRawValue(womValue).get match { - case WomInteger(value) => - if (value.toInt < 0) - s"Expecting $key runtime attribute value greater than or equal to 0".invalidNel - else if (value.toInt > 10) - s"Expecting $key runtime attribute value lower than or equal to 10".invalidNel - else - value.toInt.validNel - } - } - - override protected def missingValueMessage: String = - s"Expecting $key runtime attribute to be an Integer" -} +} \ No newline at end of file From 9e2e06a800dcbcb2124d6e3cf594fc8a4ac7043b Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 14 Apr 2021 11:08:19 +0100 Subject: [PATCH 18/24] add ulimits to docs --- docs/RuntimeAttributes.md | 41 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/docs/RuntimeAttributes.md b/docs/RuntimeAttributes.md index f2419cbe621..e640e432030 100644 --- a/docs/RuntimeAttributes.md +++ b/docs/RuntimeAttributes.md @@ -58,6 +58,7 @@ There are a number of additional runtime attributes that apply to the Google Clo ### AWS Specific Attributes - [awsBatchRetryAttempts](#awsBatchRetryAttempts) +- [ulimits](#ulimits) ## Expression support @@ -325,8 +326,6 @@ runtime { ``` - - ### `bootDiskSizeGb` In addition to working disks, Google Cloud allows specification of a boot disk size. This is the disk where the docker image itself is booted (**not the working directory of your task on the VM**). @@ -375,6 +374,7 @@ runtime { } ``` + ### `awsBatchRetryAttempts` *Default: _0_* @@ -383,10 +383,45 @@ This runtime attribute adds support to [*AWS Batch Automated Job Retries*](https ``` runtime { - awsBatchRetryAttempts: 3 + awsBatchRetryAttempts: integer } ``` + +### `ulimits` + +*Default: _empty_* + +A list of [`ulimits`](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties) values to set in the container. This parameter maps to `Ulimits` in the [Create a container](https://docs.docker.com/engine/api/v1.38/) section of the [Docker Remote API](https://docs.docker.com/engine/api/v1.38/) and the `--ulimit` option to [docker run](https://docs.docker.com/engine/reference/commandline/run/). + +``` +"ulimits": [ + { + "name": string, + "softLimit": integer, + "hardLimit": integer + } + ... +] +``` +Parameter description: + +- `name` + - The `type` of the `ulimit`. + - Type: String + - Required: Yes, when `ulimits` is used. + +- `softLimit` + - The soft limit for the `ulimit` type. + - Type: Integer + - Required: Yes, when `ulimits` is used. + +- `hardLimit` + - The hard limit for the `ulimit` type. + - Type: Integer + - Required: Yes, when `ulimits` is used. + + #### How to Setup Configure your Google network to use "Private Google Access". This will allow your VMs to access Google Services including Google Container Registry, as well as Dockerhub images. From cfaec82fb6f697cb1744fb9dba033c4fd178676e Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 14 Apr 2021 13:26:33 +0100 Subject: [PATCH 19/24] add ulimits to test; missing proper tests --- .../scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala | 1 + .../backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala index 9117ba02f6e..4a7ea041b10 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchJobSpec.scala @@ -114,6 +114,7 @@ class AwsBatchJobSpec extends TestKitSuite with AnyFlatSpecLike with Matchers wi noAddress = false, scriptS3BucketName = "script-bucket", awsBatchRetryAttempts = 1, + ulimits = Vector(Map.empty[String, String]), fileSystem = "s3") private def generateBasicJob: AwsBatchJob = { diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 822d21e35f2..ee36f0e3f14 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -65,7 +65,9 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout false, ContinueOnReturnCodeSet(Set(0)), false, - "my-stuff", 0) + "my-stuff", + 0, + Vector(Map.empty[String, String])) val expectedDefaultsLocalFS = new AwsBatchRuntimeAttributes(refineMV[Positive](1), Vector("us-east-1a", "us-east-1b"), @@ -76,6 +78,8 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout ContinueOnReturnCodeSet(Set(0)), false, "", + 0, + Vector(Map.empty[String, String]), "local") "AwsBatchRuntimeAttributes" should { From 674fa3a57bfddd6f752c4ef30069f3f765c7fc91 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 14 Apr 2021 17:39:08 +0100 Subject: [PATCH 20/24] remove duplicated tests --- .../backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index ee36f0e3f14..751b3bdc384 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -350,17 +350,17 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) } - "fail to validate an invalid awsBatchRetryAttempts entry" in { + "fail to validate with -1 as awsBatchRetryAttempts" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(-1)) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value greater than or equal to 0") } - "fail to validate an invalid awsBatchRetryAttempts entry" in { + "fail to validate with 12 as awsBatchRetryAttempts" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(12)) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") } - "fail to validate an invalid awsBatchRetryAttempts entry" in { + "fail to validate with a string as awsBatchRetryAttempts" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomString("test")) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute to be an Integer") } From 832c65281f1c5c61b03ab3700a5528398fcce6ee Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Thu, 15 Apr 2021 12:06:17 +0100 Subject: [PATCH 21/24] fix tests --- .../backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 751b3bdc384..c5aa51536be 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -66,7 +66,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout ContinueOnReturnCodeSet(Set(0)), false, "my-stuff", - 0, + 1, Vector(Map.empty[String, String])) val expectedDefaultsLocalFS = new AwsBatchRuntimeAttributes(refineMV[Positive](1), Vector("us-east-1a", "us-east-1b"), @@ -78,7 +78,7 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout ContinueOnReturnCodeSet(Set(0)), false, "", - 0, + 1, Vector(Map.empty[String, String]), "local") From 87359fe70f7a9fd9b8983a013bae1c20883c3fd7 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Thu, 15 Apr 2021 16:59:56 +0100 Subject: [PATCH 22/24] fix tests --- .../impl/aws/AwsBatchRuntimeAttributesSpec.scala | 10 +++++----- .../cromwell/backend/impl/aws/AwsBatchTestConfig.scala | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index c5aa51536be..5555ceef527 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -345,28 +345,28 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout } "validate a valid awsBatchRetryAttempts entry" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(9)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(9), "scriptBucketName" -> WomString("my-stuff")) val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 9) assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) } "fail to validate with -1 as awsBatchRetryAttempts" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(-1)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(-1), "scriptBucketName" -> WomString("my-stuff")) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value greater than or equal to 0") } "fail to validate with 12 as awsBatchRetryAttempts" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(12)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(12), "scriptBucketName" -> WomString("my-stuff")) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") } "fail to validate with a string as awsBatchRetryAttempts" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomString("test")) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomString("test"), "scriptBucketName" -> WomString("my-stuff")) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute to be an Integer") } "validate zero as awsBatchRetryAttempts entry" in { - val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0)) + val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0), "scriptBucketName" -> WomString("my-stuff")) assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") } } diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala index 48d4a07c611..682714b225c 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchTestConfig.scala @@ -141,6 +141,7 @@ object AwsBatchTestConfigForLocalFS { | zones:["us-east-1a", "us-east-1b"] | queueArn: "arn:aws:batch:us-east-1:111222333444:job-queue/job-queue" | scriptBucketName: "" + | awsBatchRetryAttempts: 1 |} | |""".stripMargin From 6642c1eb45ce13b0b3f916b20889dd3ec870374d Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Thu, 15 Apr 2021 19:18:53 +0100 Subject: [PATCH 23/24] fix tests --- backend/src/main/scala/cromwell/backend/backend.scala | 2 ++ .../backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/backend/src/main/scala/cromwell/backend/backend.scala b/backend/src/main/scala/cromwell/backend/backend.scala index 601207362ef..30317d0684c 100644 --- a/backend/src/main/scala/cromwell/backend/backend.scala +++ b/backend/src/main/scala/cromwell/backend/backend.scala @@ -141,6 +141,8 @@ object CommonBackendConfigurationAttributes { "default-runtime-attributes.noAddress", "default-runtime-attributes.docker", "default-runtime-attributes.queueArn", + "default-runtime-attributes.awsBatchRetryAttempts", + "default-runtime-attributes.ulimits", "default-runtime-attributes.failOnStderr", "slow-job-warning-time", "dockerhub", diff --git a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala index 5555ceef527..f8c009f95ed 100644 --- a/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala +++ b/supportedBackends/aws/src/test/scala/cromwell/backend/impl/aws/AwsBatchRuntimeAttributesSpec.scala @@ -367,7 +367,8 @@ class AwsBatchRuntimeAttributesSpec extends AnyWordSpecLike with CromwellTimeout "validate zero as awsBatchRetryAttempts entry" in { val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"), "awsBatchRetryAttempts" -> WomInteger(0), "scriptBucketName" -> WomString("my-stuff")) - assertAwsBatchRuntimeAttributesFailedCreation(runtimeAttributes, "Expecting awsBatchRetryAttempts runtime attribute value lower than or equal to 10") + val expectedRuntimeAttributes = expectedDefaults.copy(awsBatchRetryAttempts = 0) + assertAwsBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes, expectedRuntimeAttributes) } } From 7b03c40b1f21f89e65466b9a5ad8635f5ddaec73 Mon Sep 17 00:00:00 2001 From: Henrique Silva Date: Wed, 4 Aug 2021 17:12:43 +0100 Subject: [PATCH 24/24] remove debug messages --- .../impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala index fb929b59016..4ceb0c8295e 100755 --- a/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/aws/src/main/scala/cromwell/backend/impl/aws/AwsBatchAsyncBackendJobExecutionActor.scala @@ -208,13 +208,10 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar (remotePathArray zip localPathArray zipWithIndex) flatMap { case ((remotePath, localPath), index) => var localPathString = localPath.valueString - // Log.warn(s"!!!RAW: ${flag}: ${remotePath.valueString} -> ${localPathString}") if (localPathString.startsWith("s3://")){ localPathString = localPathString.replace("s3://", "") - Log.error(s"!!!Debug error 1: ${remotePath.valueString} -> ${localPath.valueString} ->${localPathString}") }else if (localPathString.startsWith("s3:/")) { localPathString = localPathString.replace("s3:/", "") - Log.error(s"!!!Debug error 2: ${remotePath.valueString} -> ${localPath.valueString} -> ${localPathString}") } Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk)) }