From 8700f6f601bc2fe7650ef7f0fd06ae69b9b774d1 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Fri, 1 Mar 2024 17:30:47 +0000 Subject: [PATCH] Fix template config parsing --- CHANGELOG.md | 6 + .../input_stdin_uppercase.yaml | 10 +- internal/impl/splunk/template_output.yaml | 38 ++- internal/template/config.go | 21 +- internal/template/template.go | 54 ++-- internal/template/template_test.go | 301 ++++++++++++++++++ 6 files changed, 382 insertions(+), 48 deletions(-) create mode 100644 internal/template/template_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a17dbaceb..ccfb806a79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. ## Unreleased +## 4.25.1 - 2024-03-01 + +### Fixed + +- Fixed a regression in v4.25.0 where [template based components](https://www.benthos.dev/docs/configuration/templating) were not parsing correctly from configs. + ## 4.25.0 - 2024-03-01 ### Added diff --git a/config/template_examples/input_stdin_uppercase.yaml b/config/template_examples/input_stdin_uppercase.yaml index 0fe610ef5a..96267f380c 100644 --- a/config/template_examples/input_stdin_uppercase.yaml +++ b/config/template_examples/input_stdin_uppercase.yaml @@ -9,7 +9,7 @@ mapping: | root.processors = [] root.processors."-".bloblang = """ root = content().uppercase().string() - """ + """.trim() metrics_mapping: | map decrement_processor { @@ -29,3 +29,11 @@ metrics_mapping: | root = if this.contains("processor") { this.apply("decrement_processor") } + +tests: + - name: no fields + config: {} + expected: + stdin: {} + processors: + - bloblang: "root = content().uppercase().string()" diff --git a/internal/impl/splunk/template_output.yaml b/internal/impl/splunk/template_output.yaml index 4dbddd2da8..9be37781df 100644 --- a/internal/impl/splunk/template_output.yaml +++ b/internal/impl/splunk/template_output.yaml @@ -103,7 +103,7 @@ mapping: | root.source = if $config_source != "" { $config_source} root.sourcetype = if $config_sourcetype != "" { $config_sourcetype } root.index = if $config_index != "" { $config_index } - """.format(this.event_host, this.event_source, this.event_sourcetype, this.event_index) + """.format(this.event_host, this.event_source, this.event_sourcetype, this.event_index).re_replace_all("\n ", "\n").trim_prefix("\n") tests: - name: Basic fields @@ -134,11 +134,19 @@ tests: tls: enabled: true skip_cert_verify: false - processors: [ - { - "bloblang": "let config_host = \"\"\nlet config_source = \"\"\nlet config_sourcetype = \"\"\nlet config_index = \"\"\nroot = if (this | {}).exists(\"event\") { this } else {\n { \"event\": content().string() }\n}\nroot.host = if $config_host != \"\" { $config_host }\nroot.source = if $config_source != \"\" { $config_source}\nroot.sourcetype = if $config_sourcetype != \"\" { $config_sourcetype }\nroot.index = if $config_index != \"\" { $config_index }\n" - } - ] + processors: + - bloblang: | + let config_host = "" + let config_source = "" + let config_sourcetype = "" + let config_index = "" + root = if (this | {}).exists("event") { this } else { + { "event": content().string() } + } + root.host = if $config_host != "" { $config_host } + root.source = if $config_source != "" { $config_source} + root.sourcetype = if $config_sourcetype != "" { $config_sourcetype } + root.index = if $config_index != "" { $config_index } - name: gzip config: @@ -171,8 +179,16 @@ tests: tls: enabled: true skip_cert_verify: false - processors: [ - { - "bloblang": "let config_host = \"\"\nlet config_source = \"\"\nlet config_sourcetype = \"\"\nlet config_index = \"\"\nroot = if (this | {}).exists(\"event\") { this } else {\n { \"event\": content().string() }\n}\nroot.host = if $config_host != \"\" { $config_host }\nroot.source = if $config_source != \"\" { $config_source}\nroot.sourcetype = if $config_sourcetype != \"\" { $config_sourcetype }\nroot.index = if $config_index != \"\" { $config_index }\n" - } - ] \ No newline at end of file + processors: + - bloblang: | + let config_host = "" + let config_source = "" + let config_sourcetype = "" + let config_index = "" + root = if (this | {}).exists("event") { this } else { + { "event": content().string() } + } + root.host = if $config_host != "" { $config_host } + root.source = if $config_source != "" { $config_source} + root.sourcetype = if $config_sourcetype != "" { $config_sourcetype } + root.index = if $config_index != "" { $config_index } diff --git a/internal/template/config.go b/internal/template/config.go index 931e5f53dc..9f805507dd 100644 --- a/internal/template/config.go +++ b/internal/template/config.go @@ -124,20 +124,17 @@ func (c Config) compile() (*compiled, error) { return &compiled{spec: spec, mapping: mapping, metricsMapping: metricsMapping}, nil } -func diffYAMLNodesAsJSON(expNode, actNode *yaml.Node) (string, error) { - var iexp, iact any +func diffYAMLNodesAsJSON(expNode *yaml.Node, actNode any) (string, error) { + var iexp any if err := expNode.Decode(&iexp); err != nil { return "", fmt.Errorf("failed to marshal expected %w", err) } - if err := actNode.Decode(&iact); err != nil { - return "", fmt.Errorf("failed to marshal actual %w", err) - } expBytes, err := json.Marshal(iexp) if err != nil { return "", fmt.Errorf("failed to marshal expected %w", err) } - actBytes, err := json.Marshal(iact) + actBytes, err := json.Marshal(actNode) if err != nil { return "", fmt.Errorf("failed to marshal actual %w", err) } @@ -160,12 +157,18 @@ func (c Config) Test() ([]string, error) { var failures []string for _, test := range c.Tests { - outConf, err := compiled.ExpandToNode(&test.Config) + outConf, err := compiled.Render(&test.Config) if err != nil { return nil, fmt.Errorf("test '%v': %w", test.Name, err) } - for _, lint := range docs.LintYAML(docs.NewLintContext(docs.NewLintConfig(bundle.GlobalEnvironment)), docs.Type(c.Type), outConf) { - failures = append(failures, fmt.Sprintf("test '%v': lint error in resulting config: %v", test.Name, lint.Error())) + + var yNode yaml.Node + if err := yNode.Encode(outConf); err == nil { + for _, lint := range docs.LintYAML(docs.NewLintContext(docs.NewLintConfig(bundle.GlobalEnvironment)), docs.Type(c.Type), &yNode) { + failures = append(failures, fmt.Sprintf("test '%v': lint error in resulting config: %v", test.Name, lint.Error())) + } + } else { + failures = append(failures, fmt.Sprintf("test '%v': failed to encode resulting config as YAML: %v", test.Name, err.Error())) } if len(test.Expected.Content) > 0 { diff, err := diffYAMLNodesAsJSON(&test.Expected, outConf) diff --git a/internal/template/template.go b/internal/template/template.go index 0adcd1ac81..3228f01093 100644 --- a/internal/template/template.go +++ b/internal/template/template.go @@ -52,16 +52,22 @@ type compiled struct { metricsMapping *metrics.Mapping } -// ExpandToNode attempts to apply the template to a provided YAML node and -// returns the new expanded configuration. -func (c *compiled) ExpandToNode(node *yaml.Node) (*yaml.Node, error) { - generic, err := c.spec.Config.Children.YAMLToMap(node, docs.ToValueConfig{}) +// Render a compiled template by providing a generic config. +func (c *compiled) Render(node any) (any, error) { + var genericConf any + var err error + switch t := node.(type) { + case *yaml.Node: + genericConf, err = c.spec.Config.Children.YAMLToMap(t, docs.ToValueConfig{}) + default: + genericConf, err = c.spec.Config.Children.AnyToMap(t, docs.ToValueConfig{}) + } if err != nil { return nil, fmt.Errorf("invalid config for template component: %w", err) } part := message.NewPart(nil) - part.SetStructuredMut(generic) + part.SetStructuredMut(genericConf) msg := message.Batch{part} newPart, err := c.mapping.MapPart(0, msg) @@ -73,13 +79,7 @@ func (c *compiled) ExpandToNode(node *yaml.Node) (*yaml.Node, error) { if err != nil { return nil, fmt.Errorf("mapping for template component resulted in invalid config: %w", err) } - - var resultNode yaml.Node - if err := resultNode.Encode(resultGeneric); err != nil { - return nil, fmt.Errorf("mapping for template component resulted in invalid yaml: %w", err) - } - - return &resultNode, nil + return resultGeneric, nil } //------------------------------------------------------------------------------ @@ -129,13 +129,13 @@ func WithMetricsMapping(nm bundle.NewManagement, m *metrics.Mapping) bundle.NewM func registerCacheTemplate(tmpl *compiled, env *bundle.Environment) error { return env.CacheAdd(func(c cache.Config, nm bundle.NewManagement) (cache.V1, error) { - newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node)) + newConf, err := tmpl.Render(c.Plugin) if err != nil { return nil, err } - conf := cache.NewConfig() - if err := newNode.Decode(&conf); err != nil { + conf, err := cache.FromAny(env, newConf) + if err != nil { return nil, err } @@ -150,13 +150,13 @@ func registerCacheTemplate(tmpl *compiled, env *bundle.Environment) error { func registerInputTemplate(tmpl *compiled, env *bundle.Environment) error { return env.InputAdd(func(c input.Config, nm bundle.NewManagement) (input.Streamed, error) { - newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node)) + newConf, err := tmpl.Render(c.Plugin) if err != nil { return nil, err } - conf := input.NewConfig() - if err := newNode.Decode(&conf); err != nil { + conf, err := input.FromAny(env, newConf) + if err != nil { return nil, err } @@ -174,13 +174,13 @@ func registerInputTemplate(tmpl *compiled, env *bundle.Environment) error { func registerOutputTemplate(tmpl *compiled, env *bundle.Environment) error { return env.OutputAdd(func(c output.Config, nm bundle.NewManagement, pcf ...processor.PipelineConstructorFunc) (output.Streamed, error) { - newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node)) + newConf, err := tmpl.Render(c.Plugin) if err != nil { return nil, err } - conf := output.NewConfig() - if err := newNode.Decode(&conf); err != nil { + conf, err := output.FromAny(env, newConf) + if err != nil { return nil, err } @@ -198,13 +198,13 @@ func registerOutputTemplate(tmpl *compiled, env *bundle.Environment) error { func registerProcessorTemplate(tmpl *compiled, env *bundle.Environment) error { return env.ProcessorAdd(func(c processor.Config, nm bundle.NewManagement) (processor.V1, error) { - newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node)) + newConf, err := tmpl.Render(c.Plugin) if err != nil { return nil, err } - conf := processor.NewConfig() - if err := newNode.Decode(&conf); err != nil { + conf, err := processor.FromAny(env, newConf) + if err != nil { return nil, err } @@ -219,13 +219,13 @@ func registerProcessorTemplate(tmpl *compiled, env *bundle.Environment) error { func registerRateLimitTemplate(tmpl *compiled, env *bundle.Environment) error { return env.RateLimitAdd(func(c ratelimit.Config, nm bundle.NewManagement) (ratelimit.V1, error) { - newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node)) + newConf, err := tmpl.Render(c.Plugin) if err != nil { return nil, err } - conf := ratelimit.NewConfig() - if err := newNode.Decode(&conf); err != nil { + conf, err := ratelimit.FromAny(env, newConf) + if err != nil { return nil, err } diff --git a/internal/template/template_test.go b/internal/template/template_test.go new file mode 100644 index 0000000000..2ec0fc4b28 --- /dev/null +++ b/internal/template/template_test.go @@ -0,0 +1,301 @@ +package template_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/benthosdev/benthos/v4/internal/component/cache" + "github.com/benthosdev/benthos/v4/internal/component/input" + "github.com/benthosdev/benthos/v4/internal/component/output" + "github.com/benthosdev/benthos/v4/internal/component/processor" + "github.com/benthosdev/benthos/v4/internal/component/ratelimit" + "github.com/benthosdev/benthos/v4/internal/manager" + "github.com/benthosdev/benthos/v4/internal/message" + "github.com/benthosdev/benthos/v4/internal/template" + + _ "github.com/benthosdev/benthos/v4/public/components/pure" +) + +func TestCacheTemplate(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: foo_memory +type: cache + +fields: + - name: foovalue + type: string + +mapping: | + root.memory.init_values.foo = this.foovalue +`))) + + conf, err := cache.FromAny(mgr, map[string]any{ + "foo_memory": map[string]any{ + "foovalue": "meow", + }, + }) + require.NoError(t, err) + + c, err := mgr.NewCache(conf) + require.NoError(t, err) + + res, err := c.Get(context.Background(), "foo") + require.NoError(t, err) + + assert.Equal(t, "meow", string(res)) +} + +func TestInputTemplate(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: generate_a_foo +type: input + +fields: + - name: name + type: string + +mapping: | + root.generate.count = 1 + root.generate.interval = "1ms" + root.generate.mapping = """root.foo = "%v" """.format(this.name) + root.processors = [ + { + "mutation": """root.bar = "and this too" """, + }, + ] +`))) + + conf, err := input.FromAny(mgr, map[string]any{ + "generate_a_foo": map[string]any{ + "name": "meow", + }, + "processors": []any{ + map[string]any{ + "mutation": "root.bar = this.bar.uppercase()", + }, + }, + }) + require.NoError(t, err) + + strm, err := mgr.NewInput(conf) + require.NoError(t, err) + + var tran message.Transaction + var open bool + select { + case tran, open = <-strm.TransactionChan(): + case <-time.After(time.Second): + t.Fatal("timed out") + } + require.True(t, open) + + require.Len(t, tran.Payload, 1) + assert.Equal(t, `{"bar":"AND THIS TOO","foo":"meow"}`, string(tran.Payload[0].AsBytes())) + + require.NoError(t, tran.Ack(context.Background(), nil)) + + select { + case _, open = <-strm.TransactionChan(): + case <-time.After(time.Second): + t.Fatal("timed out") + } + require.False(t, open) +} + +func TestOutputTemplate(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: write_inproc +type: output + +fields: + - name: name + type: string + +mapping: | + root.inproc = this.name + root.processors = [ + { + "mapping": "root = content().uppercase()", + }, + ] +`))) + + conf, err := output.FromAny(mgr, map[string]any{ + "write_inproc": map[string]any{ + "name": "foos", + }, + "processors": []any{ + map[string]any{ + "mapping": `root = content() + " woof"`, + }, + }, + }) + require.NoError(t, err) + + strm, err := mgr.NewOutput(conf) + require.NoError(t, err) + + tInChan := make(chan message.Transaction) + require.NoError(t, strm.Consume(tInChan)) + + tOutChan, err := mgr.GetPipe("foos") + require.NoError(t, err) + + select { + case tInChan <- message.NewTransactionFunc(message.Batch{ + message.NewPart([]byte("meow")), + }, func(ctx context.Context, err error) error { + return nil + }): + case <-time.After(time.Second): + t.Fatal("timed out") + } + + var tran message.Transaction + var open bool + select { + case tran, open = <-tOutChan: + case <-time.After(time.Second): + t.Fatal("timed out") + } + require.True(t, open) + + require.Len(t, tran.Payload, 1) + assert.Equal(t, `MEOW WOOF`, string(tran.Payload[0].AsBytes())) + + require.NoError(t, tran.Ack(context.Background(), nil)) + + close(tInChan) + strm.TriggerCloseNow() + + ctx, done := context.WithTimeout(context.Background(), time.Second) + defer done() + require.NoError(t, strm.WaitForClose(ctx)) +} + +func TestProcessorTemplate(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: append_foo +type: processor + +fields: + - name: foo + type: string + +mapping: | + root.mapping = """root = content() + "%v" """.format(this.foo) +`))) + + conf, err := processor.FromAny(mgr, map[string]any{ + "append_foo": map[string]any{ + "foo": " meow", + }, + }) + require.NoError(t, err) + + p, err := mgr.NewProcessor(conf) + require.NoError(t, err) + + res, err := p.ProcessBatch(context.Background(), message.Batch{ + message.NewPart([]byte("woof")), + }) + require.NoError(t, err) + require.Len(t, res, 1) + require.Len(t, res[0], 1) + assert.Equal(t, `woof meow`, string(res[0][0].AsBytes())) +} + +func TestProcessorTemplateOddIndentation(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: meow +type: processor + +mapping: | + map switch_if { + root.check = "this.go == true" + + root.processors = [ + { + "mutation": """ + root.id = this.id.uppercase() + """ + }, + ] + } + root.switch = [ + this.apply("switch_if") + ] +`))) + + conf, err := processor.FromAny(mgr, map[string]any{ + "meow": map[string]any{}, + }) + require.NoError(t, err) + + p, err := mgr.NewProcessor(conf) + require.NoError(t, err) + + res, err := p.ProcessBatch(context.Background(), message.Batch{ + message.NewPart([]byte(`{"go":true,"id":"aaa"}`)), + }) + require.NoError(t, err) + require.Len(t, res, 1) + require.Len(t, res[0], 1) + assert.Equal(t, `{"go":true,"id":"AAA"}`, string(res[0][0].AsBytes())) +} + +func TestRateLimitTemplate(t *testing.T) { + mgr, err := manager.New(manager.NewResourceConfig()) + require.NoError(t, err) + + require.NoError(t, template.RegisterTemplateYAML(mgr.Environment(), []byte(` +name: foo +type: rate_limit + +fields: + - name: i + type: string + +mapping: | + root.local.count = 1 + root.local.interval = this.i +`))) + + conf, err := ratelimit.FromAny(mgr, map[string]any{ + "foo": map[string]any{ + "i": "1h", + }, + }) + require.NoError(t, err) + + r, err := mgr.NewRateLimit(conf) + require.NoError(t, err) + + d, err := r.Access(context.Background()) + require.NoError(t, err) + assert.Equal(t, d, time.Duration(0)) + + d, err = r.Access(context.Background()) + require.NoError(t, err) + assert.Greater(t, d, time.Hour-time.Minute) + assert.Less(t, d, time.Hour+time.Minute) +}