diff --git a/monitor-commons/src/main/java/org/gridsuite/monitor/commons/MessageType.java b/monitor-commons/src/main/java/org/gridsuite/monitor/commons/MessageType.java index aed280fa..a9b2cb3e 100644 --- a/monitor-commons/src/main/java/org/gridsuite/monitor/commons/MessageType.java +++ b/monitor-commons/src/main/java/org/gridsuite/monitor/commons/MessageType.java @@ -10,7 +10,6 @@ * @author Antoine Bouhours */ public enum MessageType { - EXECUTION_STATUS_UPDATE, - STEP_STATUS_UPDATE, - STEPS_STATUSES_UPDATE, + EXECUTION_UPDATE, + STEPS_UPSERT, } diff --git a/monitor-server/src/main/java/org/gridsuite/monitor/server/services/ConsumerService.java b/monitor-server/src/main/java/org/gridsuite/monitor/server/services/ConsumerService.java index d134038f..9e00e15c 100644 --- a/monitor-server/src/main/java/org/gridsuite/monitor/server/services/ConsumerService.java +++ b/monitor-server/src/main/java/org/gridsuite/monitor/server/services/ConsumerService.java @@ -53,27 +53,21 @@ public Consumer> consumeMonitorUpdate() { UUID executionId = UUID.fromString(executionIdStr); switch (messageType) { - case EXECUTION_STATUS_UPDATE -> handleExecutionStatusUpdate(executionId, message); - case STEP_STATUS_UPDATE -> handleStepStatusUpdate(executionId, message); - case STEPS_STATUSES_UPDATE -> handleStepsStatusesUpdate(executionId, message); + case EXECUTION_UPDATE -> handleExecutionUpdate(executionId, message); + case STEPS_UPSERT -> handleStepsUpsert(executionId, message); default -> LOGGER.warn("Unknown message type: {}", messageType); } }; } - private void handleExecutionStatusUpdate(UUID executionId, Message message) { + private void handleExecutionUpdate(UUID executionId, Message message) { ProcessExecutionStatusUpdate payload = parsePayload(message.getPayload(), ProcessExecutionStatusUpdate.class); - monitorService.updateExecutionStatus(executionId, payload.getStatus(), payload.getExecutionEnvName(), payload.getStartedAt(), payload.getCompletedAt()); + monitorService.updateExecution(executionId, payload.getStatus(), payload.getExecutionEnvName(), payload.getStartedAt(), payload.getCompletedAt()); } - private void handleStepStatusUpdate(UUID executionId, Message message) { - ProcessExecutionStep processExecutionStep = parsePayload(message.getPayload(), ProcessExecutionStep.class); - monitorService.updateStepStatus(executionId, processExecutionStep); - } - - private void handleStepsStatusesUpdate(UUID executionId, Message message) { + private void handleStepsUpsert(UUID executionId, Message message) { List processExecutionSteps = parsePayload(message.getPayload(), new TypeReference>() { }); - monitorService.updateStepsStatuses(executionId, processExecutionSteps); + monitorService.upsertSteps(executionId, processExecutionSteps); } private T parsePayload(String payload, Class clazz) { diff --git a/monitor-server/src/main/java/org/gridsuite/monitor/server/services/MonitorService.java b/monitor-server/src/main/java/org/gridsuite/monitor/server/services/MonitorService.java index d299114e..5dc635c6 100644 --- a/monitor-server/src/main/java/org/gridsuite/monitor/server/services/MonitorService.java +++ b/monitor-server/src/main/java/org/gridsuite/monitor/server/services/MonitorService.java @@ -76,7 +76,7 @@ public UUID executeProcess(UUID caseUuid, String userId, ProcessConfig processCo } @Transactional - public void updateExecutionStatus(UUID executionId, ProcessStatus status, String executionEnvName, Instant startedAt, Instant completedAt) { + public void updateExecution(UUID executionId, ProcessStatus status, String executionEnvName, Instant startedAt, Instant completedAt) { executionRepository.findById(executionId).ifPresent(execution -> { execution.setStatus(status); if (executionEnvName != null) { @@ -116,16 +116,7 @@ private void updateStep(ProcessExecutionEntity execution, ProcessExecutionStepEn } @Transactional - public void updateStepStatus(UUID executionId, ProcessExecutionStep processExecutionStep) { - executionRepository.findById(executionId).ifPresent(execution -> { - ProcessExecutionStepEntity stepEntity = toStepEntity(processExecutionStep); - updateStep(execution, stepEntity); - executionRepository.save(execution); - }); - } - - @Transactional - public void updateStepsStatuses(UUID executionId, List processExecutionSteps) { + public void upsertSteps(UUID executionId, List processExecutionSteps) { executionRepository.findById(executionId).ifPresent(execution -> { processExecutionSteps.forEach(processExecutionStep -> { ProcessExecutionStepEntity stepEntity = toStepEntity(processExecutionStep); diff --git a/monitor-server/src/test/java/org/gridsuite/monitor/server/MonitorIntegrationTest.java b/monitor-server/src/test/java/org/gridsuite/monitor/server/MonitorIntegrationTest.java index aa7fad2a..c2d67932 100644 --- a/monitor-server/src/test/java/org/gridsuite/monitor/server/MonitorIntegrationTest.java +++ b/monitor-server/src/test/java/org/gridsuite/monitor/server/MonitorIntegrationTest.java @@ -128,7 +128,7 @@ void securityAnalysisProcessIT() throws Exception { .startedAt(Instant.now()) .completedAt(Instant.now()) .build(); - sendMessage(executionId, step0, MessageType.STEP_STATUS_UPDATE); + sendMessage(executionId, List.of(step0), MessageType.STEPS_UPSERT); // Simulate second step creation via message with both report and result UUID stepId1 = UUID.randomUUID(); @@ -145,7 +145,7 @@ void securityAnalysisProcessIT() throws Exception { .startedAt(Instant.now()) .completedAt(Instant.now()) .build(); - sendMessage(executionId, step1, MessageType.STEP_STATUS_UPDATE); + sendMessage(executionId, List.of(step1), MessageType.STEPS_UPSERT); // Verify both steps were added to database with correct data execution = executionRepository.findById(executionId).orElse(null); @@ -167,7 +167,7 @@ void securityAnalysisProcessIT() throws Exception { .startedAt(startedAt) .completedAt(completedAt) .build(); - sendMessage(executionId, finalStatus, MessageType.EXECUTION_STATUS_UPDATE); + sendMessage(executionId, finalStatus, MessageType.EXECUTION_UPDATE); // Verify final state persisted execution = executionRepository.findById(executionId).orElse(null); diff --git a/monitor-server/src/test/java/org/gridsuite/monitor/server/services/ConsumerServiceTest.java b/monitor-server/src/test/java/org/gridsuite/monitor/server/services/ConsumerServiceTest.java index ffb86b88..b48bc44b 100644 --- a/monitor-server/src/test/java/org/gridsuite/monitor/server/services/ConsumerServiceTest.java +++ b/monitor-server/src/test/java/org/gridsuite/monitor/server/services/ConsumerServiceTest.java @@ -64,21 +64,21 @@ void consumeProcessExecutionStatusUpdateMessage() throws JsonProcessingException .build(); String payload = objectMapper.writeValueAsString(statusUpdate); Map headers = new HashMap<>(); - headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_STATUS_UPDATE.toString()); + headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_UPDATE.toString()); headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString()); Message message = new GenericMessage<>(payload, headers); Consumer> consumer = consumerService.consumeMonitorUpdate(); consumer.accept(message); - verify(monitorService).updateExecutionStatus( + verify(monitorService).updateExecution( executionId, ProcessStatus.RUNNING, "env-1", startedAt, completedAt ); - verify(monitorService, never()).updateStepStatus(any(), any()); + verify(monitorService, never()).upsertSteps(any(), any()); } @Test @@ -86,7 +86,7 @@ void consumeMonitorUpdateThrowsOnInvalidJson() { UUID executionId = UUID.randomUUID(); String invalidPayload = "{invalid json}"; Map headers = new HashMap<>(); - headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_STATUS_UPDATE.toString()); + headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_UPDATE.toString()); headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString()); Message message = new GenericMessage<>(invalidPayload, headers); Consumer> consumer = consumerService.consumeMonitorUpdate(); @@ -95,16 +95,16 @@ void consumeMonitorUpdateThrowsOnInvalidJson() { .isInstanceOf(UncheckedIOException.class) .hasMessageContaining("Failed to parse payload as ProcessExecutionStatusUpdate"); - verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any()); - verify(monitorService, never()).updateStepStatus(any(), any()); + verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any()); + verify(monitorService, never()).upsertSteps(any(), any()); } @Test - void consumeMonitorUpdateStepsStatusesThrowsOnInvalidJson() { + void consumeMonitorUpdateStepsUpsertThrowsOnInvalidJson() { UUID executionId = UUID.randomUUID(); String invalidPayload = "{invalid json}"; Map headers = new HashMap<>(); - headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_STATUSES_UPDATE.toString()); + headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_UPSERT.toString()); headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString()); Message message = new GenericMessage<>(invalidPayload, headers); Consumer> consumer = consumerService.consumeMonitorUpdate(); @@ -113,31 +113,8 @@ void consumeMonitorUpdateStepsStatusesThrowsOnInvalidJson() { .isInstanceOf(UncheckedIOException.class) .hasMessageContaining("Failed to parse payload as java.util.List"); - verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any()); - verify(monitorService, never()).updateStepsStatuses(any(), any()); - } - - @Test - void consumeProcessExecutionStepUpdateMessage() throws JsonProcessingException { - UUID executionId = UUID.randomUUID(); - UUID stepId = UUID.randomUUID(); - ProcessExecutionStep stepUpdate = ProcessExecutionStep.builder() - .id(stepId) - .stepType("LOAD_FLOW") - .status(StepStatus.RUNNING) - .startedAt(Instant.now()) - .build(); - String payload = objectMapper.writeValueAsString(stepUpdate); - Map headers = new HashMap<>(); - headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEP_STATUS_UPDATE.toString()); - headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString()); - Message message = new GenericMessage<>(payload, headers); - Consumer> consumer = consumerService.consumeMonitorUpdate(); - - consumer.accept(message); - - verify(monitorService).updateStepStatus(eq(executionId), any(ProcessExecutionStep.class)); - verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any()); + verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any()); + verify(monitorService, never()).upsertSteps(any(), any()); } @Test @@ -159,15 +136,14 @@ void consumeProcessExecutionStepsUpdateMessage() throws JsonProcessingException .build(); String payload = objectMapper.writeValueAsString(List.of(stepUpdate1, stepUpdate2)); Map headers = new HashMap<>(); - headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_STATUSES_UPDATE.toString()); + headers.put(ConsumerService.HEADER_MESSAGE_TYPE, MessageType.STEPS_UPSERT.toString()); headers.put(ConsumerService.HEADER_EXECUTION_ID, executionId.toString()); Message message = new GenericMessage<>(payload, headers); Consumer> consumer = consumerService.consumeMonitorUpdate(); consumer.accept(message); - verify(monitorService).updateStepsStatuses(eq(executionId), any(List.class)); - verify(monitorService, never()).updateStepStatus(any(), any()); - verify(monitorService, never()).updateExecutionStatus(any(), any(), any(), any(), any()); + verify(monitorService).upsertSteps(eq(executionId), any(List.class)); + verify(monitorService, never()).updateExecution(any(), any(), any(), any(), any()); } } diff --git a/monitor-server/src/test/java/org/gridsuite/monitor/server/services/MonitorServiceTest.java b/monitor-server/src/test/java/org/gridsuite/monitor/server/services/MonitorServiceTest.java index f3a8e9c3..13ac9dec 100644 --- a/monitor-server/src/test/java/org/gridsuite/monitor/server/services/MonitorServiceTest.java +++ b/monitor-server/src/test/java/org/gridsuite/monitor/server/services/MonitorServiceTest.java @@ -106,7 +106,7 @@ void executeProcessCreateExecutionAndSendNotification() { } @Test - void updateExecutionStatusShouldUpdateStatusOnly() { + void updateExecutionShouldUpdateStatusOnly() { ProcessExecutionEntity execution = ProcessExecutionEntity.builder() .id(executionId) .type(ProcessType.SECURITY_ANALYSIS.name()) @@ -117,7 +117,7 @@ void updateExecutionStatusShouldUpdateStatusOnly() { .build(); when(executionRepository.findById(executionId)).thenReturn(Optional.of(execution)); - monitorService.updateExecutionStatus(executionId, ProcessStatus.RUNNING, null, null, null); + monitorService.updateExecution(executionId, ProcessStatus.RUNNING, null, null, null); verify(executionRepository).findById(executionId); assertThat(execution.getStatus()).isEqualTo(ProcessStatus.RUNNING); @@ -128,7 +128,7 @@ void updateExecutionStatusShouldUpdateStatusOnly() { } @Test - void updateExecutionStatusShouldUpdateAllFields() { + void updateExecutionShouldUpdateAllFields() { ProcessExecutionEntity execution = ProcessExecutionEntity.builder() .id(executionId) .type(ProcessType.SECURITY_ANALYSIS.name()) @@ -142,7 +142,7 @@ void updateExecutionStatusShouldUpdateAllFields() { Instant startedAt = Instant.now().minusSeconds(60); Instant completedAt = Instant.now(); - monitorService.updateExecutionStatus(executionId, ProcessStatus.COMPLETED, envName, startedAt, completedAt); + monitorService.updateExecution(executionId, ProcessStatus.COMPLETED, envName, startedAt, completedAt); verify(executionRepository).findById(executionId); assertThat(execution.getStatus()).isEqualTo(ProcessStatus.COMPLETED); @@ -153,17 +153,17 @@ void updateExecutionStatusShouldUpdateAllFields() { } @Test - void updateExecutionStatusShouldHandleExecutionNotFound() { + void updateExecutionShouldHandleExecutionNotFound() { when(executionRepository.findById(executionId)).thenReturn(Optional.empty()); - monitorService.updateExecutionStatus(executionId, ProcessStatus.COMPLETED, "env", Instant.now(), Instant.now()); + monitorService.updateExecution(executionId, ProcessStatus.COMPLETED, "env", Instant.now(), Instant.now()); verify(executionRepository).findById(executionId); verify(executionRepository, never()).save(any()); } @Test - void updateStepStatusShouldAddNewStep() { + void upsertStepShouldAddNewStep() { ProcessExecutionEntity execution = ProcessExecutionEntity.builder() .id(executionId) .type(ProcessType.SECURITY_ANALYSIS.name()) @@ -187,7 +187,7 @@ void updateStepStatusShouldAddNewStep() { .startedAt(startedAt) .build(); - monitorService.updateStepStatus(executionId, processExecutionStep); + monitorService.upsertSteps(executionId, List.of(processExecutionStep)); verify(executionRepository).findById(executionId); assertThat(execution.getSteps()).hasSize(1); @@ -203,7 +203,7 @@ void updateStepStatusShouldAddNewStep() { } @Test - void updateStepStatusShouldUpdateExistingStep() { + void upsertStepShouldUpdateExistingStep() { UUID stepId = UUID.randomUUID(); UUID originalResultId = UUID.randomUUID(); UUID newResultId = UUID.randomUUID(); @@ -237,7 +237,7 @@ void updateStepStatusShouldUpdateExistingStep() { .completedAt(completedAt) .build(); - monitorService.updateStepStatus(executionId, updateDto); + monitorService.upsertSteps(executionId, List.of(updateDto)); verify(executionRepository).findById(executionId); assertThat(execution.getSteps()).hasSize(1); @@ -253,7 +253,7 @@ void updateStepStatusShouldUpdateExistingStep() { } @Test - void updateStepsStatusesShouldUpdateExistingSteps() { + void upsertStepsShouldUpdateExistingSteps() { UUID stepId1 = UUID.randomUUID(); UUID stepId2 = UUID.randomUUID(); UUID originalResultId1 = UUID.randomUUID(); @@ -310,7 +310,7 @@ void updateStepsStatusesShouldUpdateExistingSteps() { .startedAt(startedAt2) .completedAt(completedAt2) .build(); - monitorService.updateStepsStatuses(executionId, List.of(updateDto1, updateDto2)); + monitorService.upsertSteps(executionId, List.of(updateDto1, updateDto2)); verify(executionRepository).findById(executionId); assertThat(execution.getSteps()).hasSize(2); diff --git a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/NotificationService.java b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/NotificationService.java index 61b6c6e4..731df225 100644 --- a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/NotificationService.java +++ b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/NotificationService.java @@ -46,26 +46,18 @@ private void sendMonitorUpdate( updatePublisher.send(PROCESS_UPDATE_BINDING, message); } - public void updateExecutionStatus(UUID executionId, ProcessExecutionStatusUpdate processExecutionStatusUpdate) { + public void sendExecution(UUID executionId, ProcessExecutionStatusUpdate processExecutionStatusUpdate) { sendMonitorUpdate( executionId, - MessageType.EXECUTION_STATUS_UPDATE, + MessageType.EXECUTION_UPDATE, processExecutionStatusUpdate ); } - public void updateStepStatus(UUID executionId, ProcessExecutionStep processExecutionStep) { - sendMonitorUpdate( - executionId, - MessageType.STEP_STATUS_UPDATE, - processExecutionStep - ); - } - - public void updateStepsStatuses(UUID executionId, List processExecutionSteps) { + public void notifySteps(UUID executionId, List processExecutionSteps) { sendMonitorUpdate( executionId, - MessageType.STEPS_STATUSES_UPDATE, + MessageType.STEPS_UPSERT, processExecutionSteps ); } diff --git a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionService.java b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionService.java index fa9f5c9b..b2a91637 100644 --- a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionService.java +++ b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionService.java @@ -64,14 +64,14 @@ public void executeProcess(ProcessRunMessage runMes initializeSteps(process, context); executeSteps(process, context); } catch (Exception e) { - updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.FAILED); + sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.FAILED); throw e; } } private void initializeSteps(Process process, ProcessExecutionContext context) { List> steps = process.getSteps(); - notificationService.updateStepsStatuses(context.getExecutionId(), + notificationService.notifySteps(context.getExecutionId(), IntStream.range(0, steps.size()) .mapToObj(i -> ProcessExecutionStep.builder() .id(steps.get(i).getId()) @@ -83,12 +83,12 @@ private void initializeSteps(Process process, Proce } private void executeSteps(Process process, ProcessExecutionContext context) { - updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.RUNNING); + sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.RUNNING); process.execute(context); - updateExecutionStatus(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.COMPLETED); + sendExecution(context.getExecutionId(), context.getExecutionEnvName(), ProcessStatus.COMPLETED); } - private void updateExecutionStatus(UUID executionId, String envName, ProcessStatus status) { + private void sendExecution(UUID executionId, String envName, ProcessStatus status) { ProcessExecutionStatusUpdate processExecutionStatusUpdate = new ProcessExecutionStatusUpdate( status, envName, @@ -96,6 +96,6 @@ private void updateExecutionStatus(UUID executionId, String envName, ProcessStat status == ProcessStatus.COMPLETED || status == ProcessStatus.FAILED ? Instant.now() : null ); - notificationService.updateExecutionStatus(executionId, processExecutionStatusUpdate); + notificationService.sendExecution(executionId, processExecutionStatusUpdate); } } diff --git a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/StepExecutionService.java b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/StepExecutionService.java index bac5754b..f15f51b0 100644 --- a/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/StepExecutionService.java +++ b/monitor-worker-server/src/main/java/org/gridsuite/monitor/worker/server/services/StepExecutionService.java @@ -15,6 +15,7 @@ import org.springframework.stereotype.Service; import java.time.Instant; +import java.util.List; /** * @author Antoine Bouhours @@ -35,7 +36,7 @@ public void skipStep(ProcessStepExecutionContext context, ProcessStep step .startedAt(context.getStartedAt()) .completedAt(Instant.now()) .build(); - notificationService.updateStepStatus(context.getProcessExecutionId(), executionStep); + notificationService.notifySteps(context.getProcessExecutionId(), List.of(executionStep)); } public void executeStep(ProcessStepExecutionContext context, ProcessStep step) { @@ -47,19 +48,19 @@ public void executeStep(ProcessStepExecutionContext context, ProcessStep s .reportId(context.getReportInfos().reportUuid()) .startedAt(context.getStartedAt()) .build(); - notificationService.updateStepStatus(context.getProcessExecutionId(), executionStep); + notificationService.notifySteps(context.getProcessExecutionId(), List.of(executionStep)); try { step.execute(context); reportService.sendReport(context.getReportInfos()); - updateStepStatus(context, StepStatus.COMPLETED, step); + publishStep(context, StepStatus.COMPLETED, step); } catch (Exception e) { - updateStepStatus(context, StepStatus.FAILED, step); + publishStep(context, StepStatus.FAILED, step); throw e; } } - private void updateStepStatus(ProcessStepExecutionContext context, StepStatus status, ProcessStep step) { + private void publishStep(ProcessStepExecutionContext context, StepStatus status, ProcessStep step) { ProcessExecutionStep updated = ProcessExecutionStep.builder() .id(context.getStepExecutionId()) .stepType(step.getType().getName()) @@ -71,6 +72,6 @@ private void updateStepStatus(ProcessStepExecutionContext context, StepStatus .startedAt(context.getStartedAt()) .completedAt(Instant.now()) .build(); - notificationService.updateStepStatus(context.getProcessExecutionId(), updated); + notificationService.notifySteps(context.getProcessExecutionId(), List.of(updated)); } } diff --git a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/NotificationServiceTest.java b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/NotificationServiceTest.java index 87bf06c7..adc9672d 100644 --- a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/NotificationServiceTest.java +++ b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/NotificationServiceTest.java @@ -42,17 +42,17 @@ void setUp() { } @Test - void updateExecutionStatusShouldSendExecutionStatusUpdateMessage() { + void sendExecutionShouldSendExecutionUpdateMessage() { UUID executionId = UUID.randomUUID(); ProcessExecutionStatusUpdate payload = new ProcessExecutionStatusUpdate(); - notificationService.updateExecutionStatus(executionId, payload); + notificationService.sendExecution(executionId, payload); verify(streamBridge).send( eq("publishMonitorUpdate-out-0"), argThat((Message message) -> { assertThat(message.getPayload()).isSameAs(payload); - assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_STATUS_UPDATE); + assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_MESSAGE_TYPE, MessageType.EXECUTION_UPDATE); assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_EXECUTION_ID, executionId.toString()); return true; }) @@ -60,35 +60,17 @@ void updateExecutionStatusShouldSendExecutionStatusUpdateMessage() { } @Test - void updateStepStatusShouldSendStepStatusUpdateMessage() { - UUID executionId = UUID.randomUUID(); - ProcessExecutionStep payload = new ProcessExecutionStep(); - - notificationService.updateStepStatus(executionId, payload); - - verify(streamBridge).send( - eq("publishMonitorUpdate-out-0"), - argThat((Message message) -> { - assertThat(message.getPayload()).isSameAs(payload); - assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_MESSAGE_TYPE, MessageType.STEP_STATUS_UPDATE); - assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_EXECUTION_ID, executionId.toString()); - return true; - }) - ); - } - - @Test - void updateStepsStatusesShouldSendStepStatusUpdateMessage() { + void notifyStepsShouldSendStepsUpsertMessage() { UUID executionId = UUID.randomUUID(); List payload = List.of(new ProcessExecutionStep(), new ProcessExecutionStep()); - notificationService.updateStepsStatuses(executionId, payload); + notificationService.notifySteps(executionId, payload); verify(streamBridge).send( eq("publishMonitorUpdate-out-0"), argThat((Message message) -> { assertThat(message.getPayload()).isSameAs(payload); - assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_MESSAGE_TYPE, MessageType.STEPS_STATUSES_UPDATE); + assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_MESSAGE_TYPE, MessageType.STEPS_UPSERT); assertThat(message.getHeaders()).containsEntry(NotificationService.HEADER_EXECUTION_ID, executionId.toString()); return true; }) diff --git a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionServiceTest.java b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionServiceTest.java index 8bc8c811..4e57b213 100644 --- a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionServiceTest.java +++ b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/ProcessExecutionServiceTest.java @@ -99,7 +99,7 @@ void executeProcessShouldCompleteSuccessfullyWhenProcessExecutesWithoutError() { processExecutionService.executeProcess(runMessage); verify(process, times(1)).getSteps(); - verify(notificationService, times(1)).updateStepsStatuses(eq(executionId), argThat(steps -> + verify(notificationService, times(1)).notifySteps(eq(executionId), argThat(steps -> steps.size() == 3 && steps.get(0).getStatus() == StepStatus.SCHEDULED && steps.get(0).getId().equals(loadNetworkStep.getId()) && @@ -121,14 +121,14 @@ void executeProcessShouldCompleteSuccessfullyWhenProcessExecutesWithoutError() { context.getConfig().equals(processConfig) && context.getExecutionEnvName().equals(EXECUTION_ENV_NAME) )); - verify(notificationService, times(2)).updateExecutionStatus(eq(executionId), any(ProcessExecutionStatusUpdate.class)); + verify(notificationService, times(2)).sendExecution(eq(executionId), any(ProcessExecutionStatusUpdate.class)); InOrder inOrder = inOrder(notificationService); - inOrder.verify(notificationService).updateExecutionStatus(eq(executionId), argThat(update -> + inOrder.verify(notificationService).sendExecution(eq(executionId), argThat(update -> update.getStatus() == ProcessStatus.RUNNING && update.getExecutionEnvName().equals(EXECUTION_ENV_NAME) && update.getCompletedAt() == null )); - inOrder.verify(notificationService).updateExecutionStatus(eq(executionId), argThat(update -> + inOrder.verify(notificationService).sendExecution(eq(executionId), argThat(update -> update.getStatus() == ProcessStatus.COMPLETED && update.getExecutionEnvName().equals(EXECUTION_ENV_NAME) && update.getCompletedAt() != null @@ -147,12 +147,12 @@ void executeProcessShouldSendFailedStatusWhenProcessThrowsException() { assertThrows(RuntimeException.class, () -> processExecutionService.executeProcess(runMessage)); verify(process).execute(any(ProcessExecutionContext.class)); - verify(notificationService, times(2)).updateExecutionStatus(eq(executionId), any(ProcessExecutionStatusUpdate.class)); + verify(notificationService, times(2)).sendExecution(eq(executionId), any(ProcessExecutionStatusUpdate.class)); InOrder inOrder = inOrder(notificationService); - inOrder.verify(notificationService).updateExecutionStatus(eq(executionId), argThat(update -> + inOrder.verify(notificationService).sendExecution(eq(executionId), argThat(update -> update.getStatus() == ProcessStatus.RUNNING )); - inOrder.verify(notificationService).updateExecutionStatus(eq(executionId), argThat(update -> + inOrder.verify(notificationService).sendExecution(eq(executionId), argThat(update -> update.getStatus() == ProcessStatus.FAILED && update.getCompletedAt() != null )); diff --git a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/StepExecutionServiceTest.java b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/StepExecutionServiceTest.java index c437cd38..37d2df25 100644 --- a/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/StepExecutionServiceTest.java +++ b/monitor-worker-server/src/test/java/org/gridsuite/monitor/worker/server/services/StepExecutionServiceTest.java @@ -8,7 +8,6 @@ import com.powsybl.commons.report.ReportNode; import org.gridsuite.monitor.commons.ProcessConfig; -import org.gridsuite.monitor.commons.ProcessExecutionStep; import org.gridsuite.monitor.commons.StepStatus; import org.gridsuite.monitor.worker.server.core.ProcessStep; import org.gridsuite.monitor.worker.server.core.ProcessStepExecutionContext; @@ -21,6 +20,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.util.List; import java.util.UUID; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -70,17 +70,19 @@ void executeStepShouldCompleteSuccessfullyWhenNoExceptionThrown() { verify(processStep).execute(context); verify(reportService).sendReport(any(ReportInfos.class)); - verify(notificationService, times(2)).updateStepStatus(eq(executionId), any(ProcessExecutionStep.class)); + verify(notificationService, times(2)).notifySteps(eq(executionId), any(List.class)); InOrder inOrder = inOrder(notificationService); - inOrder.verify(notificationService).updateStepStatus(eq(executionId), argThat(step -> - step.getStatus() == StepStatus.RUNNING && - "TEST_STEP".equals(step.getStepType()) && - stepOrder == step.getStepOrder() - )); - inOrder.verify(notificationService).updateStepStatus(eq(executionId), argThat(step -> - step.getStatus() == StepStatus.COMPLETED && - step.getCompletedAt() != null - )); + inOrder.verify(notificationService).notifySteps(eq(executionId), argThat(steps -> { + var step = steps.get(0); + return step.getStatus() == StepStatus.RUNNING && + "TEST_STEP".equals(step.getStepType()) && + stepOrder == step.getStepOrder(); + })); + inOrder.verify(notificationService).notifySteps(eq(executionId), argThat(steps -> { + var step = steps.get(0); + return step.getStatus() == StepStatus.COMPLETED && + step.getCompletedAt() != null; + })); } @Test @@ -99,17 +101,19 @@ void executeStepShouldSendFailedStatusWhenExceptionThrown() { () -> stepExecutionService.executeStep(context, processStep) ); assertEquals("Step execution failed", thrownException.getMessage()); - verify(notificationService, times(2)).updateStepStatus(eq(executionId), any(ProcessExecutionStep.class)); + verify(notificationService, times(2)).notifySteps(eq(executionId), any(List.class)); InOrder inOrder = inOrder(notificationService); - inOrder.verify(notificationService).updateStepStatus(eq(executionId), argThat(step -> - step.getStatus() == StepStatus.RUNNING && - "FAILING_STEP".equals(step.getStepType()) && - stepOrder == step.getStepOrder() - )); - inOrder.verify(notificationService).updateStepStatus(eq(executionId), argThat(step -> - step.getStatus() == StepStatus.FAILED && - step.getCompletedAt() != null - )); + inOrder.verify(notificationService).notifySteps(eq(executionId), argThat(steps -> { + var step = steps.get(0); + return step.getStatus() == StepStatus.RUNNING && + "FAILING_STEP".equals(step.getStepType()) && + stepOrder == step.getStepOrder(); + })); + inOrder.verify(notificationService).notifySteps(eq(executionId), argThat(steps -> { + var step = steps.get(0); + return step.getStatus() == StepStatus.FAILED && + step.getCompletedAt() != null; + })); // Verify report was NOT sent on failure verify(reportService, never()).sendReport(any(ReportInfos.class)); } @@ -127,11 +131,12 @@ void skipStepShouldSendSkippedStatusWithoutExecutingStep() { verify(processStep, never()).execute(any()); // Verify report was NOT sent on skip verify(reportService, never()).sendReport(any(ReportInfos.class)); - verify(notificationService).updateStepStatus(eq(executionId), argThat(step -> - step.getStatus() == StepStatus.SKIPPED && - "SKIPPED_STEP".equals(step.getStepType()) && - step.getStepOrder() == 3 - )); + verify(notificationService).notifySteps(eq(executionId), argThat(steps -> { + var step = steps.get(0); + return step.getStatus() == StepStatus.SKIPPED && + "SKIPPED_STEP".equals(step.getStepType()) && + step.getStepOrder() == 3; + })); } private ProcessStepExecutionContext createStepExecutionContext(UUID executionId, UUID reportUuid, int stepOrder) {