Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- elasticsearch -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.gridsuite.modification.server.config;

import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

@Configuration
public class RabbitConsumerConfiguration {
/*
* RabbitMQ consumer priority:
* https://www.rabbitmq.com/docs/consumer-priority
*
* Each container creates exactly one AMQP consumer with prefetch=1 and its own priority.
* When dispatching messages, RabbitMQ always selects the highest-priority consumer
* that is available.
*/
@Bean
public ListenerContainerCustomizer<MessageListenerContainer> customizer(BindingServiceProperties bindingServiceProperties) {
/*
* Using AtomicInteger as in org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
* We expect cloud stream to call our customizer exactly once in order for each container so it will produce a sequence of increasing priorities
*/
AtomicInteger counter = new AtomicInteger(0);
return (container, destination, group) -> {
if (container instanceof SimpleMessageListenerContainer smlc && Objects.equals(group, "buildGroup")) {
smlc.setConsumerArguments(Map.of("x-priority", counter.getAndIncrement()));
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ private CompletableFuture<NetworkModificationResult> execBuildVariant(BuildExecC
}

@Bean
public Consumer<Message<String>> consumeBuild1() {
return consumeBuild();
}

@Bean
public Consumer<Message<String>> consumeBuild2() {
return consumeBuild();
}

public Consumer<Message<String>> consumeBuild() {
return message -> networkModificationObserver.observeFullBuild(() -> {
BuildExecContext execContext;
Expand Down
11 changes: 7 additions & 4 deletions src/main/resources/config/application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ spring:

cloud:
function:
definition: consumeBuild;consumeCancelBuild
definition: consumeBuild1;consumeBuild2;consumeCancelBuild
stream:
bindings:
consumeBuild-in-0:
consumeBuild1-in-0: &consumeBuildConfig
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run
group: buildGroup
consumer:
concurrency: 2
max-attempts: 1
consumeBuild2-in-0:
<<: *consumeBuildConfig
publishBuild-out-0:
destination: ${powsybl-ws.rabbitmq.destination.prefix:}build.run
publishResultBuild-out-0:
Expand All @@ -40,7 +41,7 @@ spring:
output-bindings: publishBuild-out-0;publishResultBuild-out-0;publishCancelBuild-out-0;publishStoppedBuild-out-0
rabbit:
bindings:
consumeBuild-in-0:
consumeBuild1-in-0: &consumeBuildRabbitConfig
consumer:
auto-bind-dlq: true
dead-letter-exchange: ${powsybl-ws.rabbitmq.destination.prefix:}build.run.dlx
Expand All @@ -49,6 +50,8 @@ spring:
quorum:
enabled: true
delivery-limit: 2
consumeBuild2-in-0:
<<: *consumeBuildRabbitConfig

powsybl-ws:
database:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class BuildTest {
private CountDownLatch waitStartBuild;
private CountDownLatch blockBuild;

@Value("${spring.cloud.stream.bindings.consumeBuild-in-0.destination}")
@Value("${spring.cloud.stream.bindings.consumeBuild1-in-0.destination}")
private String consumeBuildDestination;

@Value("${spring.cloud.stream.bindings.consumeCancelBuild-in-0.destination}")
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/application-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ spring:
hibernate:
#to turn off schema validation that fails (because of clob types) and blocks tests even if the schema is compatible
ddl-auto: none
cloud:
function:
# disable all #2 consumers during test - all messages are consumed multiple times otherwise
definition: consumeBuild1;consumeCancelBuild
logging:
level:
org.springframework.orm.jpa: INFO
Expand Down
Loading