Skip to content

Commit 154884a

Browse files
srekapalliChris Smalley
andauthored
feat(taskimplresolver): Task Implementation Resolver (spinnaker#4098)
Co-authored-by: Chris Smalley <[email protected]>
1 parent 1d92bc7 commit 154884a

File tree

15 files changed

+226
-39
lines changed

15 files changed

+226
-39
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package com.netflix.spinnaker.orca
2+
3+
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
4+
import com.netflix.spinnaker.orca.api.pipeline.Task
5+
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.TaskDefinition
6+
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.DefinedTask
7+
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
8+
import com.netflix.spinnaker.orca.clouddriver.utils.CloudProviderAware
9+
import com.netflix.spinnaker.orca.config.TaskOverrideConfigurationProperties
10+
import com.netflix.spinnaker.orca.config.TaskOverrideConfigurationProperties.TaskOverrideDefinition
11+
import org.slf4j.LoggerFactory
12+
13+
/**
14+
* This resolver will modify the task definition if the original definition
15+
* matches the criteria for task replacement and the configuration for such a replacement is enabled.
16+
*/
17+
class DynamicTaskImplementationResolver(
18+
val dynamicConfigService: DynamicConfigService,
19+
val taskOverrideConfigurationProperties: TaskOverrideConfigurationProperties
20+
): TaskImplementationResolver, CloudProviderAware {
21+
22+
private val log = LoggerFactory.getLogger(javaClass)
23+
24+
override fun resolve(stage: StageExecution, taskNode: DefinedTask): DefinedTask {
25+
val taskOverrideDefinition = taskOverrideConfigurationProperties
26+
.overrideDefinitions
27+
.firstOrNull {
28+
it.stageName.equals(stage.type, true) &&
29+
it.originalTaskImplementingClassName.equals(taskNode.implementingClassName, true)
30+
} ?: return taskNode
31+
32+
if (isOverrideEnabled(getConfigAttributeName(stage, taskOverrideDefinition))) {
33+
val clazz: Class<*> = Class.forName(taskOverrideDefinition.newTaskImplementingClassName)
34+
if (clazz.interfaces.contains(Task::class.java)) {
35+
val newTask: DefinedTask = TaskDefinition(
36+
taskNode.name, // task name remains the same.
37+
clazz as Class<Task>
38+
)
39+
log.info("Task '{}' is overridden with new task impl class '{}'",
40+
taskNode.name,
41+
taskOverrideDefinition.newTaskImplementingClassName
42+
)
43+
return newTask
44+
}
45+
log.warn("Task '{}' is overridden but the new task impl class '{}' is not of type task",
46+
taskNode.name,
47+
taskOverrideDefinition.newTaskImplementingClassName)
48+
}
49+
log.info("No task override set {}", taskNode.name)
50+
return taskNode
51+
52+
}
53+
54+
private fun getConfigAttributeName(stage: StageExecution,
55+
taskOverrideDefinition: TaskOverrideDefinition
56+
): String {
57+
val configAttributeParts: MutableList<String> = mutableListOf()
58+
taskOverrideDefinition.overrideCriteriaAttributes?.forEach {
59+
when (it) {
60+
APPLICATION_ATTR_NAME -> configAttributeParts.add(stage.execution.application)
61+
CLOUDPROVIDER_ATTR_NAME ->
62+
configAttributeParts.add(getCloudProvider(stage) ?: CloudProviderAware.DEFAULT_CLOUD_PROVIDER)
63+
else -> {
64+
if (stage.context[it] != null) {
65+
configAttributeParts.add(stage.context[it].toString())
66+
}
67+
}
68+
}
69+
}
70+
71+
configAttributeParts.add(stage.type.toLowerCase())
72+
73+
return configAttributeParts.joinToString(".", ATTRIBUTE_PREFIX)
74+
}
75+
76+
private fun isOverrideEnabled(configAttrName: String): Boolean {
77+
return dynamicConfigService.isEnabled(configAttrName, false)
78+
}
79+
80+
private companion object {
81+
const val ATTRIBUTE_PREFIX = "task-override."
82+
const val APPLICATION_ATTR_NAME = "application"
83+
const val CLOUDPROVIDER_ATTR_NAME = "cloudprovider"
84+
}
85+
86+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.netflix.spinnaker.orca
2+
3+
/**
4+
* Default implementation just uses the task definition as is and will not to do any changes to the
5+
* task definition itself.
6+
*/
7+
class NoOpTaskImplementationResolver: TaskImplementationResolver
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.netflix.spinnaker.orca;
2+
3+
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.DefinedTask;
4+
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution;
5+
6+
/** Resolves the task implementation for a given task node. */
7+
public interface TaskImplementationResolver {
8+
9+
default DefinedTask resolve(StageExecution stage, DefinedTask taskNode) {
10+
return taskNode;
11+
}
12+
}

orca-core/src/main/java/com/netflix/spinnaker/orca/config/OrcaConfiguration.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
2727
import com.netflix.spinnaker.orca.DefaultStageResolver;
2828
import com.netflix.spinnaker.orca.DynamicStageResolver;
29+
import com.netflix.spinnaker.orca.DynamicTaskImplementationResolver;
30+
import com.netflix.spinnaker.orca.NoOpTaskImplementationResolver;
2931
import com.netflix.spinnaker.orca.StageResolver;
32+
import com.netflix.spinnaker.orca.TaskImplementationResolver;
3033
import com.netflix.spinnaker.orca.TaskResolver;
3134
import com.netflix.spinnaker.orca.api.pipeline.Task;
3235
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder;
@@ -84,7 +87,7 @@
8487
PreprocessorConfiguration.class,
8588
PluginsAutoConfiguration.class,
8689
})
87-
@EnableConfigurationProperties
90+
@EnableConfigurationProperties(TaskOverrideConfigurationProperties.class)
8891
public class OrcaConfiguration {
8992
@Bean
9093
public Clock clock() {
@@ -225,6 +228,21 @@ public DefaultStageResolver defaultStageResolver(
225228
return new DefaultStageResolver(stageDefinitionBuilders);
226229
}
227230

231+
@Bean
232+
@ConditionalOnProperty("task-overrides.enabled")
233+
public DynamicTaskImplementationResolver dynamicTaskImplementationResolver(
234+
DynamicConfigService dynamicConfigService,
235+
TaskOverrideConfigurationProperties taskOverrideConfigurationProperties) {
236+
return new DynamicTaskImplementationResolver(
237+
dynamicConfigService, taskOverrideConfigurationProperties);
238+
}
239+
240+
@Bean
241+
@ConditionalOnMissingBean(TaskImplementationResolver.class)
242+
public TaskImplementationResolver defaultTaskImplementationResolver() {
243+
return new NoOpTaskImplementationResolver();
244+
}
245+
228246
@Bean(name = EVENT_LISTENER_FACTORY_BEAN_NAME)
229247
public EventListenerFactory eventListenerFactory() {
230248
return new InspectableEventListenerFactory();
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.netflix.spinnaker.orca.config;
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties
4+
import org.springframework.boot.context.properties.ConstructorBinding
5+
6+
/**
7+
* Task override configuration to use while planning stages.
8+
*/
9+
@ConfigurationProperties("task-overrides")
10+
@ConstructorBinding
11+
public class TaskOverrideConfigurationProperties(
12+
/**
13+
* list of task overrides.
14+
*/
15+
public var overrideDefinitions: List<TaskOverrideDefinition> = listOf()
16+
) {
17+
18+
@ConstructorBinding
19+
public class TaskOverrideDefinition(
20+
/**
21+
* Candidate stage in which we are looking to replace task definition
22+
*/
23+
public var stageName: String,
24+
25+
/**
26+
* Attribute values to consider in the stage context for overriding the task.
27+
* For eg: this task override is only applicable for a particular cloud provider.
28+
*/
29+
public var overrideCriteriaAttributes: List<String>,
30+
31+
/**
32+
* Original task implementation class name as given while building the stage task graph
33+
*/
34+
public var originalTaskImplementingClassName: String,
35+
36+
/**
37+
* Implementation class name to use while resolving the task via Task Resolver.
38+
*/
39+
public var newTaskImplementingClassName: String
40+
)
41+
}

orca-dry-run/src/test/kotlin/com/netflix/spinnaker/orca/dryrun/DryRunStageTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.netflix.spinnaker.orca.dryrun
1818

19+
import com.netflix.spinnaker.orca.NoOpTaskImplementationResolver
1920
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder
2021
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
2122
import com.netflix.spinnaker.orca.api.test.pipeline
@@ -38,7 +39,7 @@ object DryRunStageTest : Spek({
3839

3940
fun StageDefinitionBuilder.plan(stage: StageExecution) {
4041
stage.type = type
41-
buildTasks(stage)
42+
buildTasks(stage, NoOpTaskImplementationResolver())
4243
buildBeforeStages(stage)
4344
}
4445

orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/StageDefinitionBuilders.kt

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.netflix.spinnaker.orca.q
1818

19+
import com.netflix.spinnaker.orca.TaskImplementationResolver
1920
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner
2021
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner.STAGE_BEFORE
2122
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder
@@ -24,6 +25,7 @@ import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.DefinedTask
2425
import com.netflix.spinnaker.orca.api.pipeline.graph.TaskNode.TaskGraph
2526
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
2627
import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
28+
import com.netflix.spinnaker.orca.api.pipeline.models.TaskExecution
2729
import com.netflix.spinnaker.orca.pipeline.RestrictExecutionDuringTimeWindow
2830
import com.netflix.spinnaker.orca.pipeline.StageExecutionFactory
2931
import com.netflix.spinnaker.orca.pipeline.graph.StageGraphBuilderImpl
@@ -32,10 +34,12 @@ import com.netflix.spinnaker.orca.pipeline.model.TaskExecutionImpl
3234
/**
3335
* Build and append the tasks for [stage].
3436
*/
35-
fun StageDefinitionBuilder.buildTasks(stage: StageExecution) {
37+
fun StageDefinitionBuilder.buildTasks(
38+
stage: StageExecution,
39+
taskImplementationResolver: TaskImplementationResolver) {
3640
buildTaskGraph(stage)
3741
.listIterator()
38-
.forEachWithMetadata { processTaskNode(stage, it) }
42+
.forEachWithMetadata { processTaskNode(stage, it, taskImplementationResolver) }
3943
}
4044

4145
fun StageDefinitionBuilder.addContextFlags(stage: StageExecution) {
@@ -48,15 +52,13 @@ fun StageDefinitionBuilder.addContextFlags(stage: StageExecution) {
4852
private fun processTaskNode(
4953
stage: StageExecution,
5054
element: IteratorElement<TaskNode>,
55+
resolver: TaskImplementationResolver,
5156
isSubGraph: Boolean = false
5257
) {
5358
element.apply {
5459
when (value) {
5560
is DefinedTask -> {
56-
val task = TaskExecutionImpl()
57-
task.id = (stage.tasks.size + 1).toString()
58-
task.name = value.name
59-
task.implementingClass = value.implementingClassName
61+
val task = buildTaskExecution(stage, resolver.resolve(stage, value))
6062
if (isSubGraph) {
6163
task.isLoopStart = isFirst
6264
task.isLoopEnd = isLast
@@ -70,13 +72,23 @@ private fun processTaskNode(
7072
value
7173
.listIterator()
7274
.forEachWithMetadata {
73-
processTaskNode(stage, it, isSubGraph = true)
75+
processTaskNode(stage, it, resolver, isSubGraph = true)
7476
}
7577
}
7678
}
7779
}
7880
}
7981

82+
private fun buildTaskExecution(stage: StageExecution, taskNode: DefinedTask): TaskExecution {
83+
val taskExecution: TaskExecution = TaskExecutionImpl()
84+
taskExecution.let {
85+
it.id = (stage.tasks.size + 1).toString()
86+
it.name = taskNode.name
87+
it.implementingClass = taskNode.implementingClassName
88+
}
89+
return taskExecution
90+
}
91+
8092
/**
8193
* Build the synthetic stages for [stage] and inject them into the execution.
8294
*/

orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.netflix.spinnaker.orca.q.handler
1818

1919
import com.fasterxml.jackson.databind.ObjectMapper
2020
import com.netflix.spectator.api.Registry
21+
import com.netflix.spinnaker.orca.TaskImplementationResolver
2122
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.NOT_STARTED
2223
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING
2324
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
@@ -68,7 +69,8 @@ class StartStageHandler(
6869
@Qualifier("mapper") private val objectMapper: ObjectMapper,
6970
private val clock: Clock,
7071
private val registry: Registry,
71-
@Value("\${queue.retry.delay.ms:15000}") retryDelayMs: Long
72+
@Value("\${queue.retry.delay.ms:15000}") retryDelayMs: Long,
73+
private val taskImplementationResolver: TaskImplementationResolver
7274
) : OrcaMessageHandler<StartStage>, StageBuilderAware, ExpressionAware, AuthenticationAware {
7375

7476
private val retryDelay = Duration.ofMillis(retryDelayMs)
@@ -168,7 +170,7 @@ class StartStageHandler(
168170
// if we have a top level stage, ensure that context expressions are processed
169171
val mergedStage = if (this.parentStageId == null) this.withMergedContext() else this
170172
builder.addContextFlags(mergedStage)
171-
builder.buildTasks(mergedStage)
173+
builder.buildTasks(mergedStage, taskImplementationResolver)
172174
builder.buildBeforeStages(mergedStage) {
173175
repository.addStage(it.withMergedContext())
174176
}

orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package com.netflix.spinnaker.orca.q.handler
1818

1919
import com.netflix.spectator.api.NoopRegistry
2020
import com.netflix.spinnaker.orca.DefaultStageResolver
21+
import com.netflix.spinnaker.orca.NoOpTaskImplementationResolver
2122
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner.STAGE_AFTER
2223
import com.netflix.spinnaker.orca.api.pipeline.SyntheticStageOwner.STAGE_BEFORE
2324
import com.netflix.spinnaker.orca.api.pipeline.graph.StageDefinitionBuilder
@@ -733,7 +734,7 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
733734
refId = "1"
734735
type = stageWithSyntheticBefore.type
735736
stageWithSyntheticBefore.buildBeforeStages(this)
736-
stageWithSyntheticBefore.buildTasks(this)
737+
stageWithSyntheticBefore.buildTasks(this, NoOpTaskImplementationResolver())
737738
}
738739
}
739740

@@ -998,7 +999,7 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
998999
refId = "1"
9991000
type = stageWithSyntheticBefore.type
10001001
stageWithSyntheticBefore.buildBeforeStages(this)
1001-
stageWithSyntheticBefore.buildTasks(this)
1002+
stageWithSyntheticBefore.buildTasks(this, NoOpTaskImplementationResolver())
10021003
}
10031004
}
10041005

@@ -1067,7 +1068,7 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
10671068
refId = "1"
10681069
type = stageWithSyntheticAfter.type
10691070
stageWithSyntheticAfter.buildBeforeStages(this)
1070-
stageWithSyntheticAfter.buildTasks(this)
1071+
stageWithSyntheticAfter.buildTasks(this, NoOpTaskImplementationResolver())
10711072
stageWithSyntheticAfter.buildAfterStages(this)
10721073
}
10731074
}
@@ -1288,12 +1289,12 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
12881289
name = "parallel"
12891290
type = stageWithParallelBranches.type
12901291
stageWithParallelBranches.buildBeforeStages(this)
1291-
stageWithParallelBranches.buildTasks(this)
1292+
stageWithParallelBranches.buildTasks(this, NoOpTaskImplementationResolver())
12921293
}
12931294
}
12941295

12951296
val message = pipeline.stageByRef("1<1").let { completedSynthetic ->
1296-
singleTaskStage.buildTasks(completedSynthetic)
1297+
singleTaskStage.buildTasks(completedSynthetic, NoOpTaskImplementationResolver())
12971298
completedSynthetic.tasks.forEach { it.status = SUCCEEDED }
12981299
CompleteStage(completedSynthetic)
12991300
}
@@ -1322,12 +1323,12 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
13221323
name = "parallel"
13231324
type = stageWithParallelBranches.type
13241325
stageWithParallelBranches.buildBeforeStages(this)
1325-
stageWithParallelBranches.buildTasks(this)
1326+
stageWithParallelBranches.buildTasks(this, NoOpTaskImplementationResolver())
13261327
}
13271328
}
13281329

13291330
pipeline.stages.filter { it.parentStageId != null }.forEach {
1330-
singleTaskStage.buildTasks(it)
1331+
singleTaskStage.buildTasks(it, NoOpTaskImplementationResolver())
13311332
it.tasks.forEach { it.status = SUCCEEDED }
13321333
}
13331334
val message = CompleteStage(pipeline.stageByRef("1<1"))

0 commit comments

Comments
 (0)