Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
* @author Antoine Bouhours <antoine.bouhours at rte-france.com>
*/
public enum MessageType {
EXECUTION_STATUS_UPDATE,
STEP_STATUS_UPDATE,
STEPS_STATUSES_UPDATE,
EXECUTION_UPDATE,
STEPS_UPSERT,
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,27 +53,21 @@ public Consumer<Message<String>> consumeMonitorUpdate() {
UUID executionId = UUID.fromString(executionIdStr);

switch (messageType) {
case EXECUTION_STATUS_UPDATE -> handleExecutionStatusUpdate(executionId, message);
case STEP_STATUS_UPDATE -> handleStepStatusUpdate(executionId, message);
case STEPS_STATUSES_UPDATE -> handleStepsStatusesUpdate(executionId, message);
case EXECUTION_UPDATE -> handleExecutionUpdate(executionId, message);
case STEPS_UPSERT -> handleStepsUpsert(executionId, message);
default -> LOGGER.warn("Unknown message type: {}", messageType);
}
};
}

private void handleExecutionStatusUpdate(UUID executionId, Message<String> message) {
private void handleExecutionUpdate(UUID executionId, Message<String> message) {
ProcessExecutionStatusUpdate payload = parsePayload(message.getPayload(), ProcessExecutionStatusUpdate.class);
monitorService.updateExecutionStatus(executionId, payload.getStatus(), payload.getExecutionEnvName(), payload.getStartedAt(), payload.getCompletedAt());
monitorService.updateExecution(executionId, payload.getStatus(), payload.getExecutionEnvName(), payload.getStartedAt(), payload.getCompletedAt());
}

private void handleStepStatusUpdate(UUID executionId, Message<String> message) {
ProcessExecutionStep processExecutionStep = parsePayload(message.getPayload(), ProcessExecutionStep.class);
monitorService.updateStepStatus(executionId, processExecutionStep);
}

private void handleStepsStatusesUpdate(UUID executionId, Message<String> message) {
private void handleStepsUpsert(UUID executionId, Message<String> message) {
List<ProcessExecutionStep> processExecutionSteps = parsePayload(message.getPayload(), new TypeReference<List<ProcessExecutionStep>>() { });
monitorService.updateStepsStatuses(executionId, processExecutionSteps);
monitorService.upsertSteps(executionId, processExecutionSteps);
}

private <T> T parsePayload(String payload, Class<T> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public UUID executeProcess(UUID caseUuid, String userId, ProcessConfig processCo
}

@Transactional
public void updateExecutionStatus(UUID executionId, ProcessStatus status, String executionEnvName, Instant startedAt, Instant completedAt) {
public void updateExecution(UUID executionId, ProcessStatus status, String executionEnvName, Instant startedAt, Instant completedAt) {
executionRepository.findById(executionId).ifPresent(execution -> {
execution.setStatus(status);
if (executionEnvName != null) {
Expand Down Expand Up @@ -116,16 +116,7 @@ private void updateStep(ProcessExecutionEntity execution, ProcessExecutionStepEn
}

@Transactional
public void updateStepStatus(UUID executionId, ProcessExecutionStep processExecutionStep) {
executionRepository.findById(executionId).ifPresent(execution -> {
ProcessExecutionStepEntity stepEntity = toStepEntity(processExecutionStep);
updateStep(execution, stepEntity);
executionRepository.save(execution);
});
}

@Transactional
public void updateStepsStatuses(UUID executionId, List<ProcessExecutionStep> processExecutionSteps) {
public void upsertSteps(UUID executionId, List<ProcessExecutionStep> processExecutionSteps) {
executionRepository.findById(executionId).ifPresent(execution -> {
processExecutionSteps.forEach(processExecutionStep -> {
ProcessExecutionStepEntity stepEntity = toStepEntity(processExecutionStep);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void securityAnalysisProcessIT() throws Exception {
.startedAt(Instant.now())
.completedAt(Instant.now())
.build();
sendMessage(executionId, step0, MessageType.STEP_STATUS_UPDATE);
sendMessage(executionId, List.of(step0), MessageType.STEPS_UPSERT);

// Simulate second step creation via message with both report and result
UUID stepId1 = UUID.randomUUID();
Expand All @@ -145,7 +145,7 @@ void securityAnalysisProcessIT() throws Exception {
.startedAt(Instant.now())
.completedAt(Instant.now())
.build();
sendMessage(executionId, step1, MessageType.STEP_STATUS_UPDATE);
sendMessage(executionId, List.of(step1), MessageType.STEPS_UPSERT);

// Verify both steps were added to database with correct data
execution = executionRepository.findById(executionId).orElse(null);
Expand All @@ -167,7 +167,7 @@ void securityAnalysisProcessIT() throws Exception {
.startedAt(startedAt)
.completedAt(completedAt)
.build();
sendMessage(executionId, finalStatus, MessageType.EXECUTION_STATUS_UPDATE);
sendMessage(executionId, finalStatus, MessageType.EXECUTION_UPDATE);

// Verify final state persisted
execution = executionRepository.findById(executionId).orElse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,29 @@ void consumeProcessExecutionStatusUpdateMessage() throws JsonProcessingException
.build();
String payload = objectMapper.writeValueAsString(statusUpdate);
Map<String, Object> headers = new HashMap<>();
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_STATUS_UPDATE.toString());
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_UPDATE.toString());
headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString());
Message<String> message = new GenericMessage<>(payload, headers);
Consumer<Message<String>> consumer = consumerService.consumeMonitorUpdate();

consumer.accept(message);

verify(monitorService).updateExecutionStatus(
verify(monitorService).updateExecution(
executionId,
ProcessStatus.RUNNING,
"env-1",
startedAt,
completedAt
);
verify(monitorService, never()).updateStepStatus(any(), any());
verify(monitorService, never()).upsertSteps(any(), any());
}

@Test
void consumeMonitorUpdateThrowsOnInvalidJson() {
UUID executionId = UUID.randomUUID();
String invalidPayload = "{invalid json}";
Map<String, Object> headers = new HashMap<>();
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_STATUS_UPDATE.toString());
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_UPDATE.toString());
headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString());
Message<String> message = new GenericMessage<>(invalidPayload, headers);
Consumer<Message<String>> consumer = consumerService.consumeMonitorUpdate();
Expand All @@ -95,16 +95,16 @@ void consumeMonitorUpdateThrowsOnInvalidJson() {
.isInstanceOf(UncheckedIOException.class)
.hasMessageContaining("Failed to parse payload as ProcessExecutionStatusUpdate");

verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any());
verify(monitorService, never()).updateStepStatus(any(), any());
verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any());
verify(monitorService, never()).upsertSteps(any(), any());
}

@Test
void consumeMonitorUpdateStepsStatusesThrowsOnInvalidJson() {
void consumeMonitorUpdateStepsUpsertThrowsOnInvalidJson() {
UUID executionId = UUID.randomUUID();
String invalidPayload = "{invalid json}";
Map<String, Object> headers = new HashMap<>();
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_STATUSES_UPDATE.toString());
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_UPSERT.toString());
headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString());
Message<String> message = new GenericMessage<>(invalidPayload, headers);
Consumer<Message<String>> consumer = consumerService.consumeMonitorUpdate();
Expand All @@ -113,31 +113,8 @@ void consumeMonitorUpdateStepsStatusesThrowsOnInvalidJson() {
.isInstanceOf(UncheckedIOException.class)
.hasMessageContaining("Failed to parse payload as java.util.List<org.gridsuite.monitor.commons.ProcessExecutionStep>");

verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any());
verify(monitorService, never()).updateStepsStatuses(any(), any());
}

@Test
void consumeProcessExecutionStepUpdateMessage() throws JsonProcessingException {
UUID executionId = UUID.randomUUID();
UUID stepId = UUID.randomUUID();
ProcessExecutionStep stepUpdate = ProcessExecutionStep.builder()
.id(stepId)
.stepType("LOAD_FLOW")
.status(StepStatus.RUNNING)
.startedAt(Instant.now())
.build();
String payload = objectMapper.writeValueAsString(stepUpdate);
Map<String, Object> headers = new HashMap<>();
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEP_STATUS_UPDATE.toString());
headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString());
Message<String> message = new GenericMessage<>(payload, headers);
Consumer<Message<String>> consumer = consumerService.consumeMonitorUpdate();

consumer.accept(message);

verify(monitorService).updateStepStatus(eq(executionId), any(ProcessExecutionStep.class));
verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any());
verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any());
verify(monitorService, never()).upsertSteps(any(), any());
}

@Test
Expand All @@ -159,15 +136,14 @@ void consumeProcessExecutionStepsUpdateMessage() throws JsonProcessingException
.build();
String payload = objectMapper.writeValueAsString(List.of(stepUpdate1, stepUpdate2));
Map<String, Object> headers = new HashMap<>();
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_STATUSES_UPDATE.toString());
headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_UPSERT.toString());
headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString());
Message<String> message = new GenericMessage<>(payload, headers);
Consumer<Message<String>> consumer = consumerService.consumeMonitorUpdate();

consumer.accept(message);

verify(monitorService).updateStepsStatuses(eq(executionId), any(List.class));
verify(monitorService, never()).updateStepStatus(any(), any());
verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any());
verify(monitorService).upsertSteps(eq(executionId), any(List.class));
verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void executeProcessCreateExecutionAndSendNotification() {
}

@Test
void updateExecutionStatusShouldUpdateStatusOnly() {
void updateExecutionShouldUpdateStatusOnly() {
ProcessExecutionEntity execution = ProcessExecutionEntity.builder()
.id(executionId)
.type(ProcessType.SECURITY_ANALYSIS.name())
Expand All @@ -117,7 +117,7 @@ void updateExecutionStatusShouldUpdateStatusOnly() {
.build();
when(executionRepository.findById(executionId)).thenReturn(Optional.of(execution));

monitorService.updateExecutionStatus(executionId, ProcessStatus.RUNNING, null, null, null);
monitorService.updateExecution(executionId, ProcessStatus.RUNNING, null, null, null);

verify(executionRepository).findById(executionId);
assertThat(execution.getStatus()).isEqualTo(ProcessStatus.RUNNING);
Expand All @@ -128,7 +128,7 @@ void updateExecutionStatusShouldUpdateStatusOnly() {
}

@Test
void updateExecutionStatusShouldUpdateAllFields() {
void updateExecutionShouldUpdateAllFields() {
ProcessExecutionEntity execution = ProcessExecutionEntity.builder()
.id(executionId)
.type(ProcessType.SECURITY_ANALYSIS.name())
Expand All @@ -142,7 +142,7 @@ void updateExecutionStatusShouldUpdateAllFields() {
Instant startedAt = Instant.now().minusSeconds(60);
Instant completedAt = Instant.now();

monitorService.updateExecutionStatus(executionId, ProcessStatus.COMPLETED, envName, startedAt, completedAt);
monitorService.updateExecution(executionId, ProcessStatus.COMPLETED, envName, startedAt, completedAt);

verify(executionRepository).findById(executionId);
assertThat(execution.getStatus()).isEqualTo(ProcessStatus.COMPLETED);
Expand All @@ -153,17 +153,17 @@ void updateExecutionStatusShouldUpdateAllFields() {
}

@Test
void updateExecutionStatusShouldHandleExecutionNotFound() {
void updateExecutionShouldHandleExecutionNotFound() {
when(executionRepository.findById(executionId)).thenReturn(Optional.empty());

monitorService.updateExecutionStatus(executionId, ProcessStatus.COMPLETED, "env", Instant.now(), Instant.now());
monitorService.updateExecution(executionId, ProcessStatus.COMPLETED, "env", Instant.now(), Instant.now());

verify(executionRepository).findById(executionId);
verify(executionRepository, never()).save(any());
}

@Test
void updateStepStatusShouldAddNewStep() {
void upsertStepShouldAddNewStep() {
ProcessExecutionEntity execution = ProcessExecutionEntity.builder()
.id(executionId)
.type(ProcessType.SECURITY_ANALYSIS.name())
Expand All @@ -187,7 +187,7 @@ void updateStepStatusShouldAddNewStep() {
.startedAt(startedAt)
.build();

monitorService.updateStepStatus(executionId, processExecutionStep);
monitorService.upsertSteps(executionId, List.of(processExecutionStep));

verify(executionRepository).findById(executionId);
assertThat(execution.getSteps()).hasSize(1);
Expand All @@ -203,7 +203,7 @@ void updateStepStatusShouldAddNewStep() {
}

@Test
void updateStepStatusShouldUpdateExistingStep() {
void upsertStepShouldUpdateExistingStep() {
UUID stepId = UUID.randomUUID();
UUID originalResultId = UUID.randomUUID();
UUID newResultId = UUID.randomUUID();
Expand Down Expand Up @@ -237,7 +237,7 @@ void updateStepStatusShouldUpdateExistingStep() {
.completedAt(completedAt)
.build();

monitorService.updateStepStatus(executionId, updateDto);
monitorService.upsertSteps(executionId, List.of(updateDto));

verify(executionRepository).findById(executionId);
assertThat(execution.getSteps()).hasSize(1);
Expand All @@ -253,7 +253,7 @@ void updateStepStatusShouldUpdateExistingStep() {
}

@Test
void updateStepsStatusesShouldUpdateExistingSteps() {
void upsertStepsShouldUpdateExistingSteps() {
UUID stepId1 = UUID.randomUUID();
UUID stepId2 = UUID.randomUUID();
UUID originalResultId1 = UUID.randomUUID();
Expand Down Expand Up @@ -310,7 +310,7 @@ void updateStepsStatusesShouldUpdateExistingSteps() {
.startedAt(startedAt2)
.completedAt(completedAt2)
.build();
monitorService.updateStepsStatuses(executionId, List.of(updateDto1, updateDto2));
monitorService.upsertSteps(executionId, List.of(updateDto1, updateDto2));

verify(executionRepository).findById(executionId);
assertThat(execution.getSteps()).hasSize(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,18 @@ private <T> void sendMonitorUpdate(
updatePublisher.send(PROCESS_UPDATE_BINDING, message);
}

public void updateExecutionStatus(UUID executionId, ProcessExecutionStatusUpdate processExecutionStatusUpdate) {
public void sendExecution(UUID executionId, ProcessExecutionStatusUpdate processExecutionStatusUpdate) {
sendMonitorUpdate(
executionId,
MessageType.EXECUTION_STATUS_UPDATE,
MessageType.EXECUTION_UPDATE,
processExecutionStatusUpdate
);
}

public void updateStepStatus(UUID executionId, ProcessExecutionStep processExecutionStep) {
sendMonitorUpdate(
executionId,
MessageType.STEP_STATUS_UPDATE,
processExecutionStep
);
}

public void updateStepsStatuses(UUID executionId, List<ProcessExecutionStep> processExecutionSteps) {
public void notifySteps(UUID executionId, List<ProcessExecutionStep> processExecutionSteps) {
sendMonitorUpdate(
executionId,
MessageType.STEPS_STATUSES_UPDATE,
MessageType.STEPS_UPSERT,
processExecutionSteps
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ public <T extends ProcessConfig> void executeProcess(ProcessRunMessage<T> runMes
initializeSteps(process, context);
executeSteps(process, context);
} catch (Exception e) {
updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.FAILED);
sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.FAILED);
throw e;
}
}

private <T extends ProcessConfig> void initializeSteps(Process<T> process, ProcessExecutionContext<T> context) {
List<ProcessStep<T>> steps = process.getSteps();
notificationService.updateStepsStatuses(context.getExecutionId(),
notificationService.notifySteps(context.getExecutionId(),
IntStream.range(0, steps.size())
.mapToObj(i -> ProcessExecutionStep.builder()
.id(steps.get(i).getId())
Expand All @@ -83,19 +83,19 @@ private <T extends ProcessConfig> void initializeSteps(Process<T> process, Proce
}

private <T extends ProcessConfig> void executeSteps(Process<T> process, ProcessExecutionContext<T> context) {
updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.RUNNING);
sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.RUNNING);
process.execute(context);
updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.COMPLETED);
sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.COMPLETED);
}

private void updateExecutionStatus(UUID executionId, String envName, ProcessStatus status) {
private void sendExecution(UUID executionId, String envName, ProcessStatus status) {
ProcessExecutionStatusUpdate processExecutionStatusUpdate = new ProcessExecutionStatusUpdate(
status,
envName,
status == ProcessStatus.RUNNING ? Instant.now() : null,
status == ProcessStatus.COMPLETED || status == ProcessStatus.FAILED ? Instant.now() : null
);

notificationService.updateExecutionStatus(executionId, processExecutionStatusUpdate);
notificationService.sendExecution(executionId, processExecutionStatusUpdate);
}
}
Loading