@@ -9,20 +9,23 @@ use futures::{
9
9
} ;
10
10
use snafu:: { OptionExt , ResultExt , Snafu } ;
11
11
use stackable_operator:: {
12
- builder:: meta:: OwnerReferenceBuilder ,
12
+ builder:: meta:: ObjectMetaBuilder ,
13
+ cluster_resources:: { ClusterResourceApplyStrategy , ClusterResources } ,
13
14
commons:: listener:: {
14
15
AddressType , Listener , ListenerClass , ListenerIngress , ListenerPort , ListenerSpec ,
15
16
ListenerStatus , ServiceType ,
16
17
} ,
18
+ iter:: TryFromIterator ,
17
19
k8s_openapi:: {
18
20
api:: core:: v1:: { Endpoints , Node , PersistentVolume , Service , ServicePort , ServiceSpec } ,
19
21
apimachinery:: pkg:: apis:: meta:: v1:: LabelSelector ,
20
22
} ,
21
23
kube:: {
22
24
api:: { DynamicObject , ObjectMeta } ,
23
25
runtime:: { controller, reflector:: ObjectRef , watcher} ,
24
- ResourceExt ,
26
+ Resource , ResourceExt ,
25
27
} ,
28
+ kvp:: { Annotations , Labels } ,
26
29
logging:: controller:: { report_controller_reconciled, ReconcilerError } ,
27
30
time:: Duration ,
28
31
} ;
@@ -31,12 +34,13 @@ use strum::IntoStaticStr;
31
34
use crate :: {
32
35
csi_server:: node:: NODE_TOPOLOGY_LABEL_HOSTNAME ,
33
36
utils:: address:: { node_primary_addresses, AddressCandidates } ,
37
+ APP_NAME , OPERATOR_KEY ,
34
38
} ;
35
39
36
40
#[ cfg( doc) ]
37
41
use stackable_operator:: k8s_openapi:: api:: core:: v1:: Pod ;
38
42
39
- const FIELD_MANAGER_SCOPE : & str = "listener" ;
43
+ const CONTROLLER_NAME : & str = "listener" ;
40
44
41
45
pub async fn run ( client : stackable_operator:: client:: Client ) {
42
46
let controller =
@@ -115,6 +119,11 @@ pub enum Error {
115
119
#[ snafu( display( "object has no name" ) ) ]
116
120
NoName ,
117
121
122
+ #[ snafu( display( "failed to create cluster resources" ) ) ]
123
+ CreateClusterResources {
124
+ source : stackable_operator:: cluster_resources:: Error ,
125
+ } ,
126
+
118
127
#[ snafu( display( "object has no ListenerClass (.spec.class_name)" ) ) ]
119
128
NoListenerClass ,
120
129
@@ -133,6 +142,22 @@ pub enum Error {
133
142
source : stackable_operator:: client:: Error ,
134
143
} ,
135
144
145
+ #[ snafu( display( "failed to validate labels passed through from Listener" ) ) ]
146
+ ValidateListenerLabels {
147
+ source : stackable_operator:: kvp:: LabelError ,
148
+ } ,
149
+
150
+ #[ snafu( display( "failed to validate annotations specified by {listener_class}" ) ) ]
151
+ ValidateListenerClassAnnotations {
152
+ source : stackable_operator:: kvp:: AnnotationError ,
153
+ listener_class : ObjectRef < ListenerClass > ,
154
+ } ,
155
+
156
+ #[ snafu( display( "failed to build cluster resource labels" ) ) ]
157
+ BuildClusterResourcesLabels {
158
+ source : stackable_operator:: kvp:: LabelError ,
159
+ } ,
160
+
136
161
#[ snafu( display( "failed to get {obj}" ) ) ]
137
162
GetObject {
138
163
source : stackable_operator:: client:: Error ,
@@ -146,10 +171,15 @@ pub enum Error {
146
171
147
172
#[ snafu( display( "failed to apply {svc}" ) ) ]
148
173
ApplyService {
149
- source : stackable_operator:: client :: Error ,
174
+ source : stackable_operator:: cluster_resources :: Error ,
150
175
svc : ObjectRef < Service > ,
151
176
} ,
152
177
178
+ #[ snafu( display( "failed to delete orphaned resources" ) ) ]
179
+ DeleteOrphans {
180
+ source : stackable_operator:: cluster_resources:: Error ,
181
+ } ,
182
+
153
183
#[ snafu( display( "failed to apply status for Listener" ) ) ]
154
184
ApplyStatus {
155
185
source : stackable_operator:: client:: Error ,
@@ -165,19 +195,37 @@ impl ReconcilerError for Error {
165
195
match self {
166
196
Self :: NoNs => None ,
167
197
Self :: NoName => None ,
198
+ Self :: CreateClusterResources { source : _ } => None ,
168
199
Self :: NoListenerClass => None ,
169
200
Self :: ListenerPvSelector { source : _ } => None ,
170
201
Self :: ListenerPodSelector { source : _ } => None ,
171
202
Self :: GetListenerPvs { source : _ } => None ,
203
+ Self :: ValidateListenerLabels { source : _ } => None ,
204
+ Self :: ValidateListenerClassAnnotations {
205
+ source : _,
206
+ listener_class,
207
+ } => Some ( listener_class. clone ( ) . erase ( ) ) ,
208
+ Self :: BuildClusterResourcesLabels { source : _ } => None ,
172
209
Self :: GetObject { source : _, obj } => Some ( obj. clone ( ) ) ,
173
210
Self :: BuildListenerOwnerRef { .. } => None ,
174
211
Self :: ApplyService { source : _, svc } => Some ( svc. clone ( ) . erase ( ) ) ,
212
+ Self :: DeleteOrphans { source : _ } => None ,
175
213
Self :: ApplyStatus { source : _ } => None ,
176
214
}
177
215
}
178
216
}
179
217
180
218
pub async fn reconcile ( listener : Arc < Listener > , ctx : Arc < Ctx > ) -> Result < controller:: Action > {
219
+ let mut cluster_resources = ClusterResources :: new (
220
+ APP_NAME ,
221
+ OPERATOR_KEY ,
222
+ CONTROLLER_NAME ,
223
+ & listener. object_ref ( & ( ) ) ,
224
+ // Listeners don't currently support pausing
225
+ ClusterResourceApplyStrategy :: Default ,
226
+ )
227
+ . context ( CreateClusterResourcesSnafu ) ?;
228
+
181
229
let ns = listener. metadata . namespace . as_deref ( ) . context ( NoNsSnafu ) ?;
182
230
let listener_class_name = listener
183
231
. spec
@@ -231,18 +279,36 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
231
279
} ;
232
280
233
281
let svc = Service {
234
- metadata : ObjectMeta {
235
- namespace : Some ( ns. to_string ( ) ) ,
236
- name : Some ( svc_name. clone ( ) ) ,
237
- owner_references : Some ( vec ! [ OwnerReferenceBuilder :: new( )
238
- . initialize_from_resource( & * listener)
239
- . build( )
240
- . context( BuildListenerOwnerRefSnafu ) ?] ) ,
241
- // Propagate the labels from the Listener object to the Service object, so it can be found easier
242
- labels : listener. metadata . labels . clone ( ) ,
243
- annotations : Some ( listener_class. spec . service_annotations ) ,
244
- ..Default :: default ( )
245
- } ,
282
+ metadata : ObjectMetaBuilder :: new ( )
283
+ . namespace ( ns)
284
+ . name ( & svc_name)
285
+ . ownerreference_from_resource ( & * listener, Some ( true ) , Some ( true ) )
286
+ . context ( BuildListenerOwnerRefSnafu ) ?
287
+ . with_labels (
288
+ Labels :: try_from (
289
+ listener
290
+ . metadata
291
+ . labels
292
+ . as_ref ( )
293
+ . unwrap_or ( & BTreeMap :: new ( ) ) ,
294
+ )
295
+ . context ( ValidateListenerLabelsSnafu ) ?,
296
+ )
297
+ . with_labels (
298
+ cluster_resources
299
+ // Not using Labels::recommended, since it carries a bunch of extra information that is
300
+ // only relevant for stacklets (such as rolegroups and product versions).
301
+ . get_required_labels ( )
302
+ . context ( BuildClusterResourcesLabelsSnafu ) ?,
303
+ )
304
+ . with_annotations (
305
+ Annotations :: try_from_iter ( & listener_class. spec . service_annotations ) . context (
306
+ ValidateListenerClassAnnotationsSnafu {
307
+ listener_class : ObjectRef :: from_obj ( & listener_class) ,
308
+ } ,
309
+ ) ?,
310
+ )
311
+ . build ( ) ,
246
312
spec : Some ( ServiceSpec {
247
313
// We explicitly match here and do not implement `ToString` as there might be more (non vanilla k8s Service
248
314
// types) in the future.
@@ -264,13 +330,11 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
264
330
} ) ,
265
331
..Default :: default ( )
266
332
} ;
267
- let svc = ctx
268
- . client
269
- . apply_patch ( FIELD_MANAGER_SCOPE , & svc , & svc)
333
+ let svc_ref = ObjectRef :: from_obj ( & svc ) ;
334
+ let svc = cluster_resources
335
+ . add ( & ctx . client , svc)
270
336
. await
271
- . with_context ( |_| ApplyServiceSnafu {
272
- svc : ObjectRef :: from_obj ( & svc) ,
273
- } ) ?;
337
+ . context ( ApplyServiceSnafu { svc : svc_ref } ) ?;
274
338
275
339
let nodes: Vec < Node > ;
276
340
let kubernetes_service_fqdn: String ;
@@ -376,8 +440,14 @@ pub async fn reconcile(listener: Arc<Listener>, ctx: Arc<Ctx>) -> Result<control
376
440
) ,
377
441
node_ports : ( listener_class. spec . service_type == ServiceType :: NodePort ) . then_some ( ports) ,
378
442
} ;
443
+
444
+ cluster_resources
445
+ . delete_orphaned_resources ( & ctx. client )
446
+ . await
447
+ . context ( DeleteOrphansSnafu ) ?;
448
+
379
449
ctx. client
380
- . apply_patch_status ( FIELD_MANAGER_SCOPE , & listener_status_meta, & listener_status)
450
+ . apply_patch_status ( CONTROLLER_NAME , & listener_status_meta, & listener_status)
381
451
. await
382
452
. context ( ApplyStatusSnafu ) ?;
383
453
0 commit comments