Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AWS] Add runtime attributes #6204

Open
wants to merge 30 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a076c32
add awsBatchRetryAttempts as a runtime attribute
henriqueribeiro Feb 25, 2021
de905d8
exit reconfigure-script with same exit code as rc file
henriqueribeiro Feb 26, 2021
0beb8f4
re-structure code
henriqueribeiro Mar 1, 2021
25abe25
fix runtimeAttribute tests
henriqueribeiro Mar 2, 2021
6470542
exit reconfigure-script with same exit code as rc file
henriqueribeiro Feb 26, 2021
94037dd
Merge branch 'aws/exit_rc_file' of github.com:henriqueribeiro/cromwel…
henriqueribeiro Mar 2, 2021
7d1c547
exit reconfigure-script with same exit code as rc file
henriqueribeiro Feb 26, 2021
62afbf6
Merge branch 'aws/exit_rc_file' of github.com:henriqueribeiro/cromwel…
henriqueribeiro Mar 2, 2021
c721e52
add awsBatchRetryAttempts to AwsBatchJobSpec test
henriqueribeiro Mar 2, 2021
1009398
add test to validate that is an integer
henriqueribeiro Mar 2, 2021
eb0c845
fix comment
henriqueribeiro Mar 2, 2021
5d63a1c
Merge pull request #1 from henriqueribeiro/aws/batch_retries
henriqueribeiro Mar 10, 2021
a96a291
Merge pull request #2 from henriqueribeiro/aws/exit_rc_file
henriqueribeiro Mar 10, 2021
5d6a57a
add debug message to try to find the error in the reconfigure script
henriqueribeiro Mar 17, 2021
9a622a3
add documentation for awsBatchRetryAttempts
henriqueribeiro Mar 17, 2021
fedda02
dump
henriqueribeiro Apr 6, 2021
97350ca
create ulimits runtime attribute
henriqueribeiro Apr 13, 2021
4b3e324
Merge pull request #3 from henriqueribeiro/develop
henriqueribeiro Apr 13, 2021
3d43af4
add 'ulimits' runtime attribute
henriqueribeiro Apr 13, 2021
fada2c5
add runtime attributes to the list
henriqueribeiro Apr 13, 2021
f166392
missing argument on building name
henriqueribeiro Apr 13, 2021
a1f7d68
unify
henriqueribeiro Apr 13, 2021
9e2e06a
add ulimits to docs
henriqueribeiro Apr 14, 2021
cfaec82
add ulimits to test; missing proper tests
henriqueribeiro Apr 14, 2021
674fa3a
remove duplicated tests
henriqueribeiro Apr 14, 2021
832c652
fix tests
henriqueribeiro Apr 15, 2021
87359fe
fix tests
henriqueribeiro Apr 15, 2021
6642c1e
fix tests
henriqueribeiro Apr 15, 2021
d28055c
Merge branch 'prod' into aws/batch_retries
henriqueribeiro Apr 16, 2021
7b03c40
remove debug messages
henriqueribeiro Aug 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions backend/src/main/scala/cromwell/backend/backend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ object CommonBackendConfigurationAttributes {
"default-runtime-attributes.noAddress",
"default-runtime-attributes.docker",
"default-runtime-attributes.queueArn",
"default-runtime-attributes.awsBatchRetryAttempts",
"default-runtime-attributes.ulimits",
"default-runtime-attributes.failOnStderr",
"slow-job-warning-time",
"dockerhub",
Expand Down
53 changes: 51 additions & 2 deletions docs/RuntimeAttributes.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ There are a number of additional runtime attributes that apply to the Google Clo
- [useDockerImageCache](#usedockerimagecache)


### AWS Specific Attributes
- [awsBatchRetryAttempts](#awsBatchRetryAttempts)
- [ulimits](#ulimits)

## Expression support

Expand Down Expand Up @@ -323,8 +326,6 @@ runtime {
```




### `bootDiskSizeGb`

In addition to working disks, Google Cloud allows specification of a boot disk size. This is the disk where the docker image itself is booted (**not the working directory of your task on the VM**).
Expand Down Expand Up @@ -373,6 +374,54 @@ runtime {
}
```


### `awsBatchRetryAttempts`

*Default: _0_*

This runtime attribute adds support to [*AWS Batch Automated Job Retries*](https://docs.aws.amazon.com/batch/latest/userguide/job_retries.html) which makes it possible to tackle transient job failures. For example, if a task fails due to a timeout from accessing an external service, then this option helps re-run the failed the task without having to re-run the entire workflow. It takes an Int, between 1 and 10, as a value that indicates the maximum number of times AWS Batch should retry a failed task. If the value 0 is passed, the [*Retry Strategy*](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#retryStrategy) will not be added to the job definiton and the task will run just once.

```
runtime {
awsBatchRetryAttempts: integer
}
```


### `ulimits`

*Default: _empty_*

A list of [`ulimits`](https://docs.aws.amazon.com/batch/latest/userguide/job_definition_parameters.html#containerProperties) values to set in the container. This parameter maps to `Ulimits` in the [Create a container](https://docs.docker.com/engine/api/v1.38/) section of the [Docker Remote API](https://docs.docker.com/engine/api/v1.38/) and the `--ulimit` option to [docker run](https://docs.docker.com/engine/reference/commandline/run/).

```
"ulimits": [
{
"name": string,
"softLimit": integer,
"hardLimit": integer
}
...
]
```
Parameter description:

- `name`
- The `type` of the `ulimit`.
- Type: String
- Required: Yes, when `ulimits` is used.

- `softLimit`
- The soft limit for the `ulimit` type.
- Type: Integer
- Required: Yes, when `ulimits` is used.

- `hardLimit`
- The hard limit for the `ulimit` type.
- Type: Integer
- Required: Yes, when `ulimits` is used.


#### How to Setup

Configure your Google network to use "Private Google Access". This will allow your VMs to access Google Services including Google Container Registry, as well as Dockerhub images.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser
private val metadataReadTimeout: Duration =
serviceConfig.getOrElse[Duration]("metadata-read-query-timeout", Duration.Inf)
private val metadataReadRowNumberSafetyThreshold: Int =
serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 1000000)
serviceConfig.getOrElse[Int]("metadata-read-row-number-safety-threshold", 3000000)

def readMetadataWorkerActorProps(): Props =
ReadDatabaseMetadataWorkerActor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,18 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
private def inputsFromWomFiles(namePrefix: String,
remotePathArray: Seq[WomFile],
localPathArray: Seq[WomFile],
jobDescriptor: BackendJobDescriptor): Iterable[AwsBatchInput] = {
jobDescriptor: BackendJobDescriptor,
flag: Boolean): Iterable[AwsBatchInput] = {

(remotePathArray zip localPathArray zipWithIndex) flatMap {
case ((remotePath, localPath), index) =>
Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPath.valueString), workingDisk))
var localPathString = localPath.valueString
if (localPathString.startsWith("s3://")){
localPathString = localPathString.replace("s3://", "")
}else if (localPathString.startsWith("s3:/")) {
localPathString = localPathString.replace("s3:/", "")
}
Seq(AwsBatchFileInput(s"$namePrefix-$index", remotePath.valueString, DefaultPathBuilder.get(localPathString), workingDisk))
}
}

Expand Down Expand Up @@ -237,7 +245,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
val writeFunctionFiles = instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f) } toMap

val writeFunctionInputs = writeFunctionFiles flatMap {
case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor)
case (name, files) => inputsFromWomFiles(name, files.map(_.file), files.map(localizationPath), jobDescriptor, false)
}

// Collect all WomFiles from inputs to the call.
Expand All @@ -257,7 +265,7 @@ class AwsBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
}

val callInputInputs = callInputFiles flatMap {
case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor)
case (name, files) => inputsFromWomFiles(name, files, files.map(relativeLocalizationPath), jobDescriptor, true)
}

val scriptInput: AwsBatchInput = AwsBatchFileInput(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,14 @@ object AwsBatchAttributes {
"filesystems.local.auth",
"filesystems.s3.auth",
"filesystems.s3.caching.duplication-strategy",
"filesystems.local.caching.duplication-strategy"
"filesystems.local.caching.duplication-strategy",
"auth",
"numCreateDefinitionAttempts",
"filesystems.s3.duplication-strategy",
"numSubmitAttempts",
"default-runtime-attributes.scriptBucketName",
"awsBatchRetryAttempts",
"ulimits"
)

private val deprecatedAwsBatchKeys: Map[String, String] = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
|echo '*** DELOCALIZING OUTPUTS ***'
|$outputCopyCommand
|echo '*** COMPLETED DELOCALIZATION ***'
|echo '*** EXITING WITH RC CODE ***'
|exit $$(head -n 1 $workDir/${jobPaths.returnCodeFilename})
|}
|""".stripMargin
}
Expand Down Expand Up @@ -388,16 +390,19 @@ final case class AwsBatchJob(jobDescriptor: BackendJobDescriptor, // WDL/CWL
// See:
//
// http://aws-java-sdk-javadoc.s3-website-us-west-2.amazonaws.com/latest/software/amazon/awssdk/services/batch/model/RegisterJobDefinitionRequest.Builder.html
val definitionRequest = RegisterJobDefinitionRequest.builder
var definitionRequest = RegisterJobDefinitionRequest.builder
.containerProperties(jobDefinition.containerProperties)
.jobDefinitionName(jobDefinitionName)
// See https://stackoverflow.com/questions/24349517/scala-method-named-type
.`type`(JobDefinitionType.CONTAINER)
.build

if (jobDefinitionContext.runtimeAttributes.awsBatchRetryAttempts != 0){
definitionRequest = definitionRequest.retryStrategy(jobDefinition.retryStrategy)
}

Log.debug(s"Submitting definition request: $definitionRequest")

val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest)
val response: RegisterJobDefinitionResponse = batchClient.registerJobDefinition(definitionRequest.build)
Log.info(s"Definition created: $response")
response.jobDefinitionArn()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import scala.language.postfixOps
import scala.collection.mutable.ListBuffer
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.io.JobPaths
import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, Volume}
import software.amazon.awssdk.services.batch.model.{ContainerProperties, Host, KeyValuePair, MountPoint, RetryStrategy, Volume, Ulimit}
import cromwell.backend.impl.aws.io.AwsBatchVolume

import scala.collection.JavaConverters._
Expand All @@ -62,12 +62,14 @@ import wdl4s.parser.MemoryUnit
*/
sealed trait AwsBatchJobDefinition {
def containerProperties: ContainerProperties
def retryStrategy: RetryStrategy
def name: String

override def toString: String = {
new ToStringBuilder(this, ToStringStyle.JSON_STYLE)
.append("name", name)
.append("containerProperties", containerProperties)
.append("retryStrategy", retryStrategy)
.build
}
}
Expand All @@ -78,23 +80,13 @@ trait AwsBatchJobDefinitionBuilder {

/** Gets a builder, seeded with appropriate portions of the container properties
*
* @param dockerImage docker image with which to run
* @return ContainerProperties builder ready for modification
* @param context AwsBatchJobDefinitionContext with all the runtime attributes
* @return ContainerProperties builder ready for modification and name
*
*/
def builder(dockerImage: String): ContainerProperties.Builder =
ContainerProperties.builder().image(dockerImage)


def buildResources(builder: ContainerProperties.Builder,
context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = {
// The initial buffer should only contain one item - the hostpath of the
// local disk mount point, which will be needed by the docker container
// that copies data around

val environment = List.empty[KeyValuePair]


def containerPropertiesBuilder(context: AwsBatchJobDefinitionContext): (ContainerProperties.Builder, String) = {


def buildVolumes(disks: Seq[AwsBatchVolume]): List[Volume] = {

//all the configured disks plus the fetch and run volume and the aws-cli volume
Expand All @@ -111,6 +103,7 @@ trait AwsBatchJobDefinitionBuilder {
)
}


def buildMountPoints(disks: Seq[AwsBatchVolume]): List[MountPoint] = {

//all the configured disks plus the fetch and run mount point and the AWS cli mount point
Expand All @@ -130,45 +123,63 @@ trait AwsBatchJobDefinitionBuilder {
)
}

def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair]): String = {
val str = s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}"

val sha1 = MessageDigest.getInstance("SHA-1")
.digest( str.getBytes("UTF-8") )
.map("%02x".format(_)).mkString

val prefix = s"cromwell_$imageName".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128
def buildUlimits(ulimits: Seq[Map[String, String]]): List[Ulimit] = {

sanitize(prefix + sha1)
ulimits.filter(_.nonEmpty).map(u =>
Ulimit.builder()
.name(u("name"))
.softLimit(u("softLimit").toInt)
.hardLimit(u("hardLimit").toInt)
.build()
).toList
}


def buildName(imageName: String, packedCommand: String, volumes: List[Volume], mountPoints: List[MountPoint], env: Seq[KeyValuePair], ulimits: List[Ulimit]): String = {
s"$imageName:$packedCommand:${volumes.map(_.toString).mkString(",")}:${mountPoints.map(_.toString).mkString(",")}:${env.map(_.toString).mkString(",")}:${ulimits.map(_.toString).mkString(",")}"
}


val environment = List.empty[KeyValuePair]
val cmdName = context.runtimeAttributes.fileSystem match {
case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh"
case _ => context.commandText
case AWSBatchStorageSystems.s3 => "/var/scratch/fetch_and_run.sh"
case _ => context.commandText
}
val packedCommand = packCommand("/bin/bash", "-c", cmdName)
val volumes = buildVolumes( context.runtimeAttributes.disks )
val mountPoints = buildMountPoints( context.runtimeAttributes.disks)
val jobDefinitionName = buildName(
val ulimits = buildUlimits( context.runtimeAttributes.ulimits)
val containerPropsName = buildName(
context.runtimeAttributes.dockerImage,
packedCommand.mkString(","),
volumes,
mountPoints,
environment
environment,
ulimits
)

(builder
.command(packedCommand.asJava)
.memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt)
.vcpus(context.runtimeAttributes.cpu##)
.volumes( volumes.asJava)
.mountPoints( mountPoints.asJava)
.environment(environment.asJava),
(ContainerProperties.builder()
.image(context.runtimeAttributes.dockerImage)
.command(packedCommand.asJava)
.memory(context.runtimeAttributes.memory.to(MemoryUnit.MB).amount.toInt)
.vcpus(context.runtimeAttributes.cpu##)
.volumes(volumes.asJava)
.mountPoints(mountPoints.asJava)
.environment(environment.asJava)
.ulimits(ulimits.asJava),
containerPropsName)
}

jobDefinitionName)
def retryStrategyBuilder(context: AwsBatchJobDefinitionContext): (RetryStrategy.Builder, String) = {
// We can add here the 'evaluateOnExit' statement

(RetryStrategy.builder()
.attempts(context.runtimeAttributes.awsBatchRetryAttempts),
context.runtimeAttributes.awsBatchRetryAttempts.toString)
}


private def packCommand(shell: String, options: String, mainCommand: String): Seq[String] = {
val rc = new ListBuffer[String]()
val lim = 1024
Expand All @@ -189,15 +200,29 @@ trait AwsBatchJobDefinitionBuilder {

object StandardAwsBatchJobDefinitionBuilder extends AwsBatchJobDefinitionBuilder {
def build(context: AwsBatchJobDefinitionContext): AwsBatchJobDefinition = {
//instantiate a builder with the name of the docker image
val builderInst = builder(context.runtimeAttributes.dockerImage)
val (b, name) = buildResources(builderInst, context)

val (containerPropsInst, containerPropsName) = containerPropertiesBuilder(context)
val (retryStrategyInst, retryStrategyName) = retryStrategyBuilder(context)

new StandardAwsBatchJobDefinitionBuilder(b.build, name)
val name = buildName(context.runtimeAttributes.dockerImage, containerPropsName, retryStrategyName)

new StandardAwsBatchJobDefinitionBuilder(containerPropsInst.build, retryStrategyInst.build, name)
}

def buildName(imageName: String, containerPropsName: String, retryStrategyName: String): String = {
val str = s"$imageName:$containerPropsName:$retryStrategyName"

val sha1 = MessageDigest.getInstance("SHA-1")
.digest( str.getBytes("UTF-8") )
.map("%02x".format(_)).mkString

val prefix = s"cromwell_${imageName}_".slice(0,88) // will be joined to a 40 character SHA1 for total length of 128

sanitize(prefix + sha1)
}
}

case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, name: String) extends AwsBatchJobDefinition
case class StandardAwsBatchJobDefinitionBuilder private(containerProperties: ContainerProperties, retryStrategy: RetryStrategy, name: String) extends AwsBatchJobDefinition

object AwsBatchJobDefinitionContext

Expand Down
Loading