-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WX-1670] Don't allocate job tokens for hog groups experiencing quota exhaustion #7520
Changes from 15 commits
7f5773f
ce63b6a
95bb235
01b5282
44252c4
5e4f331
59ab583
63ab888
f21b11f
ad935b9
94fcaac
7da14d7
471a798
be981b6
83a6841
256ad35
5df2a18
faaad44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,14 @@ | |
|
||
## 88 Release Notes | ||
|
||
### New feature: Prevent Job start during Cloud Quota exhaustion | ||
|
||
This optional feature prevents Cromwell from starting new jobs in a billing project that is currently experiencing | ||
cloud quota exhaustion. Jobs will be started once the project's quota becomes available. To enable this feature, | ||
set `quota-exhaustion-job-start-control.enabled` to true. | ||
|
||
Note: Jobs that are being restarted will not be affected by this feature, even if it is enabled. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect that this note will confuse people, since there are so many kinds of job restarts. I think we can probably just take it out, users probably need to understand a lot about Cromwell internals to even have this question. 😂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I added this so that users know that restarts won't affect their jobs not "being restarted" but I didn't know there could be different kind of restarts. I am happy to remove this. |
||
|
||
### Java 17 | ||
|
||
As of this version, a distribution of Java 17 is required to run Cromwell. Cromwell is developed, tested, and | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -270,6 +270,15 @@ system { | |
# token-log-interval-seconds = 300 | ||
} | ||
|
||
# If enabled, Cromwell will not allocate new execution tokens to jobs whose hog groups is actively | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. grammar nit - "hog groups are actively" |
||
# experiencing quota exhaustion. | ||
quota-exhaustion-job-start-control { | ||
enabled = false | ||
# threshold (in minutes) after which a group in GROUP_METRICS_ENTRY table is no longer considered to be | ||
# actively experiencing quota exhaustion | ||
threshold-minutes = 15 | ||
} | ||
|
||
workflow-heartbeats { | ||
heartbeat-interval: 2 minutes | ||
ttl: 10 minutes | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,15 @@ | ||
package cromwell.engine.workflow.tokens | ||
|
||
import java.time.OffsetDateTime | ||
|
||
import akka.actor.{Actor, ActorLogging, ActorRef, Props, Terminated, Timers} | ||
import akka.pattern.ask | ||
import akka.util.Timeout | ||
import cats.data.NonEmptyList | ||
import cromwell.backend.standard.GroupMetricsActor.{ | ||
GetQuotaExhaustedGroups, | ||
GetQuotaExhaustedGroupsFailure, | ||
GetQuotaExhaustedGroupsResponse, | ||
GetQuotaExhaustedGroupsSuccess | ||
} | ||
import cromwell.core.Dispatcher.EngineDispatcher | ||
import cromwell.core.JobToken._ | ||
import cromwell.core.instrumentation.InstrumentationPrefixes.ServicesPrefix | ||
|
@@ -16,20 +22,23 @@ | |
import cromwell.services.instrumentation.{CromwellInstrumentation, CromwellInstrumentationScheduler} | ||
import cromwell.services.loadcontroller.LoadControllerService.ListenToLoadController | ||
import cromwell.util.GracefulShutdownHelper.ShutdownCommand | ||
import io.circe.generic.JsonCodec | ||
import io.circe.Printer | ||
import io.github.andrebeat.pool.Lease | ||
import io.circe.syntax._ | ||
import io.circe.generic.JsonCodec | ||
import io.circe.generic.semiauto._ | ||
import io.circe.syntax._ | ||
import io.github.andrebeat.pool.Lease | ||
|
||
import java.time.OffsetDateTime | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.duration._ | ||
import scala.util.{Failure, Success, Try} | ||
|
||
class JobTokenDispenserActor(override val serviceRegistryActor: ActorRef, | ||
override val dispensingRate: DynamicRateLimiter.Rate, | ||
logInterval: Option[FiniteDuration], | ||
dispenserType: String, | ||
tokenAllocatedDescription: String | ||
tokenAllocatedDescription: String, | ||
groupMetricsActor: Option[ActorRef] | ||
) extends Actor | ||
with ActorLogging | ||
with JobInstrumentation | ||
|
@@ -38,6 +47,8 @@ | |
with DynamicRateLimiter | ||
with CromwellInstrumentation { | ||
|
||
implicit val ec: ExecutionContext = context.dispatcher | ||
|
||
// Metrics paths are based on the dispenser type | ||
private val tokenDispenserMetricsBasePath: NonEmptyList[String] = NonEmptyList.of("token_dispenser", dispenserType) | ||
|
||
|
@@ -51,6 +62,8 @@ | |
private val tokensLeasedMetricPath: NonEmptyList[String] = tokenDispenserMetricsActivityRates :+ "tokens_dispensed" | ||
private val tokensReturnedMetricPath: NonEmptyList[String] = tokenDispenserMetricsActivityRates :+ "tokens_returned" | ||
|
||
final private val groupMetricsTimeout = Timeout(60.seconds) | ||
|
||
/** | ||
* Lazily created token queue. We only create a queue for a token type when we need it | ||
*/ | ||
|
@@ -100,7 +113,7 @@ | |
case JobTokenReturn => release(sender()) | ||
case TokensAvailable(n) => | ||
emitHeartbeatMetrics() | ||
dispense(n) | ||
checkAndDispenseTokens(n) | ||
case Terminated(terminee) => onTerminate(terminee) | ||
case LogJobTokenAllocation(nextInterval) => logTokenAllocation(nextInterval) | ||
case FetchLimitedGroups => sender() ! tokenExhaustedGroups | ||
|
@@ -125,11 +138,34 @@ | |
() | ||
} | ||
|
||
private def dispense(n: Int) = if (tokenQueues.nonEmpty) { | ||
private def checkAndDispenseTokens(n: Int): Unit = | ||
if (tokenQueues.nonEmpty) { | ||
// don't fetch cloud quota exhausted groups for token dispenser allocating 'restart' tokens | ||
if (dispenserType == "execution" && groupMetricsActor.nonEmpty) { | ||
groupMetricsActor.get | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We try to avoid unprotected
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it's true that the current code state is guaranteed to not cause a problem, but the compiler isn't checking that, and it's easy for the association between the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see your point. I will change it not use |
||
.ask(GetQuotaExhaustedGroups)(groupMetricsTimeout) | ||
.mapTo[GetQuotaExhaustedGroupsResponse] onComplete { | ||
case Success(GetQuotaExhaustedGroupsSuccess(quotaExhaustedGroups)) => dispense(n, quotaExhaustedGroups) | ||
Check warning on line 148 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala
|
||
case Success(GetQuotaExhaustedGroupsFailure(errorMsg)) => | ||
log.error(s"Failed to fetch quota exhausted groups. Error: $errorMsg") | ||
dispense(n, List.empty) | ||
Check warning on line 151 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala
|
||
case Failure(exception) => | ||
log.error(s"Unexpected failure while fetching quota exhausted groups. Error: ${exception.getMessage}") | ||
dispense(n, List.empty) | ||
Check warning on line 154 in engine/src/main/scala/cromwell/engine/workflow/tokens/JobTokenDispenserActor.scala
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I appreciate the fail-safe approach here. |
||
} | ||
} else { | ||
dispense(n, List.empty) | ||
} | ||
} | ||
|
||
// Sort by backend name to avoid re-ordering across iterations: | ||
private def dispense(n: Int, quotaExhaustedGroups: List[String]): Unit = { | ||
// Sort by backend name to avoid re-ordering across iterations. The RoundRobinQueueIterator will only fetch job | ||
// requests from a hog group that is not experiencing cloud quota exhaustion. | ||
val iterator = | ||
new RoundRobinQueueIterator(tokenQueues.toList.sortBy(_._1.backend).map(_._2), currentTokenQueuePointer) | ||
new RoundRobinQueueIterator(tokenQueues.toList.sortBy(_._1.backend).map(_._2), | ||
currentTokenQueuePointer, | ||
quotaExhaustedGroups | ||
) | ||
|
||
// In rare cases, an abort might empty an inner queue between "available" and "dequeue", which could cause an | ||
// exception. | ||
|
@@ -245,9 +281,18 @@ | |
rate: DynamicRateLimiter.Rate, | ||
logInterval: Option[FiniteDuration], | ||
dispenserType: String, | ||
tokenAllocatedDescription: String | ||
tokenAllocatedDescription: String, | ||
groupMetricsActor: Option[ActorRef] | ||
): Props = | ||
Props(new JobTokenDispenserActor(serviceRegistryActor, rate, logInterval, dispenserType, tokenAllocatedDescription)) | ||
Props( | ||
new JobTokenDispenserActor(serviceRegistryActor, | ||
rate, | ||
logInterval, | ||
dispenserType, | ||
tokenAllocatedDescription, | ||
groupMetricsActor | ||
) | ||
) | ||
.withDispatcher(EngineDispatcher) | ||
|
||
case class JobTokenRequest(hogGroup: HogGroup, jobTokenType: JobTokenType) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we say "in a hog group" here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can replace
billing project
withhog group
👍There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm of the belief that we should not use "hog group" in any new, external-facing places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think would be clearer? I don't think "billing project" is either accurate in Terra or useful to other users.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just
group
might work?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I usually just use group.