Skip to content

Commit

Permalink
Auditing with Meilisearch (#372)
Browse files Browse the repository at this point in the history
  • Loading branch information
vknabel authored Jan 27, 2023
1 parent 755db95 commit 7f9ddeb
Show file tree
Hide file tree
Showing 17 changed files with 471 additions and 23 deletions.
210 changes: 210 additions & 0 deletions cmd/metal-api/internal/auditing/auditing-interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package auditing

import (
"bytes"
"context"
"io"
"net/http"

"github.com/emicklei/go-restful/v3"
"github.com/google/uuid"
"github.com/metal-stack/metal-lib/rest"
"github.com/metal-stack/security"
"go.uber.org/zap"
"google.golang.org/grpc"
)

const (
Exclude string = "exclude-from-auditing"
)

func UnaryServerInterceptor(a Auditing, logger *zap.SugaredLogger, shouldAudit func(fullMethod string) bool) grpc.UnaryServerInterceptor {
if a == nil {
logger.Fatal("cannot use nil auditing to create unary server interceptor")
}
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if !shouldAudit(info.FullMethod) {
return handler(ctx, req)
}
requestID := uuid.New().String()
childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID)

auditReqContext := []any{
"rqid", requestID,
"method", info.FullMethod,
"kind", "grpc",
}
user := security.GetUserFromContext(ctx)
if user != nil {
auditReqContext = append(
auditReqContext,
"user", user.EMail,
"tenant", user.Tenant,
)
}
err = a.Index(auditReqContext...)
if err != nil {
return nil, err
}
resp, err = handler(childCtx, req)
if err != nil {
auditRespContext := append(auditReqContext, "err", err)
err2 := a.Index(auditRespContext...)
if err2 != nil {
logger.Errorf("unable to index error: %v", err2)
}
return nil, err
}
auditRespContext := append(auditReqContext, "resp", resp)
err = a.Index(auditRespContext...)
return resp, err
}
}

func StreamServerInterceptor(a Auditing, logger *zap.SugaredLogger, shouldAudit func(fullMethod string) bool) grpc.StreamServerInterceptor {
if a == nil {
logger.Fatal("cannot use nil auditing to create stream server interceptor")
}
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if !shouldAudit(info.FullMethod) {
return handler(srv, ss)
}
requestID := uuid.New().String()
auditReqContext := []any{
"rqid", requestID,
"method", info.FullMethod,
"kind", "grpc-stream",
}

user := security.GetUserFromContext(ss.Context())
if user != nil {
auditReqContext = append(
auditReqContext,
"user", user.EMail,
"tenant", user.Tenant,
)
}
err := a.Index(auditReqContext...)
if err != nil {
return err
}
err = handler(srv, ss)
if err != nil {
auditRespContext := append(auditReqContext, "err", err)
err2 := a.Index(auditRespContext...)
if err2 != nil {
logger.Errorf("unable to index error: %v", err2)
}
return err
}
auditRespContext := append(auditReqContext, "finished", true)
err = a.Index(auditRespContext...)
return err
}
}

func HttpFilter(a Auditing, logger *zap.SugaredLogger) restful.FilterFunction {
if a == nil {
logger.Fatal("cannot use nil auditing to create http middleware")
}
return func(request *restful.Request, response *restful.Response, chain *restful.FilterChain) {
r := request.Request

switch r.Method {
case http.MethodPost, http.MethodPut, http.MethodPatch, http.MethodDelete:
break
default:
chain.ProcessFilter(request, response)
return
}

excluded, ok := request.SelectedRoute().Metadata()[Exclude].(bool)
if ok && excluded {
logger.Debugw("excluded route from auditing through metadata annotation", "path", request.SelectedRoute().Path())
chain.ProcessFilter(request, response)
return
}

requestID := r.Context().Value(rest.RequestIDKey)
if requestID == nil {
requestID = uuid.New().String()
}
auditReqContext := []any{
"rqid", requestID,
"method", r.Method,
"path", r.URL.Path,
"forwarded-for", request.HeaderParameter("x-forwarded-for"),
"remote-addr", r.RemoteAddr,
}
user := security.GetUserFromContext(r.Context())
if user != nil {
auditReqContext = append(
auditReqContext,
"user", user.EMail,
"tenant", user.Tenant,
)
}

if r.Method != http.MethodGet && r.Body != nil {
bodyReader := r.Body
body, err := io.ReadAll(bodyReader)
r.Body = io.NopCloser(bytes.NewReader(body))
if err != nil {
logger.Errorf("unable to read request body: %v", err)
response.WriteHeader(http.StatusInternalServerError)
return
}
auditReqContext = append(auditReqContext, "body", string(body))
}

err := a.Index(auditReqContext...)
if err != nil {
logger.Errorf("unable to index error: %v", err)
response.WriteHeader(http.StatusInternalServerError)
return
}

bufferedResponseWriter := &bufferedHttpResponseWriter{
w: response.ResponseWriter,
}
response.ResponseWriter = bufferedResponseWriter

chain.ProcessFilter(request, response)

auditRespContext := append(auditReqContext,
"resp", bufferedResponseWriter.Content(),
"status-code", response.StatusCode(),
)
err = a.Index(auditRespContext...)
if err != nil {
logger.Errorf("unable to index error: %v", err)
response.WriteHeader(http.StatusInternalServerError)
return
}
}
}

type bufferedHttpResponseWriter struct {
w http.ResponseWriter

buf bytes.Buffer
header int
}

func (w *bufferedHttpResponseWriter) Header() http.Header {
return w.w.Header()
}

func (w *bufferedHttpResponseWriter) Write(b []byte) (int, error) {
(&w.buf).Write(b)
return w.w.Write(b)
}

func (w *bufferedHttpResponseWriter) WriteHeader(h int) {
w.header = h
w.w.WriteHeader(h)
}

func (w *bufferedHttpResponseWriter) Content() string {
return w.buf.String()
}
23 changes: 23 additions & 0 deletions cmd/metal-api/internal/auditing/auditing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package auditing

import "go.uber.org/zap"

type Config struct {
URL string
APIKey string
IndexPrefix string
RotationInterval Interval
Log *zap.SugaredLogger
}

type Interval string

var (
HourlyInterval Interval = "@hourly"
DailyInterval Interval = "@daily"
MonthlyInterval Interval = "@monthly"
)

type Auditing interface {
Index(...any) error
}
112 changes: 112 additions & 0 deletions cmd/metal-api/internal/auditing/meilisearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package auditing

import (
"fmt"
"time"

"github.com/google/uuid"
"github.com/meilisearch/meilisearch-go"
"github.com/robfig/cron"
"go.uber.org/zap"
)

type meiliAuditing struct {
client *meilisearch.Client
index *meilisearch.Index
log *zap.SugaredLogger
indexPrefix string
rotationInterval Interval
}

func New(c Config) (Auditing, error) {
client := meilisearch.NewClient(meilisearch.ClientConfig{
Host: c.URL,
APIKey: c.APIKey,
})
v, err := client.GetVersion()
if err != nil {
return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", c.URL, err)
}
c.Log.Infow("meilisearch", "connected to", v, "index rotated", c.RotationInterval)

index := client.Index(c.IndexPrefix)
if c.RotationInterval != "" {
index = client.Index(indexName(c.IndexPrefix, c.RotationInterval))
}

a := &meiliAuditing{
client: client,
index: index,
log: c.Log.Named("auditing"),
indexPrefix: c.IndexPrefix,
rotationInterval: c.RotationInterval,
}

if c.RotationInterval != "" {
// create a new Index every interval
cn := cron.New()
err := cn.AddFunc(string(c.RotationInterval), a.newIndex)
if err != nil {
return nil, err
}
cn.Start()
}
return a, nil
}

func (a *meiliAuditing) Index(keysAndValues ...any) error {
e := a.toMap(keysAndValues)
a.log.Debugw("index", "entry", e)
id := uuid.NewString()
e["id"] = id
e["timestamp"] = time.Now()
e["component"] = "metal-api"
documents := []map[string]any{e}

_, err := a.index.AddDocuments(documents, "id")
if err != nil {
a.log.Errorw("index", "error", err)
return err
}
return nil
}

func (a *meiliAuditing) newIndex() {
a.log.Debugw("auditing", "create new index", a.rotationInterval)
a.index = a.client.Index(indexName(a.indexPrefix, a.rotationInterval))
}

func indexName(prefix string, i Interval) string {
timeFormat := "2006-01-02"

switch i {
case HourlyInterval:
timeFormat = "2006-01-02_15"
case DailyInterval:
timeFormat = "2006-01-02"
case MonthlyInterval:
timeFormat = "2006-01"
}

indexName := prefix + "-" + time.Now().Format(timeFormat)
return indexName
}

func (a *meiliAuditing) toMap(args []any) map[string]any {
if len(args) == 0 {
return nil
}
if len(args)%2 != 0 {
a.log.Errorf("meilisearch pairs of key,value must be provided:%v, not processing", args...)
return nil
}
fields := make(map[string]any)
for i := 0; i < len(args); {
key, val := args[i], args[i+1]
if keyStr, ok := key.(string); ok {
fields[keyStr] = val
}
i += 2
}
return fields
}
Loading

0 comments on commit 7f9ddeb

Please sign in to comment.