From 952bea7d11373e189f67270e7d871f52407f7899 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 29 Nov 2024 15:28:42 +0000 Subject: [PATCH 1/5] Upgrade benthos --- CHANGELOG.md | 3 ++- docs/modules/components/pages/processors/cache.adoc | 6 ++++++ go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 393199e0dc..2d2c8d41b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,13 +3,14 @@ Changelog All notable changes to this project will be documented in this file. -## 4.42.0 - TBD +## 4.42.0 - 2024-12-02 ### Added - Add support for `spanner` driver to SQL plugins. (@yufeng-deng) - Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad) - Add support for Parquet files to `bigquery` output (@rockwotj) +- (Benthos) New `exists` operator added to the `cache` processor. (@mihaitodor) ### Fixed diff --git a/docs/modules/components/pages/processors/cache.adoc b/docs/modules/components/pages/processors/cache.adoc index aedcf0ef38..592fb3e177 100644 --- a/docs/modules/components/pages/processors/cache.adoc +++ b/docs/modules/components/pages/processors/cache.adoc @@ -177,6 +177,7 @@ Options: , `add` , `get` , `delete` +, `exists` . === `key` @@ -241,3 +242,8 @@ can be detected with xref:configuration:error_handling.adoc[processor error hand Delete a key and its contents from the cache. If the key does not exist the action is a no-op and will not fail with an error. +=== `exists` + +Check if a given key exists in the cache and replace the original message payload +with `true` or `false`. + diff --git a/go.mod b/go.mod index 2f2c60bed2..9b6858ee41 100644 --- a/go.mod +++ b/go.mod @@ -106,7 +106,7 @@ require ( github.com/rabbitmq/amqp091-go v1.10.0 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 github.com/redis/go-redis/v9 v9.7.0 - github.com/redpanda-data/benthos/v4 v4.41.1 + github.com/redpanda-data/benthos/v4 v4.42.0 github.com/redpanda-data/common-go/secrets v0.1.2 github.com/redpanda-data/connect/public/bundle/free/v4 v4.31.0 github.com/rs/xid v1.5.0 diff --git a/go.sum b/go.sum index 62eaf4c8e2..f59df136f6 100644 --- a/go.sum +++ b/go.sum @@ -1733,8 +1733,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= -github.com/redpanda-data/benthos/v4 v4.41.1 h1:kvhPIW7uJUj8ImVooR/ExYG2NZJ5r0U4bH6EEm9N8Dw= -github.com/redpanda-data/benthos/v4 v4.41.1/go.mod h1:T5Nb0hH1Sa1ChlH4hLW7+nA1+jQ/3CP/cVFI73z6ZIM= +github.com/redpanda-data/benthos/v4 v4.42.0 h1:3sKmHhdC1t/IH63oTzlYurfJaO0TsEWSEKeiE6FIvG8= +github.com/redpanda-data/benthos/v4 v4.42.0/go.mod h1:T5Nb0hH1Sa1ChlH4hLW7+nA1+jQ/3CP/cVFI73z6ZIM= github.com/redpanda-data/common-go/secrets v0.1.2 h1:UCDLN/yL8yjSIYhS5MB+2Am1Jy4XZMZPtuuCRL/82Rw= github.com/redpanda-data/common-go/secrets v0.1.2/go.mod h1:WjaDI39reE/GPRPHTsaYmiMjhHj+qsSJLe+kHsPKsXk= github.com/redpanda-data/connect/public/bundle/free/v4 v4.31.0 h1:Qiz4Q8ZO17n8797hgDdJ2f1XN7wh6J2hIRgeeSw4F24= From 0ce2254e6a2ee23ea157849c0e0892939342900a Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 29 Nov 2024 15:29:19 +0000 Subject: [PATCH 2/5] Add license service to enterprise cli --- internal/cli/enterprise.go | 12 ++ internal/license/public_key.pem | 9 + internal/license/redpanda_license.go | 52 ++++++ internal/license/service.go | 215 ++++++++++++++++++++++++ internal/license/service_test.go | 241 +++++++++++++++++++++++++++ internal/license/shared_service.go | 62 +++++++ 6 files changed, 591 insertions(+) create mode 100644 internal/license/public_key.pem create mode 100644 internal/license/redpanda_license.go create mode 100644 internal/license/service.go create mode 100644 internal/license/service_test.go create mode 100644 internal/license/shared_service.go diff --git a/internal/cli/enterprise.go b/internal/cli/enterprise.go index 3a155fb87e..72a4e01c03 100644 --- a/internal/cli/enterprise.go +++ b/internal/cli/enterprise.go @@ -19,6 +19,7 @@ import ( "github.com/urfave/cli/v2" "github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise" + "github.com/redpanda-data/connect/v4/internal/license" "github.com/redpanda-data/connect/v4/internal/secrets" "github.com/redpanda-data/connect/v4/internal/telemetry" ) @@ -46,6 +47,9 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co } var disableTelemetry bool + licenseConfig := license.Config{ + LicenseFilepath: os.Getenv("REDPANDA_LICENSE_FILEPATH"), + } opts = append(opts, service.CLIOptSetVersion(version, dateBuilt), @@ -81,6 +85,9 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co }), service.CLIOptAddTeeLogger(slog.New(rpLogger)), service.CLIOptOnConfigParse(func(pConf *service.ParsedConfig) error { + // Kick off license service. + license.RegisterService(pConf.Resources(), licenseConfig) + // Kick off telemetry exporter. if !disableTelemetry { telemetry.ActivateExporter(instanceID, version, fbLogger, schema, pConf) @@ -103,8 +110,13 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co Name: "disable-telemetry", Usage: "Disable anonymous telemetry from being emitted by this Connect instance.", }, + &cli.StringFlag{ + Name: "redpanda-license", + Usage: "Provide an explicit Redpanda License, which enables enterprise functionality. By default licenses found at the path `/etc/redpanda/redpanda.license` are applied.", + }, }, func(c *cli.Context) error { disableTelemetry = c.Bool("disable-telemetry") + licenseConfig.License = c.String("redpanda-license") if secretsURNs := c.StringSlice("secrets"); len(secretsURNs) > 0 { var err error diff --git a/internal/license/public_key.pem b/internal/license/public_key.pem new file mode 100644 index 0000000000..cd8efc4232 --- /dev/null +++ b/internal/license/public_key.pem @@ -0,0 +1,9 @@ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAt0Y2jGOLI70xkF4rmpNM +hBqU3cUrwYCREgjT9TT77KusvhPVc16cdK83bpaGQy+Or1WyZpN+TCxT2vlaZet6 +RDo+55jRk7epazAHx9s+DLd6IzhSXakf6Sxh5JRK7Zn/75C1hYJMspcJ75EhLv4H +qXj12dkyivcLAecGhWdIGK95J0P7f4EQQGwGL3rilCSlfkVVmE4qaPUaLqULKelq +7T2d+AklR+KwgtHINyKDPJ9+cCAMoEOrRBDPjcQ79k0yvP3BdHV394F+2Vt/AYOL +dcVQBm3tqIySLGFtiJp+RIa+nJhMrd+G4sqwm4FhsmG35Fbr0XQJY0sM6MaFJcDH +swIDAQAB +-----END PUBLIC KEY----- \ No newline at end of file diff --git a/internal/license/redpanda_license.go b/internal/license/redpanda_license.go new file mode 100644 index 0000000000..311eedc4ed --- /dev/null +++ b/internal/license/redpanda_license.go @@ -0,0 +1,52 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package license + +import ( + "fmt" + "time" +) + +// RedpandaLicense is the payload that will be decoded from a license file. +type RedpandaLicense struct { + Version int `json:"version"` + Organization string `json:"org"` + + // 0 = FreeTrial; 1 = Enterprise + Type int `json:"type"` + + // Unix epoch + Expiry int64 `json:"expiry"` +} + +// AllowsEnterpriseFeatures returns true if license type allows enterprise features. +func (r *RedpandaLicense) AllowsEnterpriseFeatures() bool { + return r.Type == 1 +} + +func typeDisplayName(t int) string { + switch t { + case 0: + return "free trial" + case 1: + return "enterprise" + default: + return "open source" + } +} + +// CheckExpiry returns nil if the license is still valid (not expired). Otherwise, +// it will return an error that provides context when the license expired. +func (r *RedpandaLicense) CheckExpiry() error { + expires := time.Unix(r.Expiry, 0) + if expires.Before(time.Now().UTC()) { + return fmt.Errorf("license expired on %q", expires.Format(time.RFC3339)) + } + return nil +} diff --git a/internal/license/service.go b/internal/license/service.go new file mode 100644 index 0000000000..8e4c3c683f --- /dev/null +++ b/internal/license/service.go @@ -0,0 +1,215 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package license + +import ( + "bytes" + "crypto" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "encoding/base64" + "encoding/json" + "encoding/pem" + "errors" + "fmt" + "os" + "sync/atomic" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" + + _ "embed" +) + +//go:embed public_key.pem +var licensePublicKeyPem []byte + +const defaultLicenseFilepath = "/etc/redpanda/redpanda.license" + +// Service is the license service. +type Service struct { + logger *service.Logger + loadedLicense *atomic.Pointer[RedpandaLicense] + conf Config +} + +// Config is a struct used to provide configuration to a license service. +type Config struct { + License string + LicenseFilepath string + + // Just for testing + customPublicKeyPem []byte + customDefaultLicenseFilepath string +} + +func (c Config) publicKeyPem() []byte { + if len(c.customPublicKeyPem) > 0 { + return c.customPublicKeyPem + } + return licensePublicKeyPem +} + +func (c Config) defaultLicenseFilepath() string { + if c.customDefaultLicenseFilepath != "" { + return c.customDefaultLicenseFilepath + } + return defaultLicenseFilepath +} + +// RegisterService creates a new license service and registers it to the +// provided resources pointer. +func RegisterService(res *service.Resources, conf Config) { + s := &Service{ + logger: res.Logger(), + loadedLicense: &atomic.Pointer[RedpandaLicense]{}, + conf: conf, + } + + license, err := s.readAndValidateLicense() + if err != nil { + res.Logger().With("error", err).Error("Failed to read Redpanda License") + } + s.loadedLicense.Store(&license) + + setSharedService(res, s) +} + +// InjectTestService inserts an enterprise license into a resources pointer in +// order to provide testing frameworks a way to test enterprise components. +func InjectTestService(res *service.Resources) { + s := &Service{ + logger: res.Logger(), + loadedLicense: &atomic.Pointer[RedpandaLicense]{}, + } + s.loadedLicense.Store(&RedpandaLicense{ + Version: 1, + Organization: "test", + Type: 1, + Expiry: time.Now().Add(time.Hour).Unix(), + }) + setSharedService(res, s) +} + +func (s *Service) readAndValidateLicense() (RedpandaLicense, error) { + licenseBytes, err := s.readLicense() + if err != nil { + return RedpandaLicense{}, err + } + + var license RedpandaLicense + if len(licenseBytes) > 0 { + if license, err = s.validateLicense(licenseBytes); err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to validate license: %w", err) + } + } else { + // An open source license is the final fall back. + year := time.Hour * 24 * 365 + license = RedpandaLicense{ + Expiry: time.Now().Add(10 * year).Unix(), + Type: -1, + } + } + + if err := license.CheckExpiry(); err != nil { + return RedpandaLicense{}, err + } + + s.logger.With( + "license_org", license.Organization, + "license_type", typeDisplayName(license.Type), + "expires_at", time.Unix(license.Expiry, 0).Format(time.RFC3339), + ).Debug("Successfully loaded Redpanda license") + + return license, nil +} + +func (s *Service) readLicense() (licenseFileContents []byte, err error) { + // Explicit license takes priority. + if s.conf.License != "" { + s.logger.Debug("Loading explicitly defined Redpanda Enterprise license") + + licenseFileContents = []byte(s.conf.License) + return + } + + // Followed by explicit license file path. + if s.conf.LicenseFilepath != "" { + s.logger.Debug("Loading Redpanda Enterprise license from explicit file path") + + licenseFileContents, err = os.ReadFile(s.conf.LicenseFilepath) + if err != nil { + return nil, fmt.Errorf("failed to read license file: %w", err) + } + return + } + + // Followed by the default file path. + if licenseFileContents, err = os.ReadFile(s.conf.defaultLicenseFilepath()); err != nil { + if !os.IsNotExist(err) { + return nil, fmt.Errorf("failed to read default path license file: %w", err) + } + return nil, nil + } else { + s.logger.Debug("Loaded Redpanda Enterprise license from default file path") + } + return +} + +func (s *Service) validateLicense(license []byte) (RedpandaLicense, error) { + publicKeyBytes := s.conf.publicKeyPem() + + // 1. Try to parse embedded public key + block, _ := pem.Decode(publicKeyBytes) + publicKey, err := x509.ParsePKIXPublicKey(block.Bytes) + if err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to parse public key: %w", err) + } + publicKeyRSA, ok := publicKey.(*rsa.PublicKey) + if !ok { + return RedpandaLicense{}, errors.New("failed to parse public key, expected dateFormat is not RSA") + } + + // Trim Whitespace and Linebreaks for input license + license = bytes.TrimSpace(license) + + // 2. Split license contents by delimiter + splitParts := bytes.Split(license, []byte(".")) + if len(splitParts) != 2 { + return RedpandaLicense{}, errors.New("failed to split license contents by delimiter") + } + + licenseDataEncoded := splitParts[0] + signatureEncoded := splitParts[1] + + licenseData, err := base64.StdEncoding.DecodeString(string(licenseDataEncoded)) + if err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to decode license data: %w", err) + } + + signature, err := base64.StdEncoding.DecodeString(string(signatureEncoded)) + if err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to decode license signature: %w", err) + } + hash := sha256.Sum256(licenseDataEncoded) + + // 3. Verify license contents with static public key + if err := rsa.VerifyPKCS1v15(publicKeyRSA, crypto.SHA256, hash[:], signature); err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to verify license signature: %w", err) + } + + // 4. If license contents seem to be legit, we will continue unpacking the license + var rpLicense RedpandaLicense + if err := json.Unmarshal(licenseData, &rpLicense); err != nil { + return RedpandaLicense{}, fmt.Errorf("failed to unmarshal license data: %w", err) + } + + return rpLicense, nil +} diff --git a/internal/license/service_test.go b/internal/license/service_test.go new file mode 100644 index 0000000000..eee593fc16 --- /dev/null +++ b/internal/license/service_test.go @@ -0,0 +1,241 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package license + +import ( + "bytes" + "crypto" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "crypto/x509" + "encoding/base64" + "encoding/json" + "encoding/pem" + "os" + "path/filepath" + "testing" + "time" + + "github.com/redpanda-data/benthos/v4/public/service" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func createLicense(t testing.TB, license RedpandaLicense) (pubKey []byte, licenseStr string) { + t.Helper() + + licenseBytes, err := json.Marshal(license) + require.NoError(t, err) + + licenseBytesEncoded := base64.StdEncoding.AppendEncode(nil, bytes.TrimSpace(licenseBytes)) + licenseEncodedBytesHash := sha256.Sum256(licenseBytesEncoded) + + privKeyRSA, err := rsa.GenerateKey(rand.Reader, 2048) + require.NoError(t, err) + + pubKeyMarshalled, err := x509.MarshalPKIXPublicKey(&privKeyRSA.PublicKey) + require.NoError(t, err) + + var pemBuf bytes.Buffer + require.NoError(t, pem.Encode(&pemBuf, &pem.Block{ + Type: "PUBLIC KEY", + Bytes: pubKeyMarshalled, + })) + + signature, err := rsa.SignPKCS1v15(nil, privKeyRSA, crypto.SHA256, licenseEncodedBytesHash[:]) + require.NoError(t, err) + + return pemBuf.Bytes(), string(licenseBytesEncoded) + "." + base64.StdEncoding.EncodeToString(signature) +} + +func TestLicenseEnterpriseValidation(t *testing.T) { + for _, test := range []struct { + Name string + License RedpandaLicense + IsEnterprise bool + }{ + { + Name: "expired license", + License: RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(-time.Hour).Unix(), + }, + IsEnterprise: false, + }, + { + Name: "free trial license", + License: RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 0, + Expiry: time.Now().Add(time.Hour).Unix(), + }, + IsEnterprise: false, + }, + { + Name: "enterprise license", + License: RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(time.Hour).Unix(), + }, + IsEnterprise: true, + }, + } { + t.Run(test.Name, func(t *testing.T) { + pubKey, license := createLicense(t, test.License) + + res := service.MockResources() + RegisterService(res, Config{ + License: license, + customPublicKeyPem: pubKey, + }) + + loaded, err := LoadFromResources(res) + require.NoError(t, err) + + assert.Equal(t, test.IsEnterprise, loaded.AllowsEnterpriseFeatures()) + }) + } +} + +func TestLicenseEnterpriseDefaultPath(t *testing.T) { + tmpDir := t.TempDir() + tmpLicensePath := filepath.Join(tmpDir, "foo.license") + + pubKey, license := createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(time.Hour).Unix(), + }) + + // No file created + + res := service.MockResources() + RegisterService(res, Config{ + customPublicKeyPem: pubKey, + customDefaultLicenseFilepath: tmpLicensePath, + }) + + loaded, err := LoadFromResources(res) + require.NoError(t, err) + + assert.False(t, loaded.AllowsEnterpriseFeatures()) + + // File created + + require.NoError(t, os.WriteFile(tmpLicensePath, []byte(license), 0o777)) + + res = service.MockResources() + RegisterService(res, Config{ + customPublicKeyPem: pubKey, + customDefaultLicenseFilepath: tmpLicensePath, + }) + + loaded, err = LoadFromResources(res) + require.NoError(t, err) + + assert.True(t, loaded.AllowsEnterpriseFeatures()) +} + +func TestLicenseEnterpriseCustomPath(t *testing.T) { + tmpDir := t.TempDir() + tmpBadLicensePath := filepath.Join(tmpDir, "bad.license") + tmpGoodLicensePath := filepath.Join(tmpDir, "good.license") + + _, license := createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(-time.Hour).Unix(), + }) + require.NoError(t, os.WriteFile(tmpBadLicensePath, []byte(license), 0o777)) + + pubKey, license := createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(time.Hour).Unix(), + }) + require.NoError(t, os.WriteFile(tmpGoodLicensePath, []byte(license), 0o777)) + + res := service.MockResources() + RegisterService(res, Config{ + LicenseFilepath: tmpGoodLicensePath, + customPublicKeyPem: pubKey, + customDefaultLicenseFilepath: tmpBadLicensePath, + }) + + loaded, err := LoadFromResources(res) + require.NoError(t, err) + + assert.True(t, loaded.AllowsEnterpriseFeatures()) +} + +func TestLicenseEnterpriseExplicit(t *testing.T) { + tmpDir := t.TempDir() + tmpBadLicensePath := filepath.Join(tmpDir, "bad.license") + tmpAlsoBadLicensePath := filepath.Join(tmpDir, "alsobad.license") + + _, license := createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(-time.Hour).Unix(), + }) + require.NoError(t, os.WriteFile(tmpBadLicensePath, []byte(license), 0o777)) + + _, license = createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 0, + Expiry: time.Now().Add(time.Hour).Unix(), + }) + require.NoError(t, os.WriteFile(tmpAlsoBadLicensePath, []byte(license), 0o777)) + + pubKey, license := createLicense(t, RedpandaLicense{ + Version: 3, + Organization: "meow", + Type: 1, + Expiry: time.Now().Add(time.Hour).Unix(), + }) + + res := service.MockResources() + RegisterService(res, Config{ + License: license, + LicenseFilepath: tmpAlsoBadLicensePath, + customPublicKeyPem: pubKey, + customDefaultLicenseFilepath: tmpBadLicensePath, + }) + + loaded, err := LoadFromResources(res) + require.NoError(t, err) + + assert.True(t, loaded.AllowsEnterpriseFeatures()) +} + +func TestLicenseEnterpriseNoLicense(t *testing.T) { + tmpDir := t.TempDir() + tmpBadLicensePath := filepath.Join(tmpDir, "bad.license") + + res := service.MockResources() + RegisterService(res, Config{ + customDefaultLicenseFilepath: tmpBadLicensePath, + }) + + loaded, err := LoadFromResources(res) + require.NoError(t, err) + + assert.False(t, loaded.AllowsEnterpriseFeatures()) +} diff --git a/internal/license/shared_service.go b/internal/license/shared_service.go new file mode 100644 index 0000000000..9316f9f824 --- /dev/null +++ b/internal/license/shared_service.go @@ -0,0 +1,62 @@ +// Copyright 2024 Redpanda Data, Inc. +// +// Licensed as a Redpanda Enterprise file under the Redpanda Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md + +package license + +import ( + "errors" + + "github.com/redpanda-data/benthos/v4/public/service" +) + +// LoadFromResources attempts to access a license service from a provided +// resources handle and returns the current license it tracks. An error is +// returned if the license service cannot be accessed or cannot provide license +// information. +func LoadFromResources(res *service.Resources) (RedpandaLicense, error) { + svc := getSharedService(res) + if svc == nil { + return RedpandaLicense{}, errors.New("unable to access license service") + } + + l := svc.loadedLicense.Load() + if l == nil { + return RedpandaLicense{}, errors.New("unable to access license information") + } + + return *l, nil +} + +// CheckRunningEnterprise returns a non-nil error if the instance of Redpanda +// Connect is not operating with a valid enterprise license. +func CheckRunningEnterprise(res *service.Resources) error { + l, err := LoadFromResources(res) + if err != nil { + return err + } + if !l.AllowsEnterpriseFeatures() { + return errors.New("this feature requires a valid enterprise license ") + } + return nil +} + +type sharedServiceKeyType int + +var sharedServiceKey sharedServiceKeyType + +func setSharedService(res *service.Resources, svc *Service) { + res.SetGeneric(sharedServiceKey, svc) +} + +func getSharedService(res *service.Resources) *Service { + reg, _ := res.GetGeneric(sharedServiceKey) + if reg == nil { + return nil + } + return reg.(*Service) +} From 883b2010225f5859d90f6e297a02875ff69f1b84 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 29 Nov 2024 15:29:44 +0000 Subject: [PATCH 3/5] Add enterprise license checks to all enterprise plugins --- .../impl/aws/enterprise/processor_bedrock_chat.go | 5 +++++ .../aws/enterprise/processor_bedrock_embeddings.go | 5 +++++ internal/impl/cohere/chat_processor.go | 5 +++++ internal/impl/cohere/embeddings_processor.go | 6 ++++++ .../impl/gcp/enterprise/processor_vertex_ai_chat.go | 6 ++++++ .../gcp/enterprise/processor_vertex_ai_embeddings.go | 6 ++++++ internal/impl/kafka/enterprise/integration_test.go | 11 +++++++++++ .../impl/kafka/enterprise/redpanda_common_input.go | 5 +++++ .../impl/kafka/enterprise/redpanda_common_output.go | 5 +++++ .../impl/kafka/enterprise/redpanda_migrator_input.go | 5 +++++ .../enterprise/redpanda_migrator_offsets_output.go | 5 +++++ .../impl/kafka/enterprise/redpanda_migrator_output.go | 5 +++++ .../impl/kafka/enterprise/schema_registry_input.go | 5 +++++ .../impl/kafka/enterprise/schema_registry_output.go | 5 +++++ .../impl/kafka/enterprise/schema_registry_test.go | 3 +++ internal/impl/kafka/enterprise/topic_logger.go | 7 +++++++ internal/impl/ollama/chat_processor.go | 6 ++++++ internal/impl/ollama/embeddings_processor.go | 6 ++++++ internal/impl/openai/chat_processor.go | 5 +++++ internal/impl/openai/embeddings_processor.go | 6 ++++++ internal/impl/openai/image_processor.go | 6 ++++++ internal/impl/openai/speech_processor.go | 6 ++++++ internal/impl/openai/transcription_processor.go | 6 ++++++ internal/impl/snowflake/output_snowflake_put.go | 6 ++++++ internal/impl/snowflake/output_snowflake_streaming.go | 5 +++++ internal/impl/splunk/input.go | 6 ++++++ internal/impl/splunk/integration_test.go | 7 +++++++ internal/impl/splunk/output.go | 6 ++++++ 28 files changed, 160 insertions(+) diff --git a/internal/impl/aws/enterprise/processor_bedrock_chat.go b/internal/impl/aws/enterprise/processor_bedrock_chat.go index af8045ee6f..120463255f 100644 --- a/internal/impl/aws/enterprise/processor_bedrock_chat.go +++ b/internal/impl/aws/enterprise/processor_bedrock_chat.go @@ -20,6 +20,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/aws" "github.com/redpanda-data/connect/v4/internal/impl/aws/config" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -76,6 +77,10 @@ For more information, see the https://docs.aws.amazon.com/bedrock/latest/usergui } func newBedrockChatProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + aconf, err := aws.GetSession(context.Background(), conf) if err != nil { return nil, err diff --git a/internal/impl/aws/enterprise/processor_bedrock_embeddings.go b/internal/impl/aws/enterprise/processor_bedrock_embeddings.go index cd670dee36..fa723a8296 100644 --- a/internal/impl/aws/enterprise/processor_bedrock_embeddings.go +++ b/internal/impl/aws/enterprise/processor_bedrock_embeddings.go @@ -21,6 +21,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/aws" "github.com/redpanda-data/connect/v4/internal/impl/aws/config" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -78,6 +79,10 @@ output: } func newBedrockEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + aconf, err := aws.GetSession(context.Background(), conf) if err != nil { return nil, err diff --git a/internal/impl/cohere/chat_processor.go b/internal/impl/cohere/chat_processor.go index 32a10bdf93..c298aabf14 100644 --- a/internal/impl/cohere/chat_processor.go +++ b/internal/impl/cohere/chat_processor.go @@ -19,6 +19,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -144,6 +145,10 @@ We generally recommend altering this or temperature but not both.`). } func makeChatProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/cohere/embeddings_processor.go b/internal/impl/cohere/embeddings_processor.go index c74242ebce..46e576d667 100644 --- a/internal/impl/cohere/embeddings_processor.go +++ b/internal/impl/cohere/embeddings_processor.go @@ -16,6 +16,8 @@ import ( cohere "github.com/cohere-ai/cohere-go/v2" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -87,6 +89,10 @@ output: } func makeEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/gcp/enterprise/processor_vertex_ai_chat.go b/internal/impl/gcp/enterprise/processor_vertex_ai_chat.go index 3ba5cdd5be..730408da5d 100644 --- a/internal/impl/gcp/enterprise/processor_vertex_ai_chat.go +++ b/internal/impl/gcp/enterprise/processor_vertex_ai_chat.go @@ -19,6 +19,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" "google.golang.org/api/option" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -122,6 +124,10 @@ For more information, see the https://cloud.google.com/vertex-ai/docs[Vertex AI } func newVertexAIProcessor(conf *service.ParsedConfig, mgr *service.Resources) (p service.Processor, err error) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + ctx := context.Background() proc := &vertexAIChatProcessor{} var project string diff --git a/internal/impl/gcp/enterprise/processor_vertex_ai_embeddings.go b/internal/impl/gcp/enterprise/processor_vertex_ai_embeddings.go index 2cb31c77e0..c78fe0e74d 100644 --- a/internal/impl/gcp/enterprise/processor_vertex_ai_embeddings.go +++ b/internal/impl/gcp/enterprise/processor_vertex_ai_embeddings.go @@ -16,6 +16,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" + "github.com/redpanda-data/connect/v4/internal/license" + aiplatform "cloud.google.com/go/aiplatform/apiv1" "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" "cloud.google.com/go/vertexai/genai" @@ -88,6 +90,10 @@ For more information, see the https://cloud.google.com/vertex-ai/generative-ai/d } func newVertexAIEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (p service.Processor, err error) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + ctx := context.Background() proc := &vertexAIEmbeddingsProcessor{} var project string diff --git a/internal/impl/kafka/enterprise/integration_test.go b/internal/impl/kafka/enterprise/integration_test.go index a49eda926d..f5a077f533 100644 --- a/internal/impl/kafka/enterprise/integration_test.go +++ b/internal/impl/kafka/enterprise/integration_test.go @@ -36,6 +36,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service/integration" "github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise" + "github.com/redpanda-data/connect/v4/internal/license" "github.com/redpanda-data/connect/v4/internal/protoconnect" ) @@ -166,6 +167,8 @@ max_message_bytes: 1MB `, brokerAddr, logsTopic, statusTopic), nil) require.NoError(t, err) + license.InjectTestService(conf.Resources()) + logger := enterprise.NewTopicLogger("foo") require.NoError(t, logger.InitOutputFromParsed(conf)) @@ -213,6 +216,8 @@ max_message_bytes: 1MB `, brokerAddr, logsTopic, statusTopic), nil) require.NoError(t, err) + license.InjectTestService(conf.Resources()) + logger := enterprise.NewTopicLogger("foo") require.NoError(t, logger.InitOutputFromParsed(conf)) @@ -262,6 +267,8 @@ max_message_bytes: 1MB `, brokerAddr, logsTopic, statusTopic), nil) require.NoError(t, err) + license.InjectTestService(conf.Resources()) + logger := enterprise.NewTopicLogger("baz") require.NoError(t, logger.InitOutputFromParsed(conf)) @@ -471,6 +478,8 @@ output: stream, err := streamBuilder.Build() require.NoError(t, err) + license.InjectTestService(stream.Resources()) + ctx, done := context.WithTimeout(context.Background(), 3*time.Second) defer done() @@ -567,6 +576,8 @@ output: stream, err := streamBuilder.Build() require.NoError(t, err) + license.InjectTestService(stream.Resources()) + ctx, done := context.WithTimeout(context.Background(), 3*time.Second) defer done() diff --git a/internal/impl/kafka/enterprise/redpanda_common_input.go b/internal/impl/kafka/enterprise/redpanda_common_input.go index 1798940c71..c9e852d9e5 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_input.go +++ b/internal/impl/kafka/enterprise/redpanda_common_input.go @@ -16,6 +16,7 @@ import ( "github.com/twmb/franz-go/pkg/kgo" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" ) func redpandaCommonInputConfig() *service.ConfigSpec { @@ -94,6 +95,10 @@ root = if $has_topic_partitions { func init() { err := service.RegisterBatchInput("redpanda_common", redpandaCommonInputConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + tmpOpts, err := kafka.FranzConsumerOptsFromConfig(conf) if err != nil { return nil, err diff --git a/internal/impl/kafka/enterprise/redpanda_common_output.go b/internal/impl/kafka/enterprise/redpanda_common_output.go index fea495f48f..8b3fdcc109 100644 --- a/internal/impl/kafka/enterprise/redpanda_common_output.go +++ b/internal/impl/kafka/enterprise/redpanda_common_output.go @@ -14,6 +14,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" ) func redpandaCommonOutputConfig() *service.ConfigSpec { @@ -68,6 +69,10 @@ func init() { maxInFlight int, err error, ) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { return } diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_input.go b/internal/impl/kafka/enterprise/redpanda_migrator_input.go index 3149929f65..e66b9ab138 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_input.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_input.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -138,6 +139,10 @@ func RedpandaMigratorInputConfigFields() []*service.ConfigField { func init() { err := service.RegisterBatchInput("redpanda_migrator", redpandaMigratorInputConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + rdr, err := NewRedpandaMigratorReaderFromConfig(conf, mgr) if err != nil { return nil, err diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go index 524e24788c..6167bba732 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go @@ -23,6 +23,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" "github.com/redpanda-data/connect/v4/internal/retries" ) @@ -65,6 +66,10 @@ func init() { maxInFlight int, err error, ) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldInt(rmooFieldMaxInFlight); err != nil { return } diff --git a/internal/impl/kafka/enterprise/redpanda_migrator_output.go b/internal/impl/kafka/enterprise/redpanda_migrator_output.go index 5fc13cbc52..866388e0b2 100644 --- a/internal/impl/kafka/enterprise/redpanda_migrator_output.go +++ b/internal/impl/kafka/enterprise/redpanda_migrator_output.go @@ -21,6 +21,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -119,6 +120,10 @@ func init() { maxInFlight int, err error, ) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldInt(rmoFieldMaxInFlight); err != nil { return } diff --git a/internal/impl/kafka/enterprise/schema_registry_input.go b/internal/impl/kafka/enterprise/schema_registry_input.go index 4be874e0ee..fdf27b9f5b 100644 --- a/internal/impl/kafka/enterprise/schema_registry_input.go +++ b/internal/impl/kafka/enterprise/schema_registry_input.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -86,6 +87,10 @@ func schemaRegistryInputConfigFields() []*service.ConfigField { func init() { err := service.RegisterInput("schema_registry", schemaRegistryInputSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + i, err := inputFromParsed(conf, mgr) if err != nil { return nil, err diff --git a/internal/impl/kafka/enterprise/schema_registry_output.go b/internal/impl/kafka/enterprise/schema_registry_output.go index eb33a9c1d7..ed1b935509 100644 --- a/internal/impl/kafka/enterprise/schema_registry_output.go +++ b/internal/impl/kafka/enterprise/schema_registry_output.go @@ -25,6 +25,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -87,6 +88,10 @@ func schemaRegistryOutputConfigFields() []*service.ConfigField { func init() { err := service.RegisterOutput("schema_registry", schemaRegistryOutputSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, maxInFlight int, err error) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { return } diff --git a/internal/impl/kafka/enterprise/schema_registry_test.go b/internal/impl/kafka/enterprise/schema_registry_test.go index 1e579b0b42..32ba490046 100644 --- a/internal/impl/kafka/enterprise/schema_registry_test.go +++ b/internal/impl/kafka/enterprise/schema_registry_test.go @@ -21,6 +21,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/sr" + + "github.com/redpanda-data/connect/v4/internal/license" ) func TestSchemaRegistry(t *testing.T) { @@ -98,6 +100,7 @@ func TestSchemaRegistry(t *testing.T) { t.Cleanup(ts.Close) mgr := service.MockResources() + license.InjectTestService(mgr) inputConf, err := schemaRegistryInputSpec().ParseYAML(fmt.Sprintf(` url: %s diff --git a/internal/impl/kafka/enterprise/topic_logger.go b/internal/impl/kafka/enterprise/topic_logger.go index b9a3437e9d..74a4a080b1 100644 --- a/internal/impl/kafka/enterprise/topic_logger.go +++ b/internal/impl/kafka/enterprise/topic_logger.go @@ -22,6 +22,7 @@ import ( "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/connect/v4/internal/impl/kafka" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -120,6 +121,12 @@ func (l *TopicLogger) InitOutputFromParsed(pConf *service.ParsedConfig) error { return err } + if l.logsTopic != "" || l.statusTopic != "" { + if err := license.CheckRunningEnterprise(pConf.Resources()); err != nil { + return fmt.Errorf("unable to send logs or status updates to redpanda: %w", err) + } + } + lvlStr, err := pConf.FieldString("logs_level") if err != nil { return err diff --git a/internal/impl/ollama/chat_processor.go b/internal/impl/ollama/chat_processor.go index f145741fdd..d9bbb996dc 100644 --- a/internal/impl/ollama/chat_processor.go +++ b/internal/impl/ollama/chat_processor.go @@ -17,6 +17,8 @@ import ( "github.com/ollama/ollama/api" "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -150,6 +152,10 @@ output: } func makeOllamaCompletionProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + p := ollamaCompletionProcessor{} if conf.Contains(ocpFieldUserPrompt) { pf, err := conf.FieldInterpolatedString(ocpFieldUserPrompt) diff --git a/internal/impl/ollama/embeddings_processor.go b/internal/impl/ollama/embeddings_processor.go index 7c6b458495..aedcc63a12 100644 --- a/internal/impl/ollama/embeddings_processor.go +++ b/internal/impl/ollama/embeddings_processor.go @@ -15,6 +15,8 @@ import ( "github.com/ollama/ollama/api" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -98,6 +100,10 @@ output: } func makeOllamaEmbeddingProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + p := ollamaEmbeddingProcessor{} if conf.Contains(oepFieldText) { pf, err := conf.FieldInterpolatedString(oepFieldText) diff --git a/internal/impl/openai/chat_processor.go b/internal/impl/openai/chat_processor.go index f420310bd3..e92bbe4947 100644 --- a/internal/impl/openai/chat_processor.go +++ b/internal/impl/openai/chat_processor.go @@ -23,6 +23,7 @@ import ( oai "github.com/sashabaranov/go-openai" "github.com/redpanda-data/connect/v4/internal/impl/confluent/sr" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -191,6 +192,10 @@ output: } func makeChatProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/openai/embeddings_processor.go b/internal/impl/openai/embeddings_processor.go index d9aa7b3738..21f19053dc 100644 --- a/internal/impl/openai/embeddings_processor.go +++ b/internal/impl/openai/embeddings_processor.go @@ -15,6 +15,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" oai "github.com/sashabaranov/go-openai" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -80,6 +82,10 @@ output: } func makeEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/openai/image_processor.go b/internal/impl/openai/image_processor.go index c56d09e0d8..1357106a3f 100644 --- a/internal/impl/openai/image_processor.go +++ b/internal/impl/openai/image_processor.go @@ -17,6 +17,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" oai "github.com/sashabaranov/go-openai" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -75,6 +77,10 @@ To learn more about image generation, see the https://platform.openai.com/docs/g } func makeImageProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/openai/speech_processor.go b/internal/impl/openai/speech_processor.go index b89209b3a5..8712013b37 100644 --- a/internal/impl/openai/speech_processor.go +++ b/internal/impl/openai/speech_processor.go @@ -16,6 +16,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" oai "github.com/sashabaranov/go-openai" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -66,6 +68,10 @@ To learn more about turning text into spoken audio, see the https://platform.ope } func makeSpeechProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/openai/transcription_processor.go b/internal/impl/openai/transcription_processor.go index 10acaf81c6..a9b58c1bab 100644 --- a/internal/impl/openai/transcription_processor.go +++ b/internal/impl/openai/transcription_processor.go @@ -16,6 +16,8 @@ import ( "github.com/redpanda-data/benthos/v4/public/bloblang" "github.com/redpanda-data/benthos/v4/public/service" oai "github.com/sashabaranov/go-openai" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -65,6 +67,10 @@ To learn more about audio transcription, see the: https://platform.openai.com/do } func makeTranscriptionProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + b, err := newBaseProcessor(conf) if err != nil { return nil, err diff --git a/internal/impl/snowflake/output_snowflake_put.go b/internal/impl/snowflake/output_snowflake_put.go index d4c5716cd9..faae19e344 100644 --- a/internal/impl/snowflake/output_snowflake_put.go +++ b/internal/impl/snowflake/output_snowflake_put.go @@ -35,6 +35,8 @@ import ( "golang.org/x/crypto/ssh" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -401,6 +403,10 @@ func init() { maxInFlight int, err error, ) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { return } diff --git a/internal/impl/snowflake/output_snowflake_streaming.go b/internal/impl/snowflake/output_snowflake_streaming.go index f84f97b544..f89fe7fba9 100644 --- a/internal/impl/snowflake/output_snowflake_streaming.go +++ b/internal/impl/snowflake/output_snowflake_streaming.go @@ -23,6 +23,7 @@ import ( "github.com/redpanda-data/connect/v4/internal/impl/snowflake/capped" "github.com/redpanda-data/connect/v4/internal/impl/snowflake/streaming" + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -219,6 +220,10 @@ func init() { maxInFlight int, err error, ) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { return } diff --git a/internal/impl/splunk/input.go b/internal/impl/splunk/input.go index 8b899b7af8..60bdf6f916 100644 --- a/internal/impl/splunk/input.go +++ b/internal/impl/splunk/input.go @@ -22,6 +22,8 @@ import ( "github.com/Jeffail/shutdown" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -53,6 +55,10 @@ func inputSpec() *service.ConfigSpec { func init() { err := service.RegisterInput("splunk", inputSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + if err := license.CheckRunningEnterprise(mgr); err != nil { + return nil, err + } + i, err := inputFromParsed(conf, mgr.Logger()) if err != nil { return nil, err diff --git a/internal/impl/splunk/integration_test.go b/internal/impl/splunk/integration_test.go index 85ec377a2f..323eb06b2e 100644 --- a/internal/impl/splunk/integration_test.go +++ b/internal/impl/splunk/integration_test.go @@ -16,10 +16,13 @@ import ( "time" "github.com/ory/dockertest/v3" + "github.com/redpanda-data/benthos/v4/public/service" "github.com/redpanda-data/benthos/v4/public/service/integration" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/redpanda-data/connect/v4/internal/license" + _ "github.com/redpanda-data/benthos/v4/public/components/pure" ) @@ -135,6 +138,10 @@ input: integration.StreamTestOptVarSet("VAR1", serviceInputPort), integration.StreamTestOptVarSet("VAR2", serviceOutputPort), integration.StreamTestOptVarSet("VAR3", dummySplunkPassword), + integration.StreamTestOptOnResourcesInit(func(res *service.Resources) error { + license.InjectTestService(res) + return nil + }), ) }) } diff --git a/internal/impl/splunk/output.go b/internal/impl/splunk/output.go index b2bb99d92a..42b61a5875 100644 --- a/internal/impl/splunk/output.go +++ b/internal/impl/splunk/output.go @@ -20,6 +20,8 @@ import ( "net/http/httputil" "github.com/redpanda-data/benthos/v4/public/service" + + "github.com/redpanda-data/connect/v4/internal/license" ) const ( @@ -84,6 +86,10 @@ func outputSpec() *service.ConfigSpec { func init() { err := service.RegisterBatchOutput("splunk_hec", outputSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (out service.BatchOutput, batchPolicy service.BatchPolicy, maxInFlight int, err error) { + if err = license.CheckRunningEnterprise(mgr); err != nil { + return + } + if maxInFlight, err = conf.FieldMaxInFlight(); err != nil { return } From 8657b3d00638b21006cdee78b8fc799bacc8c640 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 29 Nov 2024 15:43:46 +0000 Subject: [PATCH 4/5] Update CHANGELOG --- CHANGELOG.md | 2 ++ internal/license/shared_service.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d2c8d41b9..a2eef18b36 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file. - Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad) - Add support for Parquet files to `bigquery` output (@rockwotj) - (Benthos) New `exists` operator added to the `cache` processor. (@mihaitodor) +- New CLI flag `redpanda-license` added as an alternative way to specify a Redpanda license. (@Jeffail) ### Fixed @@ -20,6 +21,7 @@ All notable changes to this project will be documented in this file. ### Changed - The `redpanda_migrator` output now registers destination schemas with all the subjects associated with the source schema ID extracted from each message. (@mihaitodor) +- Enterprise features will now only run when a valid Redpanda license is present. More information can be found at [the licenses getting started guide](https://docs.redpanda.com/current/get-started/licenses/). (@Jeffail) ## 4.41.0 - 2024-11-25 diff --git a/internal/license/shared_service.go b/internal/license/shared_service.go index 9316f9f824..bdb25f7679 100644 --- a/internal/license/shared_service.go +++ b/internal/license/shared_service.go @@ -40,7 +40,7 @@ func CheckRunningEnterprise(res *service.Resources) error { return err } if !l.AllowsEnterpriseFeatures() { - return errors.New("this feature requires a valid enterprise license ") + return errors.New("this feature requires a valid enterprise license https://docs.redpanda.com/current/get-started/licenses/") } return nil } From 288e4a33dc00b6fde2b47df9d3cb3705c8eeb4db Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Mon, 2 Dec 2024 10:41:20 +0000 Subject: [PATCH 5/5] Address nits --- internal/license/service.go | 4 ++-- internal/license/shared_service.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/license/service.go b/internal/license/service.go index 8e4c3c683f..e2bd523e9a 100644 --- a/internal/license/service.go +++ b/internal/license/service.go @@ -157,9 +157,9 @@ func (s *Service) readLicense() (licenseFileContents []byte, err error) { return nil, fmt.Errorf("failed to read default path license file: %w", err) } return nil, nil - } else { - s.logger.Debug("Loaded Redpanda Enterprise license from default file path") } + + s.logger.Debug("Loaded Redpanda Enterprise license from default file path") return } diff --git a/internal/license/shared_service.go b/internal/license/shared_service.go index bdb25f7679..3349fdaf7f 100644 --- a/internal/license/shared_service.go +++ b/internal/license/shared_service.go @@ -40,7 +40,7 @@ func CheckRunningEnterprise(res *service.Resources) error { return err } if !l.AllowsEnterpriseFeatures() { - return errors.New("this feature requires a valid enterprise license https://docs.redpanda.com/current/get-started/licenses/") + return errors.New("this feature requires a valid Redpanda Enterprise Edition license from https://redpanda.com/try-enterprise?origin=rpcn. For more information check out: https://docs.redpanda.com/current/get-started/licenses/") } return nil }