Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serviceaccount in parallel #7373

Merged
19 changes: 18 additions & 1 deletion pkg/apis/flows/v1/parallel_lifecycle.go
creydr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -42,6 +42,7 @@ const (
// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -195,6 +196,22 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string,
pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, mepsageFormat string, mepsageA ...interface{}) {
creydr marked this conversation as resolved.
Show resolved Hide resolved
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
ps.Address = address
if address == nil {
Expand Down
38 changes: 33 additions & 5 deletions pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand All @@ -42,14 +46,33 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
creydr marked this conversation as resolved.
Show resolved Hide resolved
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
serviceAccountLister: serviceaccountInformer.Lister(),
kubeclient: kubeclient.Get(ctx),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
}
impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(parallelInformer.Informer())
}
impl := parallelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -60,6 +83,11 @@ func NewController(
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
// Reconciler Parallel when the OIDC service account changes
prakrit55 marked this conversation as resolved.
Show resolved Hide resolved
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
1 change: 1 addition & 0 deletions pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
)

func TestNew(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/reconciler/parallel/parallel.go
creydr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
duckapis "knative.dev/pkg/apis/duck"

"knative.dev/pkg/kmeta"
Expand All @@ -38,16 +39,21 @@ import (

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/apis/feature"
corev1listers "k8s.io/client-go/listers/core/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel"
listers "knative.dev/eventing/pkg/client/listers/flows/v1"
messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1"
ducklib "knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler/parallel/resources"
duckv1knative "knative.dev/pkg/apis/duck/v1"
)

type Reconciler struct {
kubeclient kubernetes.Interface
// listers index properties about resources
parallelLister listers.ParallelLister
channelableTracker ducklib.ListableTracker
Expand All @@ -58,6 +64,7 @@ type Reconciler struct {

// dynamicClientSet allows us to configure pluggable Build objects
dynamicClientSet dynamic.Interface
serviceAccountLister corev1listers.ServiceAccountLister
}

// Check that our Reconciler implements parallelreconciler.Interface
Expand All @@ -71,6 +78,23 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon
// 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to
// either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply.
// 3. Rinse and repeat step #2 above for each branch in the list
// OIDC authentication
featureFlags := feature.FromContext(ctx)
if featureFlags.IsOIDCAuthentication() {
saName := auth.GetOIDCServiceAccountNameForResource(v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta)
p.Status.Auth = &duckv1knative.AuthStatus{
ServiceAccountName: &saName,
}
if err := auth.EnsureOIDCServiceAccountExistsForResource(ctx, r.serviceAccountLister, r.kubeclient, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta); err != nil {
p.Status.MarkOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", "%v", err)
creydr marked this conversation as resolved.
Show resolved Hide resolved
return err
}
p.Status.MarkOIDCIdentityCreatedSucceeded()
} else {
p.Status.Auth = nil
p.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "")
}

if p.Status.BranchStatuses == nil {
p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0)
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/reconciler/testing/v1/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package testing

import (
"context"
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/apis/feature"
flowsv1 "knative.dev/eventing/pkg/apis/flows/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -113,3 +115,31 @@ func WithFlowsParallelAddressableNotReady(reason, message string) FlowsParallelO
p.Status.MarkAddressableNotReady(reason, message)
}
}

func WithFlowsParallelOIDCIdentityCreatedSucceeded() FlowsParallelOption {
return func(p *flowsv1.Parallel) {
p.Status.MarkOIDCIdentityCreatedSucceeded()
}
}

func WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() FlowsParallelOption {
return func(p *flowsv1.Parallel) {
p.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "")
}
}

func WithFlowsParallelOIDCIdentityCreatedFailed(reason, message string) FlowsParallelOption {
return func(p *flowsv1.Parallel) {
p.Status.MarkOIDCIdentityCreatedFailed(reason, message)
}
}

func WithFlowsParallelOIDCServiceAccountName(name string) FlowsParallelOption {
return func(p *flowsv1.Parallel) {
if p.Status.Auth == nil {
p.Status.Auth = &duckv1.AuthStatus{}
}

p.Status.Auth.ServiceAccountName = &name
}
}