Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1792] Helper to Actor #7544

Merged
merged 19 commits into from
Sep 18, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cromwell.backend._
import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async._
import cromwell.backend.standard.StandardAdHocValue._
import cromwell.backend.standard.costestimation.CostPollingHelper
import cromwell.backend.standard.pollmonitoring.{AsyncJobHasFinished, ProcessThisPollResult}
import cromwell.backend.standard.retry.memory.MemoryRetryResult
import cromwell.backend.validation._
import cromwell.core._
Expand All @@ -32,24 +32,19 @@ import cromwell.core.path.Path
import cromwell.services.keyvalue.KeyValueServiceActor._
import cromwell.services.keyvalue.KvClient
import cromwell.services.metadata.CallMetadataKeys
import cromwell.services.metrics.bard.BardEventing.BardEventRequest
import cromwell.services.metrics.bard.model.TaskSummaryEvent
import eu.timepit.refined.refineV
import mouse.all._
import net.ceedubs.ficus.Ficus._
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
import shapeless.Coproduct
import wdl4s.parser.MemoryUnit
import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression}
import wom.expression.WomExpression
import wom.graph.LocalName
import wom.values._
import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper}

import java.io.IOException
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -77,8 +72,6 @@ case class DefaultStandardAsyncExecutionActorParams(
// Override to `false` when we need the script to set an environment variable in the parent shell.
case class ScriptPreambleData(bashString: String, executeInSubshell: Boolean = true)

case class StartAndEndTimes(jobStart: OffsetDateTime, cpuStart: Option[OffsetDateTime], jobEnd: OffsetDateTime)

/**
* An extension of the generic AsyncBackendJobExecutionActor providing a standard abstract implementation of an
* asynchronous polling backend.
Expand Down Expand Up @@ -904,14 +897,6 @@ trait StandardAsyncExecutionActor
*/
def getTerminalEvents(runStatus: StandardAsyncRunState): Seq[ExecutionEvent] = Seq.empty

/**
* Get the min and max event times from a terminal run status
*
* @param runStatus The terminal run status, as defined by isTerminal.
* @return The min and max event times, if events exist.
*/
def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = None

/**
* Returns true if the status represents a completion.
*
Expand Down Expand Up @@ -945,9 +930,8 @@ trait StandardAsyncExecutionActor
* @param handle The handle of the running job.
* @return A set of actions when the job is complete
*/
def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit = tellBard(
runStatus
)
def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit =
pollingResultMonitorActor.foreach(helper => helper.tell(AsyncJobHasFinished(runStatus), self))

/**
* Attempts to abort a job when an abort signal is retrieved.
Expand Down Expand Up @@ -1341,7 +1325,7 @@ trait StandardAsyncExecutionActor
checkAndRecordQuotaExhaustion(state)

// present the poll result to the cost helper. It will keep track of vm start/stop times and may emit some metadata.
costHelper.foreach(helper => helper.processPollResult(state))
pollingResultMonitorActor.foreach(helper => helper.tell(ProcessThisPollResult[StandardAsyncRunState](state), self))

state match {
case _ if isTerminal(state) =>
Expand All @@ -1358,7 +1342,7 @@ trait StandardAsyncExecutionActor

// If present, polling results will be presented to this helper.
// Subclasses can use this to emit proper metadata based on polling responses.
protected val costHelper: Option[CostPollingHelper[StandardAsyncRunState]] = Option.empty
protected val pollingResultMonitorActor: Option[ActorRef] = Option.empty

/**
* Process a poll failure.
Expand Down Expand Up @@ -1545,40 +1529,6 @@ trait StandardAsyncExecutionActor
serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), metadataKeyValues)
}

def tellBard(state: StandardAsyncRunState): Unit =
getStartAndEndTimes(state) match {
case Some(startAndEndTimes: StartAndEndTimes) =>
val dockerImage =
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes)
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value
val memory = RuntimeAttributesValidation
.extract(MemoryValidation.instance(), validatedRuntimeAttributes)
.to(MemoryUnit.Bytes)
.amount
serviceRegistryActor ! BardEventRequest(
TaskSummaryEvent(
workflowDescriptor.id.id,
workflowDescriptor.possibleParentWorkflowId.map(_.id),
workflowDescriptor.rootWorkflowId.id,
jobDescriptor.key.tag,
jobDescriptor.key.call.fullyQualifiedName,
jobDescriptor.key.index,
jobDescriptor.key.attempt,
state.getClass.getSimpleName,
platform.map(_.runtimeKey),
dockerImage,
cpus,
memory,
startAndEndTimes.jobStart.toString,
startAndEndTimes.cpuStart.map(_.toString),
startAndEndTimes.jobEnd.toString,
startAndEndTimes.jobStart.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS),
startAndEndTimes.cpuStart.map(_.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS))
)
)
case _ => ()
}

implicit override protected lazy val ec: ExecutionContextExecutor = context.dispatcher
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package cromwell.backend.standard.pollmonitoring
import akka.actor.{Actor, ActorRef}
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform}
import cromwell.backend.validation.{
CpuValidation,
DockerValidation,
MemoryValidation,
RuntimeAttributesValidation,
ValidatedRuntimeAttributes
}
import cromwell.core.logging.JobLogger
import cromwell.services.metadata.CallMetadataKeys
import cromwell.services.metrics.bard.BardEventing.BardEventRequest
import cromwell.services.metrics.bard.model.TaskSummaryEvent
import wdl4s.parser.MemoryUnit

import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
trait PollResultMessage
case class ProcessThisPollResult[PollResultType](pollResult: PollResultType) extends PollResultMessage
case class AsyncJobHasFinished[PollResultType](pollResult: PollResultType) extends PollResultMessage

case class PollMonitorParameters(
serviceRegistry: ActorRef,
workflowDescriptor: BackendWorkflowDescriptor,
jobDescriptor: BackendJobDescriptor,
validatedRuntimeAttributes: ValidatedRuntimeAttributes,
platform: Option[Platform],
logger: Option[JobLogger]
)

/**
* Processes poll results from backends and sends messages to other actors based on their contents.
* Primarily concerned with reporting start times, end times, and cost data to both the bard and cromwell metadata services.
*/
trait PollResultMonitorActor[PollResultType] extends Actor {
def params: PollMonitorParameters

// Time that Cromwell (but not necessarily the cloud) started working on this job.
def extractEarliestEventTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Time that the user VM started spending money.
def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Time that the user VM stopped spending money.
def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Function to emit metadata that is associated with a specific call attempt.
def tellMetadata(metadataKeyValues: Map[String, Any]): Unit = {
import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter
params.serviceRegistry.putMetadata(params.jobDescriptor.workflowDescriptor.id,
Option(params.jobDescriptor.key),
metadataKeyValues
)
}

// Function that reports metrics to bard, called when a specific call attempt terminates.
def tellBard(terminalStateName: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved (almost) verbatim from StandardAsyncExecutionActor

jobStart: OffsetDateTime,
vmStartTime: Option[OffsetDateTime],
vmEndTime: OffsetDateTime
): Unit = {
val validatedRuntimeAttributes = params.validatedRuntimeAttributes
val serviceRegistryActor = params.serviceRegistry
val workflowDescriptor = params.workflowDescriptor
val jobDescriptor = params.jobDescriptor
val platform = params.platform.map(_.runtimeKey)
val dockerImage =
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes)
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value
val memory = RuntimeAttributesValidation
.extract(MemoryValidation.instance(), validatedRuntimeAttributes)
.to(MemoryUnit.Bytes)
.amount
serviceRegistryActor ! BardEventRequest(
TaskSummaryEvent(
workflowDescriptor.id.id,
workflowDescriptor.possibleParentWorkflowId.map(_.id),
workflowDescriptor.rootWorkflowId.id,
jobDescriptor.key.tag,
jobDescriptor.key.call.fullyQualifiedName,
jobDescriptor.key.index,
jobDescriptor.key.attempt,
terminalStateName,
platform,
dockerImage,
cpus,
memory,
jobStart.toString,
vmStartTime.map(startTime => startTime.toString),
vmEndTime.toString,
jobStart.until(vmEndTime, ChronoUnit.SECONDS),
vmStartTime.map(start => start.until(vmEndTime, ChronoUnit.SECONDS))
)
)
}

private var jobStartTime: Option[OffsetDateTime] =
Option.empty
private var vmStartTime: Option[OffsetDateTime] = Option.empty
private var vmEndTime: Option[OffsetDateTime] = Option.empty

def processPollResult(pollStatus: PollResultType): Unit = {
// Make sure jobStartTime remains the earliest event time ever seen
extractEarliestEventTimeFromRunState(pollStatus).foreach { earliestTime =>
if (earliestTime.isBefore(jobStartTime.getOrElse(OffsetDateTime.now()))) {
jobStartTime = Option(earliestTime)
}
}
// If vm start time is reported, record it to metadata and stop trying
if (vmStartTime.isEmpty) {
extractStartTimeFromRunState(pollStatus).foreach { start =>
vmStartTime = Option(start)
tellMetadata(Map(CallMetadataKeys.VmStartTime -> start))
}
}
// If vm end time is reported, (or for some weird reason we see an end time after our recorded one),
// record it to metadata.
extractEndTimeFromRunState(pollStatus).foreach { end =>
if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) {
vmEndTime = Option(end)
tellMetadata(Map(CallMetadataKeys.VmEndTime -> end))
}
}
}

// When a job finishes, the bard actor needs to know about the timing in order to record metrics.
// Cost related metadata should already have been handled in processPollResult.
def handleAsyncJobFinish(terminalStateName: String): Unit =
jobStartTime.foreach(jobStart =>
tellBard(
terminalStateName = terminalStateName,
jobStart = jobStart,
vmStartTime = vmStartTime,
vmEndTime = vmEndTime.getOrElse(OffsetDateTime.now())
)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package cromwell.backend.google.batch.actors

import akka.actor.{ActorRef, Props}
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform}
import cromwell.backend.google.batch.models.RunStatus
import cromwell.backend.standard.pollmonitoring.{
AsyncJobHasFinished,
PollMonitorParameters,
PollResultMessage,
PollResultMonitorActor,
ProcessThisPollResult
}
import cromwell.backend.validation.ValidatedRuntimeAttributes
import cromwell.core.logging.JobLogger
import cromwell.services.metadata.CallMetadataKeys

import java.time.OffsetDateTime

object BatchPollResultMonitorActor {
def props(serviceRegistry: ActorRef,
workflowDescriptor: BackendWorkflowDescriptor,
jobDescriptor: BackendJobDescriptor,
runtimeAttributes: ValidatedRuntimeAttributes,
platform: Option[Platform],
logger: JobLogger
): Props = Props(
new BatchPollResultMonitorActor(
PollMonitorParameters(serviceRegistry,

Check warning on line 28 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L26-L28

Added lines #L26 - L28 were not covered by tests
workflowDescriptor,
jobDescriptor,
runtimeAttributes,
platform,
Option(logger)

Check warning on line 33 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L33

Added line #L33 was not covered by tests
)
)
)
}

class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)
extends PollResultMonitorActor[RunStatus] {

override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime)

Check warning on line 43 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L43

Added line #L43 was not covered by tests
override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.collectFirst {
case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime

Check warning on line 46 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L45-L46

Added lines #L45 - L46 were not covered by tests
}

override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.collectFirst {
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime

Check warning on line 51 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L50-L51

Added lines #L50 - L51 were not covered by tests
}

override def receive: Receive = {

Check warning on line 54 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L54

Added line #L54 was not covered by tests
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)

Check warning on line 57 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L56-L57

Added lines #L56 - L57 were not covered by tests
case ProcessThisPollResult(result) =>
params.logger.foreach(logger =>
logger.error(

Check warning on line 60 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L59-L60

Added lines #L59 - L60 were not covered by tests
s"Programmer error: Received Poll Result of unknown type. Expected ${RunStatus.getClass.getSimpleName} but got ${result.getClass.getSimpleName}."
)
)
case AsyncJobHasFinished(pollResult: RunStatus) => handleAsyncJobFinish(pollResult.getClass.getSimpleName)

Check warning on line 64 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L64

Added line #L64 was not covered by tests
case AsyncJobHasFinished(result) =>
params.logger.foreach(logger =>
logger.error(

Check warning on line 67 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L66-L67

Added lines #L66 - L67 were not covered by tests
s"Programmer error: Received Poll Result of unknown type. Expected ${AsyncJobHasFinished.getClass.getSimpleName} but got ${result.getClass.getSimpleName}."
)
)
}
case _ =>
params.logger.foreach(logger =>
logger.error(

Check warning on line 74 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L73-L74

Added lines #L73 - L74 were not covered by tests
s"Programmer error: Cost Helper received message of type other than CostPollingMessage"
)
)
}

override def params: PollMonitorParameters = pollMonitorParameters

Check warning on line 80 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala#L80

Added line #L80 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Future.fromTry {
Try {
runStatus match {
case RunStatus.Aborted => AbortedExecutionHandle
case RunStatus.Aborted(_) => AbortedExecutionHandle
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus)
case unknown =>
throw new RuntimeException(
Expand Down
Loading
Loading