Skip to content

Commit

Permalink
Fix template config parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Mar 1, 2024
1 parent a2a1df8 commit 8700f6f
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 48 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 4.25.1 - 2024-03-01

### Fixed

- Fixed a regression in v4.25.0 where [template based components](https://www.benthos.dev/docs/configuration/templating) were not parsing correctly from configs.

## 4.25.0 - 2024-03-01

### Added
Expand Down
10 changes: 9 additions & 1 deletion config/template_examples/input_stdin_uppercase.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mapping: |
root.processors = []
root.processors."-".bloblang = """
root = content().uppercase().string()
"""
""".trim()
metrics_mapping: |
map decrement_processor {
Expand All @@ -29,3 +29,11 @@ metrics_mapping: |
root = if this.contains("processor") {
this.apply("decrement_processor")
}
tests:
- name: no fields
config: {}
expected:
stdin: {}
processors:
- bloblang: "root = content().uppercase().string()"
38 changes: 27 additions & 11 deletions internal/impl/splunk/template_output.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mapping: |
root.source = if $config_source != "" { $config_source}
root.sourcetype = if $config_sourcetype != "" { $config_sourcetype }
root.index = if $config_index != "" { $config_index }
""".format(this.event_host, this.event_source, this.event_sourcetype, this.event_index)
""".format(this.event_host, this.event_source, this.event_sourcetype, this.event_index).re_replace_all("\n ", "\n").trim_prefix("\n")
tests:
- name: Basic fields
Expand Down Expand Up @@ -134,11 +134,19 @@ tests:
tls:
enabled: true
skip_cert_verify: false
processors: [
{
"bloblang": "let config_host = \"\"\nlet config_source = \"\"\nlet config_sourcetype = \"\"\nlet config_index = \"\"\nroot = if (this | {}).exists(\"event\") { this } else {\n { \"event\": content().string() }\n}\nroot.host = if $config_host != \"\" { $config_host }\nroot.source = if $config_source != \"\" { $config_source}\nroot.sourcetype = if $config_sourcetype != \"\" { $config_sourcetype }\nroot.index = if $config_index != \"\" { $config_index }\n"
}
]
processors:
- bloblang: |
let config_host = ""
let config_source = ""
let config_sourcetype = ""
let config_index = ""
root = if (this | {}).exists("event") { this } else {
{ "event": content().string() }
}
root.host = if $config_host != "" { $config_host }
root.source = if $config_source != "" { $config_source}
root.sourcetype = if $config_sourcetype != "" { $config_sourcetype }
root.index = if $config_index != "" { $config_index }
- name: gzip
config:
Expand Down Expand Up @@ -171,8 +179,16 @@ tests:
tls:
enabled: true
skip_cert_verify: false
processors: [
{
"bloblang": "let config_host = \"\"\nlet config_source = \"\"\nlet config_sourcetype = \"\"\nlet config_index = \"\"\nroot = if (this | {}).exists(\"event\") { this } else {\n { \"event\": content().string() }\n}\nroot.host = if $config_host != \"\" { $config_host }\nroot.source = if $config_source != \"\" { $config_source}\nroot.sourcetype = if $config_sourcetype != \"\" { $config_sourcetype }\nroot.index = if $config_index != \"\" { $config_index }\n"
}
]
processors:
- bloblang: |
let config_host = ""
let config_source = ""
let config_sourcetype = ""
let config_index = ""
root = if (this | {}).exists("event") { this } else {
{ "event": content().string() }
}
root.host = if $config_host != "" { $config_host }
root.source = if $config_source != "" { $config_source}
root.sourcetype = if $config_sourcetype != "" { $config_sourcetype }
root.index = if $config_index != "" { $config_index }
21 changes: 12 additions & 9 deletions internal/template/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,20 +124,17 @@ func (c Config) compile() (*compiled, error) {
return &compiled{spec: spec, mapping: mapping, metricsMapping: metricsMapping}, nil
}

func diffYAMLNodesAsJSON(expNode, actNode *yaml.Node) (string, error) {
var iexp, iact any
func diffYAMLNodesAsJSON(expNode *yaml.Node, actNode any) (string, error) {
var iexp any
if err := expNode.Decode(&iexp); err != nil {
return "", fmt.Errorf("failed to marshal expected %w", err)
}
if err := actNode.Decode(&iact); err != nil {
return "", fmt.Errorf("failed to marshal actual %w", err)
}

expBytes, err := json.Marshal(iexp)
if err != nil {
return "", fmt.Errorf("failed to marshal expected %w", err)
}
actBytes, err := json.Marshal(iact)
actBytes, err := json.Marshal(actNode)
if err != nil {
return "", fmt.Errorf("failed to marshal actual %w", err)
}
Expand All @@ -160,12 +157,18 @@ func (c Config) Test() ([]string, error) {

var failures []string
for _, test := range c.Tests {
outConf, err := compiled.ExpandToNode(&test.Config)
outConf, err := compiled.Render(&test.Config)
if err != nil {
return nil, fmt.Errorf("test '%v': %w", test.Name, err)
}
for _, lint := range docs.LintYAML(docs.NewLintContext(docs.NewLintConfig(bundle.GlobalEnvironment)), docs.Type(c.Type), outConf) {
failures = append(failures, fmt.Sprintf("test '%v': lint error in resulting config: %v", test.Name, lint.Error()))

var yNode yaml.Node
if err := yNode.Encode(outConf); err == nil {
for _, lint := range docs.LintYAML(docs.NewLintContext(docs.NewLintConfig(bundle.GlobalEnvironment)), docs.Type(c.Type), &yNode) {
failures = append(failures, fmt.Sprintf("test '%v': lint error in resulting config: %v", test.Name, lint.Error()))
}
} else {
failures = append(failures, fmt.Sprintf("test '%v': failed to encode resulting config as YAML: %v", test.Name, err.Error()))
}
if len(test.Expected.Content) > 0 {
diff, err := diffYAMLNodesAsJSON(&test.Expected, outConf)
Expand Down
54 changes: 27 additions & 27 deletions internal/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,22 @@ type compiled struct {
metricsMapping *metrics.Mapping
}

// ExpandToNode attempts to apply the template to a provided YAML node and
// returns the new expanded configuration.
func (c *compiled) ExpandToNode(node *yaml.Node) (*yaml.Node, error) {
generic, err := c.spec.Config.Children.YAMLToMap(node, docs.ToValueConfig{})
// Render a compiled template by providing a generic config.
func (c *compiled) Render(node any) (any, error) {
var genericConf any
var err error
switch t := node.(type) {
case *yaml.Node:
genericConf, err = c.spec.Config.Children.YAMLToMap(t, docs.ToValueConfig{})
default:
genericConf, err = c.spec.Config.Children.AnyToMap(t, docs.ToValueConfig{})
}
if err != nil {
return nil, fmt.Errorf("invalid config for template component: %w", err)
}

part := message.NewPart(nil)
part.SetStructuredMut(generic)
part.SetStructuredMut(genericConf)
msg := message.Batch{part}

newPart, err := c.mapping.MapPart(0, msg)
Expand All @@ -73,13 +79,7 @@ func (c *compiled) ExpandToNode(node *yaml.Node) (*yaml.Node, error) {
if err != nil {
return nil, fmt.Errorf("mapping for template component resulted in invalid config: %w", err)
}

var resultNode yaml.Node
if err := resultNode.Encode(resultGeneric); err != nil {
return nil, fmt.Errorf("mapping for template component resulted in invalid yaml: %w", err)
}

return &resultNode, nil
return resultGeneric, nil
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -129,13 +129,13 @@ func WithMetricsMapping(nm bundle.NewManagement, m *metrics.Mapping) bundle.NewM

func registerCacheTemplate(tmpl *compiled, env *bundle.Environment) error {
return env.CacheAdd(func(c cache.Config, nm bundle.NewManagement) (cache.V1, error) {
newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node))
newConf, err := tmpl.Render(c.Plugin)
if err != nil {
return nil, err
}

conf := cache.NewConfig()
if err := newNode.Decode(&conf); err != nil {
conf, err := cache.FromAny(env, newConf)
if err != nil {
return nil, err
}

Expand All @@ -150,13 +150,13 @@ func registerCacheTemplate(tmpl *compiled, env *bundle.Environment) error {

func registerInputTemplate(tmpl *compiled, env *bundle.Environment) error {
return env.InputAdd(func(c input.Config, nm bundle.NewManagement) (input.Streamed, error) {
newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node))
newConf, err := tmpl.Render(c.Plugin)
if err != nil {
return nil, err
}

conf := input.NewConfig()
if err := newNode.Decode(&conf); err != nil {
conf, err := input.FromAny(env, newConf)
if err != nil {
return nil, err
}

Expand All @@ -174,13 +174,13 @@ func registerInputTemplate(tmpl *compiled, env *bundle.Environment) error {

func registerOutputTemplate(tmpl *compiled, env *bundle.Environment) error {
return env.OutputAdd(func(c output.Config, nm bundle.NewManagement, pcf ...processor.PipelineConstructorFunc) (output.Streamed, error) {
newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node))
newConf, err := tmpl.Render(c.Plugin)
if err != nil {
return nil, err
}

conf := output.NewConfig()
if err := newNode.Decode(&conf); err != nil {
conf, err := output.FromAny(env, newConf)
if err != nil {
return nil, err
}

Expand All @@ -198,13 +198,13 @@ func registerOutputTemplate(tmpl *compiled, env *bundle.Environment) error {

func registerProcessorTemplate(tmpl *compiled, env *bundle.Environment) error {
return env.ProcessorAdd(func(c processor.Config, nm bundle.NewManagement) (processor.V1, error) {
newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node))
newConf, err := tmpl.Render(c.Plugin)
if err != nil {
return nil, err
}

conf := processor.NewConfig()
if err := newNode.Decode(&conf); err != nil {
conf, err := processor.FromAny(env, newConf)
if err != nil {
return nil, err
}

Expand All @@ -219,13 +219,13 @@ func registerProcessorTemplate(tmpl *compiled, env *bundle.Environment) error {

func registerRateLimitTemplate(tmpl *compiled, env *bundle.Environment) error {
return env.RateLimitAdd(func(c ratelimit.Config, nm bundle.NewManagement) (ratelimit.V1, error) {
newNode, err := tmpl.ExpandToNode(c.Plugin.(*yaml.Node))
newConf, err := tmpl.Render(c.Plugin)
if err != nil {
return nil, err
}

conf := ratelimit.NewConfig()
if err := newNode.Decode(&conf); err != nil {
conf, err := ratelimit.FromAny(env, newConf)
if err != nil {
return nil, err
}

Expand Down
Loading

1 comment on commit 8700f6f

@peczenyj
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️‍🔥

Please sign in to comment.