Skip to content

Commit

Permalink
integration: allow to specify a custom sink (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
Roberto Santalla committed Feb 22, 2022
1 parent 276da45 commit d84cea9
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 38 deletions.
22 changes: 18 additions & 4 deletions cmd/nri-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (

_ = iota
exitClients
exitConfig
exitIntegration
exitLoop
exitSetup
Expand All @@ -54,7 +55,7 @@ type clusterClients struct {
func main() {
logger = log.StandardLogger()

c, err := config.LoadConfig(config.DefaultFilePath, config.DefaultFileName)
c, err := config.LoadConfig(config.DefaultConfigFolderName, config.DefaultConfigFileName)
if err != nil {
log.Error(err.Error())
os.Exit(exitIntegration)
Expand All @@ -73,19 +74,32 @@ func main() {
}
}

iw, err := integration.NewWrapper(
integrationOptions := []integration.OptionFunc{
integration.WithLogger(logger),
integration.WithMetadata(integration.Metadata{
Name: integrationName,
Version: integrationVersion,
}),
)
}

switch c.Sink.Type {
case config.SinkTypeHTTP:
integrationOptions = append(integrationOptions, integration.WithHTTPSink(c.Sink.HTTP))
case config.SinkTypeStdout:
// We don't need to do anything here to sink to stdout, as it's the default behavior of integration.Wrapper.
logger.Warn("Sinking metrics to stdout")
default:
log.Errorf("Unknown sink type %s", c.Sink.Type)
os.Exit(exitConfig)
}

iw, err := integration.NewWrapper(integrationOptions...)
if err != nil {
logger.Errorf("creating integration wrapper: %v", err)
os.Exit(exitIntegration)
}

i, err := iw.Integration(c.Sink.HTTP)
i, err := iw.Integration()
if err != nil {
logger.Errorf("creating integration with http sink: %v", err)
os.Exit(exitIntegration)
Expand Down
16 changes: 13 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import (
)

const (
DefaultFileName = "nri-kubernetes"
DefaultFilePath = "/etc/newrelic-infra"
DefaultConfigFileName = "nri-kubernetes"
DefaultConfigFolderName = "/etc/newrelic-infra"

DefaultTimeout = 10 * time.Second
DefaultRetries = 3
DefaultAgentTimeout = 3 * time.Second

DefaultNetworkRouteFile = "/proc/net/route"

SinkTypeHTTP = "http"
SinkTypeStdout = "stdout"
)

type Config struct {
Expand All @@ -34,6 +40,9 @@ type Config struct {

// Sink defines where the integration will report the metrics to.
Sink struct {
// Type allows selecting which of the supported sinks will be used by the integration.
// Supported values are `http` and `stdout`.
Type string `mapstructure:"type"`
// HTTP stores the configuration for the HTTP sink.
HTTP HTTPSink `mapstructure:"http"`
} `mapstructure:"sink"`
Expand Down Expand Up @@ -203,11 +212,12 @@ func LoadConfig(filePath string, fileName string) (*Config, error) {
// https://github.com/spf13/viper/issues/584
v.SetDefault("clusterName", "cluster")
v.SetDefault("verbose", false)
v.SetDefault("kubelet.networkRouteFile", "/proc/net/route")
v.SetDefault("kubelet.networkRouteFile", DefaultNetworkRouteFile)
v.SetDefault("nodeName", "node")
v.SetDefault("nodeIP", "node")

// Sane connection defaults
v.SetDefault("sink.type", SinkTypeHTTP)
v.SetDefault("sink.http.port", 0)
v.SetDefault("sink.http.timeout", DefaultAgentTimeout)
v.SetDefault("sink.http.retries", DefaultRetries)
Expand Down
76 changes: 45 additions & 31 deletions src/integration/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package integration

import (
"fmt"
"io"
"net"
"os"
"strconv"
"time"

sdk "github.com/newrelic/infra-integrations-sdk/integration"
"github.com/sethgrid/pester"
log "github.com/sirupsen/logrus"

"github.com/newrelic/nri-kubernetes/v3/internal/config"
"github.com/newrelic/nri-kubernetes/v3/internal/logutil"
"github.com/newrelic/nri-kubernetes/v3/internal/storer"
"github.com/newrelic/nri-kubernetes/v3/src/integration/prober"
"github.com/newrelic/nri-kubernetes/v3/src/integration/sink"
"github.com/sethgrid/pester"
log "github.com/sirupsen/logrus"
)

const defaultProbeTimeout = 90 * time.Second
Expand All @@ -28,6 +31,7 @@ type Wrapper struct {
metadata Metadata
probeTimeout time.Duration
probeBackoff time.Duration
sink io.Writer
}

// OptionFunc is an option func for the Wrapper.
Expand All @@ -48,6 +52,42 @@ func WithMetadata(metadata Metadata) OptionFunc {
}
}

// WithHTTPSink configures the wrapper to use an HTTP Sink for metrics.
// If this option is not specified, Wrapper will configure the integration.Integration to sink metrics to stdout.
func WithHTTPSink(sinkConfig config.HTTPSink) OptionFunc {
return func(iw *Wrapper) error {
hostPort := net.JoinHostPort(sink.DefaultAgentForwarderhost, strconv.Itoa(sinkConfig.Port))

prober := prober.New(iw.probeTimeout, iw.probeBackoff)
iw.logger.Info("Waiting for agent container to be ready...")

err := prober.Probe(fmt.Sprintf("http://%s%s", hostPort, agentReadyPath))
if err != nil {
return fmt.Errorf("timeout waiting for agent: %w", err)
}

c := pester.New()
c.Backoff = pester.LinearBackoff
c.MaxRetries = sinkConfig.Retries
c.Timeout = sinkConfig.Timeout
c.LogHook = func(e pester.ErrEntry) {
// LogHook is invoked only when an error happens
iw.logger.Warnf("Error sending data to agent sink: %q", e)
}

h, err := sink.New(sink.HTTPSinkOptions{
URL: fmt.Sprintf("http://%s%s", hostPort, sink.DefaultAgentForwarderPath),
Client: c,
})
if err != nil {
return fmt.Errorf("creating HTTP Sink: %w", err)
}

iw.sink = h
return nil
}
}

// Metadata contains the integration name and version that is passed down to the integration SDK.
type Metadata struct {
Name string
Expand All @@ -58,6 +98,7 @@ type Metadata struct {
func NewWrapper(opts ...OptionFunc) (*Wrapper, error) {
intgr := &Wrapper{
logger: logutil.Discard,
sink: os.Stdout,
probeTimeout: defaultProbeTimeout,
probeBackoff: defaultProbeBackoff,
}
Expand All @@ -74,34 +115,7 @@ func NewWrapper(opts ...OptionFunc) (*Wrapper, error) {

// Integration returns a sdk.Integration, configured to output data to the specified agent.
// Integration will block and wait until the specified server is ready, up to a maximum timeout.
func (iw *Wrapper) Integration(sinkConfig config.HTTPSink) (*sdk.Integration, error) {
hostPort := net.JoinHostPort(sink.DefaultAgentForwarderhost, strconv.Itoa(sinkConfig.Port))

prober := prober.New(iw.probeTimeout, iw.probeBackoff)
iw.logger.Info("Waiting for agent container to be ready...")

err := prober.Probe(fmt.Sprintf("http://%s%s", hostPort, agentReadyPath))
if err != nil {
return nil, fmt.Errorf("timeout waiting for agent: %w", err)
}

c := pester.New()
c.Backoff = pester.LinearBackoff
c.MaxRetries = sinkConfig.Retries
c.Timeout = sinkConfig.Timeout
c.LogHook = func(e pester.ErrEntry) {
// LogHook is invoked only when an error happens
iw.logger.Warnf("Error sending data to agent sink: %q", e)
}

h, err := sink.New(sink.HTTPSinkOptions{
URL: fmt.Sprintf("http://%s%s", hostPort, sink.DefaultAgentForwarderPath),
Client: c,
})
if err != nil {
return nil, fmt.Errorf("creating HTTPSink: %w", err)
}

func (iw *Wrapper) Integration() (*sdk.Integration, error) {
cache := storer.NewInMemoryStore(storer.DefaultTTL, storer.DefaultInterval, iw.logger)
return sdk.New(iw.metadata.Name, iw.metadata.Version, sdk.Writer(h), sdk.Storer(cache))
return sdk.New(iw.metadata.Name, iw.metadata.Version, sdk.Writer(iw.sink), sdk.Storer(cache))
}

0 comments on commit d84cea9

Please sign in to comment.