Skip to content

Commit

Permalink
Add SetupAdapter plugin type
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-kraemer committed Sep 27, 2024
1 parent 4509f48 commit 8ab1b04
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 19 deletions.
85 changes: 68 additions & 17 deletions src/main/kotlin/cloud/CloudManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import agent.AgentRegistryFactory
import cloud.template.AttachedVolume
import cloud.template.ProvisioningTemplateExtension
import com.fasterxml.jackson.module.kotlin.convertValue
import db.PluginRegistry
import db.PluginRegistryFactory
import db.SetupRegistryFactory
import db.VMRegistry
import db.VMRegistryFactory
Expand All @@ -35,6 +37,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import model.cloud.PoolAgentParams
import model.cloud.VM
import model.plugins.call
import model.retry.RetryPolicy
import model.setup.Setup
import model.setup.Volume
Expand Down Expand Up @@ -97,6 +100,22 @@ class CloudManager : CoroutineVerticle() {
private const val CLUSTER_MAP_CIRCUIT_BREAKERS = "CloudManager.Map.CircuitBreakers"
}

/**
* Contains information about a VM to create
*/
private data class VMToCreate(
/**
* The VM to create (with the actual setup from which it should be created)
*/
val vm: VM,

/**
* The original setup. May differ from the VM's setup if it has been
* modified by setup adapter plugins.
*/
val originalSetup: Setup
)

/**
* The client to connect to the Cloud
*/
Expand Down Expand Up @@ -156,6 +175,11 @@ class CloudManager : CoroutineVerticle() {
*/
private lateinit var setupCircuitBreakers: VMCircuitBreakerMap

/**
* The plugin registry
*/
private val pluginRegistry: PluginRegistry = PluginRegistryFactory.create()

/**
* A cluster-wide semaphore to prevent [sync] from being called multiple
* times in parallel
Expand Down Expand Up @@ -475,7 +499,7 @@ class CloudManager : CoroutineVerticle() {
if (!cleanupOnly) {
// ensure there's a minimum number of VMs
launch {
createRemoteAgent { setupSelector.selectMinimum(setups) }
createRemoteAgent(emptyList()) { setupSelector.selectMinimum(setups) }
}
}
} finally {
Expand Down Expand Up @@ -561,21 +585,44 @@ class CloudManager : CoroutineVerticle() {
break
}

val result = createRemoteAgent { setupSelector.select(remaining, requiredCapabilities, possibleSetups) }
val result = createRemoteAgent(requiredCapabilities) {
setupSelector.select(remaining, requiredCapabilities, possibleSetups)
}
remaining = result.count { !it.second }.toLong()
}
}

private suspend fun createRemoteAgent(selector: suspend () -> List<Setup>): List<Pair<VM, Boolean>> {
/**
* Applies all setup adapter plugins to the given setup and returns the new
* instance. If there are no plugins or if they did not make any modifications,
* the method returns the original setup.
*/
private suspend fun applyPlugins(setup: Setup,
requiredCapabilities: Collection<String>): Setup {
val adapters = pluginRegistry.getSetupAdapters()
var result = setup
for (adapter in adapters) {
result = adapter.call(result, requiredCapabilities, vertx)
}
return result
}

private suspend fun createRemoteAgent(requiredCapabilities: Collection<String>,
selector: suspend () -> List<Setup>): List<Pair<VM, Boolean>> {
// atomically create VM entries in the registry
val sharedData = vertx.sharedData()
val lock = sharedData.getLock(LOCK_VMS).coAwait()
val vmsToCreate = try {
val setupsToCreate = selector()
setupsToCreate.map { setup ->
VM(setup = setup).also {
vmRegistry.addVM(it)
} to setup
// call setup adapters and modify setup if necessary
val modifiedSetup = applyPlugins(setup, requiredCapabilities)

// add VM to registry
val vm = VM(setup = modifiedSetup)
vmRegistry.addVM(vm)

VMToCreate(vm = vm, originalSetup = setup)
}
} finally {
lock.release()
Expand All @@ -589,30 +636,30 @@ class CloudManager : CoroutineVerticle() {
* Return a list that contains pairs of a VM and a boolean telling if the
* VM was created successfully or not.
*/
private suspend fun createRemoteAgents(vmsToCreate: List<Pair<VM, Setup>>): List<Pair<VM, Boolean>> {
private suspend fun createRemoteAgents(vmsToCreate: List<VMToCreate>): List<Pair<VM, Boolean>> {
val sharedData = vertx.sharedData()
val deferreds = vmsToCreate.map { (vm, setup) ->
val deferreds = vmsToCreate.map { (vm, originalSetup) ->
// create multiple VMs in parallel
async {
// hold a lock as long as we are creating this VM
val creatingLock = sharedData.getLock(VM_CREATION_LOCK_PREFIX + vm.id).coAwait()
try {
log.info("Creating virtual machine ${vm.id} with setup `${setup.id}' ...")
log.info("Creating virtual machine ${vm.id} with setup `${vm.setup.id}' ...")

val delay = setupCircuitBreakers.computeIfAbsent(setup).currentDelay
val delay = setupCircuitBreakers.computeIfAbsent(originalSetup).currentDelay
if (delay > 0) {
log.info("Backing off for $delay milliseconds due to too many failed attempts.")
delay(delay)
}

try {
// create VM
val externalId = createVM(vm.id, setup)
val externalId = createVM(vm.id, vm.setup)
vmRegistry.setVMExternalID(vm.id, externalId)
vmRegistry.setVMCreationTime(vm.id, Instant.now())

// create other volumes in background
val volumeDeferreds = createVolumesAsync(externalId, setup)
val volumeDeferreds = createVolumesAsync(externalId, vm.setup)

try {
cloudClient.waitForVM(externalId, timeoutCreateVM)
Expand All @@ -628,7 +675,7 @@ class CloudManager : CoroutineVerticle() {
vmRegistry.setVMStatus(vm.id, VM.Status.CREATING, VM.Status.PROVISIONING)

val attachedVolumes = volumes.map { AttachedVolume(it.first, it.second) }
provisionVM(ipAddress, vm.id, externalId, setup, attachedVolumes)
provisionVM(ipAddress, vm.id, externalId, vm.setup, attachedVolumes)
} catch (e: Throwable) {
vmRegistry.forceSetVMStatus(vm.id, VM.Status.DESTROYING)
cloudClient.destroyVM(externalId, timeoutDestroyVM)
Expand All @@ -646,12 +693,16 @@ class CloudManager : CoroutineVerticle() {

vmRegistry.setVMStatus(vm.id, VM.Status.PROVISIONING, VM.Status.RUNNING)
vmRegistry.setVMAgentJoinTime(vm.id, Instant.now())
setupCircuitBreakers.afterAttemptPerformed(setup.id, true)
// always call setupCircuitBreakers with original setup ID! (see
// computeIfAbsent() call above)
setupCircuitBreakers.afterAttemptPerformed(originalSetup.id, true)
} catch (t: Throwable) {
vmRegistry.forceSetVMStatus(vm.id, VM.Status.ERROR)
vmRegistry.setVMReason(vm.id, t.message ?: "Unknown error")
vmRegistry.setVMDestructionTime(vm.id, Instant.now())
setupCircuitBreakers.afterAttemptPerformed(setup.id, false)
// always call setupCircuitBreakers with original setup ID! (see
// computeIfAbsent() call above)
setupCircuitBreakers.afterAttemptPerformed(originalSetup.id, false)
throw t
}
} finally {
Expand All @@ -661,7 +712,7 @@ class CloudManager : CoroutineVerticle() {
}

return deferreds.mapIndexed { i, d ->
vmsToCreate[i].first to try {
vmsToCreate[i].vm to try {
d.await()
true
} catch (t: Throwable) {
Expand Down Expand Up @@ -699,7 +750,7 @@ class CloudManager : CoroutineVerticle() {
* objects that can be used to wait for the completion of the asynchronous
* operation and to obtain the IDs of the created volumes.
*/
private suspend fun createVolumesAsync(externalId: String,
private fun createVolumesAsync(externalId: String,
setup: Setup): List<Deferred<Pair<String, Volume>>> {
val metadata = mapOf(CREATED_BY to createdByTag, SETUP_ID to setup.id,
VM_EXTERNAL_ID to externalId)
Expand Down
8 changes: 8 additions & 0 deletions src/main/kotlin/db/PluginRegistry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import model.plugins.ProcessChainAdapterPlugin
import model.plugins.ProcessChainConsistencyCheckerPlugin
import model.plugins.ProgressEstimatorPlugin
import model.plugins.RuntimePlugin
import model.plugins.SetupAdapterPlugin

/**
* Provides access to compiled plugins
Expand All @@ -28,6 +29,8 @@ class PluginRegistry(private val compiledPlugins: List<Plugin>) {
.toMap()
private val runtimes = compiledPlugins.filterIsInstance<RuntimePlugin>()
.associateBy { it.supportedRuntime }
private val setupAdapters = compiledPlugins.filterIsInstance<SetupAdapterPlugin>()
.toResolved()

/**
* Get a list of all plugins
Expand Down Expand Up @@ -63,4 +66,9 @@ class PluginRegistry(private val compiledPlugins: List<Plugin>) {
* Get all process chain consistency checkers
*/
fun getProcessChainConsistencyCheckers() = processChainConsistencyCheckers

/**
* Get all setup adapters
*/
fun getSetupAdapters() = setupAdapters
}
5 changes: 5 additions & 0 deletions src/main/kotlin/db/PluginRegistryFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ import model.plugins.ProcessChainAdapterPlugin
import model.plugins.ProcessChainConsistencyCheckerPlugin
import model.plugins.ProgressEstimatorPlugin
import model.plugins.RuntimePlugin
import model.plugins.SetupAdapterPlugin
import model.plugins.initializerPluginTemplate
import model.plugins.outputAdapterPluginTemplate
import model.plugins.processChainAdapterPluginTemplate
import model.plugins.processChainConsistencyCheckerPluginTemplate
import model.plugins.progressEstimatorPluginTemplate
import model.plugins.runtimePluginTemplate
import model.plugins.setupAdapterPluginTemplate
import model.plugins.wrapPluginFunction
import model.processchain.ProcessChain
import model.setup.Setup
import org.slf4j.LoggerFactory
import java.io.File
import java.net.URLClassLoader
Expand Down Expand Up @@ -243,6 +246,8 @@ object PluginRegistryFactory {
f as KFunction<Double?>, ::progressEstimatorPluginTemplate.parameters))
is RuntimePlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
f as KFunction<Unit>, ::runtimePluginTemplate.parameters))
is SetupAdapterPlugin -> plugin.copy(compiledFunction = wrapPluginFunction(
f as KFunction<Setup>, ::setupAdapterPluginTemplate.parameters))
else -> throw RuntimeException("Unknown plugin type: ${plugin::class.java}")
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/model/plugins/Plugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import kotlin.reflect.jvm.javaType
JsonSubTypes.Type(value = ProcessChainAdapterPlugin::class, name = "processChainAdapter"),
JsonSubTypes.Type(value = ProcessChainConsistencyCheckerPlugin::class, name = "processChainConsistencyChecker"),
JsonSubTypes.Type(value = ProgressEstimatorPlugin::class, name = "progressEstimator"),
JsonSubTypes.Type(value = RuntimePlugin::class, name = "runtime")
JsonSubTypes.Type(value = RuntimePlugin::class, name = "runtime"),
JsonSubTypes.Type(value = SetupAdapterPlugin::class, name = "setupAdapter")
)
interface Plugin {
/**
Expand Down
1 change: 0 additions & 1 deletion src/main/kotlin/model/plugins/ProcessChainAdapterPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package model.plugins

import com.fasterxml.jackson.annotation.JsonIgnore
import io.vertx.core.Vertx
import model.processchain.Argument
import model.processchain.ProcessChain
import model.workflow.Workflow
import kotlin.reflect.KFunction
Expand Down
50 changes: 50 additions & 0 deletions src/main/kotlin/model/plugins/SetupAdapterPlugin.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package model.plugins

import com.fasterxml.jackson.annotation.JsonIgnore
import io.vertx.core.Vertx
import model.setup.Setup
import kotlin.reflect.KFunction
import kotlin.reflect.full.callSuspend

/**
* A setup adapter plugin is a function that can modify a [model.setup.Setup]
* before one or more VMs are created from it. The function has the following
* signature:
*
* suspend fun mySetupAdapter(setup: model.setup.Setup,
* requiredCapabilities: Collection<String>,
* vertx: io.vertx.core.Vertx): model.setup.Setup,
*
* The function will be called with a setup to modify and a collection of
* required capabilities the modified setup should meet. The function should
* return a new setup instance or the original one if no modifications were
* necessary.
*/
data class SetupAdapterPlugin(
override val name: String,
override val scriptFile: String,
override val version: String? = null,
override val dependsOn: List<String> = emptyList(),

/**
* The compiled plugin
*/
@JsonIgnore
override val compiledFunction: KFunction<Setup> = throwPluginNeedsCompile()
) : DependentPlugin

@Suppress("UNUSED_PARAMETER")
internal fun setupAdapterPluginTemplate(setup: Setup,
requiredCapabilities: Collection<String>, vertx: Vertx): Setup {
throw NotImplementedError("This is just a template specifying the " +
"function signature of a setup adapter plugin")
}

suspend fun SetupAdapterPlugin.call(setup: Setup,
requiredCapabilities: Collection<String>, vertx: Vertx): Setup {
return if (this.compiledFunction.isSuspend) {
this.compiledFunction.callSuspend(setup, requiredCapabilities, vertx)
} else {
this.compiledFunction.call(setup, requiredCapabilities, vertx)
}
}
Loading

0 comments on commit 8ab1b04

Please sign in to comment.