Skip to content

Commit

Permalink
[ESWE-1181] Fix MessageListener issues (#21)
Browse files Browse the repository at this point in the history
fix the listener issues when parsing message body and getting message
attributes (from header)
  • Loading branch information
rickchoijd authored Jan 28, 2025
1 parent e7373a2 commit 1ab1fee
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,5 @@ class IntegrationConfiguration {
@Bean
fun integrationMessageListener(
@Qualifier("integrationMessageService") integrationMessageService: IntegrationMessageService,
objectMapper: ObjectMapper,
) = IntegrationMessageListener(integrationMessageService, objectMapper)
) = IntegrationMessageListener(integrationMessageService)
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ abstract class EmployerMessageService(
}

override fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) {
log.info("handle message eventId=${integrationEvent.eventId} eventType=${messageAttributes.eventType}")
log.info("handle message eventId=${integrationEvent.eventId}, eventType=${messageAttributes.eventType}, messageId=${messageAttributes.messageId}")
messageAttributes.eventType?.also {
log.trace("handle message. integrationEvent={}", integrationEvent)
log.trace("handle message. integrationEvent={}, messageAttributes={}", integrationEvent, messageAttributes)
if (!eventTypeTypes.contains(it)) {
throw IllegalArgumentException("Unexpected event type=$it , with eventId=${integrationEvent.eventId}")
throw IllegalArgumentException("Unexpected event type=$it , with eventId=${integrationEvent.eventId}, messageId=${messageAttributes.messageId}")
}
handleEvent(integrationEvent.toEmployerEvent())
} ?: run {
throw IllegalArgumentException("Missing event type eventId=${integrationEvent.eventId}")
throw IllegalArgumentException("Missing event type eventId=${integrationEvent.eventId}, messageId=${messageAttributes.messageId}")
}
}

protected abstract fun handleEvent(employerEvent: EmployerEvent)

private fun IntegrationEvent.toEmployerEvent(): EmployerEvent =
objectMapper.readValue(content, EmployerEvent::class.java)
objectMapper.readValue(message, EmployerEvent::class.java)
}

class EmployerCreationMessageService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,19 @@ class IntegrationMessageServiceFacade(

override fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) {
val eventId = integrationEvent.eventId
val eventType = messageAttributes.eventType
log.info("received event: id=$eventId, type=$eventType")
log.trace("received event: {}, {}", integrationEvent, messageAttributes)
val eventType = integrationEvent.eventType
val messageId = messageAttributes?.messageId
log.info("received event: messageId=$messageId, eventId=$eventId, eventType=$eventType")
log.trace("received event: {}", integrationEvent)

messageAttributes.eventType?.also { eventTypeAttr ->
integrationEvent.eventType?.also { eventTypeAttr ->
serviceMap[eventTypeAttr]?.also { service ->
service.handleMessage(integrationEvent, messageAttributes)
} ?: run {
logAndThrowArgumentError("MessageService not found for Event type=$eventType, eventId=$eventId")
logAndThrowArgumentError("MessageService not found for Event type=$eventType, eventId=$eventId, messageId=$messageId")
}
} ?: run {
logAndThrowArgumentError("Missing event type eventId=$eventId")
logAndThrowArgumentError("Missing event type eventId=$eventId, messageId=$messageId")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain

import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes
import java.time.Instant

interface IntegrationMessageService {
fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes)
}

data class IntegrationEvent(
val eventId: String,
val eventType: String,
val timestamp: Instant,
val content: String,
val eventId: String? = null,
val eventType: String? = null,
val message: String? = null,
)
Original file line number Diff line number Diff line change
@@ -1,39 +1,57 @@
package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure

import com.fasterxml.jackson.databind.ObjectMapper
import io.awspring.cloud.sqs.annotation.SqsListener
import org.slf4j.LoggerFactory
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.MessageMapping
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService

const val ATTRIBUTE_EVENT_TYPE = "eventType"
const val ATTRIBUTE_EVENT_ID = "eventId"
const val ATTRIBUTE_ID = "id"

class IntegrationMessageListener(
private val integrationMessageService: IntegrationMessageService,
private val objectMapper: ObjectMapper,
) {
companion object {
private val log = LoggerFactory.getLogger(this::class.java)
}

@MessageMapping
@SqsListener("integrationqueue", factory = "hmppsQueueContainerFactoryProxy")
fun processMessage(message: Message) {
val event = objectMapper.readValue(message.message, IntegrationEvent::class.java)
integrationMessageService.handleMessage(event, message.messageAttributes)
fun processMessage(
message: String,
@Header(ATTRIBUTE_EVENT_TYPE) eventType: String?,
@Header(ATTRIBUTE_EVENT_ID) eventId: String?,
@Header(ATTRIBUTE_ID) messageId: String?,
) {
log.info("processMessage()|Processing message:messageId=$messageId, eventType=$eventType, eventId=$eventId")
val messageAttributes = MessageAttributes().also { attributes ->
mapOf(
ATTRIBUTE_EVENT_TYPE to eventType,
ATTRIBUTE_EVENT_ID to eventId,
ATTRIBUTE_ID to messageId,
).filterValues { it != null }.let { attributes.putAll(it) }
}
val event = IntegrationEvent(
eventType = eventType,
message = message,
)
integrationMessageService.handleMessage(event, messageAttributes)
}
}

data class MessageAttribute(val value: String, val type: String) {
constructor(value: String) : this(value, "String")
}
typealias EventType = MessageAttribute
class MessageAttributes() : HashMap<String, Any?>() {
constructor(eventType: String) : this() {
put(ATTRIBUTE_EVENT_TYPE, eventType)
}

class MessageAttributes() : HashMap<String, MessageAttribute>() {
constructor(attribute: EventType) : this() {
put(ATTRIBUTE_EVENT_TYPE, attribute)
constructor(source: Map<String, Any?>) : this() {
putAll(source)
}

val eventType: String? get() = this[ATTRIBUTE_EVENT_TYPE]?.value
val eventType: String? get() = get(ATTRIBUTE_EVENT_TYPE) as? String?
val eventId: String? get() = get(ATTRIBUTE_EVENT_ID) as? String?
val messageId: String? get() = get(ATTRIBUTE_ID) as? String?
}

data class Message(
val message: String,
val messageId: String,
val messageAttributes: MessageAttributes,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import org.mockito.Mock
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.employers.domain.EmployerEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.application.UnitTestBase
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.EventType
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes

abstract class EmployerMessageServiceTestCase : UnitTestBase() {
Expand All @@ -14,14 +13,11 @@ abstract class EmployerMessageServiceTestCase : UnitTestBase() {
@Mock
protected lateinit var employerRegistrar: EmployerRegistrar

protected fun EmployerEvent.messageAttributes() = MessageAttributes(this.eventTypeAsAttribute())

protected fun EmployerEvent.eventTypeAsAttribute() = EventType(this.eventType.type)
protected fun EmployerEvent.messageAttributes() = MessageAttributes(this.eventType.type)

protected fun EmployerEvent.toIntegrationEvent() = IntegrationEvent(
eventId = this.eventId,
eventType = this.eventType.type,
timestamp = this.timestamp,
content = objectMapper.writeValueAsString(this),
message = objectMapper.writeValueAsString(this),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.mockito.kotlin.whenever
import org.springframework.beans.factory.annotation.Qualifier
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.EventType
import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand All @@ -32,13 +31,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() {

@Test
fun `throw exception, when missing event type`() {
val integrationEvent = dummyIntegrationEvent()
val integrationEvent = dummyIntegrationEvent().copy(eventType = null)
val messageAttributes = MessageAttributes()
val exception = assertFailsWith<IllegalArgumentException> {
serviceFacade.handleMessage(integrationEvent, messageAttributes)
}

val expectedError = "Missing event type eventId=${integrationEvent.eventId}"
val expectedError = "Missing event type eventId=${integrationEvent.eventId}, messageId=null"
assertEquals(expectedError, exception.message)
}

Expand All @@ -54,13 +53,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() {
@Test
fun `throw exception, when message received but no service to handle`() {
val integrationEvent = dummyIntegrationEvent()
val messageAttributes = MessageAttributes(dummyEventType())
val messageAttributes = MessageAttributes(integrationEvent.eventType!!)
val exception = assertFailsWith<IllegalArgumentException> {
serviceFacade.handleMessage(integrationEvent, messageAttributes)
}

val expectedError =
"MessageService not found for Event type=${messageAttributes.eventType}, eventId=${integrationEvent.eventId}"
"MessageService not found for Event type=${integrationEvent.eventType}, eventId=${integrationEvent.eventId}, messageId=null"
assertEquals(expectedError, exception.message)
}
}
Expand Down Expand Up @@ -93,14 +92,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() {
}
}

private fun integrationEvent(eventType: String, content: String = "") = IntegrationEvent(
private fun integrationEvent(eventType: String, content: String? = null) = IntegrationEvent(
eventId = randomUUID(),
eventType = eventType,
timestamp = defaultCurrentTime,
content = content,
message = content,
)

private fun dummyEventType() = EventType("mjma-jobs-board-dummy-created")
private fun dummyEventType() = "mjma-jobs-board-dummy-created"

private fun dummyIntegrationEvent() = integrationEvent("DummyCreated")
private fun dummyIntegrationEvent() = integrationEvent(dummyEventType())
}

0 comments on commit 1ab1fee

Please sign in to comment.