Skip to content

Commit

Permalink
Merge pull request #1 from skit-ai/custom-labels
Browse files Browse the repository at this point in the history
Adds custom labels & fixes latency metric
  • Loading branch information
pskrunner14 authored Jun 30, 2023
2 parents a5446fa + e88ecfc commit 6ecd634
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 19 deletions.
2 changes: 1 addition & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"testing"
"time"

pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
"github.com/prometheus/client_golang/prometheus"
pb_testproto "github.com/skit-ai/go-grpc-prometheus/examples/testproto"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
Expand Down
4 changes: 2 additions & 2 deletions examples/grpc-server-with-prometheus/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

"google.golang.org/grpc"

"github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/grpc-ecosystem/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/skit-ai/go-grpc-prometheus"
pb "github.com/skit-ai/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions examples/grpc-server-with-prometheus/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

"google.golang.org/grpc"

"github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/grpc-ecosystem/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/skit-ai/go-grpc-prometheus"
pb "github.com/skit-ai/go-grpc-prometheus/examples/grpc-server-with-prometheus/protobuf"
)

// DemoServiceServer defines a Server.
Expand Down
16 changes: 15 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
module github.com/grpc-ecosystem/go-grpc-prometheus
module github.com/skit-ai/go-grpc-prometheus

go 1.18

require (
github.com/golang/protobuf v1.2.0
Expand All @@ -8,3 +10,15 @@ require (
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd
google.golang.org/grpc v1.18.0
)

require (
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 // indirect
github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a // indirect
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 // indirect
golang.org/x/text v0.3.0 // indirect
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect
)
39 changes: 32 additions & 7 deletions server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package grpc_prometheus

import (
"context"
"github.com/grpc-ecosystem/go-grpc-prometheus/packages/grpcstatus"
"fmt"
"strings"

prom "github.com/prometheus/client_golang/prometheus"
"github.com/skit-ai/go-grpc-prometheus/packages/grpcstatus"

"google.golang.org/grpc"
)
Expand Down Expand Up @@ -31,22 +34,22 @@ func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
opts.apply(prom.CounterOpts{
Name: "grpc_server_started_total",
Help: "Total number of RPCs started on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), []string{"grpc_type", "grpc_service", "grpc_method", "client_uuid", "flow_uuid"}),
serverHandledCounter: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_handled_total",
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code"}),
}), []string{"grpc_type", "grpc_service", "grpc_method", "grpc_code", "client_uuid", "flow_uuid"}),
serverStreamMsgReceived: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_received_total",
Help: "Total number of RPC stream messages received on the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), []string{"grpc_type", "grpc_service", "grpc_method", "client_uuid", "flow_uuid"}),
serverStreamMsgSent: prom.NewCounterVec(
opts.apply(prom.CounterOpts{
Name: "grpc_server_msg_sent_total",
Help: "Total number of gRPC stream messages sent by the server.",
}), []string{"grpc_type", "grpc_service", "grpc_method"}),
}), []string{"grpc_type", "grpc_service", "grpc_method", "client_uuid", "flow_uuid"}),
serverHandledHistogramEnabled: false,
serverHandledHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_handling_seconds",
Expand All @@ -68,7 +71,7 @@ func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
if !m.serverHandledHistogramEnabled {
m.serverHandledHistogram = prom.NewHistogramVec(
m.serverHandledHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
[]string{"grpc_type", "grpc_service", "grpc_method", "client_uuid", "flow_uuid"},
)
}
m.serverHandledHistogramEnabled = true
Expand Down Expand Up @@ -119,7 +122,7 @@ func (m *ServerMetrics) UnaryServerInterceptor() func(ctx context.Context, req i
func (m *ServerMetrics) StreamServerInterceptor() func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
monitor := newServerReporter(m, streamRPCType(info), info.FullMethod)
err := handler(srv, &monitoredServerStream{ss, monitor})
err := handler(srv, &monitoredServerStream{ss, monitor, true})
st, _ := grpcstatus.FromError(err)
monitor.Handled(st.Code())
return err
Expand Down Expand Up @@ -151,6 +154,7 @@ func streamRPCType(info *grpc.StreamServerInfo) grpcType {
type monitoredServerStream struct {
grpc.ServerStream
monitor *serverReporter
first bool
}

func (s *monitoredServerStream) SendMsg(m interface{}) error {
Expand All @@ -164,6 +168,27 @@ func (s *monitoredServerStream) SendMsg(m interface{}) error {
func (s *monitoredServerStream) RecvMsg(m interface{}) error {
err := s.ServerStream.RecvMsg(m)
if err == nil {
if s.first {
m_string := fmt.Sprint(m)

if strings.Contains(m_string, "client_uuid") {
splits := strings.Split(m_string, "client_uuid:")
if len(splits) > 1 {
client_uuid := strings.Split(splits[1], "\"")[1]
s.monitor.clientUuid = client_uuid
}
}

if strings.Contains(m_string, "flow_uuid") {
splits := strings.Split(m_string, "flow_uuid:")
if len(splits) > 1 {
flow_uuid := strings.Split(splits[1], "\"")[1]
s.monitor.flowUuid = flow_uuid
}
}

s.first = false
}
s.monitor.ReceivedMessage()
}
return err
Expand Down
15 changes: 10 additions & 5 deletions server_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type serverReporter struct {
serviceName string
methodName string
startTime time.Time
clientUuid string
flowUuid string
}

func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *serverReporter {
Expand All @@ -26,21 +28,24 @@ func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *s
r.startTime = time.Now()
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.serverStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, r.clientUuid, r.flowUuid).Inc()
return r
}

func (r *serverReporter) ReceivedMessage() {
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, r.clientUuid, r.flowUuid).Inc()
if r.metrics.serverHandledHistogramEnabled {
r.startTime = time.Now()
}
}

func (r *serverReporter) SentMessage() {
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, r.clientUuid, r.flowUuid).Inc()
}

func (r *serverReporter) Handled(code codes.Code) {
r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
r.metrics.serverHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String(), r.clientUuid, r.flowUuid).Inc()
if r.metrics.serverHandledHistogramEnabled {
r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
r.metrics.serverHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, r.clientUuid, r.flowUuid).Observe(time.Since(r.startTime).Seconds())
}
}
2 changes: 1 addition & 1 deletion server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/client_golang/prometheus/testutil"

pb_testproto "github.com/grpc-ecosystem/go-grpc-prometheus/examples/testproto"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
pb_testproto "github.com/skit-ai/go-grpc-prometheus/examples/testproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down

0 comments on commit 6ecd634

Please sign in to comment.