From 2fa050a421ba962e6fa94eeed133c9dbecef43f1 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Fri, 17 Jan 2025 19:39:18 +0000 Subject: [PATCH 1/4] feat: allow users to proxy schema registry --- cmd/kafka-proxy/server.go | 67 +++++++++++++----- config/config.go | 13 +++- proxy/schema_registry.go | 143 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 206 insertions(+), 17 deletions(-) create mode 100644 proxy/schema_registry.go diff --git a/cmd/kafka-proxy/server.go b/cmd/kafka-proxy/server.go index f5811c7a..63bf2fcd 100644 --- a/cmd/kafka-proxy/server.go +++ b/cmd/kafka-proxy/server.go @@ -1,40 +1,43 @@ package server import ( + "context" + "errors" "fmt" - - "github.com/grepplabs/kafka-proxy/config" - "github.com/grepplabs/kafka-proxy/proxy" - "github.com/oklog/run" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "github.com/sirupsen/logrus" - "github.com/spf13/cobra" - "net" "net/http" _ "net/http/pprof" "os" "os/exec" "os/signal" + "strings" "syscall" "time" - "errors" - "strings" + "github.com/oklog/run" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + + "github.com/grepplabs/kafka-proxy/config" + "github.com/grepplabs/kafka-proxy/proxy" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-plugin" "github.com/grepplabs/kafka-proxy/pkg/apis" localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared" tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared" tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/go-plugin" + + "github.com/spf13/viper" "github.com/grepplabs/kafka-proxy/pkg/registry" + // built-in plugins _ "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-info" _ "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-provider" - "github.com/spf13/viper" ) var ( @@ -156,7 +159,7 @@ func initFlags() { Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key") Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file") - //Same TLS client cert tls-same-client-cert-enable + // Same TLS client cert tls-same-client-cert-enable Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)") // SASL by Proxy @@ -189,6 +192,14 @@ func initFlags() { Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin") Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout") + // Schema Registry + Server.Flags().BoolVar(&c.SchemaRegistry.Enable, "schema-registry-enable", false, "Proxy Schema Registry") + Server.Flags().StringVar(&c.SchemaRegistry.Url, "schema-registry-url", "", "The Schema Registry URL without port") + Server.Flags().IntVar(&c.SchemaRegistry.Port, "schema-registry-port", 8080, "The Schema Registry port") + Server.Flags().StringVar(&c.SchemaRegistry.Username, "schema-registry-username", "", "The Schema Registry username") + Server.Flags().StringVar(&c.SchemaRegistry.Password, "schema-registry-password", "", "The Schema Registry password") + Server.Flags().IntVar(&c.SchemaRegistry.ProxyPort, "schema-registry-proxy-port", 8080, "The port to expose the proxy to") + // Web Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints") Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on") @@ -218,6 +229,31 @@ func Run(_ *cobra.Command, _ []string) { var localPasswordAuthenticator apis.PasswordAuthenticator var localTokenAuthenticator apis.TokenInfo + var g run.Group + + if c.SchemaRegistry.Enable { + srProxy, err := proxy.NewSchemaRegistryProxy( + c.SchemaRegistry.Url, + c.SchemaRegistry.Username, + c.SchemaRegistry.Password, + c.SchemaRegistry.Port, + c.SchemaRegistry.ProxyPort, + ) + if err != nil { + logrus.Fatal(err) + } + + g.Add(func() error { + return srProxy.Start() + }, func(error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := srProxy.Stop(ctx); err != nil { + logrus.WithError(err).Error("Failed to stop Schema Registry proxy gracefully") + } + }) + } + if c.Auth.Local.Enable { switch c.Auth.Local.Mechanism { case "PLAIN": @@ -372,7 +408,6 @@ func Run(_ *cobra.Command, _ []string) { } } - var g run.Group { // All active connections are stored in this variable. connset := proxy.NewConnSet() diff --git a/config/config.go b/config/config.go index d8f958fa..d3fa4af6 100644 --- a/config/config.go +++ b/config/config.go @@ -7,8 +7,9 @@ import ( "strings" "time" - "github.com/grepplabs/kafka-proxy/pkg/libs/util" "github.com/pkg/errors" + + "github.com/grepplabs/kafka-proxy/pkg/libs/util" ) const ( @@ -172,6 +173,16 @@ type Config struct { Acks0Disabled bool } } + + SchemaRegistry struct { + Enable bool + Url string + Port int + Username string + Password string + ProxyPort int + } + ForwardProxy struct { Url string diff --git a/proxy/schema_registry.go b/proxy/schema_registry.go new file mode 100644 index 00000000..44ae4817 --- /dev/null +++ b/proxy/schema_registry.go @@ -0,0 +1,143 @@ +package proxy + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "time" + + "github.com/sirupsen/logrus" +) + +type SchemaRegistryProxy struct { + url string + username string + password string + port int + proxyPort int + httpServer *http.Server +} + +func NewSchemaRegistryProxy(url, username, password string, port, proxyPort int) (*SchemaRegistryProxy, error) { + if err := validateSchemaRegistryCreds(username, password); err != nil { + return nil, err + } + + return &SchemaRegistryProxy{ + url: url, + username: username, + password: password, + port: port, + proxyPort: proxyPort, + }, nil +} + +func validateSchemaRegistryCreds(username, password string) error { + if username == "" || password == "" { + return fmt.Errorf("schema Registry proxy requires both username and password") + } + return nil +} + +func (s *SchemaRegistryProxy) Start() error { + remote, err := url.Parse(fmt.Sprintf("https://%s:%d", s.url, s.port)) + if err != nil { + return fmt.Errorf("invalid Schema Registry URL: %w", err) + } + + proxy := httputil.NewSingleHostReverseProxy(remote) + proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { + logrus.WithError(err).Error("Schema Registry proxy error") + w.WriteHeader(http.StatusBadGateway) + } + + // Setup proxy handler with logging and auth + handler := s.createProxyHandler(proxy, remote) + + s.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", s.proxyPort), + Handler: handler, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, + IdleTimeout: 120 * time.Second, + } + + logrus.WithFields(logrus.Fields{ + "listen_port": s.proxyPort, + "target_url": remote.String(), + }).Info("Starting Schema Registry proxy") + + if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(http.ErrServerClosed, err) { + return fmt.Errorf("schema Registry proxy server error: %w", err) + } + return nil +} + +func (s *SchemaRegistryProxy) Stop(ctx context.Context) error { + if s.httpServer != nil { + return s.httpServer.Shutdown(ctx) + } + return nil +} + +func (s *SchemaRegistryProxy) createProxyHandler(proxy *httputil.ReverseProxy, remote *url.URL) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.WithFields(logrus.Fields{ + "method": r.Method, + "path": r.URL.Path, + }).Debug("Schema Registry proxy request") + + r.Host = remote.Host + r.SetBasicAuth(s.username, s.password) + + proxy.ServeHTTP(w, r) + }) +} +func ExposeSchemaRegistry(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword string, schemaRegistryPort, schemaRegistryProxyPort int) { + proxy, err := NewSchemaRegistryProxy( + schemaRegistryUrl, + schemaRegistryUsername, + schemaRegistryPassword, + schemaRegistryPort, + schemaRegistryProxyPort, + ) + if err != nil { + logrus.Fatal(err) + } + + if err := proxy.Start(); err != nil { + logrus.Fatal(err) + } +} + +// func ExposeSchemaRegistry(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword string, schemaRegistryPort, schemaRegistryProxyPort int) { +// e := validateCreds(schemaRegistryUsername, schemaRegistryPassword) +// if e != nil { +// panic(e) +// } +// +// remote, err := url.Parse(fmt.Sprintf("https://%s:%d", schemaRegistryUrl, schemaRegistryPort)) +// if err != nil { +// panic(err) +// } +// +// handler := func(p *httputil.ReverseProxy) func(w http.ResponseWriter, r *http.Request) { +// return func(w http.ResponseWriter, r *http.Request) { +// logrus.Infof("Schema Registry port: %s", r.URL) +// r.Host = remote.Host +// r.SetBasicAuth(schemaRegistryUsername, schemaRegistryPassword) +// p.ServeHTTP(w, r) +// +// } +// } +// +// proxy := httputil.NewSingleHostReverseProxy(remote) +// http.HandleFunc("/", handler(proxy)) +// err = http.ListenAndServe(fmt.Sprintf(":%d", schemaRegistryProxyPort), nil) +// if err != nil { +// panic(err) +// } +// } From a56f614bead71f13797d92e693e03e1d7ae2d5bc Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Fri, 17 Jan 2025 19:42:56 +0000 Subject: [PATCH 2/4] remove unused function --- proxy/schema_registry.go | 83 +++++++++------------------------------- 1 file changed, 19 insertions(+), 64 deletions(-) diff --git a/proxy/schema_registry.go b/proxy/schema_registry.go index 44ae4817..0d78165b 100644 --- a/proxy/schema_registry.go +++ b/proxy/schema_registry.go @@ -21,6 +21,13 @@ type SchemaRegistryProxy struct { httpServer *http.Server } +func validateSchemaRegistryCreds(username, password string) error { + if username == "" || password == "" { + return fmt.Errorf("schema Registry proxy requires both username and password") + } + return nil +} + func NewSchemaRegistryProxy(url, username, password string, port, proxyPort int) (*SchemaRegistryProxy, error) { if err := validateSchemaRegistryCreds(username, password); err != nil { return nil, err @@ -35,11 +42,18 @@ func NewSchemaRegistryProxy(url, username, password string, port, proxyPort int) }, nil } -func validateSchemaRegistryCreds(username, password string) error { - if username == "" || password == "" { - return fmt.Errorf("schema Registry proxy requires both username and password") - } - return nil +func (s *SchemaRegistryProxy) createProxyHandler(proxy *httputil.ReverseProxy, remote *url.URL) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + logrus.WithFields(logrus.Fields{ + "method": r.Method, + "path": r.URL.Path, + }).Debug("Schema Registry proxy request") + + r.Host = remote.Host + r.SetBasicAuth(s.username, s.password) + + proxy.ServeHTTP(w, r) + }) } func (s *SchemaRegistryProxy) Start() error { @@ -82,62 +96,3 @@ func (s *SchemaRegistryProxy) Stop(ctx context.Context) error { } return nil } - -func (s *SchemaRegistryProxy) createProxyHandler(proxy *httputil.ReverseProxy, remote *url.URL) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - logrus.WithFields(logrus.Fields{ - "method": r.Method, - "path": r.URL.Path, - }).Debug("Schema Registry proxy request") - - r.Host = remote.Host - r.SetBasicAuth(s.username, s.password) - - proxy.ServeHTTP(w, r) - }) -} -func ExposeSchemaRegistry(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword string, schemaRegistryPort, schemaRegistryProxyPort int) { - proxy, err := NewSchemaRegistryProxy( - schemaRegistryUrl, - schemaRegistryUsername, - schemaRegistryPassword, - schemaRegistryPort, - schemaRegistryProxyPort, - ) - if err != nil { - logrus.Fatal(err) - } - - if err := proxy.Start(); err != nil { - logrus.Fatal(err) - } -} - -// func ExposeSchemaRegistry(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword string, schemaRegistryPort, schemaRegistryProxyPort int) { -// e := validateCreds(schemaRegistryUsername, schemaRegistryPassword) -// if e != nil { -// panic(e) -// } -// -// remote, err := url.Parse(fmt.Sprintf("https://%s:%d", schemaRegistryUrl, schemaRegistryPort)) -// if err != nil { -// panic(err) -// } -// -// handler := func(p *httputil.ReverseProxy) func(w http.ResponseWriter, r *http.Request) { -// return func(w http.ResponseWriter, r *http.Request) { -// logrus.Infof("Schema Registry port: %s", r.URL) -// r.Host = remote.Host -// r.SetBasicAuth(schemaRegistryUsername, schemaRegistryPassword) -// p.ServeHTTP(w, r) -// -// } -// } -// -// proxy := httputil.NewSingleHostReverseProxy(remote) -// http.HandleFunc("/", handler(proxy)) -// err = http.ListenAndServe(fmt.Sprintf(":%d", schemaRegistryProxyPort), nil) -// if err != nil { -// panic(err) -// } -// } From 8e220fa7c66c069cabc9db1a60375484d9bee1f6 Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Sat, 18 Jan 2025 10:21:22 +0000 Subject: [PATCH 3/4] update doc --- README.md | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index d285415e..b6553e0e 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,12 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w --sasl-plugin-param stringArray Authentication plugin parameter --sasl-plugin-timeout duration Authentication timeout (default 10s) --sasl-username string SASL user name + --schema-registry-enable Proxy Schema Registry + --schema-registry-password string The Schema Registry password + --schema-registry-port int The Schema Registry port (default 8080) + --schema-registry-proxy-port int The port to expose the proxy to (default 8080) + --schema-registry-url string The Schema Registry URL without port + --schema-registry-username string The Schema Registry username --tls-ca-chain-cert-file string PEM encoded CA's certificate file --tls-client-cert-file string PEM encoded file with client certificate --tls-client-key-file string PEM encoded file with private key for the client certificate @@ -383,7 +389,7 @@ Connect through test SOCKS5 Proxy server kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ - --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" + --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \ --forward-proxy socks5://localhost:1080 ``` @@ -405,7 +411,7 @@ Connect through test HTTP Proxy server using CONNECT method kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ - --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" + --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \ --forward-proxy http://localhost:3128 ``` @@ -441,6 +447,23 @@ By setting `--proxy-listener-tls-client-cert-validate-subject true`, Kafka Proxy --proxy-listener-tls-required-client-subject-organization grepplabs ``` +### Expose Schema Registry endpoint + +Proxy Kafka cluster as usual, but another endpoint for Schema Registry + +``` + kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \ + --bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \ + --bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \ + --schema-registry-enable \ + --schema-registry-password "myPassword" \ + --schema-registry-username "myUsername" \ + --schema-registry-url "schemaregistry.prod.svc.cluster.local" \ + --schema-registry-port 8888 \ + --schema-registry-proxy-port 8080 +``` + + ### Kubernetes sidecar container example ```yaml From 54c99e08c3c1effa9f2edf807ce4c7ac63aa67ee Mon Sep 17 00:00:00 2001 From: Elad Leev Date: Sun, 19 Jan 2025 18:37:55 +0000 Subject: [PATCH 4/4] add SR proxy tests --- proxy/schema_registry_test.go | 80 +++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 proxy/schema_registry_test.go diff --git a/proxy/schema_registry_test.go b/proxy/schema_registry_test.go new file mode 100644 index 00000000..0ef6be72 --- /dev/null +++ b/proxy/schema_registry_test.go @@ -0,0 +1,80 @@ +package proxy + +import ( + "context" + "net/http" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestSchemaRegistryProxy_ValidateCredentials(t *testing.T) { + tests := []struct { + name string + username string + password string + wantErr bool + errMsg string + }{ + { + name: "valid credentials", + username: "user", + password: "pass", + wantErr: false, + }, + { + name: "empty username", + username: "", + password: "pass", + wantErr: true, + errMsg: "schema Registry proxy requires both username and password", + }, + { + name: "empty password", + username: "user", + password: "", + wantErr: true, + errMsg: "schema Registry proxy requires both username and password", + }, + { + name: "both empty", + username: "", + password: "", + wantErr: true, + errMsg: "schema Registry proxy requires both username and password", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewSchemaRegistryProxy("localhost", tt.username, tt.password, 8081, 8082) + if tt.wantErr { + assert.Error(t, err) + assert.Equal(t, tt.errMsg, err.Error()) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestSchemaRegistryProxy_Shutdown(t *testing.T) { + proxy, err := NewSchemaRegistryProxy("localhost", "user", "pass", 8081, 8082) + assert.NoError(t, err) + + go func() { + err := proxy.Start() + if err != nil && err != http.ErrServerClosed { + t.Error(err) + } + }() + + time.Sleep(100 * time.Millisecond) + + // Test graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err = proxy.Stop(ctx) + assert.NoError(t, err) +}