Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WX-1670] Don't allocate job tokens for hog groups experiencing quota exhaustion #7520

Merged
merged 18 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## 88 Release Notes

### New feature: Prevent Job start during Cloud Quota exhaustion

This optional feature prevents Cromwell from starting new jobs in a group that is currently experiencing
cloud quota exhaustion. Jobs will be started once the group's quota becomes available. To enable this feature,
set `quota-exhaustion-job-start-control.enabled` to true.

### Java 17

As of this version, a distribution of Java 17 is required to run Cromwell. Cromwell is developed, tested, and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package cromwell.backend.standard

import akka.actor.{Actor, ActorLogging, Props}
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.dispatch.MessageDispatcher
import common.util.StringUtil.EnhancedToStringable
import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion
import cromwell.backend.standard.GroupMetricsActor._
import cromwell.core.Dispatcher
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.database.sql.EngineSqlDatabase
import cromwell.database.sql.SqlConverters.OffsetDateTimeToSystemTimestamp
import cromwell.database.sql.tables.GroupMetricsEntry

import java.time.OffsetDateTime
import scala.util.{Failure, Success}

class GroupMetricsActor(engineDbInterface: EngineSqlDatabase) extends Actor with ActorLogging {
class GroupMetricsActor(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long)
extends Actor
with ActorLogging {

implicit val ec: MessageDispatcher = context.system.dispatchers.lookup(Dispatcher.EngineDispatcher)

Expand All @@ -21,6 +24,16 @@
val groupMetricsEntry = GroupMetricsEntry(group, OffsetDateTime.now.toSystemTimestamp)
engineDbInterface.recordGroupMetricsEntry(groupMetricsEntry)
()
case GetQuotaExhaustedGroups =>
val respondTo: ActorRef = sender()

Check warning on line 28 in backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala#L27-L28

Added lines #L27 - L28 were not covered by tests

// for a group in the GROUP_METRICS_ENTRY table, if the 'quota_exhaustion_detected' timestamp hasn't
// been updated in last X minutes it is no longer experiencing cloud quota exhaustion
val currentTimestampMinusDelay = OffsetDateTime.now().minusMinutes(quotaExhaustionThresholdInMins)
engineDbInterface.getQuotaExhaustedGroups(currentTimestampMinusDelay.toSystemTimestamp) onComplete {
case Success(quotaExhaustedGroups) => respondTo ! GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups.toList)
case Failure(exception) => respondTo ! GetQuotaExhaustedGroupsFailure(exception.getMessage)

Check warning on line 35 in backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala

View check run for this annotation

Codecov / codecov/patch

backend/src/main/scala/cromwell/backend/standard/GroupMetricsActor.scala#L32-L35

Added lines #L32 - L35 were not covered by tests
}
case other =>
log.error(
s"Programmer Error: Unexpected message ${other.toPrettyElidedString(1000)} received by ${this.self.path.name}."
Expand All @@ -30,10 +43,16 @@

object GroupMetricsActor {

// Requests
sealed trait GroupMetricsActorMessage

case class RecordGroupQuotaExhaustion(group: String) extends GroupMetricsActorMessage
case object GetQuotaExhaustedGroups extends GroupMetricsActorMessage

// Responses
sealed trait GetQuotaExhaustedGroupsResponse
case class GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups: List[String]) extends GetQuotaExhaustedGroupsResponse
case class GetQuotaExhaustedGroupsFailure(errorMsg: String) extends GetQuotaExhaustedGroupsResponse

def props(engineDbInterface: EngineSqlDatabase): Props =
Props(new GroupMetricsActor(engineDbInterface)).withDispatcher(EngineDispatcher)
def props(engineDbInterface: EngineSqlDatabase, quotaExhaustionThresholdInMins: Long): Props =
Props(new GroupMetricsActor(engineDbInterface, quotaExhaustionThresholdInMins)).withDispatcher(EngineDispatcher)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package cromwell.backend.standard
import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.backend.standard.GroupMetricsActor.RecordGroupQuotaExhaustion
import cromwell.backend.standard.GroupMetricsActor.{
GetQuotaExhaustedGroups,
GetQuotaExhaustedGroupsSuccess,
RecordGroupQuotaExhaustion
}
import cromwell.database.slick.EngineSlickDatabase
import cromwell.database.sql.tables.GroupMetricsEntry
import cromwell.services.EngineServicesStore
Expand All @@ -12,6 +16,8 @@ import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.sql.Timestamp
import scala.concurrent.duration.DurationInt
import scala.concurrent.{ExecutionContext, Future}

class GroupMetricsActorSpec extends AnyFlatSpec with Matchers {
Expand All @@ -30,18 +36,35 @@ class GroupMetricsActorSpec extends AnyFlatSpec with Matchers {
recordMethodCallCount = recordMethodCallCount + 1
Future.successful(())
}

override def getQuotaExhaustedGroups(thresholdTimestamp: Timestamp)(implicit
ec: ExecutionContext
): Future[Seq[String]] =
Future.successful(List(testHogGroup))
}.initialized(EngineServicesStore.EngineLiquibaseSettings)

behavior of "GroupMetricsActor"

it should "receive new quota exhaustion message and call database function" in {
val db = databaseInterface()
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db))
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15))

mockGroupMetricsActor.tell(RecordGroupQuotaExhaustion(testHogGroup), TestProbe().ref)

eventually {
recordMethodCallCount shouldBe 1
}
}

it should "respond with groups in quota exhaustion" in {
val db = databaseInterface()
val mockGroupMetricsActor = TestActorRef(GroupMetricsActor.props(db, 15))
val requestActor = TestProbe()

mockGroupMetricsActor.tell(GetQuotaExhaustedGroups, requestActor.ref)

requestActor.expectMsgPF(5.seconds) { case GetQuotaExhaustedGroupsSuccess(expectedList) =>
expectedList shouldBe List(testHogGroup)
}
}
}
9 changes: 9 additions & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ system {
# token-log-interval-seconds = 300
}

# If enabled, Cromwell will not allocate new execution tokens to jobs whose hog groups are actively
# experiencing quota exhaustion.
quota-exhaustion-job-start-control {
enabled = false
# threshold (in minutes) after which a group in GROUP_METRICS_ENTRY table is no longer considered to be
# actively experiencing quota exhaustion
threshold-minutes = 15
}

workflow-heartbeats {
heartbeat-interval: 2 minutes
ttl: 10 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.database.slick
import cromwell.database.sql.GroupMetricsSqlDatabase
import cromwell.database.sql.tables.GroupMetricsEntry

import java.sql.Timestamp
import scala.concurrent.{ExecutionContext, Future}

trait GroupMetricsSlickDatabase extends GroupMetricsSqlDatabase {
Expand All @@ -29,4 +30,11 @@ trait GroupMetricsSlickDatabase extends GroupMetricsSqlDatabase {
val action = dataAccess.countGroupMetricsEntriesForGroupId(groupId).result
runTransaction(action)
}

override def getQuotaExhaustedGroups(
thresholdTimestamp: Timestamp
)(implicit ec: ExecutionContext): Future[Seq[String]] = {
val action = dataAccess.groupsExperiencingQuotaExhaustion(thresholdTimestamp).result
runTransaction(action)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ trait GroupMetricsEntryComponent {
} yield groupMetricsEntry
}.size
)

val groupsExperiencingQuotaExhaustion = Compiled((thresholdTimestamp: Rep[Timestamp]) =>
for {
groupMetricsEntry <- groupMetricsEntries
if groupMetricsEntry.quotaExhaustionDetected > thresholdTimestamp
} yield groupMetricsEntry.groupId
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.database.sql

import cromwell.database.sql.tables.GroupMetricsEntry

import java.sql.Timestamp
import scala.concurrent.{ExecutionContext, Future}

trait GroupMetricsSqlDatabase {
Expand All @@ -18,4 +19,9 @@ trait GroupMetricsSqlDatabase {
* Returns number of entries associated with given group
*/
def countGroupMetricsEntries(groupId: String)(implicit ec: ExecutionContext): Future[Int]

/**
* Returns list of groups **currently** experiencing quota exhaustion
*/
def getQuotaExhaustedGroups(thresholdTimestamp: Timestamp)(implicit ec: ExecutionContext): Future[Seq[String]]
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package cromwell.engine.workflow.tokens

import java.time.OffsetDateTime

import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, Timers}
import akka.pattern.ask
import akka.util.Timeout
import cats.data.NonEmptyList
import cromwell.backend.standard.GroupMetricsActor.{
GetQuotaExhaustedGroups,
GetQuotaExhaustedGroupsFailure,
GetQuotaExhaustedGroupsResponse,
GetQuotaExhaustedGroupsSuccess
}
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.core.JobToken._
import cromwell.core.instrumentation.InstrumentationPrefixes.ServicesPrefix
Expand All @@ -16,20 +22,23 @@
import cromwell.services.instrumentation.{CromwellInstrumentation, CromwellInstrumentationScheduler}
import cromwell.services.loadcontroller.LoadControllerService.ListenToLoadController
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import io.circe.generic.JsonCodec
import io.circe.Printer
import io.github.andrebeat.pool.Lease
import io.circe.syntax._
import io.circe.generic.JsonCodec
import io.circe.generic.semiauto._
import io.circe.syntax._
import io.github.andrebeat.pool.Lease

import java.time.OffsetDateTime
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

class JobTokenDispenserActor(override val serviceRegistryActor: ActorRef,
override val dispensingRate: DynamicRateLimiter.Rate,
logInterval: Option[FiniteDuration],
dispenserType: String,
tokenAllocatedDescription: String
tokenAllocatedDescription: String,
groupMetricsActor: Option[ActorRef]
) extends Actor
with ActorLogging
with JobInstrumentation
Expand All @@ -38,6 +47,8 @@
with DynamicRateLimiter
with CromwellInstrumentation {

implicit val ec: ExecutionContext = context.dispatcher

// Metrics paths are based on the dispenser type
private val tokenDispenserMetricsBasePath: NonEmptyList[String] = NonEmptyList.of("token_dispenser", dispenserType)

Expand All @@ -51,6 +62,8 @@
private val tokensLeasedMetricPath: NonEmptyList[String] = tokenDispenserMetricsActivityRates :+ "tokens_dispensed"
private val tokensReturnedMetricPath: NonEmptyList[String] = tokenDispenserMetricsActivityRates :+ "tokens_returned"

final private val groupMetricsTimeout = Timeout(60.seconds)

/**
* Lazily created token queue. We only create a queue for a token type when we need it
*/
Expand Down Expand Up @@ -100,7 +113,7 @@
case JobTokenReturn => release(sender())
case TokensAvailable(n) =>
emitHeartbeatMetrics()
dispense(n)
checkAndDispenseTokens(n)
case Terminated(terminee) => onTerminate(terminee)
case LogJobTokenAllocation(nextInterval) => logTokenAllocation(nextInterval)
case FetchLimitedGroups => sender() ! tokenExhaustedGroups
Expand All @@ -125,11 +138,34 @@
()
}

private def dispense(n: Int) = if (tokenQueues.nonEmpty) {
private def checkAndDispenseTokens(n: Int): Unit =
if (tokenQueues.nonEmpty) {
// don't fetch cloud quota exhausted groups for token dispenser allocating 'restart' tokens
(dispenserType, groupMetricsActor) match {
case ("execution", Some(gmActor)) =>
gmActor
.ask(GetQuotaExhaustedGroups)(groupMetricsTimeout)
.mapTo[GetQuotaExhaustedGroupsResponse] onComplete {
case Success(GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups)) => dispense(n, quotaExhaustedGroups)

Check warning on line 149 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala#L146-L149

Added lines #L146 - L149 were not covered by tests
case Success(GetQuotaExhaustedGroupsFailure(errorMsg)) =>
log.error(s"Failed to fetch quota exhausted groups. Error: $errorMsg")
dispense(n, List.empty)

Check warning on line 152 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala#L151-L152

Added lines #L151 - L152 were not covered by tests
case Failure(exception) =>
log.error(s"Unexpected failure while fetching quota exhausted groups. Error: ${exception.getMessage}")
dispense(n, List.empty)

Check warning on line 155 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala

View check run for this annotation

Codecov / codecov/patch

engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala#L154-L155

Added lines #L154 - L155 were not covered by tests
}
case _ => dispense(n, List.empty)
}
}

// Sort by backend name to avoid re-ordering across iterations:
private def dispense(n: Int, quotaExhaustedGroups: List[String]): Unit = {
// Sort by backend name to avoid re-ordering across iterations. The RoundRobinQueueIterator will only fetch job
// requests from a hog group that is not experiencing cloud quota exhaustion.
val iterator =
new RoundRobinQueueIterator(tokenQueues.toList.sortBy(_._1.backend).map(_._2), currentTokenQueuePointer)
new RoundRobinQueueIterator(tokenQueues.toList.sortBy(_._1.backend).map(_._2),
currentTokenQueuePointer,
quotaExhaustedGroups
)

// In rare cases, an abort might empty an inner queue between "available" and "dequeue", which could cause an
// exception.
Expand Down Expand Up @@ -245,9 +281,18 @@
rate: DynamicRateLimiter.Rate,
logInterval: Option[FiniteDuration],
dispenserType: String,
tokenAllocatedDescription: String
tokenAllocatedDescription: String,
groupMetricsActor: Option[ActorRef]
): Props =
Props(new JobTokenDispenserActor(serviceRegistryActor, rate, logInterval, dispenserType, tokenAllocatedDescription))
Props(
new JobTokenDispenserActor(serviceRegistryActor,
rate,
logInterval,
dispenserType,
tokenAllocatedDescription,
groupMetricsActor
)
)
.withDispatcher(EngineDispatcher)

case class JobTokenRequest(hogGroup: HogGroup, jobTokenType: JobTokenType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import cromwell.engine.workflow.tokens.TokenQueue.{DequeueResult, LeasedActor}
* It will keep rotating the list until it finds a queue with an element that can be dequeued.
* If no queue can be dequeued, the iterator is empty.
*/
final class RoundRobinQueueIterator(initialTokenQueue: List[TokenQueue], initialPointer: Int)
extends Iterator[LeasedActor] {
final class RoundRobinQueueIterator(initialTokenQueue: List[TokenQueue],
initialPointer: Int,
excludedGroups: List[String]
) extends Iterator[LeasedActor] {
// Assumes the number of queues won't change during iteration (it really shouldn't !)
private val numberOfQueues = initialTokenQueue.size
// Indicate the index of next queue to try to dequeue from.
Expand All @@ -29,7 +31,9 @@ final class RoundRobinQueueIterator(initialTokenQueue: List[TokenQueue], initial
*/
def updatedPointer = pointer

def hasNext = tokenQueues.exists(_.available)
// check if there is a request whose hog group has tokens available and is not in excluded groups list
def hasNext = tokenQueues.exists(_.available(excludedGroups))

def next() = findFirst.getOrElse(unexpectedlyEmpty)

def unexpectedlyEmpty: LeasedActor =
Expand All @@ -44,7 +48,8 @@ final class RoundRobinQueueIterator(initialTokenQueue: List[TokenQueue], initial
// For instance, if we have 5 queues and pointer is 2, we want to try indices (2, 3, 4, 0, 1)

val indexStream = ((pointer until numberOfQueues) ++ (0 until pointer)).to(LazyList)
val dequeuedTokenStream = indexStream.map(index => tokenQueues(index).dequeue -> index)
val dequeuedTokenStream: Seq[(DequeueResult, Int)] =
indexStream.map(index => tokenQueues(index).dequeue(excludedGroups) -> index)

val firstLeasedActor = dequeuedTokenStream.collectFirst {
case (DequeueResult(Some(dequeuedActor), newTokenQueue), index) =>
Expand Down
Loading
Loading