-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WX-1792] Helper to Actor #7544
Merged
Merged
Changes from 15 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
db89506
initial refactor
THWiseman 68e8930
remove startandendtimes
THWiseman 05f2197
rename helper
THWiseman 04c9fbf
fix up some tests
THWiseman 16df9c3
fix up tests
THWiseman 60e4747
comments & format
THWiseman eb60ff8
improve test
THWiseman 77edf58
scalafmt
THWiseman 200c472
Merge branch 'develop' into WX-1792-helper-to-actor
THWiseman 219aacb
build issues
THWiseman fc45b0a
default to proper kv service
THWiseman 9d5dd8b
scalafmt
THWiseman b2b08c0
PR feedback
THWiseman 3b39573
bug + scalafmt
THWiseman 7e87be3
delete unused function
THWiseman d9aa415
remove println
THWiseman 1e445b4
fix batch tests
THWiseman 586f154
status equality
THWiseman a46c85b
delete broken test (i broke it and its OK)
THWiseman File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 0 additions & 34 deletions
34
backend/src/main/scala/cromwell/backend/standard/costestimation/CostPollingHelper.scala
This file was deleted.
Oops, something went wrong.
138 changes: 138 additions & 0 deletions
138
backend/src/main/scala/cromwell/backend/standard/pollmonitoring/PollResultMonitorActor.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
package cromwell.backend.standard.pollmonitoring | ||
import akka.actor.{Actor, ActorRef} | ||
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} | ||
import cromwell.backend.validation.{ | ||
CpuValidation, | ||
DockerValidation, | ||
MemoryValidation, | ||
RuntimeAttributesValidation, | ||
ValidatedRuntimeAttributes | ||
} | ||
import cromwell.core.logging.JobLogger | ||
import cromwell.services.metadata.CallMetadataKeys | ||
import cromwell.services.metrics.bard.BardEventing.BardEventRequest | ||
import cromwell.services.metrics.bard.model.TaskSummaryEvent | ||
import wdl4s.parser.MemoryUnit | ||
|
||
import java.time.OffsetDateTime | ||
import java.time.temporal.ChronoUnit | ||
trait PollResultMessage | ||
case class ProcessThisPollResult[PollResultType](pollResult: PollResultType) extends PollResultMessage | ||
case class AsyncJobHasFinished[PollResultType](pollResult: PollResultType) extends PollResultMessage | ||
|
||
case class PollMonitorParameters( | ||
serviceRegistry: ActorRef, | ||
workflowDescriptor: BackendWorkflowDescriptor, | ||
jobDescriptor: BackendJobDescriptor, | ||
validatedRuntimeAttributes: ValidatedRuntimeAttributes, | ||
platform: Option[Platform], | ||
logger: Option[JobLogger] | ||
) | ||
|
||
/** | ||
* Processes poll results from backends and sends messages to other actors based on their contents. | ||
* Primarily concerned with reporting start times, end times, and cost data to both the bard and cromwell metadata services. | ||
*/ | ||
trait PollResultMonitorActor[PollResultType] extends Actor { | ||
def params: PollMonitorParameters | ||
|
||
// Time that Cromwell (but not necessarily the cloud) started working on this job. | ||
def extractEarliestEventTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Time that the user VM started spending money. | ||
def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Time that the user VM stopped spending money. | ||
def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] | ||
|
||
// Function to emit metadata that is associated with a specific call attempt. | ||
def tellMetadata(metadataKeyValues: Map[String, Any]): Unit = { | ||
import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter | ||
params.serviceRegistry.putMetadata(params.jobDescriptor.workflowDescriptor.id, | ||
Option(params.jobDescriptor.key), | ||
metadataKeyValues | ||
) | ||
} | ||
|
||
// Function that reports metrics to bard, called when a specific call attempt terminates. | ||
def tellBard(terminalStateName: String, | ||
jobStart: OffsetDateTime, | ||
vmStartTime: Option[OffsetDateTime], | ||
vmEndTime: OffsetDateTime | ||
): Unit = { | ||
val validatedRuntimeAttributes = params.validatedRuntimeAttributes | ||
val serviceRegistryActor = params.serviceRegistry | ||
val workflowDescriptor = params.workflowDescriptor | ||
val jobDescriptor = params.jobDescriptor | ||
val platform = params.platform.map(_.runtimeKey) | ||
val dockerImage = | ||
RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes) | ||
val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value | ||
val memory = RuntimeAttributesValidation | ||
.extract(MemoryValidation.instance(), validatedRuntimeAttributes) | ||
.to(MemoryUnit.Bytes) | ||
.amount | ||
serviceRegistryActor ! BardEventRequest( | ||
TaskSummaryEvent( | ||
workflowDescriptor.id.id, | ||
workflowDescriptor.possibleParentWorkflowId.map(_.id), | ||
workflowDescriptor.rootWorkflowId.id, | ||
jobDescriptor.key.tag, | ||
jobDescriptor.key.call.fullyQualifiedName, | ||
jobDescriptor.key.index, | ||
jobDescriptor.key.attempt, | ||
terminalStateName, | ||
platform, | ||
dockerImage, | ||
cpus, | ||
memory, | ||
jobStart.toString, | ||
vmStartTime.map(startTime => startTime.toString), | ||
vmEndTime.toString, | ||
jobStart.until(vmEndTime, ChronoUnit.SECONDS), | ||
vmStartTime.map(start => start.until(vmEndTime, ChronoUnit.SECONDS)) | ||
) | ||
) | ||
} | ||
|
||
private var jobStartTime: Option[OffsetDateTime] = | ||
Option.empty | ||
private var vmStartTime: Option[OffsetDateTime] = Option.empty | ||
private var vmEndTime: Option[OffsetDateTime] = Option.empty | ||
|
||
def processPollResult(pollStatus: PollResultType): Unit = { | ||
// Make sure jobStartTime remains the earliest event time ever seen | ||
extractEarliestEventTimeFromRunState(pollStatus).foreach { earliestTime => | ||
if (earliestTime.isBefore(jobStartTime.getOrElse(OffsetDateTime.now()))) { | ||
jobStartTime = Option(earliestTime) | ||
} | ||
} | ||
// If vm start time is reported, record it to metadata and stop trying | ||
if (vmStartTime.isEmpty) { | ||
extractStartTimeFromRunState(pollStatus).foreach { start => | ||
vmStartTime = Option(start) | ||
tellMetadata(Map(CallMetadataKeys.VmStartTime -> start)) | ||
} | ||
} | ||
// If vm end time is reported, (or for some weird reason we see an end time after our recorded one), | ||
// record it to metadata. | ||
extractEndTimeFromRunState(pollStatus).foreach { end => | ||
if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) { | ||
vmEndTime = Option(end) | ||
tellMetadata(Map(CallMetadataKeys.VmEndTime -> end)) | ||
} | ||
} | ||
} | ||
|
||
// When a job finishes, the bard actor needs to know about the timing in order to record metrics. | ||
// Cost related metadata should already have been handled in processPollResult. | ||
def handleAsyncJobFinish(terminalStateName: String): Unit = | ||
jobStartTime.foreach(jobStart => | ||
tellBard( | ||
terminalStateName = terminalStateName, | ||
jobStart = jobStart, | ||
vmStartTime = vmStartTime, | ||
vmEndTime = vmEndTime.getOrElse(OffsetDateTime.now()) | ||
) | ||
) | ||
} |
77 changes: 77 additions & 0 deletions
77
...tch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
package cromwell.backend.google.batch.actors | ||
|
||
import akka.actor.{ActorRef, Props} | ||
import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} | ||
import cromwell.backend.google.batch.models.RunStatus | ||
import cromwell.backend.standard.pollmonitoring.{ | ||
AsyncJobHasFinished, | ||
PollMonitorParameters, | ||
PollResultMessage, | ||
PollResultMonitorActor, | ||
ProcessThisPollResult | ||
} | ||
import cromwell.backend.validation.ValidatedRuntimeAttributes | ||
import cromwell.core.logging.JobLogger | ||
import cromwell.services.metadata.CallMetadataKeys | ||
|
||
import java.time.OffsetDateTime | ||
|
||
object BatchPollResultMonitorActor { | ||
def props(serviceRegistry: ActorRef, | ||
workflowDescriptor: BackendWorkflowDescriptor, | ||
jobDescriptor: BackendJobDescriptor, | ||
runtimeAttributes: ValidatedRuntimeAttributes, | ||
platform: Option[Platform], | ||
logger: JobLogger | ||
): Props = Props( | ||
new BatchPollResultMonitorActor( | ||
PollMonitorParameters(serviceRegistry, | ||
workflowDescriptor, | ||
jobDescriptor, | ||
runtimeAttributes, | ||
platform, | ||
Option(logger) | ||
) | ||
) | ||
) | ||
} | ||
|
||
class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) | ||
extends PollResultMonitorActor[RunStatus] { | ||
|
||
override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime) | ||
override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.collectFirst { | ||
case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime | ||
} | ||
|
||
override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = | ||
pollStatus.eventList.collectFirst { | ||
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime | ||
} | ||
|
||
override def receive: Receive = { | ||
case message: PollResultMessage => | ||
message match { | ||
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult) | ||
case ProcessThisPollResult(result) => | ||
params.logger.foreach(logger => | ||
logger.error( | ||
s"Programmer error: Received Poll Result of unknown type. Expected ${RunStatus.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." | ||
) | ||
) | ||
case AsyncJobHasFinished(pollResult: RunStatus) => handleAsyncJobFinish(pollResult.getClass.getSimpleName) | ||
case AsyncJobHasFinished(result) => | ||
params.logger.foreach(logger => | ||
logger.error( | ||
s"Programmer error: Received Poll Result of unknown type. Expected ${AsyncJobHasFinished.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." | ||
) | ||
) | ||
} | ||
case _ => | ||
println("Programmer error: Cost Helper received message of type other than CostPollingMessage") | ||
} | ||
|
||
override def params: PollMonitorParameters = pollMonitorParameters | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved (almost) verbatim from
StandardAsyncExecutionActor