diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/EventAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/EventAggrStrategy.java new file mode 100644 index 00000000..a531e4e1 --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/EventAggrStrategy.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2004-2022, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package org.hisp.dhis.integration.rapidpro.aggregationStrategy; + +import org.apache.camel.Exchange; +import org.springframework.stereotype.Component; + +import java.util.Map; + +@Component +public class EventAggrStrategy extends AbstractAggregationStrategy +{ + @Override + public Exchange doAggregate( Exchange original, Exchange resource ) + throws + Exception + { + Map body = original.getMessage().getBody( Map.class ); + Map fetchedEvent = objectMapper.readValue( resource.getMessage().getBody( String.class ), + Map.class ); + body.putAll( fetchedEvent ); + original.getIn().setBody( body ); + return original; + + } +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageDataElementCodesAggrStrategy.java b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageDataElementCodesAggrStrategy.java new file mode 100644 index 00000000..4e4d8855 --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/aggregationStrategy/ProgramStageDataElementCodesAggrStrategy.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2004-2022, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package org.hisp.dhis.integration.rapidpro.aggregationStrategy; + +import com.jayway.jsonpath.JsonPath; +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.Map; + +@Component +public class ProgramStageDataElementCodesAggrStrategy extends AbstractAggregationStrategy +{ + @Override + public Exchange doAggregate( Exchange oldExchange, Exchange newExchange ) + { + String programStageDataElements = newExchange.getMessage().getBody( String.class ); + Map body = oldExchange.getMessage().getBody( Map.class ); + List dataElementCodes = JsonPath.read( programStageDataElements, "$.programStageDataElements[*].dataElement.code" ); + body.put( "programStageDataElementCodes", dataElementCodes ); + oldExchange.getMessage().setBody( body ); + return oldExchange; + } +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventUpdateQueryParamSetter.java b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventUpdateQueryParamSetter.java new file mode 100644 index 00000000..b2c2d53a --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/processor/EventUpdateQueryParamSetter.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2004-2022, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package org.hisp.dhis.integration.rapidpro.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +@Component +public class EventUpdateQueryParamSetter implements Processor +{ + + @Override + public void process( Exchange exchange ) + { + Map queryParams = new HashMap<>(); + queryParams.put( "async", "false" ); + queryParams.put( "dataElementIdScheme", "CODE" ); + queryParams.put( "importStrategy", "UPDATE" ); + exchange.getMessage().setHeader( "CamelDhis2.queryParams", queryParams ); + } + +} diff --git a/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilder.java b/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilder.java new file mode 100644 index 00000000..cb1a5e17 --- /dev/null +++ b/src/main/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilder.java @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2004-2022, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package org.hisp.dhis.integration.rapidpro.route; + +import org.apache.camel.ErrorHandlerFactory; +import org.apache.camel.Exchange; +import org.apache.camel.LoggingLevel; +import org.hisp.dhis.integration.rapidpro.aggregationStrategy.EventAggrStrategy; +import org.hisp.dhis.integration.rapidpro.aggregationStrategy.ProgramStageDataElementCodesAggrStrategy; +import org.hisp.dhis.integration.rapidpro.expression.RootCauseExpr; +import org.hisp.dhis.integration.rapidpro.processor.EventUpdateQueryParamSetter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.function.Function; + +@Component +public class DeliverEventRouteBuilder extends AbstractRouteBuilder +{ + @Autowired + private RootCauseExpr rootCauseExpr; + + @Autowired + private EventAggrStrategy eventAggrStrategy; + + @Autowired + private ProgramStageDataElementCodesAggrStrategy programStageDataElementCodesAggrStrategy; + + @Autowired + private EventUpdateQueryParamSetter eventUpdateQueryParamSetter; + + @Override + protected void doConfigure() + { + ErrorHandlerFactory errorHandlerDefinition = deadLetterChannel( + "direct:failedEventDelivery" ).maximumRedeliveries( 3 ).useExponentialBackOff().useCollisionAvoidance() + .allowRedeliveryWhileStopping( false ); + + from( "timer://retryEvents?fixedRate=true&period=5000" ) + .routeId( "Retry Events" ) + .setBody( simple( "${properties:event.retry.dlc.select.{{spring.sql.init.platform}}}" ) ) + .to( "jdbc:dataSource" ) + .split().body() + .setHeader( "id", simple( "${body['id']}" ) ) + .log( LoggingLevel.INFO, LOGGER, "Retrying row with ID ${header.id}" ) + .setHeader( "eventId", simple( "${body['event_id']}" ) ) + .setBody( simple( "${body['payload']}" ) ) + .to( "jms:queue:dhis2ProgramStageEvents?exchangePattern=InOnly" ) + .setBody( simple( "${properties:event.processed.dlc.update.{{spring.sql.init.platform}}}" ) ) + .to( "jdbc:dataSource?useHeadersAsParameters=true" ) + .end(); + + from( "jms:queue:dhis2ProgramStageEvents" ) + .routeId( "Deliver Event" ) + .to( "direct:transformEvent" ) + .to( "direct:transmitEvent" ); + + from( "direct:transformEvent" ) + .routeId( "Transform Event" ) + .errorHandler( errorHandlerDefinition ) + .streamCaching() + .setHeader( "originalPayload", simple( "${body}" ) ) + .unmarshal().json() + .enrich() + .simple( "dhis2://get/resource?path=tracker/events/${header.eventId}&fields=event,program,programStage,enrollment,orgUnit&client=#dhis2Client" ) + .aggregationStrategy( eventAggrStrategy ) + .enrich() + .simple("dhis2://get/resource?path=programStages/${body[programStage]}&fields=programStageDataElements[dataElement[code]]&client=#dhis2Client") + .aggregationStrategy( programStageDataElementCodesAggrStrategy ) + .transform( datasonnet( "resource:classpath:dhis2Event.ds", Map.class, "application/x-java-object", + "application/x-java-object" ) ) + .process( eventUpdateQueryParamSetter ) + .marshal().json().transform().body( String.class ); + + from( "direct:transmitEvent" ) + .routeId( "Transmit Event" ) + .errorHandler( errorHandlerDefinition ) + .log( LoggingLevel.INFO, LOGGER, "Updating program stage event => ${body}" ) + .setHeader( "dhisRequest", simple( "${body}" ) ) + .toD("dhis2://post/resource?path=tracker&inBody=resource&client=#dhis2Client") + .setBody( (Function) exchange -> exchange.getMessage().getBody( String.class ) ) + .setHeader( "dhisResponse", simple( "${body}" ) ) + .unmarshal().json() + .choice() + .when( simple( "${body['status']} == 'SUCCESS' || ${body['status']} == 'OK'" ) ) + .log( LoggingLevel.INFO, LOGGER, "Successfully updated event => ${header.eventId} in dhis2 with response => ${header.dhisResponse}" ) + .setHeader( "rapidProPayload", header( "originalPayload" ) ) + .setBody( simple( "${properties:event.success.log.insert.{{spring.sql.init.platform}}}" ) ) + .to( "jdbc:dataSource?useHeadersAsParameters=true" ) + .otherwise() + .log(LoggingLevel.ERROR, LOGGER, "Import error from DHIS2 while saving program stage event => ${body}") + .to( "direct:failedEventDelivery" ) + .end(); + + from( "direct:failedEventDelivery" ) + .routeId( "Save Failed Event" ) + .setHeader( "errorMessage", rootCauseExpr ) + .setHeader( "payload", header( "originalPayload" ) ) + .setHeader( "eventId" ).ognl( "request.headers.eventId" ) + .setBody( simple( "${properties:event.error.dlc.insert.{{spring.sql.init.platform}}}" ) ) + .to( "jdbc:dataSource?useHeadersAsParameters=true" ); + } +} diff --git a/src/main/resources/dhis2Event.ds b/src/main/resources/dhis2Event.ds new file mode 100644 index 00000000..c00ef62a --- /dev/null +++ b/src/main/resources/dhis2Event.ds @@ -0,0 +1,30 @@ +local normaliseDeCodeFn(dataElementCode) = ds.replace(ds.lower(dataElementCode), ' ', '_'); + +local normaliseDeCodesFn(dataElementCodes) = ds.map(dataElementCodes, function(v, i) normaliseDeCodeFn(v)); + +local getResultName(result) = if std.objectHas(result.value, 'name') then result.value.name else result.key; + +local dataValueFn(result) = [ + { + dataElement: ds.filter(body.programStageDataElementCodes, function(v, i) normaliseDeCodeFn(v) == ds.lower(native.truncateCatOptComboSuffix(getResultName(result))))[0], + value: result.value.value, + comment: 'RapidPro contact details: %s' % std.escapeStringJson(std.manifestJsonEx(payload.contact, ' ')), + [if native.isCatOptCombo(getResultName(result)) then 'categoryOptionCombo']: native.getCatOptComboCode(getResultName(result)) + } +]; + + +{ + "events": [ + { + "event": body.event, + "program": body.program, + "programStage": body.programStage, + "enrollment": body.enrollment, + "orgUnit": body.orgUnit, + "status": "COMPLETED", + "occurredAt": ds.datetime.format(ds.datetime.now(), 'yyyy-MM-dd'), + "dataValues": std.flatMap(dataValueFn, ds.filter(ds.entriesOf(payload.results), function(v, i) if ds.contains(normaliseDeCodesFn(body.programStageDataElementCodes), ds.lower(native.truncateCatOptComboSuffix(getResultName(v)))) then true else native.logWarning("Ignoring data value because of unknown DHIS2 program stage data element code '" + native.truncateCatOptComboSuffix(getResultName(v)) + "'. Hint: ensure that the RapidPro result name matches the corresponding DHIS2 program stage data element code"))) + } + ] +} diff --git a/src/main/resources/sql.properties b/src/main/resources/sql.properties index 83996718..f21f7615 100644 --- a/src/main/resources/sql.properties +++ b/src/main/resources/sql.properties @@ -15,7 +15,7 @@ report.error.dlc.insert.postgresql=INSERT INTO REPORT_DEAD_LETTER_CHANNEL (paylo report.processed.dlc.update.postgresql=UPDATE REPORT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id event.success.log.insert.postgresql=INSERT INTO EVENT_SUCCESS_LOG (dhis_request, dhis_response, rapidpro_payload, event_id) VALUES (:?dhisRequest, :?dhisResponse, :?rapidProPayload, :?eventId) event.retry.dlc.select.postgresql=SELECT * FROM EVENT_DEAD_LETTER_CHANNEL WHERE status = 'RETRY' LIMIT 100 -event.error.dlc.insert.postgresql=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?event_id, 'ERROR', :?errorMessage) +event.error.dlc.insert.postgresql=INSERT INTO EVENT_DEAD_LETTER_CHANNEL (payload, event_id, status, error_message) VALUES (:?payload, :?eventId, 'ERROR', :?errorMessage) event.processed.dlc.update.postgresql=UPDATE EVENT_DEAD_LETTER_CHANNEL SET status = 'PROCESSED', last_processed_at = CURRENT_TIMESTAMP WHERE id = :?id last.run.select.postgresql=SELECT * FROM POLLER WHERE flow_uuid = :?flowUuid last.run.upsert.postgresql=INSERT INTO POLLER (flow_uuid, last_run_at) VALUES (:?flowUuid, :?newLastRunAt) ON CONFLICT (flow_uuid) DO UPDATE SET last_run_at = :?newLastRunAt diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java index d17a0bd0..cb7a3315 100644 --- a/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/AbstractFunctionalTestCase.java @@ -120,6 +120,8 @@ public void beforeEach() jdbcTemplate.execute( "TRUNCATE TABLE REPORT_DEAD_LETTER_CHANNEL" ); jdbcTemplate.execute( "TRUNCATE TABLE REPORT_SUCCESS_LOG" ); + jdbcTemplate.execute( "TRUNCATE TABLE EVENT_DEAD_LETTER_CHANNEL" ); + jdbcTemplate.execute( "TRUNCATE TABLE EVENT_SUCCESS_LOG" ); jdbcTemplate.execute( "TRUNCATE TABLE MESSAGES" ); for ( Map contact : fetchRapidProContacts() ) diff --git a/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilderFunctionalTestCase.java b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilderFunctionalTestCase.java new file mode 100644 index 00000000..5d1a0e99 --- /dev/null +++ b/src/test/java/org/hisp/dhis/integration/rapidpro/route/DeliverEventRouteBuilderFunctionalTestCase.java @@ -0,0 +1,250 @@ +/* + * Copyright (c) 2004-2022, University of Oslo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright notice, this + * list of conditions and the following disclaimer. + * + * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * Neither the name of the HISP project nor the names of its contributors may + * be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED + * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE + * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR + * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON + * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package org.hisp.dhis.integration.rapidpro.route; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.builder.AdviceWith; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.spi.CamelLogger; +import org.apache.camel.spring.boot.SpringBootCamelContext; +import org.hisp.dhis.api.model.v40_0.DataValue; +import org.hisp.dhis.api.model.v40_0.DataValueSet; +import org.hisp.dhis.api.model.v40_0.Dxf2EventsEventDataValue; +import org.hisp.dhis.api.model.v40_0.Event; +import org.hisp.dhis.api.model.v40_0.WebMessage; +import org.hisp.dhis.integration.rapidpro.AbstractFunctionalTestCase; +import org.hisp.dhis.integration.rapidpro.Environment; +import org.hisp.dhis.integration.sdk.support.period.PeriodBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.util.StreamUtils; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.sql.Timestamp; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.hisp.dhis.integration.rapidpro.Environment.DHIS_IMAGE_NAME; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DeliverEventRouteBuilderFunctionalTestCase extends AbstractFunctionalTestCase +{ + @Autowired + private ObjectMapper objectMapper; + + @Test + public void testEventIsUpdated() + throws + Exception + { + System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); + AdviceWith.adviceWith( camelContext, "Transmit Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); + spyEndpoint.setExpectedCount( 1 ); + String eventId = createTrackedEntityAndFetchEventId( "12345678" ); + syncTrackedEntityContact( "whatsapp:12345678" ); + camelContext.start(); + + String webhookMessage = StreamUtils.copyToString( + Thread.currentThread().getContextClassLoader().getResourceAsStream( "eventWebhook.json" ), + Charset.defaultCharset() ); + producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2ProgramStageEvents", + ExchangePattern.InOnly, String.format( webhookMessage, eventId ), + Map.of( "eventId", eventId ) ); + + spyEndpoint.await( 30, TimeUnit.SECONDS ); + + Event event = Environment.DHIS2_CLIENT.get( + "events/{eventId}", eventId ).withFields( "event", "status", "datavalues" ) + .transfer() + .returnAs( + Event.class ); + + Optional dataValue = event.getDataValues().get().stream() + .filter( v -> v.getDataElement().get().equals( "at60cTl6IdS" ) ).findFirst(); + assertTrue( dataValue.isPresent() ); + assertEquals( "12:00", dataValue.get().getValue().get() ); + + Map eventSuccessLog = jdbcTemplate.queryForList( "SELECT * FROM EVENT_SUCCESS_LOG" ).get( 0 ); + + String dhisRequest = (String) eventSuccessLog.get( "DHIS_REQUEST" ); + String dhisResponse = (String) eventSuccessLog.get( "DHIS_RESPONSE" ); + String rapidProPayload = (String) eventSuccessLog.get( "RAPIDPRO_PAYLOAD" ); + String successEventLogEventId = (String) eventSuccessLog.get( "EVENT_ID" ); + List> events = (List>) objectMapper.readValue( dhisRequest, Map.class ) + .get( "events" ); + assertEquals( eventId, events.get( 0 ).get( "event" ) ); + assertEquals( eventId, successEventLogEventId ); + assertEquals( "COMPLETED", events.get( 0 ).get( "status" ) ); + if ( DHIS_IMAGE_NAME.startsWith( "2.36" ) || DHIS_IMAGE_NAME.startsWith( "2.37" ) ) + { + assertEquals( "SUCCESS", objectMapper.readValue( dhisResponse, Map.class ).get( "status" ) ); + } + else + { + assertEquals( "OK", objectMapper.readValue( dhisResponse, Map.class ).get( "status" ) ); + } + assertEquals( "whatsapp:12345678", + ((Map) objectMapper.readValue( rapidProPayload, Map.class ).get( "contact" )).get( "urn" ) ); + } + + @Test + public void testRecordInFailedEventDeliveryIsCreatedGivenWebMessageErrorWhileUpdatingEvent() + throws + Exception + { + System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); + AdviceWith.adviceWith( camelContext, "Transmit Event", + r -> r.weaveByToUri( "dhis2://post/resource?path=tracker&inBody=resource&client=#dhis2Client" ) + .replace().to( "mock:dhis2" ) ); + MockEndpoint fakeDhis2Endpoint = camelContext.getEndpoint( "mock:dhis2", MockEndpoint.class ); + fakeDhis2Endpoint.whenAnyExchangeReceived( + exchange -> exchange.getMessage().setBody( objectMapper.writeValueAsString( + new WebMessage().withStatus( WebMessage.Status.ERROR ) ) ) ); + String eventId = createTrackedEntityAndFetchEventId( "12345678" ); + syncTrackedEntityContact( "whatsapp:12345678" ); + camelContext.start(); + + String webhookMessage = StreamUtils.copyToString( + Thread.currentThread().getContextClassLoader().getResourceAsStream( "eventWebhook.json" ), + Charset.defaultCharset() ); + producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2ProgramStageEvents", + ExchangePattern.InOut, String.format( webhookMessage, eventId ), + Map.of( "eventId", eventId ) ); + List> failedEventDelivery = jdbcTemplate.queryForList( + "SELECT * FROM EVENT_DEAD_LETTER_CHANNEL" ); + assertEquals( 1, failedEventDelivery.size() ); + assertEquals( "ERROR", + objectMapper.readValue( (String) failedEventDelivery.get( 0 ).get( "error_message" ), + WebMessage.class ) + .getStatus().value() ); + } + + @Test + public void testRecordInFailedEventDeliveryIsCreatedGivenInvalidEventId() + throws + Exception + { + System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); + AdviceWith.adviceWith( camelContext, "Transmit Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); + spyEndpoint.setExpectedCount( 1 ); + String eventId = createTrackedEntityAndFetchEventId( "12345678" ); + syncTrackedEntityContact( "whatsapp:12345678" ); + camelContext.start(); + + String invalidId = "trigger.params.eventId"; + String webhookMessage = StreamUtils.copyToString( + Thread.currentThread().getContextClassLoader().getResourceAsStream( "eventWebhook.json" ), + Charset.defaultCharset() ); + producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2ProgramStageEvents", + ExchangePattern.InOut, String.format( webhookMessage, invalidId ), + Map.of( "eventId", invalidId ) ); + spyEndpoint.await( 30, TimeUnit.SECONDS ); + List> failedEventDelivery = jdbcTemplate.queryForList( + "SELECT * FROM EVENT_DEAD_LETTER_CHANNEL" ); + assertEquals( 1, failedEventDelivery.size() ); + assertEquals( invalidId, failedEventDelivery.get( 0 ).get( "event_id" ) ); + } + + @Test + public void testRetryRecordInFailedEventDeliveryIsReProcessed() + throws + Exception + { + System.setProperty( "sync.dhis2.events.to.rapidpro.flows", "true" ); + AdviceWith.adviceWith( camelContext, "Transmit Event", r -> r.weaveAddLast().to( "mock:spy" ) ); + MockEndpoint spyEndpoint = camelContext.getEndpoint( "mock:spy", MockEndpoint.class ); + spyEndpoint.setExpectedCount( 1 ); + String eventId = createTrackedEntityAndFetchEventId( "12345678" ); + syncTrackedEntityContact( "whatsapp:12345678" ); + camelContext.start(); + + String webhookMessage = StreamUtils.copyToString( + Thread.currentThread().getContextClassLoader().getResourceAsStream( "eventWebhook.json" ), + Charset.defaultCharset() ); + + String invalidEventId = UUID.randomUUID().toString(); + producerTemplate.sendBodyAndHeaders( "jms:queue:dhis2ProgramStageEvents", ExchangePattern.InOut, + String.format( webhookMessage, invalidEventId ), Map.of( "eventId", invalidEventId ) ); + assertEquals( 0, spyEndpoint.getReceivedCounter() ); + + String payload = (String) jdbcTemplate.queryForList( "SELECT event_id FROM EVENT_DEAD_LETTER_CHANNEL" ).get( 0 ) + .get( "EVENT_ID" ); + jdbcTemplate.execute( + String.format( + "UPDATE EVENT_DEAD_LETTER_CHANNEL SET STATUS = 'RETRY', EVENT_ID = '%s' WHERE STATUS = 'ERROR'", + payload.replace( invalidEventId, eventId ) ) ); + + spyEndpoint.await( 30, TimeUnit.SECONDS ); + + assertEquals( 1, spyEndpoint.getReceivedCounter() ); + List> failedEventDelivery = jdbcTemplate.queryForList( + "SELECT * FROM EVENT_DEAD_LETTER_CHANNEL" ); + assertEquals( 1, failedEventDelivery.size() ); + assertEquals( "PROCESSED", failedEventDelivery.get( 0 ).get( "STATUS" ) ); + + Object lastProcessedAt = failedEventDelivery.get( 0 ).get( "LAST_PROCESSED_AT" ); + Instant lastProcessedAsInstant; + if ( lastProcessedAt instanceof OffsetDateTime ) + { + lastProcessedAsInstant = ((OffsetDateTime) lastProcessedAt).toInstant(); + } + else + { + lastProcessedAsInstant = ((Timestamp) lastProcessedAt).toInstant(); + } + + Object createdAt = failedEventDelivery.get( 0 ).get( "CREATED_AT" ); + Instant createdAtAsInstant; + if ( createdAt instanceof OffsetDateTime ) + { + createdAtAsInstant = ((OffsetDateTime) createdAt).toInstant(); + } + else + { + createdAtAsInstant = ((Timestamp) createdAt).toInstant(); + } + + assertTrue( lastProcessedAsInstant.isAfter( createdAtAsInstant ) ); + } + +} diff --git a/src/test/resources/eventWebhook.json b/src/test/resources/eventWebhook.json new file mode 100644 index 00000000..b969ca4a --- /dev/null +++ b/src/test/resources/eventWebhook.json @@ -0,0 +1,40 @@ +{ + "flow": { + "uuid": "837bad3d-bf01-4bb1-9f9f-ecb95aa3b8a2", + "name": "Program Stage Event Flow Under Test" + }, + "contact": { + "uuid": "ece108cc-4b20-4389-8778-a8a11bdd8105", + "urn": "whatsapp:12345678", + "name": null + }, + "results": { + "event_id": { + "value": "%s" + }, + "ids_afi_specimen_collection_blood": { + "value": "true" + }, + "ids_afi_specimen_collection_serum": { + "value": "true" + }, + "ids_afi_specimen_collector_family_name": { + "value": "John" + }, + "ids_afi_specimen_collector_given_name": { + "value": "Doe" + }, + "ids_afi_specimen_date_collection": { + "value": "2023-10-19" + }, + "ids_afi_specimen_date_transport": { + "value": "2023-10-19" + }, + "ids_afi_specimen_time_collection": { + "value": "12:00" + }, + "ids_afi_specimen_time_transport": { + "value": "12:00" + } + } +}