Skip to content

Commit

Permalink
add context to topic logs
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Feb 13, 2025
1 parent e27fd65 commit 190a2af
Show file tree
Hide file tree
Showing 24 changed files with 556 additions and 175 deletions.
2 changes: 2 additions & 0 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
logContext *context.Context,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
Expand All @@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
LogContext: logContext,
}, nil
}

Expand Down
14 changes: 12 additions & 2 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rawtopicwriter

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -36,6 +37,7 @@ type StreamWriter struct {
readMessagesCount int
writtenMessagesCount int
sessionID string
LogContext *context.Context
}

//nolint:funlen
Expand All @@ -52,7 +54,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
defer func() {
// defer needs for set good session id on first init response before trace the message
trace.TopicOnWriterReceiveGRPCMessage(
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
)
}()
if sendErr != nil {
Expand Down Expand Up @@ -141,7 +143,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {

err = w.Stream.Send(&protoMsg)
w.writtenMessagesCount++
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
trace.TopicOnWriterSentGRPCMessage(
w.Tracer,
w.LogContext,
w.InternalStreamID,
w.sessionID,
w.writtenMessagesCount,
&protoMsg,
err,
)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
}
Expand Down
20 changes: 12 additions & 8 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ func (c *Client) StartReader(
if err != nil {
return nil, err
}
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)

logCtx := internalReader.GetLogContext()
trace.TopicOnReaderStart(internalReader.Tracer(), &logCtx, internalReader.ID(), consumer, err)
internalReader.SetLogContext(logCtx)

return topicreader.NewReader(internalReader), nil
}
Expand Down Expand Up @@ -356,15 +359,16 @@ func (c *Client) createWriterConfig(
topicPath string,
opts []topicoptions.WriterOption,
) topicwriterinternal.WriterReconnectorConfig {
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
topicwriterinternal.RawTopicWriterStream,
error,
) {
return c.rawClient.StreamWrite(ctx, tracer)
}
//var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
// topicwriterinternal.RawTopicWriterStream,
// error,
//) {
// return c.rawClient.StreamWrite(ctx, tracer)
//}

options := []topicoptions.WriterOption{
topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithRawClient(&c.rawClient),
// topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithTopic(topicPath),
topicwriterinternal.WithCommonConfig(c.cfg.Common),
topicwriterinternal.WithTrace(c.cfg.Trace),
Expand Down
1 change: 0 additions & 1 deletion internal/topic/topiclistenerinternal/stream_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
}
}

//nolint:funlen
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {

Check failure on line 144 in internal/topic/topiclistenerinternal/stream_listener.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Function 'initStream' is too long (63 > 60) (funlen)
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
l.streamClose = streamClose
Expand Down
1 change: 1 addition & 0 deletions internal/topic/topicreadercommon/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {

onDone := trace.TopicOnReaderSendCommitMessage(
c.tracer,
&ctx,
&commits,
)
err := c.send(commits.ToRawMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ type batchedStreamReader interface {
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
CloseWithError(ctx context.Context, err error) error
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
GetLogContext() context.Context
SetLogContext(ctx context.Context)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ type Reader struct {
readerID int64
}

func (r *Reader) GetLogContext() context.Context {
return r.reader.GetLogContext()
}

Check failure on line 40 in internal/topic/topicreaderinternal/reader.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
func (r *Reader) SetLogContext(ctx context.Context) {
r.reader.SetLogContext(ctx)
}

type ReadMessageBatchOptions struct {
batcherGetOptions
}
Expand Down Expand Up @@ -89,14 +96,17 @@ func NewReader(
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
}

reader := newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
cfg.BaseContext,
cfg.RetrySettings,
cfg.Trace,
)

res := Reader{
reader: newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
cfg.RetrySettings,
cfg.Trace,
),
reader: reader,
defaultBatchConfig: cfg.DefaultBatchConfig,
tracer: cfg.Trace,
readerID: readerID,
Expand Down
Loading

0 comments on commit 190a2af

Please sign in to comment.