Skip to content

Commit

Permalink
Merge pull request #3049 from redpanda-data/licensing
Browse files Browse the repository at this point in the history
Add license checks for enterprise features
  • Loading branch information
Jeffail authored Dec 2, 2024
2 parents 7f02041 + 288e4a3 commit 8b73381
Show file tree
Hide file tree
Showing 38 changed files with 764 additions and 4 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.42.0 - TBD
## 4.42.0 - 2024-12-02

### Added

- Add support for `spanner` driver to SQL plugins. (@yufeng-deng)
- Add support for complex database types (JSONB, TEXT[], INET, TSVECTOR, TSRANGE, POINT, INTEGER[]) for `pg_stream` input. (@le-vlad)
- Add support for Parquet files to `bigquery` output (@rockwotj)
- (Benthos) New `exists` operator added to the `cache` processor. (@mihaitodor)
- New CLI flag `redpanda-license` added as an alternative way to specify a Redpanda license. (@Jeffail)

### Fixed

Expand All @@ -19,6 +21,7 @@ All notable changes to this project will be documented in this file.
### Changed

- The `redpanda_migrator` output now registers destination schemas with all the subjects associated with the source schema ID extracted from each message. (@mihaitodor)
- Enterprise features will now only run when a valid Redpanda license is present. More information can be found at [the licenses getting started guide](https://docs.redpanda.com/current/get-started/licenses/). (@Jeffail)

## 4.41.0 - 2024-11-25

Expand Down
6 changes: 6 additions & 0 deletions docs/modules/components/pages/processors/cache.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Options:
, `add`
, `get`
, `delete`
, `exists`
.
=== `key`
Expand Down Expand Up @@ -241,3 +242,8 @@ can be detected with xref:configuration:error_handling.adoc[processor error hand
Delete a key and its contents from the cache. If the key does not exist the
action is a no-op and will not fail with an error.
=== `exists`
Check if a given key exists in the cache and replace the original message payload
with `true` or `false`.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ require (
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/redis/go-redis/v9 v9.7.0
github.com/redpanda-data/benthos/v4 v4.41.1
github.com/redpanda-data/benthos/v4 v4.42.0
github.com/redpanda-data/common-go/secrets v0.1.2
github.com/redpanda-data/connect/public/bundle/free/v4 v4.31.0
github.com/rs/xid v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1733,8 +1733,8 @@ github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5X
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
github.com/redpanda-data/benthos/v4 v4.41.1 h1:kvhPIW7uJUj8ImVooR/ExYG2NZJ5r0U4bH6EEm9N8Dw=
github.com/redpanda-data/benthos/v4 v4.41.1/go.mod h1:T5Nb0hH1Sa1ChlH4hLW7+nA1+jQ/3CP/cVFI73z6ZIM=
github.com/redpanda-data/benthos/v4 v4.42.0 h1:3sKmHhdC1t/IH63oTzlYurfJaO0TsEWSEKeiE6FIvG8=
github.com/redpanda-data/benthos/v4 v4.42.0/go.mod h1:T5Nb0hH1Sa1ChlH4hLW7+nA1+jQ/3CP/cVFI73z6ZIM=
github.com/redpanda-data/common-go/secrets v0.1.2 h1:UCDLN/yL8yjSIYhS5MB+2Am1Jy4XZMZPtuuCRL/82Rw=
github.com/redpanda-data/common-go/secrets v0.1.2/go.mod h1:WjaDI39reE/GPRPHTsaYmiMjhHj+qsSJLe+kHsPKsXk=
github.com/redpanda-data/connect/public/bundle/free/v4 v4.31.0 h1:Qiz4Q8ZO17n8797hgDdJ2f1XN7wh6J2hIRgeeSw4F24=
Expand Down
12 changes: 12 additions & 0 deletions internal/cli/enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/urfave/cli/v2"

"github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise"
"github.com/redpanda-data/connect/v4/internal/license"
"github.com/redpanda-data/connect/v4/internal/secrets"
"github.com/redpanda-data/connect/v4/internal/telemetry"
)
Expand Down Expand Up @@ -46,6 +47,9 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co
}

var disableTelemetry bool
licenseConfig := license.Config{
LicenseFilepath: os.Getenv("REDPANDA_LICENSE_FILEPATH"),
}

opts = append(opts,
service.CLIOptSetVersion(version, dateBuilt),
Expand Down Expand Up @@ -81,6 +85,9 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co
}),
service.CLIOptAddTeeLogger(slog.New(rpLogger)),
service.CLIOptOnConfigParse(func(pConf *service.ParsedConfig) error {
// Kick off license service.
license.RegisterService(pConf.Resources(), licenseConfig)

// Kick off telemetry exporter.
if !disableTelemetry {
telemetry.ActivateExporter(instanceID, version, fbLogger, schema, pConf)
Expand All @@ -103,8 +110,13 @@ func InitEnterpriseCLI(binaryName, version, dateBuilt string, schema *service.Co
Name: "disable-telemetry",
Usage: "Disable anonymous telemetry from being emitted by this Connect instance.",
},
&cli.StringFlag{
Name: "redpanda-license",
Usage: "Provide an explicit Redpanda License, which enables enterprise functionality. By default licenses found at the path `/etc/redpanda/redpanda.license` are applied.",
},
}, func(c *cli.Context) error {
disableTelemetry = c.Bool("disable-telemetry")
licenseConfig.License = c.String("redpanda-license")

if secretsURNs := c.StringSlice("secrets"); len(secretsURNs) > 0 {
var err error
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/aws/enterprise/processor_bedrock_chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/redpanda-data/connect/v4/internal/impl/aws"
"github.com/redpanda-data/connect/v4/internal/impl/aws/config"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -76,6 +77,10 @@ For more information, see the https://docs.aws.amazon.com/bedrock/latest/usergui
}

func newBedrockChatProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

aconf, err := aws.GetSession(context.Background(), conf)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/aws/enterprise/processor_bedrock_embeddings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/redpanda-data/connect/v4/internal/impl/aws"
"github.com/redpanda-data/connect/v4/internal/impl/aws/config"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -78,6 +79,10 @@ output:
}

func newBedrockEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

aconf, err := aws.GetSession(context.Background(), conf)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/cohere/chat_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/confluent/sr"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -144,6 +145,10 @@ We generally recommend altering this or temperature but not both.`).
}

func makeChatProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

b, err := newBaseProcessor(conf)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions internal/impl/cohere/embeddings_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
cohere "github.com/cohere-ai/cohere-go/v2"
"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -87,6 +89,10 @@ output:
}

func makeEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

b, err := newBaseProcessor(conf)
if err != nil {
return nil, err
Expand Down
6 changes: 6 additions & 0 deletions internal/impl/gcp/enterprise/processor_vertex_ai_chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"
"google.golang.org/api/option"

"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -122,6 +124,10 @@ For more information, see the https://cloud.google.com/vertex-ai/docs[Vertex AI
}

func newVertexAIProcessor(conf *service.ParsedConfig, mgr *service.Resources) (p service.Processor, err error) {
if err = license.CheckRunningEnterprise(mgr); err != nil {
return
}

ctx := context.Background()
proc := &vertexAIChatProcessor{}
var project string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/license"

aiplatform "cloud.google.com/go/aiplatform/apiv1"
"cloud.google.com/go/aiplatform/apiv1/aiplatformpb"
"cloud.google.com/go/vertexai/genai"
Expand Down Expand Up @@ -88,6 +90,10 @@ For more information, see the https://cloud.google.com/vertex-ai/generative-ai/d
}

func newVertexAIEmbeddingsProcessor(conf *service.ParsedConfig, mgr *service.Resources) (p service.Processor, err error) {
if err = license.CheckRunningEnterprise(mgr); err != nil {
return
}

ctx := context.Background()
proc := &vertexAIEmbeddingsProcessor{}
var project string
Expand Down
11 changes: 11 additions & 0 deletions internal/impl/kafka/enterprise/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service/integration"

"github.com/redpanda-data/connect/v4/internal/impl/kafka/enterprise"
"github.com/redpanda-data/connect/v4/internal/license"
"github.com/redpanda-data/connect/v4/internal/protoconnect"
)

Expand Down Expand Up @@ -166,6 +167,8 @@ max_message_bytes: 1MB
`, brokerAddr, logsTopic, statusTopic), nil)
require.NoError(t, err)

license.InjectTestService(conf.Resources())

logger := enterprise.NewTopicLogger("foo")
require.NoError(t, logger.InitOutputFromParsed(conf))

Expand Down Expand Up @@ -213,6 +216,8 @@ max_message_bytes: 1MB
`, brokerAddr, logsTopic, statusTopic), nil)
require.NoError(t, err)

license.InjectTestService(conf.Resources())

logger := enterprise.NewTopicLogger("foo")
require.NoError(t, logger.InitOutputFromParsed(conf))

Expand Down Expand Up @@ -262,6 +267,8 @@ max_message_bytes: 1MB
`, brokerAddr, logsTopic, statusTopic), nil)
require.NoError(t, err)

license.InjectTestService(conf.Resources())

logger := enterprise.NewTopicLogger("baz")
require.NoError(t, logger.InitOutputFromParsed(conf))

Expand Down Expand Up @@ -471,6 +478,8 @@ output:
stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
defer done()

Expand Down Expand Up @@ -567,6 +576,8 @@ output:
stream, err := streamBuilder.Build()
require.NoError(t, err)

license.InjectTestService(stream.Resources())

ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
defer done()

Expand Down
5 changes: 5 additions & 0 deletions internal/impl/kafka/enterprise/redpanda_common_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"

"github.com/redpanda-data/connect/v4/internal/impl/kafka"
"github.com/redpanda-data/connect/v4/internal/license"
)

func redpandaCommonInputConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -94,6 +95,10 @@ root = if $has_topic_partitions {
func init() {
err := service.RegisterBatchInput("redpanda_common", redpandaCommonInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

tmpOpts, err := kafka.FranzConsumerOptsFromConfig(conf)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/kafka/enterprise/redpanda_common_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/kafka"
"github.com/redpanda-data/connect/v4/internal/license"
)

func redpandaCommonOutputConfig() *service.ConfigSpec {
Expand Down Expand Up @@ -68,6 +69,10 @@ func init() {
maxInFlight int,
err error,
) {
if err = license.CheckRunningEnterprise(mgr); err != nil {
return
}

if maxInFlight, err = conf.FieldMaxInFlight(); err != nil {
return
}
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/kafka/enterprise/redpanda_migrator_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/kafka"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -138,6 +139,10 @@ func RedpandaMigratorInputConfigFields() []*service.ConfigField {
func init() {
err := service.RegisterBatchInput("redpanda_migrator", redpandaMigratorInputConfig(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

rdr, err := NewRedpandaMigratorReaderFromConfig(conf, mgr)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/kafka"
"github.com/redpanda-data/connect/v4/internal/license"
"github.com/redpanda-data/connect/v4/internal/retries"
)

Expand Down Expand Up @@ -65,6 +66,10 @@ func init() {
maxInFlight int,
err error,
) {
if err = license.CheckRunningEnterprise(mgr); err != nil {
return
}

if maxInFlight, err = conf.FieldInt(rmooFieldMaxInFlight); err != nil {
return
}
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/kafka/enterprise/redpanda_migrator_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/redpanda-data/connect/v4/internal/impl/confluent/sr"
"github.com/redpanda-data/connect/v4/internal/impl/kafka"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -119,6 +120,10 @@ func init() {
maxInFlight int,
err error,
) {
if err = license.CheckRunningEnterprise(mgr); err != nil {
return
}

if maxInFlight, err = conf.FieldInt(rmoFieldMaxInFlight); err != nil {
return
}
Expand Down
5 changes: 5 additions & 0 deletions internal/impl/kafka/enterprise/schema_registry_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/confluent/sr"
"github.com/redpanda-data/connect/v4/internal/license"
)

const (
Expand Down Expand Up @@ -86,6 +87,10 @@ func schemaRegistryInputConfigFields() []*service.ConfigField {
func init() {
err := service.RegisterInput("schema_registry", schemaRegistryInputSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
if err := license.CheckRunningEnterprise(mgr); err != nil {
return nil, err
}

i, err := inputFromParsed(conf, mgr)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8b73381

Please sign in to comment.