Skip to content

Commit

Permalink
Channel dispatcher authenticates the requests with OIDC (#7445)
Browse files Browse the repository at this point in the history
* Channel dispatcher authenticates the requests with OIDC

Signed-off-by: Calum Murray <[email protected]>

* Add feature store to the context

Signed-off-by: Calum Murray <[email protected]>

* Small fixes to status update patches

Signed-off-by: Calum Murray <[email protected]>

* mostly fixed unit tests

Signed-off-by: Calum Murray <[email protected]>

* Debugged unit test

Signed-off-by: Calum Murray <[email protected]>

* Add audience to WithSubcriber function

Signed-off-by: Calum Murray <[email protected]>

* Added rekt test for channel dispatcher

Signed-off-by: Calum Murray <[email protected]>

* Added validation that the channel.spec.subscribers is only updated by eventing-controller

Signed-off-by: Calum Murray <[email protected]>

* Added Auth to subscribable spec

Signed-off-by: Calum Murray <[email protected]>

* Fix unit test

Signed-off-by: Calum Murray <[email protected]>

* use spec.auth instead of status.auth for channel subscription service account name

Signed-off-by: Calum Murray <[email protected]>

* Remove unused var

Signed-off-by: Calum Murray <[email protected]>

* fix: add auth to channel and imc CRDs

Signed-off-by: Calum Murray <[email protected]>

* fix: allow imc dispatcher to create SA tokens

Signed-off-by: Calum Murray <[email protected]>

* Move subscription update validation from channel to imc

Signed-off-by: Calum Murray <[email protected]>

* fix: use the proper kref in the channel dispatcher OIDC test

Signed-off-by: Calum Murray <[email protected]>

* Update test/auth/oidc_test.go

Co-authored-by: Christoph Stäbler <[email protected]>

* Log out which username tries to edit the subscribers

Signed-off-by: Calum Murray <[email protected]>

* Added unit test for IMC validation of user updating spec.subscribers

Signed-off-by: Calum Murray <[email protected]>

* Try new username for service account

Signed-off-by: Calum Murray <[email protected]>

* Update conformance test

Signed-off-by: Calum Murray <[email protected]>

* set permissions on clusterrole

Signed-off-by: Calum Murray <[email protected]>

* Remove unnecessary auth

Signed-off-by: Calum Murray <[email protected]>

* fix rekt conformance test

Signed-off-by: Calum Murray <[email protected]>

* Fix imports

Signed-off-by: Calum Murray <[email protected]>

* fix imports again :)

Signed-off-by: Calum Murray <[email protected]>

* maybe fix the nil pointer reference

Signed-off-by: Calum Murray <[email protected]>

* fix test

Signed-off-by: Calum Murray <[email protected]>

* fix possible panic

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
Co-authored-by: Christoph Stäbler <[email protected]>
  • Loading branch information
Cali0707 and creydr authored Nov 24, 2023
1 parent 02d572e commit bdca23d
Showing 28 changed files with 378 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -157,6 +157,13 @@ spec:
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
status:
description: Status represents the current state of the Channel. This data may be out of date.
type: object
Original file line number Diff line number Diff line change
@@ -43,6 +43,13 @@ rules:
verbs:
- create
- patch
# Create OIDC tokens
- apiGroups:
- ""
resources:
- "serviceaccounts/token"
verbs:
- create
# Updates the finalizer so we can remove our handlers when channel is deleted
# Patches the status.subscribers to reflect when the subscription dataplane has been
# configured.
7 changes: 7 additions & 0 deletions config/core/resources/channel.yaml
Original file line number Diff line number Diff line change
@@ -179,6 +179,13 @@ spec:
uid:
description: UID is used to understand the origin of the subscriber.
type: string
auth:
description: Auth provides the relevant information for OIDC authentication.
type: object
properties:
serviceAccountName:
description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication.
type: string
status:
description: Status represents the current state of the Channel. This data may be out of date.
type: object
14 changes: 14 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
@@ -728,6 +728,20 @@ DeliverySpec
<p>DeliverySpec contains options controlling the event delivery</p>
</td>
</tr>
<tr>
<td>
<code>auth</code><br/>
<em>
<a href="https://pkg.go.dev/knative.dev/pkg/apis/duck/v1#AuthStatus">
knative.dev/pkg/apis/duck/v1.AuthStatus
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Auth contains the service account name for the subscription</p>
</td>
</tr>
</tbody>
</table>
<h3 id="duck.knative.dev/v1.SubscriberStatus">SubscriberStatus
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ require (
go.uber.org/atomic v1.9.0
go.uber.org/multierr v1.10.0
go.uber.org/zap v1.26.0
golang.org/x/net v0.18.0
golang.org/x/sync v0.5.0
google.golang.org/grpc v1.59.0
google.golang.org/protobuf v1.31.0
@@ -113,7 +114,6 @@ require (
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/oauth2 v0.14.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/term v0.14.0 // indirect
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1/subscribable_types.go
Original file line number Diff line number Diff line change
@@ -67,6 +67,9 @@ type SubscriberSpec struct {
// DeliverySpec contains options controlling the event delivery
// +optional
Delivery *DeliverySpec `json:"delivery,omitempty"`
// Auth contains the service account name for the subscription
// +optional
Auth *duckv1.AuthStatus `json:"auth,omitempty"`
}

// SubscriberStatus defines the status of a single subscriber to a Channel.
5 changes: 5 additions & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion pkg/apis/messaging/v1/crd_validation_test.go
Original file line number Diff line number Diff line change
@@ -29,12 +29,17 @@ type CRDTest struct {
name string
cr resourcesemantics.GenericCRD
want *apis.FieldError
ctx context.Context
}

func doValidateTest(t *testing.T, tests []CRDTest) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := test.cr.Validate(context.TODO())
ctx := test.ctx
if ctx == nil {
ctx = context.TODO()
}
got := test.cr.Validate(ctx)
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("%s: validate (-want, +got) = %v", test.name, diff)
}
50 changes: 50 additions & 0 deletions pkg/apis/messaging/v1/in_memory_channel_validation.go
Original file line number Diff line number Diff line change
@@ -21,10 +21,13 @@ import (
"fmt"

"knative.dev/pkg/apis"
"knative.dev/pkg/kmp"

"knative.dev/eventing/pkg/apis/eventing"
)

const eventingControllerSAName = "system:serviceaccount:knative-eventing:eventing-controller"

func (imc *InMemoryChannel) Validate(ctx context.Context) *apis.FieldError {
errs := imc.Spec.Validate(ctx).ViaField("spec")

@@ -39,6 +42,12 @@ func (imc *InMemoryChannel) Validate(ctx context.Context) *apis.FieldError {
}
}

if apis.IsInUpdate(ctx) {
// Validate that if any changes were made to spec.subscribers, they were made by the eventing-controller
original := apis.GetBaseline(ctx).(*InMemoryChannel)
errs = errs.Also(imc.CheckSubscribersChangeAllowed(ctx, original))
}

return errs
}

@@ -54,3 +63,44 @@ func (imcs *InMemoryChannelSpec) Validate(ctx context.Context) *apis.FieldError

return errs
}

func (imc *InMemoryChannel) CheckSubscribersChangeAllowed(ctx context.Context, original *InMemoryChannel) *apis.FieldError {
if original == nil {
return nil
}

if !canChangeChannelSpecAuth(ctx) {
return imc.checkSubsciberSpecAuthChanged(original, ctx)
}
return nil
}

func (imc *InMemoryChannel) checkSubsciberSpecAuthChanged(original *InMemoryChannel, ctx context.Context) *apis.FieldError {
if diff, err := kmp.ShortDiff(original.Spec.Subscribers, imc.Spec.Subscribers); err != nil {
return &apis.FieldError{
Message: "Failed to diff Channel.Spec.Subscribers",
Paths: []string{"spec.subscribers"},
Details: err.Error(),
}
} else if diff != "" {
user := apis.GetUserInfo(ctx)
userName := ""
if user != nil {
userName = user.Username
}
return &apis.FieldError{
Message: fmt.Sprintf("Channel.Spec.Subscribers changed by user %s which was not the %s service account", userName, eventingControllerSAName),
Paths: []string{"spec.subscribers"},
Details: diff,
}
}
return nil
}

func canChangeChannelSpecAuth(ctx context.Context) bool {
user := apis.GetUserInfo(ctx)
if user == nil {
return false
}
return user.Username == eventingControllerSAName
}
44 changes: 44 additions & 0 deletions pkg/apis/messaging/v1/in_memory_channel_validation_test.go
Original file line number Diff line number Diff line change
@@ -19,13 +19,45 @@ package v1
import (
"testing"

"golang.org/x/net/context"
authenticationv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/apis"
"knative.dev/pkg/kmp"

eventingduck "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
)

var (
validIMCSingleSubscriber = &InMemoryChannel{
Spec: InMemoryChannelSpec{
ChannelableSpec: eventingduck.ChannelableSpec{
SubscribableSpec: eventingduck.SubscribableSpec{
Subscribers: []eventingduck.SubscriberSpec{{
SubscriberURI: apis.HTTP("subscriberendpoint"),
ReplyURI: apis.HTTP("resultendpoint"),
}},
}},
},
}

validIMCTwoSubscribers = &InMemoryChannel{
Spec: InMemoryChannelSpec{
ChannelableSpec: eventingduck.ChannelableSpec{
SubscribableSpec: eventingduck.SubscribableSpec{
Subscribers: []eventingduck.SubscriberSpec{{
SubscriberURI: apis.HTTP("subscriberendpoint"),
ReplyURI: apis.HTTP("resultendpoint"),
}, {
SubscriberURI: apis.HTTP("subscriberendpoint2"),
ReplyURI: apis.HTTP("resultendpoint2"),
}},
}},
},
}
)

func TestInMemoryChannelValidation(t *testing.T) {
tests := []CRDTest{{
name: "empty",
@@ -101,6 +133,18 @@ func TestInMemoryChannelValidation(t *testing.T) {
fe.Details = "expected either 'cluster' or 'namespace'"
return fe
}(),
}, {
name: "invalid user for spec.subscribers update",
cr: validIMCTwoSubscribers,
want: func() *apis.FieldError {
diff, _ := kmp.ShortDiff(validIMCSingleSubscriber.Spec.Subscribers, validIMCTwoSubscribers.Spec.Subscribers)
return &apis.FieldError{
Message: "Channel.Spec.Subscribers changed by user test-user which was not the system:serviceaccount:knative-eventing:eventing-controller service account",
Paths: []string{"spec.subscribers"},
Details: diff,
}
}(),
ctx: apis.WithUserInfo(apis.WithinUpdate(context.TODO(), validIMCSingleSubscriber), &authenticationv1.UserInfo{Username: "test-user"}),
}}

doValidateTest(t, tests)
35 changes: 23 additions & 12 deletions pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
@@ -46,10 +46,11 @@ const (
)

type Subscription struct {
Subscriber duckv1.Addressable
Reply *duckv1.Addressable
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
Subscriber duckv1.Addressable
Reply *duckv1.Addressable
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
ServiceAccount *types.NamespacedName
}

// Config for a fanout.EventHandler.
@@ -130,24 +131,27 @@ func NewFanoutEventHandler(

func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscription, error) {
destination := duckv1.Addressable{
URL: sub.SubscriberURI,
CACerts: sub.SubscriberCACerts,
URL: sub.SubscriberURI,
CACerts: sub.SubscriberCACerts,
Audience: sub.SubscriberAudience,
}

var reply *duckv1.Addressable
if sub.ReplyURI != nil {
reply = &duckv1.Addressable{
URL: sub.ReplyURI,
CACerts: sub.ReplyCACerts,
URL: sub.ReplyURI,
CACerts: sub.ReplyCACerts,
Audience: sub.ReplyAudience,
}
}

var deadLetter *duckv1.Addressable
if sub.Delivery != nil && sub.Delivery.DeadLetterSink != nil && sub.Delivery.DeadLetterSink.URI != nil {
// Subscription reconcilers resolves the URI.
deadLetter = &duckv1.Addressable{
URL: sub.Delivery.DeadLetterSink.URI,
CACerts: sub.Delivery.DeadLetterSink.CACerts,
URL: sub.Delivery.DeadLetterSink.URI,
CACerts: sub.Delivery.DeadLetterSink.CACerts,
Audience: sub.Delivery.DeadLetterSink.Audience,
}
}

@@ -317,11 +321,18 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription,
// makeFanoutRequest sends the request to exactly one subscription. It handles both the `call` and
// the `sink` portions of the subscription.
func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.Event, additionalHeaders nethttp.Header, sub Subscription) (*kncloudevents.DispatchInfo, error) {
return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber,
dispatchOptions := []kncloudevents.SendOption{
kncloudevents.WithHeader(additionalHeaders),
kncloudevents.WithReply(sub.Reply),
kncloudevents.WithDeadLetterSink(sub.DeadLetter),
kncloudevents.WithRetryConfig(sub.RetryConfig))
kncloudevents.WithRetryConfig(sub.RetryConfig),
}

if sub.ServiceAccount != nil {
dispatchOptions = append(dispatchOptions, kncloudevents.WithOIDCAuthentication(sub.ServiceAccount))
}

return f.eventDispatcher.SendEvent(ctx, event, sub.Subscriber, dispatchOptions...)
}

type DispatchResult struct {
24 changes: 16 additions & 8 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
@@ -130,10 +130,25 @@ func NewController(
eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider),
}

var globalResync func(obj interface{})

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(_ string, _ interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{SkipStatusUpdates: true, FinalizerName: finalizerName}
return controller.Options{SkipStatusUpdates: true, FinalizerName: finalizerName, ConfigStore: featureStore}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(inmemorychannelInformer.Informer())
}

r.featureStore = featureStore

// Watch for inmemory channels.
inmemorychannelInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
@@ -144,13 +159,6 @@ func NewController(
DeleteFunc: r.deleteFunc,
}})

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
impl.GlobalResync(inmemorychannelInformer.Informer())
})
featureStore.WatchConfigs(cmw)

r.featureStore = featureStore

httpArgs := &inmemorychannel.InMemoryEventDispatcherArgs{
Port: httpPort,
ReadTimeout: readTimeout,
12 changes: 10 additions & 2 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
return nil
}

config, err := newConfigForInMemoryChannel(imc)
config, err := newConfigForInMemoryChannel(ctx, imc)
if err != nil {
logging.FromContext(ctx).Error("Error creating config for in memory channels", zap.Error(err))
return err
@@ -209,11 +209,19 @@ func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemory
}

// newConfigForInMemoryChannel creates a new Config for a single inmemory channel.
func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) {
func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (*multichannelfanout.ChannelConfig, error) {
featureFlags := feature.FromContext(ctx)
isOIDCEnabled := featureFlags.IsOIDCAuthentication()
subs := make([]fanout.Subscription, len(imc.Spec.Subscribers))

for i, sub := range imc.Spec.Subscribers {
conf, err := fanout.SubscriberSpecToFanoutConfig(sub)
if isOIDCEnabled {
conf.ServiceAccount = &types.NamespacedName{
Name: *sub.Auth.ServiceAccountName,
Namespace: imc.Namespace,
}
}
if err != nil {
return nil, err
}
Loading

0 comments on commit bdca23d

Please sign in to comment.