Skip to content
This repository has been archived by the owner on Mar 15, 2022. It is now read-only.

Add TLS and SASL SCRAM auth support for the Kafka sync #107

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/rockset/rockset-go-client v0.6.0
github.com/sethgrid/pester v0.0.0-20190127155807-68a33a018ad0
github.com/spf13/viper v1.4.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
k8s.io/api v0.0.0-20190814101207-0772a1bdf941
k8s.io/apimachinery v0.0.0-20190814100815-533d101be9a6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
Expand Down
29 changes: 22 additions & 7 deletions sinks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,30 @@ func ManufactureSink() (e EventSinkInterface) {
viper.SetDefault("kafkaRetryMax", 5)
viper.SetDefault("kafkaSaslUser", "")
viper.SetDefault("kafkaSaslPwd", "")

brokers := viper.GetStringSlice("kafkaBrokers")
viper.SetDefault("kafkaSaslMechanism", "")

var tlsCfg *kafkaTLSConfig
if viper.IsSet("kafkaTLS") {
tlsCfg = &kafkaTLSConfig{}
if err := viper.UnmarshalKey("kafkaTLS", tlsCfg); err != nil {
panic(err.Error())
}
}

cfg := kafkaConfig{
Brokers: viper.GetStringSlice("kafkaBrokers"),
Async: viper.GetBool("kafkaAsync"),
RetryMax: viper.GetInt("kafkaRetryMax"),
SASL: kafkaSASLConfig{
Username: viper.GetString("kafkaSaslUser"),
Password: viper.GetString("kafkaSaslPwd"),
Mechanism: viper.GetString("kafkaSaslMechanism"),
},
TLS: tlsCfg,
}
topic := viper.GetString("kafkaTopic")
async := viper.GetBool("kakfkaAsync")
retryMax := viper.GetInt("kafkaRetryMax")
saslUser := viper.GetString("kafkaSaslUser")
saslPwd := viper.GetString("kafkaSaslPwd")

e, err := NewKafkaSink(brokers, topic, async, retryMax, saslUser, saslPwd)
e, err := NewKafkaSink(cfg, topic)
if err != nil {
panic(err.Error())
}
Expand Down
118 changes: 108 additions & 10 deletions sinks/kafkasink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ limitations under the License.
package sinks

import (
"errors"
"fmt"

"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"

"github.com/Shopify/sarama"
"github.com/golang/glog"
"k8s.io/api/core/v1"
Expand All @@ -29,10 +36,43 @@ type KafkaSink struct {
producer interface{}
}

type kafkaConfig struct {
Brokers []string
Async bool
RetryMax int
SASL kafkaSASLConfig
TLS *kafkaTLSConfig
}

type kafkaSASLConfig struct {
Username string
Password string
Mechanism string
}

type kafkaTLSConfig struct {
CertFile string
KeyFile string
CACertFiles []string
}

var (
errMissingCertOrKeyFile = errors.New("one of certificate file or key file for client authentication missing")

kafkaSASLMechanisms = map[sarama.SASLMechanism]func() sarama.SCRAMClient {
sarama.SASLTypeSCRAMSHA256: func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: scramSHA256}
},
sarama.SASLTypeSCRAMSHA512: func() sarama.SCRAMClient {
return &XDGSCRAMClient{HashGeneratorFcn: scramSHA512}
},
}
)

// NewKafkaSinkSink will create a new KafkaSink with default options, returned as an EventSinkInterface
func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, saslUser string, saslPwd string) (EventSinkInterface, error) {
func NewKafkaSink(cfg kafkaConfig, topic string) (EventSinkInterface, error) {

p, err := sinkFactory(brokers, async, retryMax, saslUser, saslPwd)
p, err := sinkFactory(&cfg)

if err != nil {
return nil, err
Expand All @@ -44,23 +84,43 @@ func NewKafkaSink(brokers []string, topic string, async bool, retryMax int, sasl
}, err
}

func sinkFactory(brokers []string, async bool, retryMax int, saslUser string, saslPwd string) (interface{}, error) {
func sinkFactory(cfg *kafkaConfig) (interface{}, error) {
config := sarama.NewConfig()
config.Producer.Retry.Max = retryMax
config.Producer.Retry.Max = cfg.RetryMax
config.Producer.RequiredAcks = sarama.WaitForAll

if saslUser != "" && saslPwd != "" {
if cfg.SASL.Username != "" && cfg.SASL.Password != "" {
config.Net.SASL.Enable = true
config.Net.SASL.User = saslUser
config.Net.SASL.Password = saslPwd
config.Net.SASL.User = cfg.SASL.Username
config.Net.SASL.Password = cfg.SASL.Password

if cfg.SASL.Mechanism != "" {
mechanism := sarama.SASLMechanism(cfg.SASL.Mechanism)
generatorFunc, ok := kafkaSASLMechanisms[mechanism]
if !ok {
return nil, fmt.Errorf("unknown SASL mechanism name: %q", cfg.SASL.Mechanism)
}

config.Net.SASL.Mechanism = mechanism
config.Net.SASL.SCRAMClientGeneratorFunc = generatorFunc
}
}

if cfg.TLS != nil {
tlsConfig, err := cfg.TLS.AsTLSConfig()
if err != nil {
return nil, err
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
}

if async {
return sarama.NewAsyncProducer(brokers, config)
if cfg.Async {
return sarama.NewAsyncProducer(cfg.Brokers, config)
}

config.Producer.Return.Successes = true
return sarama.NewSyncProducer(brokers, config)
return sarama.NewSyncProducer(cfg.Brokers, config)

}

Expand Down Expand Up @@ -100,3 +160,41 @@ func (ks *KafkaSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) {
}

}

func (c *kafkaTLSConfig) AsTLSConfig() (*tls.Config, error) {
var certs []tls.Certificate
var certPool *x509.CertPool

if c.CertFile != "" && c.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
if err != nil {
return nil, err
}

certs = []tls.Certificate{cert}
} else if c.CertFile != "" || c.KeyFile != "" {
return nil, errMissingCertOrKeyFile
}

for _, caCertFile := range c.CACertFiles {
caCert, err := ioutil.ReadFile(caCertFile)
if err != nil {
return nil, err
}

if certPool == nil {
certPool = x509.NewCertPool()
}
certPool.AppendCertsFromPEM(caCert)
}

cfg := tls.Config{}
if certs != nil {
cfg.Certificates = certs
}
if certPool != nil {
cfg.RootCAs = certPool
}

return &cfg, nil
}
50 changes: 50 additions & 0 deletions sinks/scram_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2017 The Contributors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sinks

import (
"crypto/sha512"

"github.com/xdg/scram"
)

var (
scramSHA256 = scram.SHA256
scramSHA512 = scram.HashGeneratorFcn(sha512.New)
)

type XDGSCRAMClient struct {
*scram.Client
*scram.ClientConversation
scram.HashGeneratorFcn
}

func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
if err == nil {
x.ClientConversation = x.Client.NewConversation()
}
return
}

func (x *XDGSCRAMClient) Step(challenge string) (string, error) {
return x.ClientConversation.Step(challenge)
}

func (x *XDGSCRAMClient) Done() bool {
return x.ClientConversation.Done()
}