diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/config/IntegrationConfiguration.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/config/IntegrationConfiguration.kt index bb5f5a9..61bb417 100644 --- a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/config/IntegrationConfiguration.kt +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/config/IntegrationConfiguration.kt @@ -32,6 +32,5 @@ class IntegrationConfiguration { @Bean fun integrationMessageListener( @Qualifier("integrationMessageService") integrationMessageService: IntegrationMessageService, - objectMapper: ObjectMapper, - ) = IntegrationMessageListener(integrationMessageService, objectMapper) + ) = IntegrationMessageListener(integrationMessageService) } diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServices.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServices.kt index 85112c1..dff4c9f 100644 --- a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServices.kt +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServices.kt @@ -26,22 +26,22 @@ abstract class EmployerMessageService( } override fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) { - log.info("handle message eventId=${integrationEvent.eventId} eventType=${messageAttributes.eventType}") + log.info("handle message eventId=${integrationEvent.eventId}, eventType=${messageAttributes.eventType}, messageId=${messageAttributes.messageId}") messageAttributes.eventType?.also { - log.trace("handle message. integrationEvent={}", integrationEvent) + log.trace("handle message. integrationEvent={}, messageAttributes={}", integrationEvent, messageAttributes) if (!eventTypeTypes.contains(it)) { - throw IllegalArgumentException("Unexpected event type=$it , with eventId=${integrationEvent.eventId}") + throw IllegalArgumentException("Unexpected event type=$it , with eventId=${integrationEvent.eventId}, messageId=${messageAttributes.messageId}") } handleEvent(integrationEvent.toEmployerEvent()) } ?: run { - throw IllegalArgumentException("Missing event type eventId=${integrationEvent.eventId}") + throw IllegalArgumentException("Missing event type eventId=${integrationEvent.eventId}, messageId=${messageAttributes.messageId}") } } protected abstract fun handleEvent(employerEvent: EmployerEvent) private fun IntegrationEvent.toEmployerEvent(): EmployerEvent = - objectMapper.readValue(content, EmployerEvent::class.java) + objectMapper.readValue(message, EmployerEvent::class.java) } class EmployerCreationMessageService( diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacade.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacade.kt index ddbe491..00ff3cd 100644 --- a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacade.kt +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacade.kt @@ -21,18 +21,19 @@ class IntegrationMessageServiceFacade( override fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) { val eventId = integrationEvent.eventId - val eventType = messageAttributes.eventType - log.info("received event: id=$eventId, type=$eventType") - log.trace("received event: {}, {}", integrationEvent, messageAttributes) + val eventType = integrationEvent.eventType + val messageId = messageAttributes?.messageId + log.info("received event: messageId=$messageId, eventId=$eventId, eventType=$eventType") + log.trace("received event: {}", integrationEvent) - messageAttributes.eventType?.also { eventTypeAttr -> + integrationEvent.eventType?.also { eventTypeAttr -> serviceMap[eventTypeAttr]?.also { service -> service.handleMessage(integrationEvent, messageAttributes) } ?: run { - logAndThrowArgumentError("MessageService not found for Event type=$eventType, eventId=$eventId") + logAndThrowArgumentError("MessageService not found for Event type=$eventType, eventId=$eventId, messageId=$messageId") } } ?: run { - logAndThrowArgumentError("Missing event type eventId=$eventId") + logAndThrowArgumentError("Missing event type eventId=$eventId, messageId=$messageId") } } diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/domain/MessageServices.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/domain/MessageServices.kt index dcbae7d..f357403 100644 --- a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/domain/MessageServices.kt +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/domain/MessageServices.kt @@ -1,15 +1,13 @@ package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes -import java.time.Instant interface IntegrationMessageService { fun handleMessage(integrationEvent: IntegrationEvent, messageAttributes: MessageAttributes) } data class IntegrationEvent( - val eventId: String, - val eventType: String, - val timestamp: Instant, - val content: String, + val eventId: String? = null, + val eventType: String? = null, + val message: String? = null, ) diff --git a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/infrastructure/MessageListeners.kt b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/infrastructure/MessageListeners.kt index 6af359b..41bdf80 100644 --- a/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/infrastructure/MessageListeners.kt +++ b/src/main/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/infrastructure/MessageListeners.kt @@ -1,39 +1,57 @@ package uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure -import com.fasterxml.jackson.databind.ObjectMapper import io.awspring.cloud.sqs.annotation.SqsListener +import org.slf4j.LoggerFactory +import org.springframework.messaging.handler.annotation.Header +import org.springframework.messaging.handler.annotation.MessageMapping import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService const val ATTRIBUTE_EVENT_TYPE = "eventType" +const val ATTRIBUTE_EVENT_ID = "eventId" +const val ATTRIBUTE_ID = "id" class IntegrationMessageListener( private val integrationMessageService: IntegrationMessageService, - private val objectMapper: ObjectMapper, ) { + companion object { + private val log = LoggerFactory.getLogger(this::class.java) + } + @MessageMapping @SqsListener("integrationqueue", factory = "hmppsQueueContainerFactoryProxy") - fun processMessage(message: Message) { - val event = objectMapper.readValue(message.message, IntegrationEvent::class.java) - integrationMessageService.handleMessage(event, message.messageAttributes) + fun processMessage( + message: String, + @Header(ATTRIBUTE_EVENT_TYPE) eventType: String?, + @Header(ATTRIBUTE_EVENT_ID) eventId: String?, + @Header(ATTRIBUTE_ID) messageId: String?, + ) { + log.info("processMessage()|Processing message:messageId=$messageId, eventType=$eventType, eventId=$eventId") + val messageAttributes = MessageAttributes().also { attributes -> + mapOf( + ATTRIBUTE_EVENT_TYPE to eventType, + ATTRIBUTE_EVENT_ID to eventId, + ATTRIBUTE_ID to messageId, + ).filterValues { it != null }.let { attributes.putAll(it) } + } + val event = IntegrationEvent( + eventType = eventType, + message = message, + ) + integrationMessageService.handleMessage(event, messageAttributes) } } -data class MessageAttribute(val value: String, val type: String) { - constructor(value: String) : this(value, "String") -} -typealias EventType = MessageAttribute +class MessageAttributes() : HashMap() { + constructor(eventType: String) : this() { + put(ATTRIBUTE_EVENT_TYPE, eventType) + } -class MessageAttributes() : HashMap() { - constructor(attribute: EventType) : this() { - put(ATTRIBUTE_EVENT_TYPE, attribute) + constructor(source: Map) : this() { + putAll(source) } - val eventType: String? get() = this[ATTRIBUTE_EVENT_TYPE]?.value + val eventType: String? get() = get(ATTRIBUTE_EVENT_TYPE) as? String? + val eventId: String? get() = get(ATTRIBUTE_EVENT_ID) as? String? + val messageId: String? get() = get(ATTRIBUTE_ID) as? String? } - -data class Message( - val message: String, - val messageId: String, - val messageAttributes: MessageAttributes, -) diff --git a/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServiceTestCase.kt b/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServiceTestCase.kt index ab4a4e0..a1621c0 100644 --- a/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServiceTestCase.kt +++ b/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/employers/application/EmployerMessageServiceTestCase.kt @@ -4,7 +4,6 @@ import org.mockito.Mock import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.employers.domain.EmployerEvent import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.application.UnitTestBase import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent -import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.EventType import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes abstract class EmployerMessageServiceTestCase : UnitTestBase() { @@ -14,14 +13,11 @@ abstract class EmployerMessageServiceTestCase : UnitTestBase() { @Mock protected lateinit var employerRegistrar: EmployerRegistrar - protected fun EmployerEvent.messageAttributes() = MessageAttributes(this.eventTypeAsAttribute()) - - protected fun EmployerEvent.eventTypeAsAttribute() = EventType(this.eventType.type) + protected fun EmployerEvent.messageAttributes() = MessageAttributes(this.eventType.type) protected fun EmployerEvent.toIntegrationEvent() = IntegrationEvent( eventId = this.eventId, eventType = this.eventType.type, - timestamp = this.timestamp, - content = objectMapper.writeValueAsString(this), + message = objectMapper.writeValueAsString(this), ) } diff --git a/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacadeShould.kt b/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacadeShould.kt index a4d6450..010be17 100644 --- a/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacadeShould.kt +++ b/src/test/kotlin/uk/gov/justice/digital/hmpps/jobsboardintegrationapi/shared/application/IntegrationMessageServiceFacadeShould.kt @@ -16,7 +16,6 @@ import org.mockito.kotlin.whenever import org.springframework.beans.factory.annotation.Qualifier import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationEvent import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.domain.IntegrationMessageService -import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.EventType import uk.gov.justice.digital.hmpps.jobsboardintegrationapi.shared.infrastructure.MessageAttributes import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -32,13 +31,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() { @Test fun `throw exception, when missing event type`() { - val integrationEvent = dummyIntegrationEvent() + val integrationEvent = dummyIntegrationEvent().copy(eventType = null) val messageAttributes = MessageAttributes() val exception = assertFailsWith { serviceFacade.handleMessage(integrationEvent, messageAttributes) } - val expectedError = "Missing event type eventId=${integrationEvent.eventId}" + val expectedError = "Missing event type eventId=${integrationEvent.eventId}, messageId=null" assertEquals(expectedError, exception.message) } @@ -54,13 +53,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() { @Test fun `throw exception, when message received but no service to handle`() { val integrationEvent = dummyIntegrationEvent() - val messageAttributes = MessageAttributes(dummyEventType()) + val messageAttributes = MessageAttributes(integrationEvent.eventType!!) val exception = assertFailsWith { serviceFacade.handleMessage(integrationEvent, messageAttributes) } val expectedError = - "MessageService not found for Event type=${messageAttributes.eventType}, eventId=${integrationEvent.eventId}" + "MessageService not found for Event type=${integrationEvent.eventType}, eventId=${integrationEvent.eventId}, messageId=null" assertEquals(expectedError, exception.message) } } @@ -93,14 +92,13 @@ class IntegrationMessageServiceFacadeShould : UnitTestBase() { } } - private fun integrationEvent(eventType: String, content: String = "") = IntegrationEvent( + private fun integrationEvent(eventType: String, content: String? = null) = IntegrationEvent( eventId = randomUUID(), eventType = eventType, - timestamp = defaultCurrentTime, - content = content, + message = content, ) - private fun dummyEventType() = EventType("mjma-jobs-board-dummy-created") + private fun dummyEventType() = "mjma-jobs-board-dummy-created" - private fun dummyIntegrationEvent() = integrationEvent("DummyCreated") + private fun dummyIntegrationEvent() = integrationEvent(dummyEventType()) }