Skip to content

Commit

Permalink
added rockset sink
Browse files Browse the repository at this point in the history
Signed-off-by: Rui Aguiar <[email protected]>

added workspace and vendor folder

Signed-off-by: Rui Aguiar <[email protected]>

fixed import

Signed-off-by: Rui Aguiar <[email protected]>

updated config to have rockset workspace

Signed-off-by: Rui Aguiar <[email protected]>

took out print statments

Signed-off-by: Rui Aguiar <[email protected]>
  • Loading branch information
Rui Aguiar committed Aug 13, 2019
1 parent 7310ab6 commit 5655e74
Show file tree
Hide file tree
Showing 110 changed files with 8,814 additions and 5 deletions.
24 changes: 22 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,7 @@
[[constraint]]
name = "github.com/influxdata/influxdb"
version = "1.7.4"

[[constraint]]
name = "github.com/rockset/rockset-go-client"
version = "0.6.0"
3 changes: 3 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"httpSinkUrl": "http://localhost:8080",
"httpSinkBufferSize": 1500,
"httpSinkDiscardMessages": true,
"rocksetAPIKey": "",
"rocksetCollectionName": "",
"rocksetWorkspaceName": "",
"s3SinkAccessKeyID": "",
"s3SinkSecretAccessKey": "",
"s3SinkRegion": "ap-south-1",
Expand Down
2 changes: 1 addition & 1 deletion eventrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/heptiolabs/eventrouter/sinks"
"github.com/prometheus/client_golang/prometheus"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
Expand Down
18 changes: 16 additions & 2 deletions sinks/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (

"github.com/golang/glog"
"github.com/spf13/viper"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

// EventSinkInterface is the interface used to shunt events
Expand Down Expand Up @@ -181,6 +180,21 @@ func ManufactureSink() (e EventSinkInterface) {
panic(err.Error())
}
return influx
case "rockset":
rocksetAPIKey := viper.GetString("rocksetAPIKey")
if rocksetAPIKey == "" {
panic("Rockset sink specified but rocksetAPIKey not specified")
}

rocksetCollectionName := viper.GetString("rocksetCollectionName")
if rocksetCollectionName == "" {
panic("Rockset sink specified but rocksetCollectionName not specified")
}
rocksetWorkspaceName := viper.GetString("rocksetWorkspaceName")
if rocksetCollectionName == "" {
panic("Rockset sink specified but rocksetWorkspaceName not specified")
}
e = NewRocksetSink(rocksetAPIKey, rocksetCollectionName, rocksetWorkspaceName)
// case "logfile"
default:
err := errors.New("Invalid Sink Specified")
Expand Down
70 changes: 70 additions & 0 deletions sinks/rocksetsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2019 The Contributors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package sinks

import (
"encoding/json"
"fmt"
"os"

apiclient "github.com/rockset/rockset-go-client"
models "github.com/rockset/rockset-go-client/lib/go"
v1 "k8s.io/api/core/v1"
)

/*
RocksetSink is a sink that uploads the kubernetes events as json object
and converts them to documents inside of a Rockset collection.
Rockset can later be used with
many different connectors such as Tableau or Redash to use this data.
*/
type RocksetSink struct {
client *apiclient.RockClient
rocksetCollectionName string
rocksetWorkspaceName string
}

// NewRocksetSink will create a new RocksetSink with default options, returned as
// an EventSinkInterface
func NewRocksetSink(rocksetAPIKey string, rocksetCollectionName string, rocksetWorkspaceName string) EventSinkInterface {
client := apiclient.Client(rocksetAPIKey, "https://api.rs2.usw2.rockset.com")
return &RocksetSink{
client: client,
rocksetCollectionName: rocksetCollectionName,
rocksetWorkspaceName: rocksetWorkspaceName,
}
}

// UpdateEvents implements the EventSinkInterface
func (rs *RocksetSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) {
eData := NewEventData(eNew, eOld)

if eJSONBytes, err := json.Marshal(eData); err == nil {
var m map[string]interface{}
json.Unmarshal(eJSONBytes, &m)
docs := []interface{}{
m,
}
dinfo := models.AddDocumentsRequest{
Data: docs,
}
rs.client.Documents.Add(rs.rocksetWorkspaceName, rs.rocksetCollectionName, dinfo)
} else {
fmt.Fprintf(os.Stderr, "Failed to json serialize event: %v", err)
}
}
70 changes: 70 additions & 0 deletions tests/rockset/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2017 Heptio Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"eventrouter/sinks"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"

ref "k8s.io/client-go/tools/reference"
)

func main() {
testPod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
},
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/api/version/pods/foo",
Name: "foo",
Namespace: "baz",
UID: "bar",
},
Spec: v1.PodSpec{},
}
podRef, err := ref.GetReference(scheme.Scheme, testPod)
if err != nil {
panic(err.Error())
}

evt := makeFakeEvent(podRef, v1.EventTypeWarning, "CreateInCluster", "Fake pod creation event")

sink := sinks.NewRocksetSink("key", "collection", "commons")

sink.UpdateEvents(evt, nil)
}

func makeFakeEvent(ref *v1.ObjectReference, eventtype, reason, message string) *v1.Event {
tm := metav1.Time{
Time: time.Now(),
}
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, tm.UnixNano()),
Namespace: ref.Namespace,
},
InvolvedObject: *ref,
Reason: reason,
Message: message,
FirstTimestamp: tm,
LastTimestamp: tm,
Count: 1,
Type: eventtype,
}
}
15 changes: 15 additions & 0 deletions vendor/github.com/rockset/rockset-go-client/CHANGELOG.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions vendor/github.com/rockset/rockset-go-client/Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions vendor/github.com/rockset/rockset-go-client/Gopkg.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5655e74

Please sign in to comment.