Skip to content

Commit

Permalink
[ESWE-1181] Message Listener for Integration services (#12)
Browse files Browse the repository at this point in the history
- add message listener to integration queue
- fix broken tests due to enabling SQS `/queue-admin/*` endpoints
  • Loading branch information
rickchoijd authored Jan 21, 2025
1 parent 5c5986d commit 6701174
Show file tree
Hide file tree
Showing 19 changed files with 399 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ sonar-project.properties

#Helm
**/Chart.lock

# LocalStack
/volume/
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies {
implementation("uk.gov.justice.service.hmpps:hmpps-sqs-spring-boot-starter:5.2.2")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springdoc:springdoc-openapi-starter-webmvc-ui:2.7.0")

testImplementation("org.jetbrains.kotlin:kotlin-test-junit5")
}

kotlin {
Expand All @@ -38,6 +40,8 @@ testing {
exclude(group = "io.swagger.core.v3")
exclude(group = "io.swagger.parser.v3", module = "swagger-parser-safe-url-resolver")
}
implementation("org.springframework.boot:spring-boot-testcontainers")
implementation("org.testcontainers:localstack")
}

targets {
Expand Down
1 change: 1 addition & 0 deletions helm_deploy/hmpps-jobs-board-integration-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ generic-service:
JAVA_OPTS: "-Xmx512m"
SERVER_PORT: "8080"
APPLICATIONINSIGHTS_CONFIGURATION_FILE: applicationinsights.json
API_INTEGRATION_ENABLED: false

# Pre-existing kubernetes secrets to load as environment variables in the deployment.
# namespace_secrets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT
import org.springframework.http.HttpHeaders
import org.springframework.test.context.ActiveProfiles
import org.springframework.test.context.DynamicPropertyRegistry
import org.springframework.test.context.DynamicPropertySource
import org.springframework.test.web.reactive.server.WebTestClient
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.testcontainers.LocalStackContainer
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.testcontainers.LocalStackContainer.setLocalStackProperties
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.wiremock.ExampleApiExtension
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.wiremock.ExampleApiExtension.Companion.exampleApi
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.wiremock.HmppsAuthApiExtension
Expand All @@ -24,6 +28,16 @@ abstract class IntegrationTestBase {
@Autowired
protected lateinit var jwtAuthHelper: JwtAuthorisationHelper

companion object {
private val localStackContainer by lazy { LocalStackContainer.instance }

@JvmStatic
@DynamicPropertySource
fun configureTestContainers(registry: DynamicPropertyRegistry) {
localStackContainer?.also { setLocalStackProperties(it, registry) }
}
}

internal fun setAuthorisation(
username: String? = "AUTH_ADM",
roles: List<String> = listOf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,23 @@ class OpenApiDocsTest : IntegrationTestBase() {
fun `the open api json path security requirements are valid`() {
val result = OpenAPIV3Parser().readLocation("http://localhost:$port/v3/api-docs", null, null)

// TODO remove these exclusions when SQS Queue Admin endpoints have been enhanced to include Security reqs.
val exclusions = setOf(
"/queue-admin/retry-dlq/{dlqName}",
"/queue-admin/retry-all-dlqs",
"/queue-admin/purge-queue/{queueName}",
"/queue-admin/get-dlq-messages/{dlqName}",
)

// The security requirements of each path don't appear to be validated like they are at https://editor.swagger.io/
// We therefore need to grab all the valid security requirements and check that each path only contains those items
val securityRequirements = result.openAPI.security.flatMap { it.keys }
result.openAPI.paths.forEach { pathItem ->
assertThat(pathItem.value.get.security.flatMap { it.keys }).isSubsetOf(securityRequirements)
}
result.openAPI.paths
.filter { !exclusions.contains(it.key) }
.forEach { pathItem ->
assertThat(pathItem.value.get).isNotNull
assertThat(pathItem.value.get.security.flatMap { it.keys }).isSubsetOf(securityRequirements)
}
}

@ParameterizedTest
Expand All @@ -94,12 +105,19 @@ class OpenApiDocsTest : IntegrationTestBase() {

@Test
fun `all endpoints have a security scheme defined`() {
// There are 4 SQS endpoints without security scheme, to be excluded from this test; all these endpoint has single tag "hmpps-queue-resource"
val queueAdminTag = "hmpps-queue-resource"
val queueAdminEndpointCount = 4

webTestClient.get()
.uri("/v3/api-docs")
.accept(MediaType.APPLICATION_JSON)
.exchange()
.expectStatus().isOk
.expectBody()
.jsonPath("$.paths[*][*][?([email protected])]").doesNotExist()
.jsonPath("$.paths[*][*][?([email protected])]..tags[0]").value<JSONArray> {
assertThat(it).hasSize(queueAdminEndpointCount)
it.forEach { tag -> assertThat(tag).isEqualTo(queueAdminTag) }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ResourceSecurityTest : IntegrationTestBase() {
"GET /swagger-ui.html",
"GET /v3/api-docs",
"GET /v3/api-docs/swagger-config",
"PUT /queue-admin/retry-all-dlqs",
" /error",
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.integration.testcontainers

import org.slf4j.LoggerFactory
import org.springframework.test.context.DynamicPropertyRegistry
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName
import java.io.IOException
import java.net.ServerSocket

object LocalStackContainer {
private val log = LoggerFactory.getLogger(this::class.java)
val instance by lazy { startLocalstackIfNotRunning() }

fun setLocalStackProperties(localStackContainer: LocalStackContainer, registry: DynamicPropertyRegistry) {
registry.add("hmpps.sqs.localstackUrl") { localStackContainer.getEndpointOverride(LocalStackContainer.Service.SNS) }
registry.add("hmpps.sqs.region") { localStackContainer.region }
}

private fun startLocalstackIfNotRunning(): LocalStackContainer? {
if (localstackIsRunning()) {
log.warn("Using existing localstack instance")
return null
}
log.info("Creating a localstack instance")
val logConsumer = Slf4jLogConsumer(log).withPrefix("localstack")
return LocalStackContainer(
DockerImageName.parse("localstack/localstack").withTag("4"),
).apply {
withServices(LocalStackContainer.Service.SQS)
withEnv("DEFAULT_REGION", "eu-west-2")
waitingFor(
Wait.forLogMessage(".*Ready.*", 1),
)
start()
followOutput(logConsumer)
}
}

private fun localstackIsRunning(): Boolean =
try {
val serverSocket = ServerSocket(4566)
serverSocket.localPort == 0
} catch (e: IOException) {
true
}
}
6 changes: 6 additions & 0 deletions src/integrationTest/resources/application-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ example-api:
client:
id: "example-api-client"
secret: "example-api-client-secret"

hmpps.sqs:
provider: localstack
queues:
integrationqueue:
queueName: hmpps_jobs_board_integration_queue
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.config

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.IntegrationMessageListener

@Configuration
@ConditionalOnProperty("api.integration.enabled", havingValue = "true")
class IntegrationConfiguration {
@Bean
@Qualifier("integrationServiceMap")
fun integrationServiceMap(): Map<String, IntegrationMessageService> = emptyMap()

@Bean
fun integrationMessageListener(
@Qualifier("integrationMessageService") integrationMessageService: IntegrationMessageService,
objectMapper: ObjectMapper,
) = IntegrationMessageListener(integrationMessageService, objectMapper)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.application

import org.springframework.stereotype.Component
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.TimeProvider
import java.time.LocalDateTime
import java.time.ZoneId

@Component
class DefaultTimeProvider(
override val timezoneId: ZoneId = ZoneId.systemDefault(),
) : TimeProvider {
override fun now(): LocalDateTime {
return LocalDateTime.now()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.application

import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes

@Service
@Qualifier("integrationMessageService")
class IntegrationMessageServiceFacade(
@Qualifier("integrationServiceMap") private val serviceMap: Map<String, IntegrationMessageService>,
) : IntegrationMessageService {
private companion object {
private val log: Logger = LoggerFactory.getLogger(this::class.java)
}

override fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) {
val eventId = integrationEvent.eventId
val eventType = messageAttributes.eventType
log.info("received event: id=$eventId, type=$eventType")
log.trace("received event: {}, {}", integrationEvent, messageAttributes)

messageAttributes.eventType?.also { eventTypeAttr ->
serviceMap[eventTypeAttr]?.also { service ->
service.handleMessage(integrationEvent, messageAttributes)
} ?: run {
logAndThrowArgumentError("MessageService not found for Event type=$eventType, eventId=$eventId")
}
} ?: run {
logAndThrowArgumentError("Missing event type eventId=$eventId")
}
}

private fun logAndThrowArgumentError(message: String) {
log.error(message)
throw IllegalArgumentException(message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain

import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes
import java.time.Instant

interface IntegrationMessageService {
fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes)
}

data class IntegrationEvent(
val eventId: String,
val eventType: String,
val timestamp: Instant,
val content: String,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain

import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneId

interface TimeProvider {
val timezoneId: ZoneId

fun now(): LocalDateTime

fun nowAsInstant(): Instant = now().atZone(timezoneId).toInstant()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure

import com.fasterxml.jackson.databind.ObjectMapper
import io.awspring.cloud.sqs.annotation.SqsListener
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService

const val ATTRIBUTE_EVENT_TYPE = "eventType"

class IntegrationMessageListener(
private val integrationMessageService: IntegrationMessageService,
private val objectMapper: ObjectMapper,
) {

@SqsListener("integrationqueue", factory = "hmppsQueueContainerFactoryProxy")
fun processMessage(message: Message) {
val event = objectMapper.readValue(message.message, IntegrationEvent::class.java)
integrationMessageService.handleMessage(event, message.messageAttributes)
}
}

data class MessageAttribute(val value: String, val type: String) {
constructor(value: String) : this(value, "String")
}
typealias EventType = MessageAttribute

class MessageAttributes() : HashMap<String, MessageAttribute>() {
constructor(attribute: EventType) : this() {
put(ATTRIBUTE_EVENT_TYPE, attribute)
}

val eventType: String? get() = this[ATTRIBUTE_EVENT_TYPE]?.value
}

data class Message(
val message: String,
val messageId: String,
val messageAttributes: MessageAttributes,
)
3 changes: 3 additions & 0 deletions src/main/resources/application-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ example-api:
client:
id: "example-api-client"
secret: "example-api-client-secret"

api.integration:
enabled: false
3 changes: 3 additions & 0 deletions src/main/resources/application-localstack.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
api.integration:
enabled: true

hmpps.sqs:
provider: localstack
queues:
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,6 @@ management:
info:
cache:
time-to-live: 2000ms

api.integration:
enabled: true
Loading

0 comments on commit 6701174

Please sign in to comment.