Skip to content
This repository has been archived by the owner on Oct 23, 2024. It is now read-only.

Commit

Permalink
Remove dependency of HealthCheckManager.add to the AppRepository
Browse files Browse the repository at this point in the history
This fixes a race of the HelathCheckManager expecting a given AppDefinition in the
AppRepository which is not always true (yet) when an app is launched.

This PR removes the dependency by passing in the known AppDefinition from addForAll,
not only the id and version.
  • Loading branch information
sttts authored and aquamatthias committed Apr 15, 2016
1 parent b4af519 commit 2c77e97
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ trait HealthCheckManager {
def list(appId: PathId): Set[HealthCheck]

/**
* Adds a health check for the app with the supplied id.
* Adds a health check of the supplied app.
*/
def add(appId: PathId, version: Timestamp, healthCheck: HealthCheck): Unit
def add(appDefinition: AppDefinition, healthCheck: HealthCheck): Unit

/**
* Adds all health checks for the supplied app.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,35 +54,31 @@ class MarathonHealthCheckManager @Inject() (
ahcs(appId)(appVersion)
}

override def add(appId: PathId, appVersion: Timestamp, healthCheck: HealthCheck): Unit =
override def add(app: AppDefinition, healthCheck: HealthCheck): Unit =
appHealthChecks.writeLock { ahcs =>
val healthChecksForApp = listActive(appId, appVersion)
val healthChecksForApp = listActive(app.id, app.version)

if (healthChecksForApp.exists(_.healthCheck == healthCheck))
log.debug(s"Not adding duplicate health check for app [$appId] and version [$appVersion]: [$healthCheck]")
log.debug(s"Not adding duplicate health check for app [$app.id] and version [${app.version}]: [$healthCheck]")

else {
log.info(s"Adding health check for app [$appId] and version [$appVersion]: [$healthCheck]")
Await.result(appRepository.app(appId, appVersion), zkConf.zkTimeoutDuration) match {
case Some(app: AppDefinition) =>
val ref = system.actorOf(
HealthCheckActor.props(app, driverHolder, healthCheck, taskTracker, eventBus))
val newHealthChecksForApp =
healthChecksForApp + ActiveHealthCheck(healthCheck, ref)

val appMap = ahcs(appId) + (appVersion -> newHealthChecksForApp)
ahcs += appId -> appMap

eventBus.publish(AddHealthCheck(appId, appVersion, healthCheck))
case None =>
log.warn(s"Couldn't add health check for app [$appId] and version [$appVersion] - app definition not found")
}
log.info(s"Adding health check for app [${app.id}] and version [${app.version}]: [$healthCheck]")

val ref = system.actorOf(
HealthCheckActor.props(app, driverHolder, healthCheck, taskTracker, eventBus))
val newHealthChecksForApp =
healthChecksForApp + ActiveHealthCheck(healthCheck, ref)

val appMap = ahcs(app.id) + (app.version -> newHealthChecksForApp)
ahcs += app.id -> appMap

eventBus.publish(AddHealthCheck(app.id, app.version, healthCheck))
}
}

override def addAllFor(app: AppDefinition): Unit =
appHealthChecks.writeLock { _ => // atomically add all checks
app.healthChecks.foreach(add(app.id, app.version, _))
app.healthChecks.foreach(add(app, _))
}

override def remove(appId: PathId, appVersion: Timestamp, healthCheck: HealthCheck): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,20 @@ class MarathonHealthCheckManagerTest
}
}

test("Add") {
test("Add for a known app") {
val app: AppDefinition = AppDefinition(id = appId)
appRepository.store(app).futureValue

val healthCheck = HealthCheck()
hcManager.add(appId, app.version, healthCheck)
hcManager.add(app, healthCheck)
assert(hcManager.list(appId).size == 1)
}

test("Add for not-yet-known app") {
val app: AppDefinition = AppDefinition(id = appId)

val healthCheck = HealthCheck()
hcManager.add(app, healthCheck)
assert(hcManager.list(appId).size == 1)
}

Expand All @@ -133,7 +141,7 @@ class MarathonHealthCheckManagerTest
taskCreationHandler.created(TaskStateOp.LaunchEphemeral(marathonTask)).futureValue
stateOpProcessor.process(update).futureValue

hcManager.add(appId, app.version, healthCheck)
hcManager.add(app, healthCheck)

val status1 = hcManager.status(appId, taskId).futureValue
assert(status1 == Seq(Health(taskId)))
Expand Down Expand Up @@ -164,7 +172,7 @@ class MarathonHealthCheckManagerTest
val version = app.version

val healthCheck = HealthCheck(protocol = Protocol.COMMAND, gracePeriod = 0.seconds)
hcManager.add(appId, version, healthCheck)
hcManager.add(app, healthCheck)

val task1 = makeRunningTask(appId, version)
val task2 = makeRunningTask(appId, version)
Expand Down

0 comments on commit 2c77e97

Please sign in to comment.