18
18
19
19
@ SuppressWarnings ("rawtypes" )
20
20
public class Operator implements AutoCloseable {
21
-
22
21
private static final Logger log = LoggerFactory .getLogger (Operator .class );
23
22
private final KubernetesClient k8sClient ;
24
23
private final ConfigurationService configurationService ;
25
24
private final List <Closeable > closeables ;
25
+ private final Object lock ;
26
+ private final List <ControllerRef > controllers ;
27
+ private volatile boolean started ;
26
28
27
29
public Operator (KubernetesClient k8sClient , ConfigurationService configurationService ) {
28
30
this .k8sClient = k8sClient ;
29
31
this .configurationService = configurationService ;
30
32
this .closeables = new ArrayList <>();
33
+ this .lock = new Object ();
34
+ this .controllers = new ArrayList <>();
35
+ this .started = false ;
31
36
32
37
Runtime .getRuntime ().addShutdownHook (new Thread (this ::close ));
33
38
}
@@ -45,43 +50,65 @@ public ConfigurationService getConfigurationService() {
45
50
* where there is no obvious entrypoint to the application which can trigger the injection process
46
51
* and start the cluster monitoring processes.
47
52
*/
53
+ @ SuppressWarnings ("unchecked" )
48
54
public void start () {
49
- final var version = configurationService .getVersion ();
50
- log .info (
51
- "Operator SDK {} (commit: {}) built on {} starting..." ,
52
- version .getSdkVersion (),
53
- version .getCommit (),
54
- version .getBuiltTime ());
55
- log .info ("Client version: {}" , Version .clientVersion ());
56
- try {
57
- final var k8sVersion = k8sClient .getVersion ();
58
- if (k8sVersion != null ) {
59
- log .info ("Server version: {}.{}" , k8sVersion .getMajor (), k8sVersion .getMinor ());
55
+ synchronized (lock ) {
56
+ if (started ) {
57
+ return ;
58
+ }
59
+
60
+ final var version = configurationService .getVersion ();
61
+ log .info (
62
+ "Operator SDK {} (commit: {}) built on {} starting..." ,
63
+ version .getSdkVersion (),
64
+ version .getCommit (),
65
+ version .getBuiltTime ());
66
+ log .info ("Client version: {}" , Version .clientVersion ());
67
+ try {
68
+ final var k8sVersion = k8sClient .getVersion ();
69
+ if (k8sVersion != null ) {
70
+ log .info ("Server version: {}.{}" , k8sVersion .getMajor (), k8sVersion .getMinor ());
71
+ }
72
+ } catch (Exception e ) {
73
+ log .error ("Error retrieving the server version. Exiting!" , e );
74
+ throw new OperatorException ("Error retrieving the server version" , e );
75
+ }
76
+
77
+ for (ControllerRef ref : controllers ) {
78
+ startController (ref .controller , ref .configuration );
60
79
}
61
- } catch (Exception e ) {
62
- log .error ("Error retrieving the server version. Exiting!" , e );
63
- throw new OperatorException ("Error retrieving the server version" , e );
80
+
81
+ started = true ;
64
82
}
65
83
}
66
84
67
85
/** Stop the operator. */
68
86
@ Override
69
87
public void close () {
70
- log .info (
71
- "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
88
+ synchronized (lock ) {
89
+ if (!started ) {
90
+ return ;
91
+ }
72
92
73
- for (Closeable closeable : this .closeables ) {
74
- try {
75
- log .debug ("closing {}" , closeable );
76
- closeable .close ();
77
- } catch (IOException e ) {
78
- log .warn ("Error closing {}" , closeable , e );
93
+ log .info (
94
+ "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
95
+
96
+ for (Closeable closeable : this .closeables ) {
97
+ try {
98
+ log .debug ("closing {}" , closeable );
99
+ closeable .close ();
100
+ } catch (IOException e ) {
101
+ log .warn ("Error closing {}" , closeable , e );
102
+ }
79
103
}
104
+
105
+ started = false ;
80
106
}
81
107
}
82
108
83
109
/**
84
- * Registers the specified controller with this operator.
110
+ * Add a registration requests for the specified controller with this operator. The effective
111
+ * registration of the controller is delayed till the operator is started.
85
112
*
86
113
* @param controller the controller to register
87
114
* @param <R> the {@code CustomResource} type associated with the controller
@@ -92,6 +119,32 @@ public <R extends CustomResource> void register(ResourceController<R> controller
92
119
register (controller , null );
93
120
}
94
121
122
+ /**
123
+ * Add a registration requests for the specified controller with this operator, overriding its
124
+ * default configuration by the specified one (usually created via {@link
125
+ * io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider#override(ControllerConfiguration)},
126
+ * passing it the controller's original configuration. The effective registration of the
127
+ * controller is delayed till the operator is started.
128
+ *
129
+ * @param controller the controller to register
130
+ * @param configuration the configuration with which we want to register the controller, if {@code
131
+ * null}, the controller's original configuration is used
132
+ * @param <R> the {@code CustomResource} type associated with the controller
133
+ * @throws OperatorException if a problem occurred during the registration process
134
+ */
135
+ public <R extends CustomResource > void register (
136
+ ResourceController <R > controller , ControllerConfiguration <R > configuration )
137
+ throws OperatorException {
138
+ synchronized (lock ) {
139
+ if (!started ) {
140
+ this .controllers .add (new ControllerRef (controller , configuration ));
141
+ } else {
142
+ this .controllers .add (new ControllerRef (controller , configuration ));
143
+ startController (controller , configuration );
144
+ }
145
+ }
146
+ }
147
+
95
148
/**
96
149
* Registers the specified controller with this operator, overriding its default configuration by
97
150
* the specified one (usually created via {@link
@@ -104,9 +157,10 @@ public <R extends CustomResource> void register(ResourceController<R> controller
104
157
* @param <R> the {@code CustomResource} type associated with the controller
105
158
* @throws OperatorException if a problem occurred during the registration process
106
159
*/
107
- public <R extends CustomResource > void register (
160
+ private <R extends CustomResource > void startController (
108
161
ResourceController <R > controller , ControllerConfiguration <R > configuration )
109
162
throws OperatorException {
163
+
110
164
final var existing = configurationService .getConfigurationFor (controller );
111
165
if (existing == null ) {
112
166
log .warn (
@@ -120,7 +174,7 @@ public <R extends CustomResource> void register(
120
174
configuration = existing ;
121
175
}
122
176
123
- Class <R > resClass = configuration .getCustomResourceClass ();
177
+ final Class <R > resClass = configuration .getCustomResourceClass ();
124
178
final String controllerName = configuration .getName ();
125
179
final var crdName = configuration .getCRDName ();
126
180
final var specVersion = "v1" ;
@@ -137,10 +191,10 @@ public <R extends CustomResource> void register(
137
191
CustomResourceUtils .assertCustomResource (resClass , crd );
138
192
}
139
193
140
- final var client = k8sClient .customResources (resClass );
141
194
try {
142
195
DefaultEventSourceManager eventSourceManager =
143
- new DefaultEventSourceManager (controller , configuration , client );
196
+ new DefaultEventSourceManager (
197
+ controller , configuration , k8sClient .customResources (resClass ));
144
198
controller .init (eventSourceManager );
145
199
closeables .add (eventSourceManager );
146
200
} catch (MissingCRDException e ) {
@@ -195,4 +249,14 @@ private static <R extends CustomResource> boolean failOnMissingCurrentNS(
195
249
}
196
250
return false ;
197
251
}
252
+
253
+ private static class ControllerRef {
254
+ public final ResourceController controller ;
255
+ public final ControllerConfiguration configuration ;
256
+
257
+ public ControllerRef (ResourceController controller , ControllerConfiguration configuration ) {
258
+ this .controller = controller ;
259
+ this .configuration = configuration ;
260
+ }
261
+ }
198
262
}
0 commit comments