Skip to content

feat: Proxy Schema Registry #189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ You can launch a kafka-proxy container with auth-ldap plugin for trying it out w
--sasl-plugin-param stringArray Authentication plugin parameter
--sasl-plugin-timeout duration Authentication timeout (default 10s)
--sasl-username string SASL user name
--schema-registry-enable Proxy Schema Registry
--schema-registry-password string The Schema Registry password
--schema-registry-port int The Schema Registry port (default 8080)
--schema-registry-proxy-port int The port to expose the proxy to (default 8080)
--schema-registry-url string The Schema Registry URL without port
--schema-registry-username string The Schema Registry username
--tls-ca-chain-cert-file string PEM encoded CA's certificate file
--tls-client-cert-file string PEM encoded file with client certificate
--tls-client-key-file string PEM encoded file with private key for the client certificate
Expand Down Expand Up @@ -383,7 +389,7 @@ Connect through test SOCKS5 Proxy server

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
--forward-proxy socks5://localhost:1080
```

Expand All @@ -405,7 +411,7 @@ Connect through test HTTP Proxy server using CONNECT method

kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502"
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
--forward-proxy http://localhost:3128
```

Expand Down Expand Up @@ -441,6 +447,23 @@ By setting `--proxy-listener-tls-client-cert-validate-subject true`, Kafka Proxy
--proxy-listener-tls-required-client-subject-organization grepplabs
```

### Expose Schema Registry endpoint

Proxy Kafka cluster as usual, but another endpoint for Schema Registry

```
kafka-proxy server --bootstrap-server-mapping "kafka-0.grepplabs.com:9092,127.0.0.1:32500" \
--bootstrap-server-mapping "kafka-1.grepplabs.com:9092,127.0.0.1:32501" \
--bootstrap-server-mapping "kafka-2.grepplabs.com:9092,127.0.0.1:32502" \
--schema-registry-enable \
--schema-registry-password "myPassword" \
--schema-registry-username "myUsername" \
--schema-registry-url "schemaregistry.prod.svc.cluster.local" \
--schema-registry-port 8888 \
--schema-registry-proxy-port 8080
```


### Kubernetes sidecar container example

```yaml
Expand Down
67 changes: 51 additions & 16 deletions cmd/kafka-proxy/server.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,43 @@
package server

import (
"context"
"errors"
"fmt"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/proxy"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"net"
"net/http"
_ "net/http/pprof"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

"errors"
"strings"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/proxy"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"

"github.com/grepplabs/kafka-proxy/pkg/apis"
localauth "github.com/grepplabs/kafka-proxy/plugin/local-auth/shared"
tokeninfo "github.com/grepplabs/kafka-proxy/plugin/token-info/shared"
tokenprovider "github.com/grepplabs/kafka-proxy/plugin/token-provider/shared"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"

"github.com/spf13/viper"

"github.com/grepplabs/kafka-proxy/pkg/registry"

// built-in plugins
_ "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-info"
_ "github.com/grepplabs/kafka-proxy/pkg/libs/googleid-provider"
"github.com/spf13/viper"
)

var (
Expand Down Expand Up @@ -156,7 +159,7 @@ func initFlags() {
Server.Flags().StringVar(&c.Kafka.TLS.ClientKeyPassword, "tls-client-key-password", os.Getenv("TLS_CLIENT_KEY_PASSWORD"), "Password to decrypt rsa private key")
Server.Flags().StringVar(&c.Kafka.TLS.CAChainCertFile, "tls-ca-chain-cert-file", "", "PEM encoded CA's certificate file")

//Same TLS client cert tls-same-client-cert-enable
// Same TLS client cert tls-same-client-cert-enable
Server.Flags().BoolVar(&c.Kafka.TLS.SameClientCertEnable, "tls-same-client-cert-enable", false, "Use only when mutual TLS is enabled on proxy and broker. It controls whether a proxy validates if proxy client certificate exactly matches brokers client cert (tls-client-cert-file)")

// SASL by Proxy
Expand Down Expand Up @@ -189,6 +192,14 @@ func initFlags() {
Server.Flags().StringVar(&c.Kafka.SASL.Plugin.LogLevel, "sasl-plugin-log-level", "trace", "Log level of the auth plugin")
Server.Flags().DurationVar(&c.Kafka.SASL.Plugin.Timeout, "sasl-plugin-timeout", 10*time.Second, "Authentication timeout")

// Schema Registry
Server.Flags().BoolVar(&c.SchemaRegistry.Enable, "schema-registry-enable", false, "Proxy Schema Registry")
Server.Flags().StringVar(&c.SchemaRegistry.Url, "schema-registry-url", "", "The Schema Registry URL without port")
Server.Flags().IntVar(&c.SchemaRegistry.Port, "schema-registry-port", 8080, "The Schema Registry port")
Server.Flags().StringVar(&c.SchemaRegistry.Username, "schema-registry-username", "", "The Schema Registry username")
Server.Flags().StringVar(&c.SchemaRegistry.Password, "schema-registry-password", "", "The Schema Registry password")
Server.Flags().IntVar(&c.SchemaRegistry.ProxyPort, "schema-registry-proxy-port", 8080, "The port to expose the proxy to")

// Web
Server.Flags().BoolVar(&c.Http.Disable, "http-disable", false, "Disable HTTP endpoints")
Server.Flags().StringVar(&c.Http.ListenAddress, "http-listen-address", "0.0.0.0:9080", "Address that kafka-proxy is listening on")
Expand Down Expand Up @@ -218,6 +229,31 @@ func Run(_ *cobra.Command, _ []string) {

var localPasswordAuthenticator apis.PasswordAuthenticator
var localTokenAuthenticator apis.TokenInfo
var g run.Group

if c.SchemaRegistry.Enable {
srProxy, err := proxy.NewSchemaRegistryProxy(
c.SchemaRegistry.Url,
c.SchemaRegistry.Username,
c.SchemaRegistry.Password,
c.SchemaRegistry.Port,
c.SchemaRegistry.ProxyPort,
)
if err != nil {
logrus.Fatal(err)
}

g.Add(func() error {
return srProxy.Start()
}, func(error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := srProxy.Stop(ctx); err != nil {
logrus.WithError(err).Error("Failed to stop Schema Registry proxy gracefully")
}
})
}

if c.Auth.Local.Enable {
switch c.Auth.Local.Mechanism {
case "PLAIN":
Expand Down Expand Up @@ -372,7 +408,6 @@ func Run(_ *cobra.Command, _ []string) {
}
}

var g run.Group
{
// All active connections are stored in this variable.
connset := proxy.NewConnSet()
Expand Down
13 changes: 12 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"strings"
"time"

"github.com/grepplabs/kafka-proxy/pkg/libs/util"
"github.com/pkg/errors"

"github.com/grepplabs/kafka-proxy/pkg/libs/util"
)

const (
Expand Down Expand Up @@ -172,6 +173,16 @@ type Config struct {
Acks0Disabled bool
}
}

SchemaRegistry struct {
Enable bool
Url string
Port int
Username string
Password string
ProxyPort int
}

ForwardProxy struct {
Url string

Expand Down
98 changes: 98 additions & 0 deletions proxy/schema_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package proxy

import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httputil"
"net/url"
"time"

"github.com/sirupsen/logrus"
)

type SchemaRegistryProxy struct {
url string
username string
password string
port int
proxyPort int
httpServer *http.Server
}

func validateSchemaRegistryCreds(username, password string) error {
if username == "" || password == "" {
return fmt.Errorf("schema Registry proxy requires both username and password")
}
return nil
}

func NewSchemaRegistryProxy(url, username, password string, port, proxyPort int) (*SchemaRegistryProxy, error) {
if err := validateSchemaRegistryCreds(username, password); err != nil {
return nil, err
}

return &SchemaRegistryProxy{
url: url,
username: username,
password: password,
port: port,
proxyPort: proxyPort,
}, nil
}

func (s *SchemaRegistryProxy) createProxyHandler(proxy *httputil.ReverseProxy, remote *url.URL) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logrus.WithFields(logrus.Fields{
"method": r.Method,
"path": r.URL.Path,
}).Debug("Schema Registry proxy request")

r.Host = remote.Host
r.SetBasicAuth(s.username, s.password)

proxy.ServeHTTP(w, r)
})
}

func (s *SchemaRegistryProxy) Start() error {
remote, err := url.Parse(fmt.Sprintf("https://%s:%d", s.url, s.port))
if err != nil {
return fmt.Errorf("invalid Schema Registry URL: %w", err)
}

proxy := httputil.NewSingleHostReverseProxy(remote)
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
logrus.WithError(err).Error("Schema Registry proxy error")
w.WriteHeader(http.StatusBadGateway)
}

// Setup proxy handler with logging and auth
handler := s.createProxyHandler(proxy, remote)

s.httpServer = &http.Server{
Addr: fmt.Sprintf(":%d", s.proxyPort),
Handler: handler,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}

logrus.WithFields(logrus.Fields{
"listen_port": s.proxyPort,
"target_url": remote.String(),
}).Info("Starting Schema Registry proxy")

if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(http.ErrServerClosed, err) {
return fmt.Errorf("schema Registry proxy server error: %w", err)
}
return nil
}

func (s *SchemaRegistryProxy) Stop(ctx context.Context) error {
if s.httpServer != nil {
return s.httpServer.Shutdown(ctx)
}
return nil
}
80 changes: 80 additions & 0 deletions proxy/schema_registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package proxy

import (
"context"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSchemaRegistryProxy_ValidateCredentials(t *testing.T) {
tests := []struct {
name string
username string
password string
wantErr bool
errMsg string
}{
{
name: "valid credentials",
username: "user",
password: "pass",
wantErr: false,
},
{
name: "empty username",
username: "",
password: "pass",
wantErr: true,
errMsg: "schema Registry proxy requires both username and password",
},
{
name: "empty password",
username: "user",
password: "",
wantErr: true,
errMsg: "schema Registry proxy requires both username and password",
},
{
name: "both empty",
username: "",
password: "",
wantErr: true,
errMsg: "schema Registry proxy requires both username and password",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewSchemaRegistryProxy("localhost", tt.username, tt.password, 8081, 8082)
if tt.wantErr {
assert.Error(t, err)
assert.Equal(t, tt.errMsg, err.Error())
} else {
assert.NoError(t, err)
}
})
}
}

func TestSchemaRegistryProxy_Shutdown(t *testing.T) {
proxy, err := NewSchemaRegistryProxy("localhost", "user", "pass", 8081, 8082)
assert.NoError(t, err)

go func() {
err := proxy.Start()
if err != nil && err != http.ErrServerClosed {
t.Error(err)
}
}()

time.Sleep(100 * time.Millisecond)

// Test graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err = proxy.Stop(ctx)
assert.NoError(t, err)
}