-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WX-1792] Helper to Actor #7544
Changes from all commits
db89506
68e8930
05f2197
04c9fbf
16df9c3
60e4747
eb60ff8
77edf58
200c472
219aacb
fc45b0a
9d5dd8b
b2b08c0
3b39573
7e87be3
d9aa415
1e445b4
586f154
a46c85b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package cromwell.backend.standard.pollmonitoring | ||
import akka.actor.{Actor, ActorRef} | ||
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} | ||
import cromwell.backend.validation.{ | ||
CpuValidation, | ||
DockerValidation, | ||
MemoryValidation, | ||
RuntimeAttributesValidation, | ||
ValidatedRuntimeAttributes | ||
} | ||
import cromwell.core.logging.JobLogger | ||
import cromwell.services.metadata.CallMetadataKeys | ||
import cromwell.services.metrics.bard.BardEventing.BardEventRequest | ||
import cromwell.services.metrics.bard.model.TaskSummaryEvent | ||
import wdl4s.parser.MemoryUnit | ||
|
||
import java.time.OffsetDateTime | ||
import java.time.temporal.ChronoUnit | ||
trait PollResultMessage | ||
case class ProcessThisPollResult[PollResultType](pollResult: PollResultType) extends PollResultMessage | ||
case class AsyncJobHasFinished[PollResultType](pollResult: PollResultType) extends PollResultMessage | ||
|
||
case class PollMonitorParameters( | ||
serviceRegistry: ActorRef, | ||
workflowDescriptor: BackendWorkflowDescriptor, | ||
jobDescriptor: BackendJobDescriptor, | ||
validatedRuntimeAttributes: ValidatedRuntimeAttributes, | ||
platform: Option[Platform], | ||
logger: Option[JobLogger] | ||
) | ||
|
||
/** | ||
* Processes poll results from backends and sends messages to other actors based on their contents. | ||
* Primarily concerned with reporting start times, end times, and cost data to both the bard and cromwell metadata services. | ||
*/ | ||
trait PollResultMonitorActor[PollResultType] extends Actor { | ||
def params: PollMonitorParameters | ||
|
||
// Time that Cromwell (but not necessarily the cloud) started working on this job. | ||
def extractEarliestEventTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Time that the user VM started spending money. | ||
def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Time that the user VM stopped spending money. | ||
def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Function to emit metadata that is associated with a specific call attempt. | ||
def tellMetadata(metadataKeyValues: Map[String, Any]): Unit = { | ||
import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter | ||
params.serviceRegistry.putMetadata(params.jobDescriptor.workflowDescriptor.id, | ||
Option(params.jobDescriptor.key), | ||
metadataKeyValues | ||
) | ||
} | ||
|
||
// Function that reports metrics to bard, called when a specific call attempt terminates. | ||
def tellBard(terminalStateName: String, | ||
jobStart: OffsetDateTime, | ||
vmStartTime: Option[OffsetDateTime], | ||
vmEndTime: OffsetDateTime | ||
): Unit = { | ||
val validatedRuntimeAttributes = params.validatedRuntimeAttributes | ||
val serviceRegistryActor = params.serviceRegistry | ||
val workflowDescriptor = params.workflowDescriptor | ||
val jobDescriptor = params.jobDescriptor | ||
val platform = params.platform.map(_.runtimeKey) | ||
val dockerImage = | ||
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes) | ||
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value | ||
val memory = RuntimeAttributesValidation | ||
.extract(MemoryValidation.instance(), validatedRuntimeAttributes) | ||
.to(MemoryUnit.Bytes) | ||
.amount | ||
serviceRegistryActor ! BardEventRequest( | ||
TaskSummaryEvent( | ||
workflowDescriptor.id.id, | ||
workflowDescriptor.possibleParentWorkflowId.map(_.id), | ||
workflowDescriptor.rootWorkflowId.id, | ||
jobDescriptor.key.tag, | ||
jobDescriptor.key.call.fullyQualifiedName, | ||
jobDescriptor.key.index, | ||
jobDescriptor.key.attempt, | ||
terminalStateName, | ||
platform, | ||
dockerImage, | ||
cpus, | ||
memory, | ||
jobStart.toString, | ||
vmStartTime.map(startTime => startTime.toString), | ||
vmEndTime.toString, | ||
jobStart.until(vmEndTime, ChronoUnit.SECONDS), | ||
vmStartTime.map(start => start.until(vmEndTime, ChronoUnit.SECONDS)) | ||
) | ||
) | ||
} | ||
|
||
private var jobStartTime: Option[OffsetDateTime] = | ||
Option.empty | ||
private var vmStartTime: Option[OffsetDateTime] = Option.empty | ||
private var vmEndTime: Option[OffsetDateTime] = Option.empty | ||
|
||
def processPollResult(pollStatus: PollResultType): Unit = { | ||
// Make sure jobStartTime remains the earliest event time ever seen | ||
extractEarliestEventTimeFromRunState(pollStatus).foreach { earliestTime => | ||
if (earliestTime.isBefore(jobStartTime.getOrElse(OffsetDateTime.now()))) { | ||
jobStartTime = Option(earliestTime) | ||
} | ||
} | ||
// If vm start time is reported, record it to metadata and stop trying | ||
if (vmStartTime.isEmpty) { | ||
extractStartTimeFromRunState(pollStatus).foreach { start => | ||
vmStartTime = Option(start) | ||
tellMetadata(Map(CallMetadataKeys.VmStartTime -> start)) | ||
} | ||
} | ||
// If vm end time is reported, (or for some weird reason we see an end time after our recorded one), | ||
// record it to metadata. | ||
extractEndTimeFromRunState(pollStatus).foreach { end => | ||
if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) { | ||
vmEndTime = Option(end) | ||
tellMetadata(Map(CallMetadataKeys.VmEndTime -> end)) | ||
} | ||
} | ||
} | ||
|
||
// When a job finishes, the bard actor needs to know about the timing in order to record metrics. | ||
// Cost related metadata should already have been handled in processPollResult. | ||
def handleAsyncJobFinish(terminalStateName: String): Unit = | ||
jobStartTime.foreach(jobStart => | ||
tellBard( | ||
terminalStateName = terminalStateName, | ||
jobStart = jobStart, | ||
vmStartTime = vmStartTime, | ||
vmEndTime = vmEndTime.getOrElse(OffsetDateTime.now()) | ||
) | ||
) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package cromwell.backend.google.batch.actors | ||
|
||
import akka.actor.{ActorRef, Props} | ||
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} | ||
import cromwell.backend.google.batch.models.RunStatus | ||
import cromwell.backend.standard.pollmonitoring.{ | ||
AsyncJobHasFinished, | ||
PollMonitorParameters, | ||
PollResultMessage, | ||
PollResultMonitorActor, | ||
ProcessThisPollResult | ||
} | ||
import cromwell.backend.validation.ValidatedRuntimeAttributes | ||
import cromwell.core.logging.JobLogger | ||
import cromwell.services.metadata.CallMetadataKeys | ||
|
||
import java.time.OffsetDateTime | ||
|
||
object BatchPollResultMonitorActor { | ||
def props(serviceRegistry: ActorRef, | ||
workflowDescriptor: BackendWorkflowDescriptor, | ||
jobDescriptor: BackendJobDescriptor, | ||
runtimeAttributes: ValidatedRuntimeAttributes, | ||
platform: Option[Platform], | ||
logger: JobLogger | ||
): Props = Props( | ||
new BatchPollResultMonitorActor( | ||
PollMonitorParameters(serviceRegistry, | ||
Check warning on line 28 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
workflowDescriptor, | ||
jobDescriptor, | ||
runtimeAttributes, | ||
platform, | ||
Option(logger) | ||
Check warning on line 33 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
) | ||
) | ||
) | ||
} | ||
|
||
class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) | ||
extends PollResultMonitorActor[RunStatus] { | ||
|
||
override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime) | ||
Check warning on line 43 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.collectFirst { | ||
case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime | ||
Check warning on line 46 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
} | ||
|
||
override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.collectFirst { | ||
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime | ||
Check warning on line 51 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
} | ||
|
||
override def receive: Receive = { | ||
Check warning on line 54 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
case message: PollResultMessage => | ||
message match { | ||
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult) | ||
Check warning on line 57 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
case ProcessThisPollResult(result) => | ||
params.logger.foreach(logger => | ||
logger.error( | ||
Check warning on line 60 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
s"Programmer error: Received Poll Result of unknown type. Expected ${RunStatus.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." | ||
) | ||
) | ||
case AsyncJobHasFinished(pollResult: RunStatus) => handleAsyncJobFinish(pollResult.getClass.getSimpleName) | ||
Check warning on line 64 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
case AsyncJobHasFinished(result) => | ||
params.logger.foreach(logger => | ||
logger.error( | ||
Check warning on line 67 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
s"Programmer error: Received Poll Result of unknown type. Expected ${AsyncJobHasFinished.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." | ||
) | ||
) | ||
} | ||
case _ => | ||
params.logger.foreach(logger => | ||
logger.error( | ||
Check warning on line 74 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
s"Programmer error: Cost Helper received message of type other than CostPollingMessage" | ||
) | ||
) | ||
} | ||
|
||
override def params: PollMonitorParameters = pollMonitorParameters | ||
Check warning on line 80 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved (almost) verbatim from
StandardAsyncExecutionActor