diff --git a/pom.xml b/pom.xml index 3bcca0ca8..7e66372f2 100644 --- a/pom.xml +++ b/pom.xml @@ -199,6 +199,10 @@ org.springframework.boot spring-boot-starter-actuator + + org.springframework.boot + spring-boot-starter-amqp + diff --git a/src/main/java/org/gridsuite/modification/server/config/RabbitConsumerConfiguration.java b/src/main/java/org/gridsuite/modification/server/config/RabbitConsumerConfiguration.java new file mode 100644 index 000000000..f063434b0 --- /dev/null +++ b/src/main/java/org/gridsuite/modification/server/config/RabbitConsumerConfiguration.java @@ -0,0 +1,37 @@ +package org.gridsuite.modification.server.config; + +import org.springframework.amqp.rabbit.listener.MessageListenerContainer; +import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; +import org.springframework.cloud.stream.config.BindingServiceProperties; +import org.springframework.cloud.stream.config.ListenerContainerCustomizer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +public class RabbitConsumerConfiguration { + /* + * RabbitMQ consumer priority: + * https://www.rabbitmq.com/docs/consumer-priority + * + * Each container creates exactly one AMQP consumer with prefetch=1 and its own priority. + * When dispatching messages, RabbitMQ always selects the highest-priority consumer + * that is available. + */ + @Bean + public ListenerContainerCustomizer customizer(BindingServiceProperties bindingServiceProperties) { + /* + * Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java + * We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities + */ + AtomicInteger counter = new AtomicInteger(0); + return (container, destination, group) -> { + if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "buildGroup")) { + smlc.setConsumerArguments(Map.of("x-priority", counter.getAndIncrement())); + } + }; + } +} diff --git a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java index 9a8a7397f..0cc258a3d 100644 --- a/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java +++ b/src/main/java/org/gridsuite/modification/server/service/BuildWorkerService.java @@ -81,6 +81,15 @@ private CompletableFuture execBuildVariant(BuildExecC } @Bean + public Consumer> consumeBuild1() { + return consumeBuild(); + } + + @Bean + public Consumer> consumeBuild2() { + return consumeBuild(); + } + public Consumer> consumeBuild() { return message -> networkModificationObserver.observeFullBuild(() -> { BuildExecContext execContext; diff --git a/src/main/resources/config/application.yaml b/src/main/resources/config/application.yaml index 5bd6027e6..f68627ccf 100644 --- a/src/main/resources/config/application.yaml +++ b/src/main/resources/config/application.yaml @@ -18,15 +18,16 @@ spring: cloud: function: - definition: consumeBuild;consumeCancelBuild + definition: consumeBuild1;consumeBuild2;consumeCancelBuild stream: bindings: - consumeBuild-in-0: + consumeBuild1-in-0: &consumeBuildConfig destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run group: buildGroup consumer: - concurrency: 2 max-attempts: 1 + consumeBuild2-in-0: + <<: *consumeBuildConfig publishBuild-out-0: destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run publishResultBuild-out-0: @@ -40,7 +41,7 @@ spring: output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0 rabbit: bindings: - consumeBuild-in-0: + consumeBuild1-in-0: &consumeBuildRabbitConfig consumer: auto-bind-dlq: true dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx @@ -49,6 +50,8 @@ spring: quorum: enabled: true delivery-limit: 2 + consumeBuild2-in-0: + <<: *consumeBuildRabbitConfig powsybl-ws: database: diff --git a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java index 131bc198f..9816f2b60 100644 --- a/src/test/java/org/gridsuite/modification/server/service/BuildTest.java +++ b/src/test/java/org/gridsuite/modification/server/service/BuildTest.java @@ -110,7 +110,7 @@ class BuildTest { private CountDownLatch waitStartBuild; private CountDownLatch blockBuild; - @Value("${spring.cloud.stream.bindings.consumeBuild-in-0.destination}") + @Value("${spring.cloud.stream.bindings.consumeBuild1-in-0.destination}") private String consumeBuildDestination; @Value("${spring.cloud.stream.bindings.consumeCancelBuild-in-0.destination}") diff --git a/src/test/resources/application-default.yml b/src/test/resources/application-default.yml index 3f2a04d86..64281ab8c 100644 --- a/src/test/resources/application-default.yml +++ b/src/test/resources/application-default.yml @@ -10,6 +10,10 @@ spring: hibernate: #to turn off schema validation that fails (because of clob types) and blocks tests even if the schema is compatible ddl-auto: none + cloud: + function: + # disable all #2 consumers during test - all messages are consumed multiple times otherwise + definition: consumeBuild1;consumeCancelBuild logging: level: org.springframework.orm.jpa: INFO