Skip to content

Commit

Permalink
Merge branch 'master' into feat/excludeExecutionForDisabledPipelines
Browse files Browse the repository at this point in the history
  • Loading branch information
christosarvanitis authored Dec 19, 2024
2 parents a464c99 + 3c65fe0 commit 03ee606
Show file tree
Hide file tree
Showing 14 changed files with 259 additions and 22 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
fiatVersion=1.52.0
fiatVersion=1.53.0
korkVersion=7.247.0
kotlinVersion=1.6.21
org.gradle.parallel=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class EcsServerGroupCreator implements ServerGroupCreator, DeploymentDetailsAwar
def bakeStage = getPreviousStageWithImage(stage, operation.region, cloudProvider)

if (bakeStage) {
operation.dockerImageAddress = bakeStage.context.amiDetails.imageId.value.get(0).toString()
operation.dockerImageAddress = bakeStage.context.amiDetails.collect(it->it.imageId).get(0).toString()
}
}

Expand Down Expand Up @@ -216,7 +216,7 @@ class EcsServerGroupCreator implements ServerGroupCreator, DeploymentDetailsAwar
throw new IllegalStateException("No image stage found in context for $description.imageLabelOrSha.")
}

description.imageId = imageStage.context.amiDetails.imageId.value.get(0).toString()
description.imageId = imageStage.context.amiDetails.collect(it->it.imageId).get(0).toString()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class EcsServerGroupCreatorSpec extends Specification {
def parentStage = stage {}
parentStage.id = parentStageId
parentStage.refId = parentStageId
parentStage.context.amiDetails = [imageId: [value: ["$testReg/$testRepo:$testTag"]]]
parentStage.context.amiDetails = List.of(Map.of("imageId","$testReg/$testRepo:$testTag"))

stage.context.imageDescription = testDescription
stage.parentStageId = parentStageId
Expand Down Expand Up @@ -120,7 +120,7 @@ class EcsServerGroupCreatorSpec extends Specification {
parentStage.id = parentStageId
parentStage.context.region = testRegion
parentStage.context.cloudProviderType = "ecs"
parentStage.context.amiDetails = [imageId: [value: ["$testReg/$testRepo:$testTag"]]]
parentStage.context.amiDetails = List.of(Map.of("imageId","$testReg/$testRepo:$testTag"))

stage.context.region = testRegion
stage.parentStageId = parentStageId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.netflix.spinnaker.orca.qos

import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.patterns.PolledMeter
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.orca.annotations.Sync
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.BUFFERED
Expand All @@ -29,6 +30,7 @@ import net.logstash.logback.argument.StructuredArguments.value
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
import java.util.concurrent.atomic.AtomicLong

/**
* Determines if an execution should be buffered.
Expand All @@ -50,6 +52,11 @@ class ExecutionBufferActuator(
private val enqueuedId = registry.createId("qos.executionsEnqueued")
private val elapsedTimeId = registry.createId("qos.actuator.elapsedTime")

// have to use PolledMeter because an ordinary metric is deleted by Garbage Collector
private val bufferingEnabled = PolledMeter.using(registry)
.withId(bufferingId)
.monitorValue(AtomicLong(0))

@Sync
@EventListener(BeforeInitialExecutionPersist::class)
fun beforeInitialPersist(event: BeforeInitialExecutionPersist) {
Expand All @@ -61,7 +68,7 @@ class ExecutionBufferActuator(

val supplierName = bufferStateSupplier.javaClass.simpleName
if (bufferStateSupplier.get() == ACTIVE) {
registry.gauge(bufferingId).set(1.0)
bufferingEnabled.set(1)

val execution = event.execution
withActionDecision(execution) {
Expand All @@ -83,7 +90,7 @@ class ExecutionBufferActuator(
}
}
} else {
registry.gauge(bufferingId).set(0.0)
bufferingEnabled.set(0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ class PipelineRefTriggerDeserializerSupplier(
isStrategy = get("strategy")?.booleanValue() == true,
parentExecutionId = parentExecutionId,
parentPipelineStageId = get("parentPipelineStageId")?.textValue()
)
).apply {
resolvedExpectedArtifacts = get("resolvedExpectedArtifacts")?.listValue(parser) ?: mutableListOf()
other = get("other")?.mapValue(parser) ?: mutableMapOf()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import com.netflix.spinnaker.orca.api.pipeline.models.StageExecution
import java.sql.ResultSet
import org.jooq.DSLContext
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.name
import org.slf4j.LoggerFactory
import java.nio.charset.StandardCharsets

Expand Down Expand Up @@ -142,11 +141,7 @@ class ExecutionMapper(
fun convertPipelineRefTrigger(execution: PipelineExecution, context: DSLContext) {
val trigger = execution.trigger
if (trigger is PipelineRefTrigger) {
val parentExecution = context
.selectExecution(execution.type, compressionProperties)
.where(field("id").eq(trigger.parentExecutionId))
.fetchExecutions(mapper, 200, compressionProperties, context, pipelineRefEnabled)
.firstOrNull()
val parentExecution = fetchParentExecution(execution.type, trigger, context)

if (parentExecution == null) {
// If someone deletes the parent execution, we'll be unable to load the full, valid child pipeline. Rather than
Expand All @@ -160,4 +155,13 @@ class ExecutionMapper(
execution.trigger = trigger.toPipelineTrigger(parentExecution)
}
}

@VisibleForTesting
fun fetchParentExecution(type: ExecutionType, trigger: PipelineRefTrigger, context: DSLContext): PipelineExecution? {
return context
.selectExecution(type, compressionProperties)
.where(field("id").eq(trigger.parentExecutionId))
.fetchExecutions(mapper, 200, compressionProperties, context, pipelineRefEnabled)
.firstOrNull()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,8 @@ data class PipelineRefTrigger(
isStrategy = isStrategy,
parentExecution = parentExecution,
parentPipelineStageId = parentPipelineStageId
)
).apply {
this.resolvedExpectedArtifacts = this@PipelineRefTrigger.resolvedExpectedArtifacts
this.other = this@PipelineRefTrigger.other
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2024 Harness Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.sql

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.JsonNodeFactory
import com.fasterxml.jackson.databind.node.ObjectNode
import com.netflix.spinnaker.orca.sql.pipeline.persistence.PipelineRefTrigger
import dev.minutest.junit.JUnit5Minutests
import dev.minutest.rootContext
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertEquals

class PipelineRefTriggerDeserializerSupplierTest : JUnit5Minutests {

fun tests() = rootContext {
context("pipelineRef feature is disabled") {
val deserializerSupplier = PipelineRefTriggerDeserializerSupplier(pipelineRefEnabled = false)
val jsonNodeFactory = JsonNodeFactory.instance

test("predicate is true when the trigger is a pipelineRef") {
val node = jsonNodeFactory.objectNode().apply {
put("type", "pipelineRef")
}
assertTrue(deserializerSupplier.predicate(node))
}

test("predicate is false when the trigger is not a pipelineRef") {
val node = jsonNodeFactory.objectNode().apply {
put("type", "manual")
}
assertFalse(deserializerSupplier.predicate(node))
}
}

context("pipelineRef feature is enabled") {
val deserializerSupplier = PipelineRefTriggerDeserializerSupplier(pipelineRefEnabled = true)
val jsonNodeFactory = JsonNodeFactory.instance

test("predicate is true when the trigger is a pipelineRef") {
val node = jsonNodeFactory.objectNode().apply {
put("type", "pipelineRef")
}
assertTrue(deserializerSupplier.predicate(node))
}

test("predicate is true when the trigger has parentExecution") {
val node = jsonNodeFactory.objectNode().apply {
set<ObjectNode>("parentExecution", jsonNodeFactory.objectNode().put("id", "execution-id"))
}
assertTrue(deserializerSupplier.predicate(node))
}

test("predicate is false when the trigger is not a pipelineRef") {
val node = jsonNodeFactory.objectNode().apply {
put("type", "manual")
}
assertFalse(deserializerSupplier.predicate(node))
}

}

context("deserializing pipelineRef") {
val deserializerSupplier = PipelineRefTriggerDeserializerSupplier(pipelineRefEnabled = true)
val jsonNodeFactory = JsonNodeFactory.instance
val jsonParser = ObjectMapper().createParser("")

test("all fields in pipelineRef are added") {
val node = jsonNodeFactory.objectNode().apply {
put("correlationId", "correlation-id")
put("user", "test-user")
set<ObjectNode>("parameters", jsonNodeFactory.objectNode().put("key1", "value1"))
set<ObjectNode>("artifacts", jsonNodeFactory.arrayNode().add(jsonNodeFactory.objectNode().put("type", "artifact-type")))
put("rebake", true)
put("dryRun", false)
put("strategy", true)
put("parentExecutionId", "parent-execution-id")
set<ObjectNode>("resolvedExpectedArtifacts", jsonNodeFactory.arrayNode().add(jsonNodeFactory.objectNode().put("id", "resolved-artifact-id")))
set<ObjectNode>("other", jsonNodeFactory.objectNode().put("extra1", "value1"))
}

val trigger = deserializerSupplier.deserializer(node, jsonParser) as PipelineRefTrigger

assertEquals("correlation-id", trigger.correlationId)
assertEquals("test-user", trigger.user)
assertEquals(mapOf("key1" to "value1"), trigger.parameters)
assertEquals(1, trigger.artifacts.size)
assertTrue(trigger.notifications.isEmpty())
assertTrue(trigger.isRebake)
assertFalse(trigger.isDryRun)
assertTrue(trigger.isStrategy)
assertEquals("parent-execution-id", trigger.parentExecutionId)
assertEquals(1, trigger.resolvedExpectedArtifacts.size)
assertEquals(1, trigger.other.size)
}

test("pipelineTrigger is deserialized into pipelineRef") {
val node = jsonNodeFactory.objectNode().apply {
put("correlationId", "correlation-id")
put("user", "test-user")
set<ObjectNode>("parameters", jsonNodeFactory.objectNode().put("key1", "value1"))
set<ObjectNode>("artifacts", jsonNodeFactory.arrayNode().add(jsonNodeFactory.objectNode().put("type", "artifact-type")))
put("rebake", true)
put("dryRun", false)
put("strategy", true)
set<ObjectNode>("parentExecution", jsonNodeFactory.objectNode().put("id", "parent-execution-id-from-pipeline-trigger"))
set<ObjectNode>("resolvedExpectedArtifacts", jsonNodeFactory.arrayNode().add(jsonNodeFactory.objectNode().put("id", "resolved-artifact-id")))
set<ObjectNode>("other", jsonNodeFactory.objectNode().put("extra1", "value1"))
}

val trigger = deserializerSupplier.deserializer(node, jsonParser) as PipelineRefTrigger

assertEquals("correlation-id", trigger.correlationId)
assertEquals("test-user", trigger.user)
assertEquals(mapOf("key1" to "value1"), trigger.parameters)
assertEquals(1, trigger.artifacts.size)
assertTrue(trigger.notifications.isEmpty())
assertTrue(trigger.isRebake)
assertFalse(trigger.isDryRun)
assertTrue(trigger.isStrategy)
assertEquals("parent-execution-id-from-pipeline-trigger", trigger.parentExecutionId)
assertEquals(1, trigger.resolvedExpectedArtifacts.size)
assertEquals(1, trigger.other.size)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,27 @@ package com.netflix.spinnaker.orca.sql.pipeline.persistence
import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.config.CompressionType
import com.netflix.spinnaker.config.ExecutionCompressionProperties
import com.netflix.spinnaker.kork.artifacts.model.Artifact
import com.netflix.spinnaker.kork.artifacts.model.ExpectedArtifact
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.pipeline.model.DefaultTrigger
import com.nhaarman.mockito_kotlin.*
import com.netflix.spinnaker.orca.pipeline.model.PipelineExecutionImpl
import com.netflix.spinnaker.orca.pipeline.model.PipelineTrigger
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.times
import dev.minutest.junit.JUnit5Minutests
import dev.minutest.rootContext
import org.assertj.core.api.Assertions.assertThat
import org.jooq.DSLContext
import org.mockito.Mockito
import strikt.api.expectThat
import strikt.assertions.isA
import strikt.assertions.isEmpty
import strikt.assertions.isEqualTo
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets
import java.sql.ResultSet
Expand Down Expand Up @@ -69,16 +82,75 @@ class ExecutionMapperTest : JUnit5Minutests {
val compressionProperties = ExecutionCompressionProperties().apply {
enabled = false
}
val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties, true)
val mockedExecution = mock<PipelineExecution>()
val database: DSLContext = Mockito.mock(DSLContext::class.java, Mockito.RETURNS_DEEP_STUBS)

test("conversion ignored when trigger is not PipelineRef") {
val mockedExecution = mock<PipelineExecution>()
val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties = compressionProperties, true)
val spyMapper = Mockito.spy(mapper)

doReturn(DefaultTrigger(type = "default")).`when`(mockedExecution).trigger
mapper.convertPipelineRefTrigger(mockedExecution, database)
spyMapper.convertPipelineRefTrigger(mockedExecution, database)
verify(mockedExecution, times(1)).trigger
verify(spyMapper, times(0)).fetchParentExecution(any(), any(), any())
}

test("conversion is aborted when trigger is PipelineRef but parentExecution not found") {
val mockedExecution = mock<PipelineExecution>()
val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties = compressionProperties, true)
val spyMapper = Mockito.spy(mapper)

doReturn(PipelineRefTrigger(parentExecutionId = "test-parent-id")).`when`(mockedExecution).trigger
doReturn(ExecutionType.PIPELINE).`when`(mockedExecution).type
doReturn(null).`when`(spyMapper).fetchParentExecution(any(), any(), any())
spyMapper.convertPipelineRefTrigger(mockedExecution, database)
verify(mockedExecution, times(1)).trigger
verify(spyMapper, times(1)).fetchParentExecution(any(), any(), any())
}

test("conversion is processed when trigger is PipelineRef") {
val correlationId = "test-correlation"
val parentExecutionId = "test-execution"
val parameters = mutableMapOf<String, Any>("test-parameter" to "test-body")
val artifacts = mutableListOf(Artifact.builder().build())
val resolvedExpectedArtifact = mutableListOf(ExpectedArtifact.builder().boundArtifact(Artifact.builder().build()).build())
val otherTest = mutableMapOf<String, Any>("test-other" to "other-body")

val execution = PipelineExecutionImpl(ExecutionType.PIPELINE, "test-app").apply {
trigger = PipelineRefTrigger(
correlationId = correlationId,
parentExecutionId = parentExecutionId,
parameters = parameters,
artifacts = artifacts
).apply {
resolvedExpectedArtifacts = resolvedExpectedArtifact
other = otherTest
}
}

val mockedParentExecution = mock<PipelineExecution>()
val mapper = ExecutionMapper(mapper = ObjectMapper(), stageBatchSize = 200, compressionProperties = compressionProperties, true)
val spyMapper = Mockito.spy(mapper)

doReturn(mockedParentExecution).`when`(spyMapper).fetchParentExecution(any(), any(), any())

spyMapper.convertPipelineRefTrigger(execution, database)

expectThat(execution.trigger) {
isA<PipelineTrigger>()
get { this.correlationId }.isEqualTo(correlationId)
get { this.parameters }.isEqualTo(parameters)
get { this.artifacts }.isEqualTo(artifacts)
get { this.resolvedExpectedArtifacts }.isEqualTo(resolvedExpectedArtifact)
get { this.other }.isEqualTo(otherTest)
get { this.notifications }.isEmpty()
}

expectThat(execution.trigger as PipelineTrigger)
.get(PipelineTrigger::parentExecution).isEqualTo(mockedParentExecution)

verify(spyMapper, times(1)).fetchParentExecution(any(), any(), any())
}
}
}
}
Loading

0 comments on commit 03ee606

Please sign in to comment.