Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AN-146] Emit VM cost for GCP Batch #7582

Merged
merged 14 commits into from
Dec 9, 2024
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ object Dependencies {
private val metrics3StatsdV = "4.2.0"
private val mockFtpServerV = "3.0.0"
private val mockitoV = "3.12.4"
private val mockitoInlineV = "2.8.9"
private val mockserverNettyV = "5.14.0"
private val mouseV = "1.0.11"

Expand Down Expand Up @@ -627,7 +628,8 @@ object Dependencies {
"org.scalatest" %% "scalatest" % scalatestV,
// Use mockito Java DSL directly instead of the numerous and often hard to keep updated Scala DSLs.
// See also scaladoc in common.mock.MockSugar and that trait's various usages.
"org.mockito" % "mockito-core" % mockitoV
"org.mockito" % "mockito-core" % mockitoV,
"org.mockito" % "mockito-inline" % mockitoInlineV
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

necessary in order to mock a final method in the gcp BatchServiceClient

) ++ slf4jBindingDependencies // During testing, add an slf4j binding for _all_ libraries.

val kindProjectorPlugin = "org.typelevel" % "kind-projector" % kindProjectorV cross CrossVersion.full
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime
}

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] =
pollStatus.instantiatedVmInfo

Check warning on line 51 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L51

Added line #L51 was not covered by tests

override def handleVmCostLookup(vmInfo: InstantiatedVmInfo) = {
val request = GcpCostLookupRequest(vmInfo, self)
params.serviceRegistry ! request
Expand All @@ -69,6 +72,7 @@
}

override def receive: Receive = {
case costResponse: GcpCostLookupResponse => handleCostResponse(costResponse)

Check warning on line 75 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L75

Added line #L75 was not covered by tests
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)
Expand All @@ -93,5 +97,4 @@

override def params: PollMonitorParameters = pollMonitorParameters

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = Option.empty // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,18 @@
} yield status
}

override val pollingResultMonitorActor: Option[ActorRef] = Option(
context.actorOf(
BatchPollResultMonitorActor.props(serviceRegistryActor,

Check warning on line 1030 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L1028-L1030

Added lines #L1028 - L1030 were not covered by tests
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
platform,

Check warning on line 1034 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L1034

Added line #L1034 was not covered by tests
jobLogger
)
)
)

override def isTerminal(runStatus: RunStatus): Boolean =
runStatus match {
case _: RunStatus.TerminalRunStatus => true
Expand Down Expand Up @@ -1070,7 +1082,7 @@
Future.fromTry {
Try {
runStatus match {
case RunStatus.Aborted(_) => AbortedExecutionHandle
case RunStatus.Aborted(_, _) => AbortedExecutionHandle

Check warning on line 1085 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L1085

Added line #L1085 was not covered by tests
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus)
case unknown =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.backend.google.batch.api.request

import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.batch.v1.AllocationPolicy.ProvisioningModel
import com.google.cloud.batch.v1._
import com.typesafe.scalalogging.LazyLogging
import cromwell.backend.google.batch.actors.BatchApiAbortClient.{
Expand All @@ -11,6 +12,8 @@
import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse}
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys

import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -136,14 +139,32 @@
)
lazy val exitCode = findBatchExitCode(events)

// Get vm info for this job
val allocationPolicy = job.getAllocationPolicy

Check warning on line 143 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L143

Added line #L143 was not covered by tests

// Get instances that can be created with this AllocationPolicy, only instances[0] is supported
val instancePolicy = allocationPolicy.getInstances(0).getPolicy
val machineType = instancePolicy.getMachineType
val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber

Check warning on line 148 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L146-L148

Added lines #L146 - L148 were not covered by tests

// location list = [regions/us-central1, zones/us-central1-b], region is the first element
val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0)

Check warning on line 151 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L151

Added line #L151 was not covered by tests
val region =
if (location.isEmpty)
"us-central1"

Check warning on line 154 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L153-L154

Added lines #L153 - L154 were not covered by tests
else
location.split("/").last

Check warning on line 156 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L156

Added line #L156 was not covered by tests

val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible))

Check warning on line 158 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L158

Added line #L158 was not covered by tests

if (job.getStatus.getState == JobStatus.State.SUCCEEDED) {
RunStatus.Success(events)
RunStatus.Success(events, instantiatedVmInfo)

Check warning on line 161 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L161

Added line #L161 was not covered by tests
} else if (job.getStatus.getState == JobStatus.State.RUNNING) {
RunStatus.Running(events)
RunStatus.Running(events, instantiatedVmInfo)

Check warning on line 163 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L163

Added line #L163 was not covered by tests
} else if (job.getStatus.getState == JobStatus.State.FAILED) {
RunStatus.Failed(exitCode, events)
RunStatus.Failed(exitCode, events, instantiatedVmInfo)

Check warning on line 165 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L165

Added line #L165 was not covered by tests
} else {
RunStatus.Initializing(events)
RunStatus.Initializing(events, instantiatedVmInfo)

Check warning on line 167 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L167

Added line #L167 was not covered by tests
}
}

Expand All @@ -152,12 +173,20 @@
GcpBatchExitCode.fromEventMessage(e.name.toLowerCase)
}.headOption

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] =
private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED

Check warning on line 178 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L177-L178

Added lines #L177 - L178 were not covered by tests
events.map { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
.atOffset(java.time.ZoneOffset.UTC)
ExecutionEvent(name = e.getDescription, offsetDateTime = time)
val eventType = e.getDescription match {
case startedRegex() => CallMetadataKeys.VmStartTime
case endedRegex() => CallMetadataKeys.VmEndTime
case _ => e.getType

Check warning on line 186 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L183-L186

Added lines #L183 - L186 were not covered by tests
}
ExecutionEvent(name = eventType, offsetDateTime = time)

Check warning on line 188 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala#L188

Added line #L188 was not covered by tests
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
package cromwell.backend.google.batch.models

import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo

sealed trait RunStatus {
def eventList: Seq[ExecutionEvent]
def toString: String

val instantiatedVmInfo: Option[InstantiatedVmInfo]
}

object RunStatus {

case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus {
case class Initializing(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Initializing" }

Check warning on line 16 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala#L16

Added line #L16 was not covered by tests
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends RunStatus {
override def toString = "AwaitingCloudQuota"
}

case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" }
case class Running(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Running" }

Check warning on line 24 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala#L24

Added line #L24 was not covered by tests

sealed trait TerminalRunStatus extends RunStatus

case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus {
case class Success(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends TerminalRunStatus {
override def toString = "Success"
}

Expand All @@ -29,7 +37,8 @@

final case class Failed(
exitCode: Option[GcpBatchExitCode],
eventList: Seq[ExecutionEvent]
eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
override def toString = "Failed"

Expand Down Expand Up @@ -58,7 +67,9 @@
}
}

final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus {
final case class Aborted(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
override def toString = "Aborted"

override val exitCode: Option[GcpBatchExitCode] = None
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package cromwell.backend.google.batch.actors

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.testkit.{TestKit, TestProbe}
import cats.data.Validated.Valid
import common.mock.MockSugar
import cromwell.backend.google.batch.models.GcpBatchRuntimeAttributes
import cromwell.backend.{BackendJobDescriptor, BackendJobDescriptorKey, RuntimeAttributeDefinition}
import cromwell.core.callcaching.NoDocker
import cromwell.core.{ExecutionEvent, WorkflowOptions}
import cromwell.core.logging.JobLogger
import cromwell.services.cost.{GcpCostLookupRequest, GcpCostLookupResponse, InstantiatedVmInfo}
import cromwell.services.keyvalue.InMemoryKvServiceActor
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
import cromwell.backend.google.batch.models.GcpBatchTestConfig._
import wom.graph.CommandCallNode
import cromwell.backend._
import cromwell.backend.google.batch.models._
import cromwell.backend.io.TestWorkflows
import cromwell.backend.standard.pollmonitoring.ProcessThisPollResult
import cromwell.services.metadata.CallMetadataKeys
import cromwell.services.metadata.MetadataService.PutMetadataAction
import org.slf4j.helpers.NOPLogger
import wom.values.WomString

import java.time.{Instant, OffsetDateTime}
import java.time.temporal.ChronoUnit
import scala.concurrent.duration.DurationInt

class BatchPollResultMonitorActorSpec
extends TestKit(ActorSystem("BatchPollResultMonitorActorSpec"))
with AnyFlatSpecLike
with BackendSpec
with Matchers
with MockSugar {

var kvService: ActorRef = system.actorOf(Props(new InMemoryKvServiceActor), "kvService")
val runtimeAttributesBuilder = GcpBatchRuntimeAttributes.runtimeAttributesBuilder(gcpBatchConfiguration)
val jobLogger = mock[JobLogger]
val serviceRegistry = TestProbe()

val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld)
val call: CommandCallNode = workflowDescriptor.callable.taskCallNodes.head
val jobKey = BackendJobDescriptorKey(call, None, 1)

val jobDescriptor = BackendJobDescriptor(workflowDescriptor,
jobKey,
runtimeAttributes = Map.empty,
evaluatedTaskInputs = Map.empty,
NoDocker,
None,
Map.empty
)

val runtimeAttributes = Map("docker" -> WomString("ubuntu:latest"))

val staticRuntimeAttributeDefinitions: Set[RuntimeAttributeDefinition] =
GcpBatchRuntimeAttributes.runtimeAttributesBuilder(GcpBatchTestConfig.gcpBatchConfiguration).definitions.toSet

val defaultedAttributes =
RuntimeAttributeDefinition.addDefaultsToAttributes(staticRuntimeAttributeDefinitions,
WorkflowOptions.fromMap(Map.empty).get
)(
runtimeAttributes
)
val validatedRuntimeAttributes = runtimeAttributesBuilder.build(defaultedAttributes, NOPLogger.NOP_LOGGER)

val actor = system.actorOf(
BatchPollResultMonitorActor.props(serviceRegistry.ref,
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
Some(Gcp),
jobLogger
)
)
val vmInfo = InstantiatedVmInfo("europe-west9", "custom-16-32768", false)

behavior of "BatchPollResultMonitorActor"

it should "send a cost lookup request with the correct vm info after receiving a success pollResult" in {

val terminalPollResult =
RunStatus.Success(Seq(ExecutionEvent("fakeEvent", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))),
Some(vmInfo)
)
val message = ProcessThisPollResult(terminalPollResult)

actor ! message

serviceRegistry.expectMsgPF(1.seconds) { case m: GcpCostLookupRequest =>
m.vmInfo shouldBe vmInfo
}
}

it should "emit the correct cost metadata after receiving a costLookupResponse" in {

val costLookupResponse = GcpCostLookupResponse(vmInfo, Valid(BigDecimal(0.1)))

actor ! costLookupResponse

serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction =>
val event = m.events.head
m.events.size shouldBe 1
event.key.key shouldBe CallMetadataKeys.VmCostPerHour
event.value.get.value shouldBe "0.1"
}
}

it should "emit the correct start time after receiving a running pollResult" in {

val vmStartTime = OffsetDateTime.now().minus(2, ChronoUnit.HOURS)
val pollResult = RunStatus.Running(
Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, vmStartTime)),
Some(vmInfo)
)
val message = ProcessThisPollResult(pollResult)

actor ! message

serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction =>
val event = m.events.head
m.events.size shouldBe 1
event.key.key shouldBe CallMetadataKeys.VmStartTime
assert(
Instant
.parse(event.value.get.value)
.equals(vmStartTime.toInstant.truncatedTo(ChronoUnit.MILLIS))
)
}
}

it should "emit the correct end time after receiving a running pollResult" in {

val vmEndTime = OffsetDateTime.now().minus(2, ChronoUnit.HOURS)
val pollResult = RunStatus.Running(
Seq(ExecutionEvent(CallMetadataKeys.VmEndTime, vmEndTime)),
Some(vmInfo)
)
val message = ProcessThisPollResult(pollResult)

actor ! message

serviceRegistry.expectMsgPF(1.seconds) { case m: PutMetadataAction =>
val event = m.events.head
m.events.size shouldBe 1
event.key.key shouldBe CallMetadataKeys.VmEndTime
assert(
Instant
.parse(event.value.get.value)
.equals(vmEndTime.toInstant.truncatedTo(ChronoUnit.MILLIS))
)
}
}
}
Loading
Loading