From af320d61d8903e9636ce37aef42dcfbc5c551c3d Mon Sep 17 00:00:00 2001 From: bartekdrobczyk <165910486+bartekdrobczyk@users.noreply.github.com> Date: Mon, 19 Aug 2024 16:28:21 +0200 Subject: [PATCH] Offline retransmission extension with view parameter (#1881) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * SPAW-942 implementation draft * SPAW-942 test coverage & cleanup * SPAW-942 test coverage & cleanup * commit msg adjustment * test coverage & cleanup * test coverage & cleanup * checkstyle * retransmission view name refactor * pr adjustments * checkstyle * jdk8Module registered in ObjectMapper * validation check fix in shouldReturnClientErrorWhenRequestingRetransmissionWithEmptyData test * validation adjustment * validation adjustment * validation testing * validation testing * validation testing * validation testing * pr adjustments * checkstyle * Update hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java Co-authored-by: Maciej Moscicki * permission method better naming --------- Co-authored-by: Adam Izydorczyk Co-authored-by: Maciej Moscicki Co-authored-by: Daniel FÄ…derski --- .../api/OfflineRetransmissionRequest.java | 16 +++++-- .../hermes/api/OfflineRetransmissionTask.java | 9 +++- .../pl/allegro/tech/hermes/api/TopicName.java | 2 +- .../constraints/OneSourceRetransmission.java | 22 +++++++++ .../OneSourceRetransmissionValidator.java | 24 ++++++++++ ...neSourceRetransmissionValidatorTest.groovy | 37 +++++++++++++++ .../api/OfflineRetransmissionEndpoint.java | 26 ++++++---- .../config/ManagementConfiguration.java | 3 +- .../OfflineRetransmissionService.java | 9 ++-- .../OfflineRetransmissionManagementTest.java | 47 ++++++++++++++----- 10 files changed, 164 insertions(+), 31 deletions(-) create mode 100644 hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmission.java create mode 100644 hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidator.java create mode 100644 hermes-api/src/test/groovy/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidatorTest.groovy diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionRequest.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionRequest.java index e21fcc2d31..a63b80f073 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionRequest.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionRequest.java @@ -7,6 +7,7 @@ import jakarta.validation.constraints.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import pl.allegro.tech.hermes.api.constraints.OneSourceRetransmission; import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer; import java.time.Instant; @@ -14,7 +15,9 @@ import java.time.format.DateTimeFormatter; import java.time.format.DateTimeParseException; import java.util.List; +import java.util.Optional; +@OneSourceRetransmission public class OfflineRetransmissionRequest { private static final List formatters = List.of( @@ -24,7 +27,7 @@ public class OfflineRetransmissionRequest { ); private static final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionRequest.class); - @NotEmpty + private final String sourceViewPath; private final String sourceTopic; @NotEmpty private final String targetTopic; @@ -35,10 +38,12 @@ public class OfflineRetransmissionRequest { @JsonCreator public OfflineRetransmissionRequest( + @JsonProperty("sourceViewPath") String sourceViewPath, @JsonProperty("sourceTopic") String sourceTopic, @JsonProperty("targetTopic") String targetTopic, @JsonProperty("startTimestamp") String startTimestamp, @JsonProperty("endTimestamp") String endTimestamp) { + this.sourceViewPath = sourceViewPath; this.sourceTopic = sourceTopic; this.targetTopic = targetTopic; this.startTimestamp = initializeTimestamp(startTimestamp); @@ -62,8 +67,12 @@ private Instant initializeTimestamp(String timestamp) { return null; } - public String getSourceTopic() { - return sourceTopic; + public Optional getSourceViewPath() { + return Optional.ofNullable(sourceViewPath); + } + + public Optional getSourceTopic() { + return Optional.ofNullable(sourceTopic); } public String getTargetTopic() { @@ -84,6 +93,7 @@ public Instant getEndTimestamp() { public String toString() { return "OfflineRetransmissionRequest{" + "sourceTopic='" + sourceTopic + '\'' + + ", sourceViewPath='" + sourceViewPath + '\'' + ", targetTopic='" + targetTopic + '\'' + ", startTimestamp=" + startTimestamp + ", endTimestamp=" + endTimestamp diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionTask.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionTask.java index 9f7d68d0a3..c3fb94f05c 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionTask.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/OfflineRetransmissionTask.java @@ -7,6 +7,7 @@ import pl.allegro.tech.hermes.api.jackson.InstantIsoSerializer; import java.time.Instant; +import java.util.Optional; public class OfflineRetransmissionTask { private final String taskId; @@ -16,12 +17,14 @@ public class OfflineRetransmissionTask { @JsonCreator public OfflineRetransmissionTask( @JsonProperty("taskId") String taskId, + @JsonProperty("sourceViewPath") String sourceViewPath, @JsonProperty("sourceTopic") String sourceTopic, @JsonProperty("targetTopic") String targetTopic, @JsonProperty("startTimestamp") Instant startTimestamp, @JsonProperty("endTimestamp") Instant endTimestamp, @JsonProperty("createdAt") Instant createdAt) { this(taskId, new OfflineRetransmissionRequest( + sourceViewPath, sourceTopic, targetTopic, startTimestamp.toString(), @@ -39,10 +42,14 @@ public String getTaskId() { return taskId; } - public String getSourceTopic() { + public Optional getSourceTopic() { return request.getSourceTopic(); } + public Optional getSourceViewPath() { + return request.getSourceViewPath(); + } + public String getTargetTopic() { return request.getTargetTopic(); } diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicName.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicName.java index 32c903fdb2..3c9704f211 100644 --- a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicName.java +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/TopicName.java @@ -39,7 +39,7 @@ public static TopicName fromQualifiedName(String qualifiedName) { int index = qualifiedName.lastIndexOf(GROUP_SEPARATOR); if (index == -1) { - throw new IllegalArgumentException("Missing group"); + throw new IllegalArgumentException("Invalid qualified name " + qualifiedName); } String groupName = qualifiedName.substring(0, index); diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmission.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmission.java new file mode 100644 index 0000000000..9c920a322b --- /dev/null +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmission.java @@ -0,0 +1,22 @@ +package pl.allegro.tech.hermes.api.constraints; + +import jakarta.validation.Constraint; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.TYPE; + +@Documented +@Retention(RetentionPolicy.RUNTIME) +@Target({TYPE}) +@Constraint(validatedBy = OneSourceRetransmissionValidator.class) +public @interface OneSourceRetransmission { + String message() default "must contain one defined source of retransmission data - source topic or source view"; + + Class[] groups() default {}; + + Class[] payload() default {}; +} diff --git a/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidator.java b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidator.java new file mode 100644 index 0000000000..a311b4b263 --- /dev/null +++ b/hermes-api/src/main/java/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidator.java @@ -0,0 +1,24 @@ +package pl.allegro.tech.hermes.api.constraints; + +import jakarta.validation.ConstraintValidator; +import jakarta.validation.ConstraintValidatorContext; +import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest; + +public class OneSourceRetransmissionValidator implements ConstraintValidator { + + public static final String EMPTY_STRING = ""; + + @Override + public boolean isValid(OfflineRetransmissionRequest offlineRetransmissionRequest, ConstraintValidatorContext context) { + var sourceViewPath = offlineRetransmissionRequest.getSourceViewPath(); + var sourceTopic = offlineRetransmissionRequest.getSourceTopic(); + + return (nonBlank(sourceViewPath.orElse(EMPTY_STRING)) && sourceTopic.isEmpty()) + || (nonBlank(sourceTopic.orElse(EMPTY_STRING)) && sourceViewPath.isEmpty()); + } + + private static boolean nonBlank(String value) { + return value != null && !value.isBlank(); + } + +} diff --git a/hermes-api/src/test/groovy/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidatorTest.groovy b/hermes-api/src/test/groovy/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidatorTest.groovy new file mode 100644 index 0000000000..e7aa7a42c3 --- /dev/null +++ b/hermes-api/src/test/groovy/pl/allegro/tech/hermes/api/constraints/OneSourceRetransmissionValidatorTest.groovy @@ -0,0 +1,37 @@ +package pl.allegro.tech.hermes.api.constraints + + +import jakarta.validation.ConstraintValidatorContext +import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest +import spock.lang.Specification + +class OneSourceRetransmissionValidatorTest extends Specification { + + OneSourceRetransmissionValidator validator = new OneSourceRetransmissionValidator() + ConstraintValidatorContext mockContext = Mock() + + def "Validator should validate retransmission request when sourceViewPath is '#sourceViewPath' and sourceTopic is '#sourceTopic'"() { + given: + def request = new OfflineRetransmissionRequest( + sourceViewPath, + sourceTopic, + "someTargetTopic", + "2024-07-08T12:00:00", + "2024-07-08T13:00:00" + ) + expect: + validator.isValid(request, mockContext) == isValid + + where: + sourceViewPath | sourceTopic | isValid + null | "testTopic" | true + "testView" | null | true + null | null | false + "testView" | "testTopic" | false + "" | "" | false + " " | " " | false + "" | "testTopic" | false + "testView" | " " | false + } + +} diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java index 23662d469d..2cbd5ea6ed 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/api/OfflineRetransmissionEndpoint.java @@ -17,7 +17,6 @@ import org.springframework.stereotype.Component; import pl.allegro.tech.hermes.api.OfflineRetransmissionRequest; import pl.allegro.tech.hermes.api.OfflineRetransmissionTask; -import pl.allegro.tech.hermes.api.Topic; import pl.allegro.tech.hermes.api.TopicName; import pl.allegro.tech.hermes.domain.topic.TopicRepository; import pl.allegro.tech.hermes.management.api.auth.ManagementRights; @@ -25,6 +24,7 @@ import pl.allegro.tech.hermes.management.domain.retransmit.OfflineRetransmissionService; import java.util.List; +import java.util.Optional; import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; @@ -36,6 +36,7 @@ public class OfflineRetransmissionEndpoint { private final OfflineRetransmissionService retransmissionService; private final RetransmissionPermissions permissions; private final OfflineRetransmissionAuditor auditor; + private final Logger logger = LoggerFactory.getLogger(OfflineRetransmissionEndpoint.class); public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmissionService, TopicRepository topicRepository, ManagementRights managementRights) { @@ -47,9 +48,10 @@ public OfflineRetransmissionEndpoint(OfflineRetransmissionService retransmission @POST @Consumes(APPLICATION_JSON) public Response createRetransmissionTask(@Valid OfflineRetransmissionRequest request, @Context ContainerRequestContext requestContext) { + logger.info("Offline retransmission request: {}", request); retransmissionService.validateRequest(request); - permissions.ensurePermissionsToBothTopics(request, requestContext); - OfflineRetransmissionTask task = retransmissionService.createTask(request); + permissions.ensurePermissions(request, requestContext); + var task = retransmissionService.createTask(request); auditor.auditRetransmissionCreation(request, requestContext, task); return Response.status(Response.Status.CREATED).build(); } @@ -68,24 +70,30 @@ public Response deleteRetransmissionTask(@PathParam("taskId") String taskId) { } private static class RetransmissionPermissions { + private final Logger logger = LoggerFactory.getLogger(RetransmissionPermissions.class); private final TopicRepository topicRepository; private final ManagementRights managementRights; - private RetransmissionPermissions(TopicRepository topicRepository, ManagementRights managementRights) { this.topicRepository = topicRepository; this.managementRights = managementRights; } - private void ensurePermissionsToBothTopics(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) { - Topic sourceTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getSourceTopic())); - Topic targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic())); - boolean hasPermissions = managementRights.isUserAllowedToManageTopic(sourceTopic, requestContext) - && managementRights.isUserAllowedToManageTopic(targetTopic, requestContext); + private void ensurePermissions(OfflineRetransmissionRequest request, ContainerRequestContext requestContext) { + var targetTopic = topicRepository.getTopicDetails(TopicName.fromQualifiedName(request.getTargetTopic())); + var hasPermissions = validateSourceTopic(request.getSourceTopic(), requestContext) && managementRights.isUserAllowedToManageTopic(targetTopic, requestContext); if (!hasPermissions) { + logger.info("User {} has no permissions to make offline retransmission {}", requestContext.getSecurityContext().getUserPrincipal(), request); throw new PermissionDeniedException("User needs permissions to source and target topics."); } } + + private boolean validateSourceTopic(Optional sourceTopic, ContainerRequestContext requestContext) { + return sourceTopic.isEmpty() || managementRights.isUserAllowedToManageTopic( + topicRepository.getTopicDetails(TopicName.fromQualifiedName(sourceTopic.get())), + requestContext + ); + } } private static class OfflineRetransmissionAuditor { diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java index e4cc665d9a..ffaaf0ba99 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/config/ManagementConfiguration.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import io.micrometer.core.instrument.MeterRegistry; import org.springframework.beans.factory.annotation.Autowired; @@ -42,7 +43,7 @@ public ObjectMapper objectMapper() { mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); mapper.disable(SerializationFeature.WRITE_NULL_MAP_VALUES); - mapper.registerModule(new JavaTimeModule()); + mapper.registerModules(new JavaTimeModule(), new Jdk8Module()); // Jdk8Module is required for Jackson to serialize & deserialize Optional type final InjectableValues defaultSchemaIdAwareSerializationEnabled = new InjectableValues.Std().addValue( Topic.DEFAULT_SCHEMA_ID_SERIALIZATION_ENABLED_KEY, diff --git a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/retransmit/OfflineRetransmissionService.java b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/retransmit/OfflineRetransmissionService.java index 6d5633e25e..0c7259da15 100644 --- a/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/retransmit/OfflineRetransmissionService.java +++ b/hermes-management/src/main/java/pl/allegro/tech/hermes/management/domain/retransmit/OfflineRetransmissionService.java @@ -21,7 +21,7 @@ public OfflineRetransmissionService(OfflineRetransmissionRepository offlineRetra } public void validateRequest(OfflineRetransmissionRequest request) { - TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic()); + TopicName sourceTopicName = TopicName.fromQualifiedName(request.getSourceTopic().orElse(null)); TopicName targetTopicName = TopicName.fromQualifiedName(request.getTargetTopic()); ensureTopicsExist(sourceTopicName, targetTopicName); @@ -49,12 +49,11 @@ public void deleteTask(String taskId) { } private void ensureTopicsExist(TopicName sourceTopicName, TopicName targetTopicName) { - boolean sourceTopicExists = topicRepository.topicExists(sourceTopicName); - boolean targetTopicExists = topicRepository.topicExists(targetTopicName); - if (!sourceTopicExists) { + if (sourceTopicName != null && !topicRepository.topicExists(sourceTopicName)) { throw new OfflineRetransmissionValidationException("Source topic does not exist"); } - if (!targetTopicExists) { + + if (!topicRepository.topicExists(targetTopicName)) { throw new OfflineRetransmissionValidationException("Target topic does not exist"); } } diff --git a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/OfflineRetransmissionManagementTest.java b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/OfflineRetransmissionManagementTest.java index 27847ad2e6..27739c7f72 100644 --- a/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/OfflineRetransmissionManagementTest.java +++ b/integration-tests/src/integrationTest/java/pl/allegro/tech/hermes/integrationtests/management/OfflineRetransmissionManagementTest.java @@ -45,7 +45,7 @@ public void shouldCreateRetransmissionTask() { // when OfflineRetransmissionRequest request = createRequest( sourceTopic.getQualifiedName(), - targetTopic.getQualifiedName()); + targetTopic.getQualifiedName(), null); WebTestClient.ResponseSpec response = hermes.api().createOfflineRetransmissionTask(request); Instant now = Instant.now(); @@ -62,6 +62,29 @@ public void shouldCreateRetransmissionTask() { assertThat(allTasks.get(0).getCreatedAt()).isBefore(now); } + @Test + public void shouldCreateRetransmissionTaskWithViewInsteadTopic() { + // given + var targetTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); + + // when + var request = createRequest(null, targetTopic.getQualifiedName(), "testViewPath"); + var response = hermes.api().createOfflineRetransmissionTask(request); + var now = Instant.now(); + + //then + response.expectStatus().isCreated(); + + // and + var allTasks = getOfflineRetransmissionTasks(); + assertThat(allTasks.size()).isEqualTo(1); + assertThat(allTasks.get(0).getStartTimestamp()).isEqualTo(request.getStartTimestamp()); + assertThat(allTasks.get(0).getEndTimestamp()).isEqualTo(request.getEndTimestamp()); + assertThat(allTasks.get(0).getSourceTopic()).isEmpty(); + assertThat(allTasks.get(0).getSourceViewPath()).hasValue("testViewPath"); + assertThat(allTasks.get(0).getTargetTopic()).isEqualTo(request.getTargetTopic()); + assertThat(allTasks.get(0).getCreatedAt()).isBefore(now); + } @Test public void shouldReturnEmptyListIfThereAreNoTasks() { @@ -73,6 +96,7 @@ public void shouldReturnEmptyListIfThereAreNoTasks() { public void shouldReturnClientErrorWhenRequestingRetransmissionWithEmptyData() { // given OfflineRetransmissionRequest request = new OfflineRetransmissionRequest( + null, "", "", null, @@ -86,10 +110,9 @@ public void shouldReturnClientErrorWhenRequestingRetransmissionWithEmptyData() { response.expectStatus().isBadRequest(); assertThat(response.expectBody(String.class).returnResult().getResponseBody()).contains( List.of( - "sourceTopic must not be empty", - "targetTopic must not be empty", - "startTimestamp must not be null", - "endTimestamp must not be null") + "must contain one defined source of retransmission data - source topic or source view", + "startTimestamp must not be null", + "endTimestamp must not be null") ); } @@ -98,7 +121,7 @@ public void shouldReturnClientErrorWhenRequestingRetransmissionWithNotExistingSo // given Topic targetTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); OfflineRetransmissionRequest request = createRequest("not.existing.sourceTopic", - targetTopic.getQualifiedName()); + targetTopic.getQualifiedName(), null); // when WebTestClient.ResponseSpec response = hermes.api().createOfflineRetransmissionTask(request); @@ -114,7 +137,7 @@ public void shouldReturnClientErrorWhenRequestingRetransmissionWithNotExistingTa // given Topic sourceTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); OfflineRetransmissionRequest request = createRequest( - sourceTopic.getQualifiedName(), "not.existing.targetTopic"); + sourceTopic.getQualifiedName(), "not.existing.targetTopic", null); // when WebTestClient.ResponseSpec response = hermes.api().createOfflineRetransmissionTask(request); @@ -131,6 +154,7 @@ public void shouldReturnClientErrorWhenRequestingRetransmissionWithNegativeTimeR Topic sourceTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); Topic targetTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); OfflineRetransmissionRequest request = new OfflineRetransmissionRequest( + null, sourceTopic.getQualifiedName(), targetTopic.getQualifiedName(), Instant.now().toString(), @@ -151,7 +175,7 @@ public void shouldReturnClientErrorWhenRequestingRetransmissionWithTargetTopicSt Topic sourceTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); Topic targetTopic = hermes.initHelper().createTopic(topicWithRandomName().withOfflineStorage(1).build()); OfflineRetransmissionRequest request = createRequest( - sourceTopic.getQualifiedName(), targetTopic.getQualifiedName()); + sourceTopic.getQualifiedName(), targetTopic.getQualifiedName(), null); // when WebTestClient.ResponseSpec response = hermes.api().createOfflineRetransmissionTask(request); @@ -170,7 +194,7 @@ public void shouldDeleteRetransmissionTask() { Topic targetTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); OfflineRetransmissionRequest request = createRequest( - sourceTopic.getQualifiedName(), targetTopic.getQualifiedName()); + sourceTopic.getQualifiedName(), targetTopic.getQualifiedName(), null); hermes.api().createOfflineRetransmissionTask(request); List allTasks = getOfflineRetransmissionTasks(); @@ -203,7 +227,7 @@ public void shouldThrowAccessDeniedWhenTryingToCreateTaskWithoutPermissionsToSou Topic targetTopic = hermes.initHelper().createTopic(topicWithRandomName().build()); TestSecurityProvider.setUserIsAdmin(false); OfflineRetransmissionRequest request = createRequest( - sourceTopic.getQualifiedName(), targetTopic.getQualifiedName()); + sourceTopic.getQualifiedName(), targetTopic.getQualifiedName(), null); // when WebTestClient.ResponseSpec response = hermes.api().createOfflineRetransmissionTask(request); @@ -218,8 +242,9 @@ public void shouldThrowAccessDeniedWhenTryingToCreateTaskWithoutPermissionsToSou TestSecurityProvider.reset(); } - private OfflineRetransmissionRequest createRequest(String sourceTopic, String targetTopic) { + private OfflineRetransmissionRequest createRequest(String sourceTopic, String targetTopic, String sourceViewPath) { return new OfflineRetransmissionRequest( + sourceViewPath, sourceTopic, targetTopic, Instant.now().minusSeconds(1).toString(),