Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1792] Helper to Actor #7544

Merged
merged 19 commits into from
Sep 18, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import cromwell.backend._
import cromwell.backend.async.AsyncBackendJobExecutionActor._
import cromwell.backend.async._
import cromwell.backend.standard.StandardAdHocValue._
import cromwell.backend.standard.costestimation.CostPollingHelper
import cromwell.backend.standard.pollmonitoring.{AsyncJobHasFinished, ProcessThisPollResult}
import cromwell.backend.standard.retry.memory.MemoryRetryResult
import cromwell.backend.validation._
import cromwell.core._
Expand Down Expand Up @@ -77,8 +77,6 @@ case class DefaultStandardAsyncExecutionActorParams(
// Override to `false` when we need the script to set an environment variable in the parent shell.
case class ScriptPreambleData(bashString: String, executeInSubshell: Boolean = true)

case class StartAndEndTimes(jobStart: OffsetDateTime, cpuStart: Option[OffsetDateTime], jobEnd: OffsetDateTime)

/**
* An extension of the generic AsyncBackendJobExecutionActor providing a standard abstract implementation of an
* asynchronous polling backend.
Expand Down Expand Up @@ -904,14 +902,6 @@ trait StandardAsyncExecutionActor
*/
def getTerminalEvents(runStatus: StandardAsyncRunState): Seq[ExecutionEvent] = Seq.empty

/**
* Get the min and max event times from a terminal run status
*
* @param runStatus The terminal run status, as defined by isTerminal.
* @return The min and max event times, if events exist.
*/
def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = None

/**
* Returns true if the status represents a completion.
*
Expand Down Expand Up @@ -945,9 +935,10 @@ trait StandardAsyncExecutionActor
* @param handle The handle of the running job.
* @return A set of actions when the job is complete
*/
def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit = tellBard(
runStatus
)
def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit =
pollingResultMonitorActor.foreach(helper =>
helper.tell(AsyncJobHasFinished(runStatus.getClass.getSimpleName), self)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass runStatus here rather than stringifying it?

)

/**
* Attempts to abort a job when an abort signal is retrieved.
Expand Down Expand Up @@ -1341,7 +1332,7 @@ trait StandardAsyncExecutionActor
checkAndRecordQuotaExhaustion(state)

// present the poll result to the cost helper. It will keep track of vm start/stop times and may emit some metadata.
costHelper.foreach(helper => helper.processPollResult(state))
pollingResultMonitorActor.foreach(helper => helper.tell(ProcessThisPollResult[StandardAsyncRunState](state), self))

state match {
case _ if isTerminal(state) =>
Expand All @@ -1358,7 +1349,7 @@ trait StandardAsyncExecutionActor

// If present, polling results will be presented to this helper.
// Subclasses can use this to emit proper metadata based on polling responses.
protected val costHelper: Option[CostPollingHelper[StandardAsyncRunState]] = Option.empty
protected val pollingResultMonitorActor: Option[ActorRef] = Option.empty

/**
* Process a poll failure.
Expand Down Expand Up @@ -1545,39 +1536,46 @@ trait StandardAsyncExecutionActor
serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), metadataKeyValues)
}

def tellBard(state: StandardAsyncRunState): Unit =
getStartAndEndTimes(state) match {
case Some(startAndEndTimes: StartAndEndTimes) =>
val dockerImage =
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes)
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value
val memory = RuntimeAttributesValidation
.extract(MemoryValidation.instance(), validatedRuntimeAttributes)
.to(MemoryUnit.Bytes)
.amount
serviceRegistryActor ! BardEventRequest(
TaskSummaryEvent(
workflowDescriptor.id.id,
workflowDescriptor.possibleParentWorkflowId.map(_.id),
workflowDescriptor.rootWorkflowId.id,
jobDescriptor.key.tag,
jobDescriptor.key.call.fullyQualifiedName,
jobDescriptor.key.index,
jobDescriptor.key.attempt,
state.getClass.getSimpleName,
platform.map(_.runtimeKey),
dockerImage,
cpus,
memory,
startAndEndTimes.jobStart.toString,
startAndEndTimes.cpuStart.map(_.toString),
startAndEndTimes.jobEnd.toString,
startAndEndTimes.jobStart.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS),
startAndEndTimes.cpuStart.map(_.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS))
)
)
case _ => ()
}
/**
* Reports metrics to bard.
* @param jobStart: Time that Cromwell started processing this job. Should be the earliest recorded time that this job has done...anything.
* @param vmStartTime: Time that the backend started spending money.
* @param vmEndTime: Time that the backend stopped spending money.
*/
def tellBard(terminalStateName: String,
jobStart: OffsetDateTime,
vmStartTime: Option[OffsetDateTime],
vmEndTime: OffsetDateTime
): Unit = {
val dockerImage =
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes)
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value
val memory = RuntimeAttributesValidation
.extract(MemoryValidation.instance(), validatedRuntimeAttributes)
.to(MemoryUnit.Bytes)
.amount
serviceRegistryActor ! BardEventRequest(
TaskSummaryEvent(
workflowDescriptor.id.id,
workflowDescriptor.possibleParentWorkflowId.map(_.id),
workflowDescriptor.rootWorkflowId.id,
jobDescriptor.key.tag,
jobDescriptor.key.call.fullyQualifiedName,
jobDescriptor.key.index,
jobDescriptor.key.attempt,
terminalStateName,
platform.map(_.runtimeKey),
dockerImage,
cpus,
memory,
jobStart.toString,
vmStartTime.map(startTime => startTime.toString),
vmEndTime.toString,
jobStart.until(vmEndTime, ChronoUnit.SECONDS),
vmStartTime.map(start => start.until(vmEndTime, ChronoUnit.SECONDS))
)
)
}

implicit override protected lazy val ec: ExecutionContextExecutor = context.dispatcher
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cromwell.backend.standard.pollmonitoring
import akka.actor.Actor
import cromwell.services.metadata.CallMetadataKeys
import java.time.OffsetDateTime
trait PollResultMessage
case class ProcessThisPollResult[PollResultType](pollResultType: PollResultType) extends PollResultMessage
case class AsyncJobHasFinished(terminalStateName: String) extends PollResultMessage

/**
* Processes poll results from backends and sends messages to other actors based on their contents.
* Primarily concerned with reporting start times, end times, and cost data to both the bard and cromwell metadata services.
*/
trait PollResultMonitorActor[PollResultType] extends Actor {
// Time that Cromwell (but not necessarily the cloud) started working on this job.
def extractEarliestEventTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Time that the user VM started spending money.
def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Time that the user VM stopped spending money.
def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime]

// Function to emit metadata that is associated with a specific call attempt.
def tellMetadata(metadata: Map[String, Any]): Unit

// Function that reports metrics to bard, called when a specific call attempt terminates.
def tellBard(terminalStateName: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved (almost) verbatim from StandardAsyncExecutionActor

jobStart: OffsetDateTime,
vmStartTime: Option[OffsetDateTime],
vmEndTime: OffsetDateTime
): Unit

private var jobStartTime: Option[OffsetDateTime] =
Option.empty
private var vmStartTime: Option[OffsetDateTime] = Option.empty
private var vmEndTime: Option[OffsetDateTime] = Option.empty

def processPollResult(pollStatus: PollResultType): Unit = {
// Make sure jobStartTime remains the earliest event time ever seen
extractEarliestEventTimeFromRunState(pollStatus).foreach { earliestTime =>
if (earliestTime.isBefore(jobStartTime.getOrElse(OffsetDateTime.now()))) {
jobStartTime = Option(earliestTime)
}
}
// If vm start time is reported, record it to metadata and stop trying
if (vmStartTime.isEmpty) {
extractStartTimeFromRunState(pollStatus).foreach { start =>
vmStartTime = Option(start)
tellMetadata(Map(CallMetadataKeys.VmStartTime -> start))
}
}
// If vm end time is reported, (or for some weird reason we see an end time after our recorded one),
// record it to metadata.
extractEndTimeFromRunState(pollStatus).foreach { end =>
if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) {
vmEndTime = Option(end)
tellMetadata(Map(CallMetadataKeys.VmEndTime -> end))
}
}
}

// When a job finishes, the bard actor needs to know about the timing in order to record metrics.
// Cost related metadata should already have been handled in processPollResult.
def handleAsyncJobFinish(terminalStateName: String): Unit =
jobStartTime.foreach(jobStart =>
tellBard(terminalStateName, jobStart, vmStartTime, vmEndTime.getOrElse(OffsetDateTime.now()))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ import wom.types.{WomArrayType, WomMapType, WomSingleFileType, WomStringType}
import wom.values._

import java.nio.file.Paths
import java.time.OffsetDateTime
import java.time.temporal.ChronoUnit
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -1190,35 +1188,6 @@ class GcpBatchAsyncBackendJobExecutionActorSpec

}

private def setupBackend: TestableGcpBatchJobExecutionActor = {
val womFile = WomSingleFile("gs://blah/b/c.txt")
val workflowInputs = Map("file_passing.f" -> womFile)
val callInputs = Map(
"in" -> womFile, // how does one programmatically map the wf inputs to the call inputs?
"out_name" -> WomString("out") // is it expected that this isn't using the default?
)
makeBatchActorRef(SampleWdl.FilePassingWorkflow, workflowInputs, "a", callInputs).underlyingActor
}

it should "not try to extract start, cpu start, and end times from terminal run statuses" in {
val jesBackend = setupBackend

val start = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.HOURS), None)
val middle = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(30, ChronoUnit.MINUTES), None)
val end = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.MINUTES), None)
val successStatus = RunStatus.Success(Seq(middle, end, start))

jesBackend.getStartAndEndTimes(successStatus) shouldBe None
}

it should "return None trying to get start and end times from a status containing no events" in {
val jesBackend = setupBackend

val successStatus = RunStatus.Success(Seq())

jesBackend.getStartAndEndTimes(successStatus) shouldBe None
}

private def makeRuntimeAttributes(job: CommandCallNode) = {
val evaluatedAttributes =
RuntimeAttributeDefinition.evaluateRuntimeAttributes(job.callable.runtimeAttributes, null, Map.empty)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cromwell.backend.google.pipelines.common

import akka.actor.Props
import cromwell.backend.google.pipelines.common.api.RunStatus
import cromwell.backend.standard.pollmonitoring.{
AsyncJobHasFinished,
PollResultMessage,
PollResultMonitorActor,
ProcessThisPollResult
}
import cromwell.services.metadata.CallMetadataKeys

import java.time.OffsetDateTime

object PapiPollResultMonitorActor {
def props(tellMetadataFn: Map[String, Any] => Unit,
tellBardFn: (String, OffsetDateTime, Option[OffsetDateTime], OffsetDateTime) => Unit
): Props = Props(new PapiPollResultMonitorActor(tellMetadataFn, tellBardFn))
}

class PapiPollResultMonitorActor(tellMetadataFn: Map[String, Any] => Unit,
tellBardFn: (String, OffsetDateTime, Option[OffsetDateTime], OffsetDateTime) => Unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth bringing back the StartAndEndTimes class, a series of OffsetDateTime can be easy to mess up.

) extends PollResultMonitorActor[RunStatus] {

override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime)
override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.collectFirst {
case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime
}

override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] =
pollStatus.eventList.collectFirst {
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime
}

override def tellMetadata(metadata: Map[String, Any]): Unit = tellMetadataFn(metadata)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we give this actor a handle to the metadata service registry rather than having all the backends pass in (I assume identical?) implementations?

Similar question for Bard - can we put the actual message-passing to the Bard service in the parent PollResultMonitorActor and figure out the minimum logic that needs to be per-backend?

Copy link
Contributor Author

@THWiseman THWiseman Sep 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is certainly possible, but I'm not sure if that's a big improvement since we would also need to pass in a lot of information about the particular task (runtime attributes, job descriptor, etc...) in order to have all the information necessary to send the message in the right way. All that other stuff doesn't seem relevant to tracking start and end times, yet we still want to send messages that include more context about their task. The passing of the function object is more about capturing necessary context from the parent class than it is about sharing implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I see what you mean, for Bard. Maybe we could have a method that takes the current inputs and creates and returns a TaskSummaryEvent, which the PollMonitorActor knows what to do with?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated branch to do something along these lines. We no longer pass in callbacks from individual backends, but instead pass in some extra data so that the poll monitors can call their own implementations of tellMetadata and tellBard instead. This allowed us to remove tellBard from elsewhere in the codebase.

override def tellBard(terminalStateName: String,
jobStart: OffsetDateTime,
vmStartTime: Option[OffsetDateTime],
vmEndTime: OffsetDateTime
): Unit =
tellBardFn(terminalStateName, jobStart, vmStartTime, vmEndTime)
override def receive: Receive = {
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)
case ProcessThisPollResult(_) => println("Programmer error: Received Poll Result of unknown type.")

Check warning on line 48 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala#L48

Added line #L48 was not covered by tests
case AsyncJobHasFinished(terminalStateName) => handleAsyncJobFinish(terminalStateName)
}
case _ =>
println("Programmer error: Cost Helper received message of type other than CostPollingMessage")

Check warning on line 52 in supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala#L52

Added line #L52 was not covered by tests
}
}
Loading
Loading