Skip to content

Commit d960a99

Browse files
authored
fix: improvements event source management (#1239)
1 parent 6fbb56e commit d960a99

File tree

4 files changed

+72
-26
lines changed

4 files changed

+72
-26
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,15 @@ public EventSourceManager(Controller<R> controller) {
3939
this.eventSources = eventSources;
4040
this.controller = controller;
4141
// controller event source needs to be available before we create the event processor
42-
final var controllerEventSource = eventSources.initControllerEventSource(controller);
42+
eventSources.initControllerEventSource(controller);
4343
this.eventProcessor = new EventProcessor<>(this);
4444

45-
// sources need to be registered after the event processor is created since it's set on the
46-
// event source
47-
registerEventSource(eventSources.retryEventSource());
48-
registerEventSource(controllerEventSource);
45+
postProcessDefaultEventSources();
46+
}
47+
48+
private void postProcessDefaultEventSources() {
49+
eventSources.controllerResourceEventSource().setEventHandler(eventProcessor);
50+
eventSources.retryEventSource().setEventHandler(eventProcessor);
4951
}
5052

5153
/**
@@ -146,7 +148,7 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes
146148
public void changeNamespaces(Set<String> namespaces) {
147149
eventProcessor.stop();
148150
eventSources
149-
.allEventSources()
151+
.eventSources()
150152
.filter(NamespaceChangeable.class::isInstance)
151153
.map(NamespaceChangeable.class::cast)
152154
.filter(NamespaceChangeable::allowsNamespaceChanges)

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

+12-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
class EventSources<R extends HasMetadata> implements Iterable<NamedEventSource> {
1818

19-
private static final String CONTROLLER_EVENT_SOURCE_KEY = "0";
19+
public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME =
20+
"ControllerResourceEventSource";
21+
public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME =
22+
"RetryAndRescheduleTimerEventSource";
23+
2024
private final ConcurrentNavigableMap<String, Map<String, EventSource>> sources =
2125
new ConcurrentSkipListMap<>();
2226
private final TimerEventSource<R> retryAndRescheduleTimerEventSource = new TimerEventSource<>();
@@ -38,17 +42,21 @@ TimerEventSource<R> retryEventSource() {
3842

3943
@Override
4044
public Iterator<NamedEventSource> iterator() {
41-
return flatMappedSources().iterator();
45+
return Stream.concat(Stream.of(
46+
new NamedEventSource(controllerResourceEventSource, CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
47+
new NamedEventSource(retryAndRescheduleTimerEventSource,
48+
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
49+
flatMappedSources()).iterator();
4250
}
4351

4452
Stream<NamedEventSource> flatMappedSources() {
4553
return sources.values().stream().flatMap(c -> c.entrySet().stream()
4654
.map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey())));
4755
}
4856

49-
Stream<EventSource> allEventSources() {
57+
Stream<EventSource> eventSources() {
5058
return Stream.concat(
51-
Stream.of(retryEventSource(), controllerResourceEventSource()).filter(Objects::nonNull),
59+
Stream.of(controllerResourceEventSource(), retryEventSource()).filter(Objects::nonNull),
5260
sources.values().stream().flatMap(c -> c.values().stream()));
5361
}
5462

@@ -81,9 +89,6 @@ private Class<?> getResourceType(EventSource source) {
8189
}
8290

8391
private String keyFor(EventSource source) {
84-
if (source instanceof ControllerResourceEventSource) {
85-
return CONTROLLER_EVENT_SOURCE_KEY;
86-
}
8792
return keyFor(getResourceType(source));
8893
}
8994

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

+17
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.util.Objects;
4+
35
import io.javaoperatorsdk.operator.OperatorException;
46
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
57

@@ -40,4 +42,19 @@ public String toString() {
4042
public EventSource original() {
4143
return original;
4244
}
45+
46+
@Override
47+
public boolean equals(Object o) {
48+
if (this == o)
49+
return true;
50+
if (o == null || getClass() != o.getClass())
51+
return false;
52+
NamedEventSource that = (NamedEventSource) o;
53+
return Objects.equals(original, that.original) && Objects.equals(name, that.name);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hash(original, name);
59+
}
4360
}
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import java.util.Set;
4-
import java.util.stream.Collectors;
5-
63
import org.junit.jupiter.api.Test;
74

85
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -12,12 +9,16 @@
129
import io.javaoperatorsdk.operator.processing.Controller;
1310
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1411

12+
import static io.javaoperatorsdk.operator.processing.event.EventSources.CONTROLLER_RESOURCE_EVENT_SOURCE_NAME;
13+
import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME;
14+
import static org.assertj.core.api.Assertions.assertThat;
1515
import static org.junit.jupiter.api.Assertions.*;
1616
import static org.mockito.Mockito.mock;
1717

1818
@SuppressWarnings({"unchecked", "rawtypes"})
1919
class EventSourcesTest {
2020

21+
public static final String EVENT_SOURCE_NAME = "foo";
2122
EventSources eventSources = new EventSources();
2223

2324
@Test
@@ -31,20 +32,41 @@ void cannotAddTwoEventSourcesWithSameName() {
3132
@Test
3233
void allEventSourcesShouldReturnAll() {
3334
// initial state doesn't have ControllerResourceEventSource
34-
assertEquals(Set.of(eventSources.retryEventSource()), eventSources.allEventSources().collect(
35-
Collectors.toSet()));
35+
assertThat(eventSources.eventSources()).containsExactly(eventSources.retryEventSource());
36+
37+
initControllerEventSource();
38+
39+
assertThat(eventSources.eventSources()).containsExactly(
40+
eventSources.controllerResourceEventSource(),
41+
eventSources.retryEventSource());
42+
43+
final var source = mock(EventSource.class);
44+
eventSources.add(EVENT_SOURCE_NAME, source);
45+
// order matters
46+
assertThat(eventSources.eventSources())
47+
.containsExactly(eventSources.controllerResourceEventSource(),
48+
eventSources.retryEventSource(), source);
49+
}
50+
51+
@Test
52+
void eventSourcesIteratorShouldReturnControllerEventSourceAsFirst() {
53+
initControllerEventSource();
54+
final var source = mock(EventSource.class);
55+
eventSources.add(EVENT_SOURCE_NAME, source);
56+
57+
assertThat(eventSources.iterator()).toIterable().containsExactly(
58+
new NamedEventSource(eventSources.controllerResourceEventSource(),
59+
CONTROLLER_RESOURCE_EVENT_SOURCE_NAME),
60+
new NamedEventSource(eventSources.retryEventSource(),
61+
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME),
62+
new NamedEventSource(source, EVENT_SOURCE_NAME));
63+
}
64+
65+
private void initControllerEventSource() {
3666
final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
3767
final var controller = new Controller(mock(Reconciler.class), configuration,
3868
MockKubernetesClient.client(HasMetadata.class));
3969
eventSources.initControllerEventSource(controller);
40-
assertEquals(
41-
Set.of(eventSources.retryEventSource(), eventSources.controllerResourceEventSource()),
42-
eventSources.allEventSources().collect(Collectors.toSet()));
43-
final var source = mock(EventSource.class);
44-
eventSources.add("foo", source);
45-
assertEquals(Set.of(eventSources.retryEventSource(),
46-
eventSources.controllerResourceEventSource(), source),
47-
eventSources.allEventSources().collect(Collectors.toSet()));
4870
}
4971

5072
}

0 commit comments

Comments
 (0)