Skip to content

Commit

Permalink
Merge pull request vmware-archive#64 from alok87/s3sink
Browse files Browse the repository at this point in the history
S3 Sink
  • Loading branch information
timothysc authored Sep 13, 2018
2 parents e585d5d + 20edca3 commit de06a28
Show file tree
Hide file tree
Showing 6,827 changed files with 3,327,500 additions and 8 deletions.
The diff you're trying to view is too large. We only load the first 3000 changed files.
76 changes: 71 additions & 5 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,14 @@
"sink": "glog",
"httpSinkUrl": "http://localhost:8080",
"httpSinkBufferSize": 1500,
"httpSinkDiscardMessages": true
"httpSinkDiscardMessages": true,
"s3SinkAccessKeyID": "",
"s3SinkSecretAccessKey": "",
"s3SinkRegion": "ap-south-1",
"s3SinkBucket": "",
"s3SinkBucketDir": "",
"s3SinkBufferSize": 1500,
"s3SinkDiscardMessages": true,
"s3SinkOutputFormat": "flatjson",
"s3SinkUploadInterval": 120
}
2 changes: 1 addition & 1 deletion eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer glog.Infof("Shutting down EventRouter")

glog.Infof("Starting EvenRouter")
glog.Infof("Starting EventRouter")

// here is where we kick the caches into gear
if !cache.WaitForCacheSync(stopCh, er.eListerSynched) {
Expand Down
25 changes: 25 additions & 0 deletions sinks/eventdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import (
"io"

"github.com/crewjam/rfc5424"
"github.com/json-iterator/go"
"github.com/json-iterator/go/extra"
"k8s.io/api/core/v1"

"github.com/nytlabs/gojsonexplode"
)

// EventData encodes an eventrouter event and previous event, with a verb for
Expand Down Expand Up @@ -80,3 +84,24 @@ func (e *EventData) WriteRFC5424(w io.Writer) (int64, error) {

return msg.WriteTo(w)
}

// WriteFlattenedJSON writes the json to the file in the below format
// 1) Flattens the json into a not nested key:value
// 2) Convert the json into snake format
// Eg: {"event_involved_object_kind":"pod", "event_metadata_namespace":"kube-system"}
func (e *EventData) WriteFlattenedJSON(w io.Writer) (int64, error) {
var eJSONBytes []byte
var err error
extra.SetNamingStrategy(extra.LowerCaseWithUnderscores)
if eJSONBytes, err = jsoniter.Marshal(e); err != nil {
return 0, fmt.Errorf("failed to json serialize event: %v", err)
}

result, err := gojsonexplode.Explodejsonstr(string(eJSONBytes), "_")
if err != nil {
return 0, fmt.Errorf("failed to flatten json: %v", err)
}

written, err := w.Write([]byte(result))
return int64(written), err
}
55 changes: 54 additions & 1 deletion sinks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func ManufactureSink() (e EventSinkInterface) {
case "http":
url := viper.GetString("httpSinkUrl")
if url == "" {
panic("http sync specified but no httpSinkUrl")
panic("http sink specified but no httpSinkUrl")
}

// By default we buffer up to 1500 events, and drop messages if more than
Expand Down Expand Up @@ -75,6 +75,59 @@ func ManufactureSink() (e EventSinkInterface) {
panic(err.Error())
}
return e
case "s3sink":
accessKeyID := viper.GetString("s3SinkAccessKeyID")
if accessKeyID == "" {
panic("s3 sink specified but s3SinkAccessKeyID not specified")
}

secretAccessKey := viper.GetString("s3SinkSecretAccessKey")
if secretAccessKey == "" {
panic("s3 sink specified but s3SinkSecretAccessKey not specified")
}

region := viper.GetString("s3SinkRegion")
if region == "" {
panic("s3 sink specified but s3SinkRegion not specified")
}

bucket := viper.GetString("s3SinkBucket")
if bucket == "" {
panic("s3 sink specified but s3SinkBucket not specified")
}

bucketDir := viper.GetString("s3SinkBucketDir")
if bucketDir == "" {
panic("s3 sink specified but s3SinkBucketDir not specified")
}

// By default the json is pushed to s3 in not flatenned rfc5424 write format
// The option to write to s3 is in the flattened json format which will help in
// using the data in redshift with least effort
viper.SetDefault("s3SinkOutputFormat", "rfc5424")
outputFormat := viper.GetString("s3SinkOutputFormat")
if outputFormat != "rfc5424" && outputFormat != "flatjson" {
panic("s3 sink specified, but incorrect s3SinkOutputFormat specifed. Supported formats are: rfc5424 (default) and flatjson")
}

// By default we buffer up to 1500 events, and drop messages if more than
// 1500 have come in without getting consumed
viper.SetDefault("s3SinkBufferSize", 1500)
viper.SetDefault("s3SinkDiscardMessages", true)

viper.SetDefault("s3SinkUploadInterval", 120)
uploadInterval := viper.GetInt("s3SinkUploadInterval")

bufferSize := viper.GetInt("s3SinkBufferSize")
overflow := viper.GetBool("s3SinkDiscardMessages")

s, err := NewS3Sink(accessKeyID, secretAccessKey, region, bucket, bucketDir, uploadInterval, overflow, bufferSize, outputFormat)
if err != nil {
panic(err.Error())
}

go s.Run(make(chan bool))
return s
// case "logfile"
default:
err := errors.New("Invalid Sink Specified")
Expand Down
Loading

0 comments on commit de06a28

Please sign in to comment.