Skip to content

Commit ae4e6aa

Browse files
javiergaitanmcovarr
andauthoredOct 10, 2024
feat: [GCP Batch] Support passing standard machine types to the Google backend (#1)
* WX-1810 WX-1830 n1/n2/n2d machine types, cpuPlatform on GCPBATCH (broadinstitute#7518) * feat: [GCP Batch] Support passing standard machine types to the Google backend --------- Co-authored-by: Miguel Covarrubias <[email protected]>

File tree

12 files changed

+137
-61
lines changed

12 files changed

+137
-61
lines changed
 

‎CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
2222
### GCP Batch
2323

2424
- The `genomics` configuration entry was renamed to `batch`, see [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/backends/GCPBatch/) for more information.
25+
- Fixes a bug with not being able to recover jobs on Cromwell restart.
26+
- Fixes machine type selection to match the Google Cloud Life Sciences backend, including default n1 non shared-core machine types and correct handling of `cpuPlatform` to select n2 or n2d machine types as appropriate.
2527
- Fixes the preemption error handling, now, the correct error message is printed, this also handles the other potential exit codes.
2628
- Fixes pulling Docker image metadata from private GCR repositories.
2729
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend

‎build.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ lazy val googlePipelinesV2Beta = (project in backendRoot / "google" / "pipelines
237237

238238
lazy val googleBatch = (project in backendRoot / "google" / "batch")
239239
.withLibrarySettings("cromwell-google-batch-backend")
240+
.dependsOn(core)
240241
.dependsOn(backend)
241242
.dependsOn(gcsFileSystem)
242243
.dependsOn(drsFileSystem)

‎centaur/src/main/resources/standardTestCases/papi_cpu_platform.test

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
name: papi_cpu_platform
22
testFormat: workflowsuccess
3-
backends: [Papiv2]
3+
backendsMode: any
4+
backends: [Papiv2, GCPBATCH]
45

56
files {
67
workflow: papi_cpu_platform/papi_cpu_platform.wdl

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiRunCreationClient.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ trait BatchApiRunCreationClient { this: Actor with ActorLogging with BatchInstru
5353
backendSingletonActor ! BatchApiRequestManager.BatchRunCreationRequest(
5454
request.workflowId,
5555
self,
56-
requestFactory.submitRequest(request)
56+
requestFactory.submitRequest(request, jobLogger)
5757
)
5858
val newPromise = Promise[StandardAsyncJob]()
5959
runCreationClientPromise = Option(newPromise)

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
66
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.VirtualPrivateCloudConfiguration
77
import cromwell.backend.google.batch.models._
88
import cromwell.backend.google.batch.monitoring.{CheckpointingConfiguration, MonitoringImage}
9+
import cromwell.core.logging.JobLogger
910
import cromwell.core.path.Path
1011
import wom.runtime.WomOutputRuntimeExtractor
1112

1213
import scala.concurrent.duration.FiniteDuration
1314

1415
trait GcpBatchRequestFactory {
15-
def submitRequest(data: GcpBatchRequest): CreateJobRequest
16+
def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest
1617

1718
def queryRequest(jobName: JobName): GetJobRequest
1819

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala

+15-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
2222
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
2323
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
2424
import cromwell.backend.google.batch.runnable._
25-
import cromwell.backend.google.batch.util.BatchUtilityConversions
25+
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
26+
import cromwell.core.logging.JobLogger
2627

2728
import scala.jdk.CollectionConverters._
2829

@@ -74,14 +75,16 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
7475
private def createInstancePolicy(cpuPlatform: String,
7576
spotModel: ProvisioningModel,
7677
accelerators: Option[Accelerator.Builder],
77-
attachedDisks: List[AttachedDisk]
78+
attachedDisks: List[AttachedDisk],
79+
machineType: String
7880
): InstancePolicy.Builder = {
7981

8082
// set GPU count to 0 if not included in workflow
8183
val gpuAccelerators = accelerators.getOrElse(Accelerator.newBuilder.setCount(0).setType("")) // TODO: Driver version
8284

8385
val instancePolicy = InstancePolicy.newBuilder
8486
.setProvisioningModel(spotModel)
87+
.setMachineType(machineType)
8588
.addAllDisks(attachedDisks.asJava)
8689
.setMinCpuPlatform(cpuPlatform)
8790
.buildPartial()
@@ -154,7 +157,7 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
154157
}
155158
}
156159

157-
override def submitRequest(data: GcpBatchRequest): CreateJobRequest = {
160+
override def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest = {
158161

159162
val runtimeAttributes = data.gcpBatchParameters.runtimeAttributes
160163
val createParameters = data.createParameters
@@ -224,7 +227,15 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
224227
val computeResource = createComputeResource(cpuCores, memory, gcpBootDiskSizeMb)
225228
val taskSpec = createTaskSpec(sortedRunnables, computeResource, retryCount, durationInSeconds, allVolumes)
226229
val taskGroup: TaskGroup = createTaskGroup(taskCount, taskSpec)
227-
val instancePolicy = createInstancePolicy(cpuPlatform, spotModel, accelerators, allDisks)
230+
val machineType = GcpBatchMachineConstraints.machineType(runtimeAttributes.memory,
231+
runtimeAttributes.cpu,
232+
cpuPlatformOption = runtimeAttributes.cpuPlatform,
233+
standardMachineTypeOption = runtimeAttributes.standardMachineType,
234+
googleLegacyMachineSelection = false,
235+
jobLogger = jobLogger
236+
)
237+
val instancePolicy =
238+
createInstancePolicy(cpuPlatform = cpuPlatform, spotModel, accelerators, allDisks, machineType = machineType)
228239
val locationPolicy = LocationPolicy.newBuilder.addAllowedLocations(zones).build
229240
val allocationPolicy =
230241
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchCustomMachineType.scala

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import wom.format.MemorySize
1111

1212
import scala.math.{log, pow}
1313

14+
case class StandardMachineType(machineType: String) {}
15+
1416
/**
1517
* Adjusts memory and cpu for custom machine types.
1618
*

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributes.scala

+15-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ final case class GcpBatchRuntimeAttributes(cpu: Int Refined Positive,
4949
continueOnReturnCode: ContinueOnReturnCode,
5050
noAddress: Boolean,
5151
useDockerImageCache: Option[Boolean],
52-
checkpointFilename: Option[String]
52+
checkpointFilename: Option[String],
53+
standardMachineType: Option[String]
5354
)
5455

5556
object GcpBatchRuntimeAttributes {
@@ -77,13 +78,16 @@ object GcpBatchRuntimeAttributes {
7778
private val cpuPlatformValidationInstance = new StringRuntimeAttributesValidation(CpuPlatformKey).optional
7879
// via `gcloud compute zones describe us-central1-a`
7980
val CpuPlatformIntelCascadeLakeValue = "Intel Cascade Lake"
81+
val CpuPlatformIntelIceLakeValue = "Intel Ice Lake"
8082
val CpuPlatformAMDRomeValue = "AMD Rome"
8183

8284
val UseDockerImageCacheKey = "useDockerImageCache"
8385
private val useDockerImageCacheValidationInstance = new BooleanRuntimeAttributesValidation(
8486
UseDockerImageCacheKey
8587
).optional
8688

89+
val StandardMachineTypeKey = "standardMachineType"
90+
8791
val CheckpointFileKey = "checkpointFile"
8892
private val checkpointFileValidationInstance = new StringRuntimeAttributesValidation(CheckpointFileKey).optional
8993

@@ -97,6 +101,8 @@ object GcpBatchRuntimeAttributes {
97101
)
98102
private def cpuPlatformValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[String] =
99103
cpuPlatformValidationInstance
104+
private def standardMachineTypeValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[String] =
105+
new StringRuntimeAttributesValidation(StandardMachineTypeKey).optional
100106
private def gpuTypeValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[GpuType] =
101107
GpuTypeValidation.optional
102108

@@ -170,7 +176,8 @@ object GcpBatchRuntimeAttributes {
170176
bootDiskSizeValidation(runtimeConfig),
171177
useDockerImageCacheValidation(runtimeConfig),
172178
checkpointFileValidationInstance,
173-
dockerValidation
179+
dockerValidation,
180+
standardMachineTypeValidation(runtimeConfig)
174181
)
175182
}
176183

@@ -227,6 +234,10 @@ object GcpBatchRuntimeAttributes {
227234
useDockerImageCacheValidation(runtimeAttrsConfig).key,
228235
validatedRuntimeAttributes
229236
)
237+
val standardMachineType: Option[String] = RuntimeAttributesValidation.extractOption(
238+
standardMachineTypeValidation(runtimeAttrsConfig).key,
239+
validatedRuntimeAttributes
240+
)
230241

231242
new GcpBatchRuntimeAttributes(
232243
cpu = cpu,
@@ -242,7 +253,8 @@ object GcpBatchRuntimeAttributes {
242253
continueOnReturnCode = continueOnReturnCode,
243254
noAddress = noAddress,
244255
useDockerImageCache = useDockerImageCache,
245-
checkpointFilename = checkpointFileName
256+
checkpointFilename = checkpointFileName,
257+
standardMachineType = standardMachineType
246258
)
247259
}
248260

‎supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/util/GcpBatchMachineConstraints.scala

+10-5
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,34 @@ import cromwell.backend.google.batch.models.{
44
GcpBatchRuntimeAttributes,
55
N1CustomMachineType,
66
N2CustomMachineType,
7-
N2DCustomMachineType
7+
N2DCustomMachineType,
8+
StandardMachineType
89
}
10+
import cromwell.core.logging.JobLogger
911
import eu.timepit.refined.api.Refined
1012
import eu.timepit.refined.numeric.Positive
11-
import org.slf4j.Logger
1213
import wdl4s.parser.MemoryUnit
1314
import wom.format.MemorySize
1415

1516
object GcpBatchMachineConstraints {
1617
def machineType(memory: MemorySize,
1718
cpu: Int Refined Positive,
1819
cpuPlatformOption: Option[String],
20+
standardMachineTypeOption: Option[String],
1921
googleLegacyMachineSelection: Boolean,
20-
jobLogger: Logger
22+
jobLogger: JobLogger
2123
): String =
22-
if (googleLegacyMachineSelection) {
24+
if (standardMachineTypeOption.exists(_.trim.nonEmpty)) {
25+
StandardMachineType(standardMachineTypeOption.get).machineType
26+
} else if (googleLegacyMachineSelection) {
2327
s"predefined-$cpu-${memory.to(MemoryUnit.MB).amount.intValue()}"
2428
} else {
25-
// If someone requests Intel Cascade Lake as their CPU platform then switch the machine type to n2.
29+
// If someone requests Intel Cascade Lake or Intel Ice Lake as their CPU platform then switch the machine type to n2.
2630
// Similarly, CPU platform of AMD Rome corresponds to the machine type n2d.
2731
val customMachineType =
2832
cpuPlatformOption match {
2933
case Some(GcpBatchRuntimeAttributes.CpuPlatformIntelCascadeLakeValue) => N2CustomMachineType
34+
case Some(GcpBatchRuntimeAttributes.CpuPlatformIntelIceLakeValue) => N2CustomMachineType
3035
case Some(GcpBatchRuntimeAttributes.CpuPlatformAMDRomeValue) => N2DCustomMachineType
3136
case _ => N1CustomMachineType
3237
}

‎supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class GcpBatchAsyncBackendJobExecutionActorSpec
131131
val runtimeAttributesBuilder = GcpBatchRuntimeAttributes.runtimeAttributesBuilder(configuration)
132132

133133
val requestFactory: GcpBatchRequestFactory = new GcpBatchRequestFactory {
134-
override def submitRequest(data: GcpBatchRequest): CreateJobRequest = null
134+
override def submitRequest(data: GcpBatchRequest, jobLogger: JobLogger): CreateJobRequest = null
135135

136136
override def queryRequest(jobName: JobName): GetJobRequest = null
137137

‎supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/models/GcpBatchRuntimeAttributesSpec.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ trait GcpBatchRuntimeAttributesSpecsMixin {
286286
continueOnReturnCode = ContinueOnReturnCodeSet(Set(0)),
287287
noAddress = false,
288288
useDockerImageCache = None,
289-
checkpointFilename = None
289+
checkpointFilename = None,
290+
standardMachineType = None
290291
)
291292

292293
def assertBatchRuntimeAttributesSuccessfulCreation(runtimeAttributes: Map[String, WomValue],

0 commit comments

Comments
 (0)