diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala index ec126eee1da..8f8312023f1 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardAsyncExecutionActor.scala @@ -23,7 +23,7 @@ import cromwell.backend._ import cromwell.backend.async.AsyncBackendJobExecutionActor._ import cromwell.backend.async._ import cromwell.backend.standard.StandardAdHocValue._ -import cromwell.backend.standard.costestimation.CostPollingHelper +import cromwell.backend.standard.pollmonitoring.{AsyncJobHasFinished, ProcessThisPollResult} import cromwell.backend.standard.retry.memory.MemoryRetryResult import cromwell.backend.validation._ import cromwell.core._ @@ -32,15 +32,12 @@ import cromwell.core.path.Path import cromwell.services.keyvalue.KeyValueServiceActor._ import cromwell.services.keyvalue.KvClient import cromwell.services.metadata.CallMetadataKeys -import cromwell.services.metrics.bard.BardEventing.BardEventRequest -import cromwell.services.metrics.bard.model.TaskSummaryEvent import eu.timepit.refined.refineV import mouse.all._ import net.ceedubs.ficus.Ficus._ import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.exception.ExceptionUtils import shapeless.Coproduct -import wdl4s.parser.MemoryUnit import wom.callable.{AdHocValue, CommandTaskDefinition, ContainerizedInputExpression} import wom.expression.WomExpression import wom.graph.LocalName @@ -48,8 +45,6 @@ import wom.values._ import wom.{CommandSetupSideEffectFile, InstantiatedCommand, WomFileMapper} import java.io.IOException -import java.time.OffsetDateTime -import java.time.temporal.ChronoUnit import scala.concurrent._ import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} @@ -77,8 +72,6 @@ case class DefaultStandardAsyncExecutionActorParams( // Override to `false` when we need the script to set an environment variable in the parent shell. case class ScriptPreambleData(bashString: String, executeInSubshell: Boolean = true) -case class StartAndEndTimes(jobStart: OffsetDateTime, cpuStart: Option[OffsetDateTime], jobEnd: OffsetDateTime) - /** * An extension of the generic AsyncBackendJobExecutionActor providing a standard abstract implementation of an * asynchronous polling backend. @@ -904,14 +897,6 @@ trait StandardAsyncExecutionActor */ def getTerminalEvents(runStatus: StandardAsyncRunState): Seq[ExecutionEvent] = Seq.empty - /** - * Get the min and max event times from a terminal run status - * - * @param runStatus The terminal run status, as defined by isTerminal. - * @return The min and max event times, if events exist. - */ - def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = None - /** * Returns true if the status represents a completion. * @@ -945,9 +930,8 @@ trait StandardAsyncExecutionActor * @param handle The handle of the running job. * @return A set of actions when the job is complete */ - def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit = tellBard( - runStatus - ) + def onTaskComplete(runStatus: StandardAsyncRunState, handle: StandardAsyncPendingExecutionHandle): Unit = + pollingResultMonitorActor.foreach(helper => helper.tell(AsyncJobHasFinished(runStatus), self)) /** * Attempts to abort a job when an abort signal is retrieved. @@ -1341,7 +1325,7 @@ trait StandardAsyncExecutionActor checkAndRecordQuotaExhaustion(state) // present the poll result to the cost helper. It will keep track of vm start/stop times and may emit some metadata. - costHelper.foreach(helper => helper.processPollResult(state)) + pollingResultMonitorActor.foreach(helper => helper.tell(ProcessThisPollResult[StandardAsyncRunState](state), self)) state match { case _ if isTerminal(state) => @@ -1358,7 +1342,7 @@ trait StandardAsyncExecutionActor // If present, polling results will be presented to this helper. // Subclasses can use this to emit proper metadata based on polling responses. - protected val costHelper: Option[CostPollingHelper[StandardAsyncRunState]] = Option.empty + protected val pollingResultMonitorActor: Option[ActorRef] = Option.empty /** * Process a poll failure. @@ -1545,40 +1529,6 @@ trait StandardAsyncExecutionActor serviceRegistryActor.putMetadata(jobDescriptor.workflowDescriptor.id, Option(jobDescriptor.key), metadataKeyValues) } - def tellBard(state: StandardAsyncRunState): Unit = - getStartAndEndTimes(state) match { - case Some(startAndEndTimes: StartAndEndTimes) => - val dockerImage = - RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes) - val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value - val memory = RuntimeAttributesValidation - .extract(MemoryValidation.instance(), validatedRuntimeAttributes) - .to(MemoryUnit.Bytes) - .amount - serviceRegistryActor ! BardEventRequest( - TaskSummaryEvent( - workflowDescriptor.id.id, - workflowDescriptor.possibleParentWorkflowId.map(_.id), - workflowDescriptor.rootWorkflowId.id, - jobDescriptor.key.tag, - jobDescriptor.key.call.fullyQualifiedName, - jobDescriptor.key.index, - jobDescriptor.key.attempt, - state.getClass.getSimpleName, - platform.map(_.runtimeKey), - dockerImage, - cpus, - memory, - startAndEndTimes.jobStart.toString, - startAndEndTimes.cpuStart.map(_.toString), - startAndEndTimes.jobEnd.toString, - startAndEndTimes.jobStart.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS), - startAndEndTimes.cpuStart.map(_.until(startAndEndTimes.jobEnd, ChronoUnit.SECONDS)) - ) - ) - case _ => () - } - implicit override protected lazy val ec: ExecutionContextExecutor = context.dispatcher } diff --git a/backend/src/main/scala/cromwell/backend/standard/costestimation/CostPollingHelper.scala b/backend/src/main/scala/cromwell/backend/standard/costestimation/CostPollingHelper.scala deleted file mode 100644 index 451b8b60194..00000000000 --- a/backend/src/main/scala/cromwell/backend/standard/costestimation/CostPollingHelper.scala +++ /dev/null @@ -1,34 +0,0 @@ -package cromwell.backend.standard.costestimation -import cromwell.services.metadata.CallMetadataKeys -import java.time.OffsetDateTime - -// Emits metadata based on backend poll results. Limits the amount of metadata emitted. -trait CostPollingHelper[A] { - - // Should be overridden to return the time that the user VM starts spending money. - // None if it can't be ascertained from the provided status. - def extractStartTimeFromRunState(pollStatus: A): Option[OffsetDateTime] - - // Should be overridden to return the time that the user VM stops spending money. - // None if it can't be ascertained from the provided status. - def extractEndTimeFromRunState(pollStatus: A): Option[OffsetDateTime] - // Function to emit metadata that is associated with a specific call attempt. - def tellMetadata(metadata: Map[String, Any]): Unit - - var vmStartTime: Option[OffsetDateTime] = Option.empty - var vmEndTime: Option[OffsetDateTime] = Option.empty - def processPollResult(pollStatus: A): Unit = { - if (vmStartTime.isEmpty) { - extractStartTimeFromRunState(pollStatus).foreach { start => - vmStartTime = Some(start) - tellMetadata(Map(CallMetadataKeys.VmStartTime -> start)) - } - } - extractEndTimeFromRunState(pollStatus).foreach { end => - if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) { - vmEndTime = Some(end) - tellMetadata(Map(CallMetadataKeys.VmEndTime -> end)) - } - } - } -} diff --git a/backend/src/main/scala/cromwell/backend/standard/pollmonitoring/PollResultMonitorActor.scala b/backend/src/main/scala/cromwell/backend/standard/pollmonitoring/PollResultMonitorActor.scala new file mode 100644 index 00000000000..d5b8110d5dc --- /dev/null +++ b/backend/src/main/scala/cromwell/backend/standard/pollmonitoring/PollResultMonitorActor.scala @@ -0,0 +1,138 @@ +package cromwell.backend.standard.pollmonitoring +import akka.actor.{Actor, ActorRef} +import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} +import cromwell.backend.validation.{ + CpuValidation, + DockerValidation, + MemoryValidation, + RuntimeAttributesValidation, + ValidatedRuntimeAttributes +} +import cromwell.core.logging.JobLogger +import cromwell.services.metadata.CallMetadataKeys +import cromwell.services.metrics.bard.BardEventing.BardEventRequest +import cromwell.services.metrics.bard.model.TaskSummaryEvent +import wdl4s.parser.MemoryUnit + +import java.time.OffsetDateTime +import java.time.temporal.ChronoUnit +trait PollResultMessage +case class ProcessThisPollResult[PollResultType](pollResult: PollResultType) extends PollResultMessage +case class AsyncJobHasFinished[PollResultType](pollResult: PollResultType) extends PollResultMessage + +case class PollMonitorParameters( + serviceRegistry: ActorRef, + workflowDescriptor: BackendWorkflowDescriptor, + jobDescriptor: BackendJobDescriptor, + validatedRuntimeAttributes: ValidatedRuntimeAttributes, + platform: Option[Platform], + logger: Option[JobLogger] +) + +/** + * Processes poll results from backends and sends messages to other actors based on their contents. + * Primarily concerned with reporting start times, end times, and cost data to both the bard and cromwell metadata services. + */ +trait PollResultMonitorActor[PollResultType] extends Actor { + def params: PollMonitorParameters + + // Time that Cromwell (but not necessarily the cloud) started working on this job. + def extractEarliestEventTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] + + // Time that the user VM started spending money. + def extractStartTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] + + // Time that the user VM stopped spending money. + def extractEndTimeFromRunState(pollStatus: PollResultType): Option[OffsetDateTime] + + // Function to emit metadata that is associated with a specific call attempt. + def tellMetadata(metadataKeyValues: Map[String, Any]): Unit = { + import cromwell.services.metadata.MetadataService.implicits.MetadataAutoPutter + params.serviceRegistry.putMetadata(params.jobDescriptor.workflowDescriptor.id, + Option(params.jobDescriptor.key), + metadataKeyValues + ) + } + + // Function that reports metrics to bard, called when a specific call attempt terminates. + def tellBard(terminalStateName: String, + jobStart: OffsetDateTime, + vmStartTime: Option[OffsetDateTime], + vmEndTime: OffsetDateTime + ): Unit = { + val validatedRuntimeAttributes = params.validatedRuntimeAttributes + val serviceRegistryActor = params.serviceRegistry + val workflowDescriptor = params.workflowDescriptor + val jobDescriptor = params.jobDescriptor + val platform = params.platform.map(_.runtimeKey) + val dockerImage = + RuntimeAttributesValidation.extractOption(DockerValidation.instance, validatedRuntimeAttributes) + val cpus = RuntimeAttributesValidation.extract(CpuValidation.instance, validatedRuntimeAttributes).value + val memory = RuntimeAttributesValidation + .extract(MemoryValidation.instance(), validatedRuntimeAttributes) + .to(MemoryUnit.Bytes) + .amount + serviceRegistryActor ! BardEventRequest( + TaskSummaryEvent( + workflowDescriptor.id.id, + workflowDescriptor.possibleParentWorkflowId.map(_.id), + workflowDescriptor.rootWorkflowId.id, + jobDescriptor.key.tag, + jobDescriptor.key.call.fullyQualifiedName, + jobDescriptor.key.index, + jobDescriptor.key.attempt, + terminalStateName, + platform, + dockerImage, + cpus, + memory, + jobStart.toString, + vmStartTime.map(startTime => startTime.toString), + vmEndTime.toString, + jobStart.until(vmEndTime, ChronoUnit.SECONDS), + vmStartTime.map(start => start.until(vmEndTime, ChronoUnit.SECONDS)) + ) + ) + } + + private var jobStartTime: Option[OffsetDateTime] = + Option.empty + private var vmStartTime: Option[OffsetDateTime] = Option.empty + private var vmEndTime: Option[OffsetDateTime] = Option.empty + + def processPollResult(pollStatus: PollResultType): Unit = { + // Make sure jobStartTime remains the earliest event time ever seen + extractEarliestEventTimeFromRunState(pollStatus).foreach { earliestTime => + if (earliestTime.isBefore(jobStartTime.getOrElse(OffsetDateTime.now()))) { + jobStartTime = Option(earliestTime) + } + } + // If vm start time is reported, record it to metadata and stop trying + if (vmStartTime.isEmpty) { + extractStartTimeFromRunState(pollStatus).foreach { start => + vmStartTime = Option(start) + tellMetadata(Map(CallMetadataKeys.VmStartTime -> start)) + } + } + // If vm end time is reported, (or for some weird reason we see an end time after our recorded one), + // record it to metadata. + extractEndTimeFromRunState(pollStatus).foreach { end => + if (vmEndTime.isEmpty || end.isAfter(vmEndTime.get)) { + vmEndTime = Option(end) + tellMetadata(Map(CallMetadataKeys.VmEndTime -> end)) + } + } + } + + // When a job finishes, the bard actor needs to know about the timing in order to record metrics. + // Cost related metadata should already have been handled in processPollResult. + def handleAsyncJobFinish(terminalStateName: String): Unit = + jobStartTime.foreach(jobStart => + tellBard( + terminalStateName = terminalStateName, + jobStart = jobStart, + vmStartTime = vmStartTime, + vmEndTime = vmEndTime.getOrElse(OffsetDateTime.now()) + ) + ) +} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala new file mode 100644 index 00000000000..8b05bf4057b --- /dev/null +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchPollResultMonitorActor.scala @@ -0,0 +1,81 @@ +package cromwell.backend.google.batch.actors + +import akka.actor.{ActorRef, Props} +import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} +import cromwell.backend.google.batch.models.RunStatus +import cromwell.backend.standard.pollmonitoring.{ + AsyncJobHasFinished, + PollMonitorParameters, + PollResultMessage, + PollResultMonitorActor, + ProcessThisPollResult +} +import cromwell.backend.validation.ValidatedRuntimeAttributes +import cromwell.core.logging.JobLogger +import cromwell.services.metadata.CallMetadataKeys + +import java.time.OffsetDateTime + +object BatchPollResultMonitorActor { + def props(serviceRegistry: ActorRef, + workflowDescriptor: BackendWorkflowDescriptor, + jobDescriptor: BackendJobDescriptor, + runtimeAttributes: ValidatedRuntimeAttributes, + platform: Option[Platform], + logger: JobLogger + ): Props = Props( + new BatchPollResultMonitorActor( + PollMonitorParameters(serviceRegistry, + workflowDescriptor, + jobDescriptor, + runtimeAttributes, + platform, + Option(logger) + ) + ) + ) +} + +class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters) + extends PollResultMonitorActor[RunStatus] { + + override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime) + override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.collectFirst { + case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime + } + + override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.collectFirst { + case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime + } + + override def receive: Receive = { + case message: PollResultMessage => + message match { + case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult) + case ProcessThisPollResult(result) => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Received Poll Result of unknown type. Expected ${RunStatus.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." + ) + ) + case AsyncJobHasFinished(pollResult: RunStatus) => handleAsyncJobFinish(pollResult.getClass.getSimpleName) + case AsyncJobHasFinished(result) => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Received Poll Result of unknown type. Expected ${AsyncJobHasFinished.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." + ) + ) + } + case _ => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Cost Helper received message of type other than CostPollingMessage" + ) + ) + } + + override def params: PollMonitorParameters = pollMonitorParameters +} diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index febaf196bec..433acef7731 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -1074,7 +1074,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar Future.fromTry { Try { runStatus match { - case RunStatus.Aborted => AbortedExecutionHandle + case RunStatus.Aborted(_) => AbortedExecutionHandle case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus) case unknown => throw new RuntimeException( diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala index 1ec59cc1dc3..3677f49782c 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/request/BatchRequestExecutor.scala @@ -120,11 +120,11 @@ object BatchRequestExecutor { } catch { // A job can't be cancelled but deleted, which is why we consider 404 status as the job being cancelled successfully case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.NOT_FOUND => - BatchApiResponse.StatusQueried(RunStatus.Aborted) + BatchApiResponse.StatusQueried(RunStatus.Aborted(Seq.empty)) // We don't need to detect preemptible VMs because that's handled automatically by GCP case apiException: ApiException if apiException.getStatusCode.getCode == StatusCode.Code.RESOURCE_EXHAUSTED => - BatchApiResponse.StatusQueried(RunStatus.AwaitingCloudQuota) + BatchApiResponse.StatusQueried(RunStatus.AwaitingCloudQuota(Seq.empty)) } private[request] def interpretOperationStatus(job: Job): RunStatus = { @@ -140,11 +140,11 @@ object BatchRequestExecutor { if (job.getStatus.getState == JobStatus.State.SUCCEEDED) { RunStatus.Success(events) } else if (job.getStatus.getState == JobStatus.State.RUNNING) { - RunStatus.Running + RunStatus.Running(events) } else if (job.getStatus.getState == JobStatus.State.FAILED) { RunStatus.Failed(exitCode, events) } else { - RunStatus.Initializing + RunStatus.Initializing(events) } } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala index c7c78fa7074..b231e6e969a 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/RunStatus.scala @@ -2,19 +2,21 @@ package cromwell.backend.google.batch.models import cromwell.core.ExecutionEvent -sealed trait RunStatus +sealed trait RunStatus { + def eventList: Seq[ExecutionEvent] + def toString: String +} object RunStatus { - case object Initializing extends RunStatus + case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" } + case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus { + override def toString = "AwaitingCloudQuota" + } - case object AwaitingCloudQuota extends RunStatus + case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" } - case object Running extends RunStatus - - sealed trait TerminalRunStatus extends RunStatus { - def eventList: Seq[ExecutionEvent] - } + sealed trait TerminalRunStatus extends RunStatus case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus { override def toString = "Success" @@ -56,13 +58,11 @@ object RunStatus { } } - final case object Aborted extends UnsuccessfulRunStatus { + final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus { override def toString = "Aborted" override val exitCode: Option[GcpBatchExitCode] = None - override def eventList: Seq[ExecutionEvent] = List.empty - override val prettyPrintedError: String = "The job was aborted" } } diff --git a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala index ed2730db40b..a162165f765 100644 --- a/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/batch/src/test/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActorSpec.scala @@ -3,7 +3,7 @@ package actors import _root_.wdl.draft2.model._ import akka.actor.{ActorRef, Props} -import akka.testkit.{ImplicitSender, TestActorRef, TestDuration} +import akka.testkit.{ImplicitSender, TestActorRef, TestDuration, TestProbe} import cats.data.NonEmptyList import cloud.nio.impl.drs.DrsCloudNioFileProvider.DrsReadInterpreter import cloud.nio.impl.drs.{DrsCloudNioFileSystemProvider, GoogleOauthDrsCredentials} @@ -14,6 +14,7 @@ import common.collections.EnhancedCollections._ import common.mock.MockSugar import cromwell.backend.BackendJobExecutionActor.BackendJobExecutionResponse import cromwell.backend._ +import cromwell.backend.google.batch.actors.GcpBatchAsyncBackendJobExecutionActor.GcpBatchPendingExecutionHandle import cromwell.backend.google.batch.api.GcpBatchRequestFactory import cromwell.backend.google.batch.io.{DiskType, GcpBatchWorkingDisk} import cromwell.backend.google.batch.models._ @@ -23,6 +24,7 @@ import cromwell.backend.io.JobPathsSpecHelper._ import cromwell.backend.standard.{ DefaultStandardAsyncExecutionActorParams, StandardAsyncExecutionActorParams, + StandardAsyncJob, StandardExpressionFunctionsParams } import cromwell.core._ @@ -33,6 +35,10 @@ import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.drs.DrsPathBuilder import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder, MockGcsPathBuilder} import cromwell.services.keyvalue.InMemoryKvServiceActor +import cromwell.services.metadata.CallMetadataKeys +import cromwell.services.metadata.MetadataService.PutMetadataAction +import cromwell.services.metrics.bard.BardEventing.BardEventRequest +import cromwell.services.metrics.bard.model.TaskSummaryEvent import cromwell.util.JsonFormatting.WomValueJsonFormatter._ import cromwell.util.SampleWdl import org.mockito.Mockito._ @@ -189,6 +195,18 @@ class GcpBatchAsyncBackendJobExecutionActorSpec } override lazy val backendEngineFunctions: BatchExpressionFunctions = functions + + override val pollingResultMonitorActor: Option[ActorRef] = Some( + context.actorOf( + BatchPollResultMonitorActor.props(serviceRegistryActor, + workflowDescriptor, + jobDescriptor, + validatedRuntimeAttributes, + platform, + jobLogger + ) + ) + ) } private val runtimeAttributesBuilder = GcpBatchRuntimeAttributes.runtimeAttributesBuilder(gcpBatchConfiguration) @@ -1187,36 +1205,215 @@ class GcpBatchAsyncBackendJobExecutionActorSpec "stdout" -> s"$batchGcsRoot/wf_hello/$workflowId/call-goodbye/stdout" ) ) + } + + private def buildJobDescriptor(): BackendJobDescriptor = { + val attempt = 1 + val preemptible = 1 + val wdlNamespace = WdlNamespaceWithWorkflow + .load(YoSup.replace("[PREEMPTIBLE]", s"preemptible: $preemptible"), Seq.empty[Draft2ImportResolver]) + .get + val womDefinition = wdlNamespace.workflow + .toWomWorkflowDefinition(isASubworkflow = false) + .getOrElse(fail("failed to get WomDefinition from WdlWorkflow")) + + wdlNamespace.toWomExecutable(Option(Inputs.toJson.compactPrint), NoIoFunctionSet, strictValidation = true) match { + case Right(womExecutable) => + val inputs = for { + combined <- womExecutable.resolvedExecutableInputs + (port, resolvedInput) = combined + value <- resolvedInput.select[WomValue] + } yield port -> value + val workflowDescriptor = BackendWorkflowDescriptor( + WorkflowId.randomId(), + womDefinition, + inputs, + NoOptions, + Labels.empty, + HogGroup("foo"), + List.empty, + None + ) + val job = workflowDescriptor.callable.taskCallNodes.head + val key = BackendJobDescriptorKey(job, None, attempt) + val runtimeAttributes = makeRuntimeAttributes(job) + + BackendJobDescriptor(workflowDescriptor, + key, + runtimeAttributes, + fqnWdlMapToDeclarationMap(Inputs), + NoDocker, + None, + Map() + ) + case Left(badtimes) => fail(badtimes.toList.mkString(", ")) + } } - private def setupBackend: TestableGcpBatchJobExecutionActor = { - val womFile = WomSingleFile("gs://blah/b/c.txt") - val workflowInputs = Map("file_passing.f" -> womFile) - val callInputs = Map( - "in" -> womFile, // how does one programmatically map the wf inputs to the call inputs? - "out_name" -> WomString("out") // is it expected that this isn't using the default? + def buildTestActorRef( + jobDescriptor: BackendJobDescriptor, + serviceRegistryActor: Option[TestProbe] = None + ): TestActorRef[TestableGcpBatchJobExecutionActor] = { + val props = Props( + new TestableGcpBatchJobExecutionActor( + jobDescriptor, + Promise(), + gcpBatchConfiguration, + TestableGcpBatchExpressionFunctions, + emptyActor, + failIoActor, + serviceRegistryActor.map(actor => actor.ref).getOrElse(kvService) + ) ) - makeBatchActorRef(SampleWdl.FilePassingWorkflow, workflowInputs, "a", callInputs).underlyingActor + TestActorRef(props, s"TestableJesJobExecutionActor-${jobDescriptor.workflowDescriptor.id}") } - it should "not try to extract start, cpu start, and end times from terminal run statuses" in { - val jesBackend = setupBackend - - val start = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.HOURS), None) - val middle = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(30, ChronoUnit.MINUTES), None) - val end = ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.MINUTES), None) - val successStatus = RunStatus.Success(Seq(middle, end, start)) + it should "emit expected timing metadata as task executes" in { + val expectedJobStart = OffsetDateTime.now().minus(3, ChronoUnit.HOURS) + val expectedVmStart = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) + val expectedVmEnd = OffsetDateTime.now().minus(1, ChronoUnit.HOURS) + + val pollResult0 = RunStatus.Initializing(Seq.empty) + val pollResult1 = RunStatus.Running(Seq(ExecutionEvent("fakeEvent", expectedJobStart))) + val pollResult2 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, expectedVmStart))) + val pollResult3 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmEndTime, expectedVmEnd))) + val terminalPollResult = + RunStatus.Success(Seq(ExecutionEvent("fakeEvent", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)))) + + val serviceRegistryProbe = TestProbe() + + val jobDescriptor = buildJobDescriptor() + val job = StandardAsyncJob(UUID.randomUUID().toString) + val run = Run(job) + val handle = new GcpBatchPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + val testActorRef = buildTestActorRef(jobDescriptor, Option(serviceRegistryProbe)) + + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult0) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult1) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult2) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case action: PutMetadataAction => + action.events.exists(event => event.key.key.equals(CallMetadataKeys.VmStartTime)) + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult3) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case action: PutMetadataAction => action.events.exists(event => event.key.key.equals(CallMetadataKeys.VmEndTime)) + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, terminalPollResult) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } + } - jesBackend.getStartAndEndTimes(successStatus) shouldBe None + it should "send bard metrics message on task success" in { + val expectedJobStart = OffsetDateTime.now().minus(3, ChronoUnit.HOURS) + val expectedVmStart = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) + val expectedVmEnd = OffsetDateTime.now().minus(1, ChronoUnit.HOURS) + + val pollResult0 = RunStatus.Initializing(Seq.empty) + val pollResult1 = RunStatus.Running(Seq(ExecutionEvent("fakeEvent", expectedJobStart))) + val pollResult2 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, expectedVmStart))) + val pollResult3 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmEndTime, expectedVmEnd))) + val terminalPollResult = + RunStatus.Success(Seq(ExecutionEvent("fakeEvent", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS)))) + + val serviceRegistryProbe = TestProbe() + + val jobDescriptor = buildJobDescriptor() + val job = StandardAsyncJob(jobDescriptor.workflowDescriptor.id.id.toString) + val run = Run(job) + val handle = new GcpBatchPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + val testActorRef = buildTestActorRef(jobDescriptor, Option(serviceRegistryProbe)) + + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult0) + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult1) + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult2) + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult3) + testActorRef.underlyingActor.handlePollSuccess(handle, terminalPollResult) + + val bardMessage = serviceRegistryProbe.fishForMessage(5.seconds) { + case _: BardEventRequest => true + case _ => false + } + val taskSummary = bardMessage.asInstanceOf[BardEventRequest].event.asInstanceOf[TaskSummaryEvent] + taskSummary.workflowId should be(jobDescriptor.workflowDescriptor.id.id) + taskSummary.parentWorkflowId should be(None) + taskSummary.rootWorkflowId should be(jobDescriptor.workflowDescriptor.id.id) + taskSummary.jobTag should be(jobDescriptor.key.tag) + taskSummary.jobFullyQualifiedName should be(jobDescriptor.key.call.fullyQualifiedName) + taskSummary.jobIndex should be(None) + taskSummary.jobAttempt should be(jobDescriptor.key.attempt) + taskSummary.terminalState shouldBe a[String] + taskSummary.platform should be(Some("gcp")) + taskSummary.dockerImage should be(Some("ubuntu:latest")) + taskSummary.cpuCount should be(1) + taskSummary.memoryBytes should be(2.147483648e9) + taskSummary.startTime should not be empty + taskSummary.cpuStartTime should be(Option(expectedVmStart.toString)) + taskSummary.endTime should not be empty + taskSummary.jobSeconds should be(7200) + taskSummary.cpuSeconds should be(Option(3600)) } - it should "return None trying to get start and end times from a status containing no events" in { - val jesBackend = setupBackend + it should "send bard metrics message on task failure" in { + val expectedJobStart = OffsetDateTime.now().minus(3, ChronoUnit.HOURS) + val expectedVmStart = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) - val successStatus = RunStatus.Success(Seq()) + val pollResult0 = RunStatus.Initializing(Seq.empty) + val pollResult1 = RunStatus.Running(Seq(ExecutionEvent("fakeEvent", expectedJobStart))) + val pollResult2 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, expectedVmStart))) + val abortStatus = RunStatus.Aborted( + Seq(ExecutionEvent("got aborted", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))) + ) + + val serviceRegistryProbe = TestProbe() + + val jobDescriptor = buildJobDescriptor() + val job = StandardAsyncJob(jobDescriptor.workflowDescriptor.id.id.toString) + val run = Run(job) + val handle = new GcpBatchPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + val testActorRef = buildTestActorRef(jobDescriptor, Option(serviceRegistryProbe)) + + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult0) + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult1) + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult2) + testActorRef.underlyingActor.handlePollSuccess(handle, abortStatus) + + val bardMessage = serviceRegistryProbe.fishForMessage(5.seconds) { + case _: BardEventRequest => true + case _ => false + } - jesBackend.getStartAndEndTimes(successStatus) shouldBe None + val taskSummary = bardMessage.asInstanceOf[BardEventRequest].event.asInstanceOf[TaskSummaryEvent] + taskSummary.workflowId should be(jobDescriptor.workflowDescriptor.id.id) + taskSummary.parentWorkflowId should be(None) + taskSummary.rootWorkflowId should be(jobDescriptor.workflowDescriptor.id.id) + taskSummary.jobTag should be(jobDescriptor.key.tag) + taskSummary.jobFullyQualifiedName should be(jobDescriptor.key.call.fullyQualifiedName) + taskSummary.jobIndex should be(None) + taskSummary.jobAttempt should be(jobDescriptor.key.attempt) + taskSummary.terminalState shouldBe a[String] + taskSummary.platform should be(Some("gcp")) + taskSummary.dockerImage should be(Some("ubuntu:latest")) + taskSummary.cpuCount should be(1) + taskSummary.memoryBytes should be(2.147483648e9) + taskSummary.startTime should not be empty + taskSummary.cpuStartTime should be(Option(expectedVmStart.toString)) + taskSummary.endTime should not be empty + taskSummary.jobSeconds should be > 0.toLong + taskSummary.cpuSeconds.get should be > 0.toLong } private def makeRuntimeAttributes(job: CommandCallNode) = { diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiCostPollingHelper.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiCostPollingHelper.scala deleted file mode 100644 index 8a6cabdcccd..00000000000 --- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiCostPollingHelper.scala +++ /dev/null @@ -1,22 +0,0 @@ -package cromwell.backend.google.pipelines.common - -import cromwell.backend.google.pipelines.common.api.RunStatus -import cromwell.backend.standard.costestimation.CostPollingHelper -import cromwell.services.metadata.CallMetadataKeys - -import java.time.OffsetDateTime - -class PapiCostPollingHelper(tellMetadataFn: Map[String, Any] => Unit) extends CostPollingHelper[RunStatus] { - - override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = - pollStatus.eventList.collectFirst { - case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime - } - - override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = - pollStatus.eventList.collectFirst { - case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime - } - - override def tellMetadata(metadata: Map[String, Any]): Unit = tellMetadataFn(metadata) -} diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala new file mode 100644 index 00000000000..597c0ed8d35 --- /dev/null +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PapiPollResultMonitorActor.scala @@ -0,0 +1,79 @@ +package cromwell.backend.google.pipelines.common + +import akka.actor.{ActorRef, Props} +import cromwell.backend.{BackendJobDescriptor, BackendWorkflowDescriptor, Platform} +import cromwell.backend.google.pipelines.common.api.RunStatus +import cromwell.backend.standard.pollmonitoring.{ + AsyncJobHasFinished, + PollMonitorParameters, + PollResultMessage, + PollResultMonitorActor, + ProcessThisPollResult +} +import cromwell.backend.validation.ValidatedRuntimeAttributes +import cromwell.core.logging.JobLogger +import cromwell.services.metadata.CallMetadataKeys + +import java.time.OffsetDateTime + +object PapiPollResultMonitorActor { + def props(serviceRegistry: ActorRef, + workflowDescriptor: BackendWorkflowDescriptor, + jobDescriptor: BackendJobDescriptor, + runtimeAttributes: ValidatedRuntimeAttributes, + platform: Option[Platform], + logger: JobLogger + ): Props = Props( + new PapiPollResultMonitorActor( + PollMonitorParameters(serviceRegistry, + workflowDescriptor, + jobDescriptor, + runtimeAttributes, + platform, + Option(logger) + ) + ) + ) +} + +class PapiPollResultMonitorActor(parameters: PollMonitorParameters) extends PollResultMonitorActor[RunStatus] { + + override def extractEarliestEventTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime) + override def extractStartTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.collectFirst { + case event if event.name == CallMetadataKeys.VmStartTime => event.offsetDateTime + } + + override def extractEndTimeFromRunState(pollStatus: RunStatus): Option[OffsetDateTime] = + pollStatus.eventList.collectFirst { + case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime + } + + override def receive: Receive = { + case message: PollResultMessage => + message match { + case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult) + case ProcessThisPollResult(result) => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Received Poll Result of unknown type. Expected ${RunStatus.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." + ) + ) + case AsyncJobHasFinished(pollResult: RunStatus) => handleAsyncJobFinish(pollResult.getClass.getSimpleName) + case AsyncJobHasFinished(result) => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Received Poll Result of unknown type. Expected ${AsyncJobHasFinished.getClass.getSimpleName} but got ${result.getClass.getSimpleName}." + ) + ) + } + case _ => + params.logger.foreach(logger => + logger.error( + s"Programmer error: Cost Helper received message of type other than CostPollingMessage" + ) + ) + } + override def params: PollMonitorParameters = parameters +} diff --git a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala index a1f74cf9d4d..402427dc81b 100644 --- a/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActor.scala @@ -37,7 +37,6 @@ import cromwell.backend.google.pipelines.common.monitoring.{CheckpointingConfigu import cromwell.backend.io.DirectoryFunctions import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion import cromwell.backend.standard._ -import cromwell.backend.standard.costestimation.CostPollingHelper import cromwell.core._ import cromwell.core.io.IoCommandBuilder @@ -62,7 +61,6 @@ import wom.types.{WomArrayType, WomSingleFileType} import wom.values._ import java.net.SocketTimeoutException -import java.time.OffsetDateTime import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps @@ -134,7 +132,7 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta override type StandardAsyncRunState = RunStatus - override val costHelper: Option[CostPollingHelper[RunStatus]] = Option(new PapiCostPollingHelper(tellMetadata)) + override val pollingResultMonitorActor: Option[ActorRef] = Option.empty def statusEquivalentTo(thiz: StandardAsyncRunState)(that: StandardAsyncRunState): Boolean = thiz.toString == that.toString @@ -833,29 +831,6 @@ class PipelinesApiAsyncBackendJobExecutionActor(override val standardParams: Sta throw new RuntimeException(s"handleExecutionSuccess not called with RunStatus.Success. Instead got $unknown") } - override def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = { - // Intuition: - // "job start" is the earliest event time across all events. - // "cpuStart" is obtained from the cost helper. It should be the event time where the user VM started spending money. - // "jobEnd" is obtained from the cost helper, falling back to the last event time, falling back to now. - // We allow fallbacks for end times to account for in progress runs, but generally the end time is only known once we've reached a terminal status. - val jobStart: Option[OffsetDateTime] = runStatus.eventList.minByOption(_.offsetDateTime).map(e => e.offsetDateTime) - val maxEventTime: Option[OffsetDateTime] = - runStatus.eventList.maxByOption(_.offsetDateTime).map(e => e.offsetDateTime) - - costHelper match { - case Some(helper) => - val cpuStart: Option[OffsetDateTime] = helper.vmStartTime - val jobEnd: OffsetDateTime = helper.vmEndTime.orElse(maxEventTime).getOrElse(OffsetDateTime.now()) - jobStart.flatMap(start => Option(StartAndEndTimes(start, cpuStart, jobEnd))) - case None => - jobLogger.error( - "Programmer error: expected costHelper object to be present in PipelinesApiBackendJobExecutionActor" - ) - None - } - } - override def retryEvaluateOutputs(exception: Exception): Boolean = exception match { case aggregated: CromwellAggregatedException => diff --git a/supportedBackends/google/pipelines/common/src/test/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/google/pipelines/common/src/test/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActorSpec.scala index 0be56081e07..8ad083a3297 100644 --- a/supportedBackends/google/pipelines/common/src/test/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/google/pipelines/common/src/test/scala/cromwell/backend/google/pipelines/common/PipelinesApiAsyncBackendJobExecutionActorSpec.scala @@ -26,17 +26,15 @@ import cromwell.backend.async.{ import cromwell.backend.google.pipelines.common.PipelinesApiAsyncBackendJobExecutionActor.JesPendingExecutionHandle import cromwell.backend.google.pipelines.common.api.{PipelinesApiRequestFactory, RunStatus} import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestManager.PAPIStatusPollRequest -import cromwell.backend.google.pipelines.common.api.RunStatus.{Initializing, Running, UnsuccessfulRunStatus} +import cromwell.backend.google.pipelines.common.api.RunStatus.UnsuccessfulRunStatus import cromwell.backend.google.pipelines.common.io.{DiskType, PipelinesApiWorkingDisk} import cromwell.backend.io.JobPathsSpecHelper._ import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion -import cromwell.backend.standard.costestimation.CostPollingHelper import cromwell.backend.standard.{ DefaultStandardAsyncExecutionActorParams, StandardAsyncExecutionActorParams, StandardAsyncJob, - StandardExpressionFunctionsParams, - StartAndEndTimes + StandardExpressionFunctionsParams } import cromwell.core._ import cromwell.core.callcaching.NoDocker @@ -49,6 +47,7 @@ import cromwell.services.instrumentation.InstrumentationService.InstrumentationS import cromwell.services.keyvalue.InMemoryKvServiceActor import cromwell.services.keyvalue.KeyValueServiceActor.{KvGet, KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys +import cromwell.services.metadata.MetadataService.PutMetadataAction import cromwell.services.metrics.bard.BardEventing.BardEventRequest import cromwell.services.metrics.bard.model.TaskSummaryEvent import cromwell.util.JsonFormatting.WomValueJsonFormatter._ @@ -203,7 +202,18 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec override def tag: String = s"$name [UUID(${workflowId.shortString})$jobTag]" override val slf4jLoggers: Set[Logger] = Set.empty } - override val costHelper: Option[CostPollingHelper[RunStatus]] = Some(new PapiCostPollingHelper(tellMetadata)) + override val pollingResultMonitorActor: Option[ActorRef] = Some( + context.actorOf( + PapiPollResultMonitorActor.props(serviceRegistryActor, + workflowDescriptor, + jobDescriptor, + validatedRuntimeAttributes, + platform, + jobLogger + ) + ) + ) + override lazy val backendEngineFunctions: PipelinesApiExpressionFunctions = functions } @@ -372,9 +382,10 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec Await.result(promise.future, Timeout) } - def buildPreemptibleTestActorRef(attempt: Int, - preemptible: Int, - failedRetriesCountOpt: Option[Int] = None + private def buildPreemptibleTestActorRef(attempt: Int, + preemptible: Int, + failedRetriesCountOpt: Option[Int] = None, + serviceRegistryActor: Option[TestProbe] = None ): TestActorRef[TestablePipelinesApiJobExecutionActor] = { // For this test we say that all previous attempts were preempted: val jobDescriptor = buildPreemptibleJobDescriptor(preemptible, @@ -383,12 +394,14 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec failedRetriesCountOpt = failedRetriesCountOpt ) val props = Props( - new TestablePipelinesApiJobExecutionActor(jobDescriptor, - Promise(), - papiConfiguration, - TestableJesExpressionFunctions, - emptyActor, - failIoActor + new TestablePipelinesApiJobExecutionActor( + jobDescriptor, + Promise(), + papiConfiguration, + TestableJesExpressionFunctions, + emptyActor, + failIoActor, + serviceRegistryActor.map(actor => actor.ref).getOrElse(kvService) ) ) TestActorRef(props, s"TestableJesJobExecutionActor-${jobDescriptor.workflowDescriptor.id}") @@ -1769,56 +1782,56 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec makeJesActorRef(SampleWdl.ArrayIO, Map.empty, "serialize", inputs, functions).underlyingActor } - it should "extract start, cpu, and end times from terminal run statuses" in { - val jesBackend = setupBackend - - val earliestEvent = - ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.HOURS), None) - val cpuStartEvent = - ExecutionEvent(CallMetadataKeys.VmStartTime, OffsetDateTime.now().minus(30, ChronoUnit.MINUTES), None) - val cpuEndEvent = - ExecutionEvent(CallMetadataKeys.VmEndTime, OffsetDateTime.now().minus(1, ChronoUnit.MINUTES), None) - val initialPollResult = Initializing(Seq(earliestEvent)) - val cpuStartPollResult = Running(Seq(earliestEvent, cpuStartEvent)) - val cpuEndPollResult = RunStatus.Success(Seq(earliestEvent, cpuStartEvent, cpuEndEvent), None, None, None) - - jesBackend.costHelper.get.processPollResult(initialPollResult) - jesBackend.costHelper.get.processPollResult(cpuStartPollResult) - jesBackend.costHelper.get.processPollResult(cpuEndPollResult) - - jesBackend.getStartAndEndTimes(cpuEndPollResult) shouldBe Some( - StartAndEndTimes(earliestEvent.offsetDateTime, Some(cpuStartEvent.offsetDateTime), cpuEndEvent.offsetDateTime) - ) - } - - it should "not return a cpu start time if one was never recorded" in { - val jesBackend = setupBackend - - val earliestEvent = - ExecutionEvent(UUID.randomUUID().toString, OffsetDateTime.now().minus(1, ChronoUnit.HOURS), None) + it should "emit expected timing metadata as task executes" in { + val expectedJobStart = OffsetDateTime.now().minus(3, ChronoUnit.HOURS) + val expectedVmStart = OffsetDateTime.now().minus(2, ChronoUnit.HOURS) + val expectedVmEnd = OffsetDateTime.now().minus(1, ChronoUnit.HOURS) - // Note how there is no VmStartTime execution event. - val cpuEndEvent = - ExecutionEvent(CallMetadataKeys.VmEndTime, OffsetDateTime.now().minus(1, ChronoUnit.MINUTES), None) - val initialPollResult = Initializing(Seq(earliestEvent)) - val cpuStartPollResult = Running(Seq(earliestEvent)) - val cpuEndPollResult = RunStatus.Success(Seq(earliestEvent, cpuEndEvent), None, None, None) - - jesBackend.costHelper.get.processPollResult(initialPollResult) - jesBackend.costHelper.get.processPollResult(cpuStartPollResult) - jesBackend.costHelper.get.processPollResult(cpuEndPollResult) - - jesBackend.getStartAndEndTimes(cpuEndPollResult) shouldBe Some( - StartAndEndTimes(earliestEvent.offsetDateTime, None, cpuEndEvent.offsetDateTime) + val pollResult0 = RunStatus.Initializing(Seq.empty) + val pollResult1 = RunStatus.Running(Seq(ExecutionEvent("fakeEvent", expectedJobStart))) + val pollResult2 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmStartTime, expectedVmStart))) + val pollResult3 = RunStatus.Running(Seq(ExecutionEvent(CallMetadataKeys.VmEndTime, expectedVmEnd))) + val terminalPollResult = RunStatus.Success( + Seq(ExecutionEvent("fakeEvent", OffsetDateTime.now().truncatedTo(ChronoUnit.MILLIS))), + Option("fakeMachine"), + Option("fakeZone"), + Option("fakeInstance") ) - } - it should "return None trying to get start and end times from a status containing no events" in { - val jesBackend = setupBackend + val serviceRegistryProbe = TestProbe() - val successStatus = RunStatus.Success(Seq(), None, None, None) + val jobDescriptor = buildPreemptibleJobDescriptor(0, 0, 0) + val job = StandardAsyncJob(UUID.randomUUID().toString) + val run = Run(job) + val handle = new JesPendingExecutionHandle(jobDescriptor, run.job, Option(run), None) + val testActorRef = buildPreemptibleTestActorRef(1, 1, None, Option(serviceRegistryProbe)) - jesBackend.getStartAndEndTimes(successStatus) shouldBe None + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult0) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult1) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult2) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case action: PutMetadataAction => + action.events.exists(event => event.key.key.equals(CallMetadataKeys.VmStartTime)) + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, pollResult3) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case action: PutMetadataAction => action.events.exists(event => event.key.key.equals(CallMetadataKeys.VmEndTime)) + case _ => false + } + testActorRef.underlyingActor.handlePollSuccess(handle, terminalPollResult) + serviceRegistryProbe.fishForMessage(max = 5.seconds.dilated, hint = "") { + case _: PutMetadataAction => true + case _ => false + } } it should "send bard metrics message on task success" in { diff --git a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala index d9ccb6c6c7b..bcd146508f1 100644 --- a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala @@ -1,5 +1,6 @@ package cromwell.backend.google.pipelines.v2beta +import akka.actor.ActorRef import cats.data.NonEmptyList import cats.implicits._ import com.google.cloud.storage.contrib.nio.CloudStorageOptions @@ -8,11 +9,10 @@ import cromwell.backend.BackendJobDescriptor import cromwell.backend.google.pipelines.common.PipelinesApiConfigurationAttributes.GcsTransferConfiguration import cromwell.backend.google.pipelines.common._ import cromwell.backend.google.pipelines.common.api.PipelinesApiRequestFactory.CreatePipelineParameters -import cromwell.backend.google.pipelines.common.api.RunStatus import cromwell.backend.google.pipelines.common.io.PipelinesApiWorkingDisk import cromwell.backend.google.pipelines.v2beta.PipelinesApiAsyncBackendJobExecutionActor._ import cromwell.backend.standard.StandardAsyncExecutionActorParams -import cromwell.backend.standard.costestimation.CostPollingHelper +import cromwell.backend.google.pipelines.common.PapiPollResultMonitorActor import cromwell.core.path.{DefaultPathBuilder, Path} import cromwell.filesystems.drs.DrsPath import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath @@ -23,6 +23,7 @@ import org.apache.commons.io.output.ByteArrayOutputStream import wom.core.FullyQualifiedName import wom.expression.FileEvaluation import wom.values.{GlobFunctions, WomFile, WomGlobFile, WomSingleFile, WomUnlistedDirectory} + import java.nio.charset.Charset import java.io.{FileNotFoundException, OutputStreamWriter} import scala.concurrent.Future @@ -68,7 +69,17 @@ class PipelinesApiAsyncBackendJobExecutionActor(standardParams: StandardAsyncExe } } - override val costHelper: Option[CostPollingHelper[RunStatus]] = Some(new PapiCostPollingHelper(tellMetadata)) + override val pollingResultMonitorActor: Option[ActorRef] = Option( + context.actorOf( + PapiPollResultMonitorActor.props(serviceRegistryActor, + workflowDescriptor, + jobDescriptor, + validatedRuntimeAttributes, + platform, + jobLogger + ) + ) + ) private lazy val gcsTransferLibrary = Source.fromInputStream(Thread.currentThread.getContextClassLoader.getResourceAsStream("gcs_transfer.sh")).mkString diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala index f8149035304..df77ffd8903 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala @@ -9,7 +9,6 @@ import akka.http.scaladsl.model._ import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller} import akka.stream.ActorMaterializer import akka.util.ByteString -import cats.data.NonEmptyList import cats.implicits._ import common.collections.EnhancedCollections._ import common.exception.AggregatedMessageException @@ -28,8 +27,7 @@ import cromwell.backend.standard.{ ScriptPreambleData, StandardAsyncExecutionActor, StandardAsyncExecutionActorParams, - StandardAsyncJob, - StartAndEndTimes + StandardAsyncJob } import cromwell.core.logging.JobLogger import cromwell.core.path.{DefaultPathBuilder, Path} @@ -39,14 +37,13 @@ import cromwell.filesystems.blob.{BlobPath, WSMBlobSasTokenGenerator} import cromwell.filesystems.drs.{DrsPath, DrsResolver} import cromwell.filesystems.http.HttpPath import cromwell.services.instrumentation.CromwellInstrumentation -import cromwell.services.instrumentation.CromwellInstrumentation.InstrumentationPath import cromwell.services.metadata.CallMetadataKeys import net.ceedubs.ficus.Ficus._ import wom.values.WomFile import java.io.FileNotFoundException import java.nio.file.FileAlreadyExistsException -import java.time.{Duration, OffsetDateTime} +import java.time.Duration import java.time.temporal.ChronoUnit import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} @@ -156,9 +153,10 @@ object TesAsyncBackendJobExecutionActor { * This assumes that some of the task inputs are blob files, all blob files are in the same container, and we can get a sas * token for this container from WSM. * The task VM will use the user assigned managed identity that it is running as in order to authenticate. - * @param taskInputs The inputs to this particular TesTask. If any are blob files, the first will be used to - * determine the storage container to retrieve the sas token for. - * @param pathGetter A function to convert string filepath into a cromwell Path object. + * + * @param taskInputs The inputs to this particular TesTask. If any are blob files, the first will be used to + * determine the storage container to retrieve the sas token for. + * @param pathGetter A function to convert string filepath into a cromwell Path object. * @param blobConverter A function to convert a Path into a Blob path, if possible. Provided for testing purposes. * @return A URL endpoint that, when called with proper authentication, will return a sas token. * Returns 'None' if one should not be used for this task. @@ -322,7 +320,6 @@ object TesAsyncBackendJobExecutionActor { handle: StandardAsyncPendingExecutionHandle, getTaskLogsFn: StandardAsyncPendingExecutionHandle => Future[Option[TaskLog]], tellMetadataFn: Map[String, Any] => Unit, - tellBardFn: StandardAsyncRunState => Unit, logger: LoggingAdapter )(implicit ec: ExecutionContext): Unit = { val logs = getTaskLogsFn(handle) @@ -346,35 +343,9 @@ object TesAsyncBackendJobExecutionActor { taskEndTime.onComplete { case Success(result) => result.foreach(r => tellMetadataFn(Map(CallMetadataKeys.VmEndTime -> r))) - val newCostData = runStatus.costData.map(_.copy(endTime = result)) - runStatus match { - case _: Complete => tellBardFn(Complete(newCostData)) - case _: Cancelled => tellBardFn(Cancelled(newCostData)) - case failed: Failed => tellBardFn(Failed(sysLogs = failed.sysLogs, costData = newCostData)) - case error: Error => tellBardFn(Error(sysLogs = error.sysLogs, costData = newCostData)) - case _ => () - } case Failure(e) => logger.error(e.getMessage) } } - - def getStartAndEndTimes(runStatus: StandardAsyncRunState, - logger: LoggingAdapter, - incrementFn: (InstrumentationPath, Option[String]) => Unit - ): Option[StartAndEndTimes] = runStatus.costData match { - case Some(TesVmCostData(Some(startTime), Some(endTime), _)) => - Try { - (OffsetDateTime.parse(startTime), OffsetDateTime.parse(endTime)) - } match { - case Success((parsedStartTime, parsedEndTime)) => - Some(StartAndEndTimes(parsedStartTime, Option(parsedStartTime), parsedEndTime)) - case Failure(e: Throwable) => - incrementFn(NonEmptyList.of("parse_tes_timestamp", "failure"), Some("bard")) - logger.error(s"Parsing TES task start and end time failed: $e") - None - } - case _ => None - } } class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyncExecutionActorParams) @@ -565,7 +536,6 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn handle, getTaskLogs, tellMetadata, - tellBard, log ) () @@ -587,7 +557,6 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn tellMetadata, getErrorLogs ) - } private def getTesStatus(state: Option[String], withCostData: Option[TesVmCostData], jobId: String): TesRunStatus = @@ -707,7 +676,5 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn } } yield data - override def getStartAndEndTimes(runStatus: StandardAsyncRunState): Option[StartAndEndTimes] = - TesAsyncBackendJobExecutionActor.getStartAndEndTimes(runStatus, log, increment) override def platform: Option[Platform] = tesConfiguration.platform } diff --git a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActorSpec.scala index dca8268c1bc..83ae233af28 100644 --- a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActorSpec.scala @@ -1,24 +1,21 @@ package cromwell.backend.impl.tes -import akka.event.LoggingAdapter import common.mock.MockSugar import cromwell.backend.async.PendingExecutionHandle -import cromwell.backend.standard.{StandardAsyncJob, StartAndEndTimes} +import cromwell.backend.standard.StandardAsyncJob import cromwell.backend.{BackendJobDescriptorKey, BackendSpec} import cromwell.core.TestKitSuite import cromwell.core.logging.JobLogger import cromwell.core.path.{DefaultPathBuilder, NioPath} import cromwell.filesystems.blob.{BlobFileSystemManager, BlobPath, WSMBlobSasTokenGenerator} import cromwell.filesystems.http.HttpPathBuilder -import cromwell.services.instrumentation.CromwellInstrumentation.InstrumentationPath import org.mockito.ArgumentMatchers.any -import org.mockito.Mockito.verify import org.scalatest.flatspec.AnyFlatSpecLike import org.scalatest.matchers.should.Matchers import org.scalatest.prop.TableDrivenPropertyChecks import wom.graph.CommandCallNode -import java.time.{Duration, OffsetDateTime} +import java.time.Duration import java.time.temporal.ChronoUnit import java.util.UUID import scala.concurrent.{ExecutionContext, Future} @@ -437,71 +434,4 @@ class TesAsyncBackendJobExecutionActorSpec actual shouldBe s"${jobPaths.callInputsDockerRoot}/$localPathInInputDir" } } - - it should "return task start and end time" in { - val status = Complete( - Some( - TesVmCostData(Some("2024-04-04T20:20:32.240066+00:00"), Some("2024-04-04T20:22:32.077818+00:00"), Some("0.203")) - ) - ) - - TesAsyncBackendJobExecutionActor.getStartAndEndTimes(status, - mock[LoggingAdapter], - mock[(InstrumentationPath, Option[String]) => Unit] - ) shouldBe - Some( - StartAndEndTimes( - OffsetDateTime.parse("2024-04-04T20:20:32.240066+00:00"), - Option(OffsetDateTime.parse("2024-04-04T20:20:32.240066+00:00")), - OffsetDateTime.parse("2024-04-04T20:22:32.077818+00:00") - ) - ) - } - - it should "return None when task start or end time are improperly formatted" in { - TesAsyncBackendJobExecutionActor.getStartAndEndTimes( - Complete(Option(TesVmCostData(Option("badlyFormattedTime"), Option("badlyFormattedTime"), None))), - mock[LoggingAdapter], - mock[(InstrumentationPath, Option[String]) => Unit] - ) shouldBe None - - } - - it should "call tellBard with Complete status containing task end time" in { - val runId = StandardAsyncJob(UUID.randomUUID().toString) - val handle = new StandardAsyncPendingExecutionHandle(null, runId, None, None) - val getTaskLogsFn = (_: StandardAsyncPendingExecutionHandle) => - Future.successful( - Some( - TaskLog(Some("2024-04-04T20:20:32.240066+00:00"), - Some("2024-04-04T20:22:32.077818+00:00"), - None, - None, - None, - None - ) - ) - ) - val tellMetadataFn = mock[Map[String, Any] => Unit] - val tellBardFn = mock[TesRunStatus => Unit] - val mockLogger = mock[LoggingAdapter] - - val tesRunStatus = Complete(Some(TesVmCostData(Some("2024-04-04T20:20:32.240066+00:00"), None, Some("0.203")))) - val expectedNewCostData = Some( - TesVmCostData(Some("2024-04-04T20:20:32.240066+00:00"), Some("2024-04-04T20:22:32.077818+00:00"), Some("0.203")) - ) - - TesAsyncBackendJobExecutionActor.onTaskComplete(tesRunStatus, - handle, - getTaskLogsFn, - tellMetadataFn, - tellBardFn, - mockLogger - ) - - // Wait for any futures to complete, I tried using whenReady and it didn't work. - Thread.sleep(500) - - verify(tellBardFn).apply(Complete(expectedNewCostData)) - } }