Skip to content

Commit

Permalink
add context to topic logs
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Feb 10, 2025
1 parent e5c21a8 commit e5def5a
Show file tree
Hide file tree
Showing 21 changed files with 310 additions and 141 deletions.
2 changes: 2 additions & 0 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
logContext *context.Context,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
Expand All @@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
LogContext: logContext,
}, nil
}

Expand Down
14 changes: 12 additions & 2 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rawtopicwriter

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -36,6 +37,7 @@ type StreamWriter struct {
readMessagesCount int
writtenMessagesCount int
sessionID string
LogContext *context.Context
}

//nolint:funlen
Expand All @@ -52,7 +54,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
defer func() {
// defer needs for set good session id on first init response before trace the message
trace.TopicOnWriterReceiveGRPCMessage(
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
)
}()
if sendErr != nil {
Expand Down Expand Up @@ -141,7 +143,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {

err = w.Stream.Send(&protoMsg)
w.writtenMessagesCount++
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
trace.TopicOnWriterSentGRPCMessage(
w.Tracer,
w.LogContext,
w.InternalStreamID,
w.sessionID,
w.writtenMessagesCount,
&protoMsg,
err,
)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
}
Expand Down
19 changes: 10 additions & 9 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,11 @@ func (c *Client) StartReader(
}
opts = append(defaultOpts, opts...)

internalReader, err := topicreaderinternal.NewReader(&c.rawClient, connector, consumer, readSelectors, opts...)
internalReader, logCtx, err := topicreaderinternal.NewReader(&c.rawClient, connector, consumer, readSelectors, opts...)
if err != nil {
return nil, err
}
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)
trace.TopicOnReaderStart(internalReader.Tracer(), logCtx, internalReader.ID(), consumer, err)

return topicreader.NewReader(internalReader), nil
}
Expand Down Expand Up @@ -356,15 +356,16 @@ func (c *Client) createWriterConfig(
topicPath string,
opts []topicoptions.WriterOption,
) topicwriterinternal.WriterReconnectorConfig {
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
topicwriterinternal.RawTopicWriterStream,
error,
) {
return c.rawClient.StreamWrite(ctx, tracer)
}
//var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
// topicwriterinternal.RawTopicWriterStream,
// error,
//) {
// return c.rawClient.StreamWrite(ctx, tracer)
//}

options := []topicoptions.WriterOption{
topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithRawClient(&c.rawClient),
// topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithTopic(topicPath),
topicwriterinternal.WithCommonConfig(c.cfg.Common),
topicwriterinternal.WithTrace(c.cfg.Trace),
Expand Down
1 change: 0 additions & 1 deletion internal/topic/topiclistenerinternal/stream_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) {
}
}

//nolint:funlen
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {

Check failure on line 144 in internal/topic/topiclistenerinternal/stream_listener.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Function 'initStream' is too long (63 > 60) (funlen)
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
l.streamClose = streamClose
Expand Down
1 change: 1 addition & 0 deletions internal/topic/topicreadercommon/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {

onDone := trace.TopicOnReaderSendCommitMessage(
c.tracer,
&ctx,
&commits,
)
err := c.send(commits.ToRawMessage())
Expand Down
23 changes: 13 additions & 10 deletions internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ func NewReader(
consumer string,
readSelectors []topicreadercommon.PublicReadSelector,
opts ...PublicReaderOption,
) (Reader, error) {
) (Reader, *context.Context, error) {
cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...)

if errs := cfg.Validate(); len(errs) > 0 {
return Reader{}, xerrors.WithStackTrace(fmt.Errorf(
return Reader{}, nil, xerrors.WithStackTrace(fmt.Errorf(
"ydb: failed to start topic reader, because is contains error in config: %w",
errors.Join(errs...),
))
Expand All @@ -89,20 +89,23 @@ func NewReader(
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
}

reader := newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
&cfg.BaseContext,
cfg.RetrySettings,
cfg.Trace,
)

res := Reader{
reader: newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
cfg.RetrySettings,
cfg.Trace,
),
reader: reader,
defaultBatchConfig: cfg.DefaultBatchConfig,
tracer: cfg.Trace,
readerID: readerID,
}

return res, nil
return res, reader.logContext, nil
}

func (r *Reader) WaitInit(ctx context.Context) error {
Expand Down
27 changes: 15 additions & 12 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange topicreadercommon.C
func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error {
err := r.stream.Send(msg)
if err != nil {
trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err)
trace.TopicOnReaderError(r.cfg.Trace, &r.ctx, r.readConnectionID, err)
_ = r.CloseWithError(r.ctx, err)
}

Expand Down Expand Up @@ -613,7 +613,7 @@ func (r *topicStreamReaderImpl) setStarted() error {
func (r *topicStreamReaderImpl) initSession() (err error) {
initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors)

onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage)
onDone := trace.TopicOnReaderInit(r.cfg.Trace, &r.ctx, r.readConnectionID, initMessage)
defer func() {
onDone(r.readConnectionID, err)
}()
Expand Down Expand Up @@ -664,9 +664,9 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
for {
serverMessage, err := r.stream.Recv()
if err != nil {
trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err)
trace.TopicOnReaderError(r.cfg.Trace, &ctx, r.readConnectionID, err)
if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) {
trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, r.readConnectionID, err)
trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, &ctx, r.readConnectionID, err)
// new messages can be added to protocol, it must be backward compatible to old programs
// and skip message is safe
continue
Expand All @@ -687,7 +687,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {

switch m := serverMessage.(type) {
case *rawtopicreader.ReadResponse:
if err = r.onReadResponse(m); err != nil {
if err = r.onReadResponse(ctx, m); err != nil {
_ = r.CloseWithError(ctx, err)
}
case *rawtopicreader.StartPartitionSessionRequest:
Expand All @@ -703,7 +703,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
return
}
case *rawtopicreader.CommitOffsetResponse:
if err = r.onCommitResponse(m); err != nil {
if err = r.onCommitResponse(ctx, m); err != nil {
_ = r.CloseWithError(ctx, err)

return
Expand All @@ -714,6 +714,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) {
default:
trace.TopicOnReaderUnknownGrpcMessage(
r.cfg.Trace,
&ctx,
r.readConnectionID,
xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf(
"ydb: unexpected message type in stream reader: %v",
Expand Down Expand Up @@ -753,7 +754,7 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) {
}

resCapacity := r.addRestBufferBytes(sum)
trace.TopicOnReaderSentDataRequest(r.cfg.Trace, r.readConnectionID, sum, resCapacity)
trace.TopicOnReaderSentDataRequest(r.cfg.Trace, &ctx, r.readConnectionID, sum, resCapacity)
if err := r.sendDataRequest(sum); err != nil {
return
}
Expand Down Expand Up @@ -791,9 +792,9 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) {
}
}

func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) {
func (r *topicStreamReaderImpl) onReadResponse(ctx context.Context, msg *rawtopicreader.ReadResponse) (err error) {
resCapacity := r.addRestBufferBytes(-msg.BytesSize)
onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, r.readConnectionID, resCapacity, msg)
onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &ctx, r.readConnectionID, resCapacity, msg)
defer func() {
onDone(err)
}()
Expand All @@ -813,7 +814,7 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse)
}

func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) {
onDone := trace.TopicOnReaderClose(r.cfg.Trace, r.readConnectionID, reason)
onDone := trace.TopicOnReaderClose(r.cfg.Trace, &ctx, r.readConnectionID, reason)
defer onDone(closeErr)

isFirstClose := false
Expand Down Expand Up @@ -853,7 +854,7 @@ func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error
return closeErr
}

func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffsetResponse) error {
func (r *topicStreamReaderImpl) onCommitResponse(ctx context.Context, msg *rawtopicreader.CommitOffsetResponse) error {
for i := range msg.PartitionsCommittedOffsets {
commit := &msg.PartitionsCommittedOffsets[i]
partition, err := r.sessionController.Get(commit.PartitionSessionID)
Expand All @@ -864,6 +865,7 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse

trace.TopicOnReaderCommittedNotify(
r.cfg.Trace,
&ctx,
r.readConnectionID,
partition.Topic,
partition.PartitionID,
Expand All @@ -880,10 +882,11 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse
func (r *topicStreamReaderImpl) updateToken(ctx context.Context) {
onUpdateToken := trace.TopicOnReaderUpdateToken(
r.cfg.Trace,
&ctx,
r.readConnectionID,
)
token, err := r.cfg.Cred.Token(ctx)
onSent := onUpdateToken(len(token), err)
onSent := onUpdateToken(&ctx, len(token), err)
if err != nil {
return
}
Expand Down
15 changes: 9 additions & 6 deletions internal/topic/topicreaderinternal/stream_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error)
type readerReconnector struct {
background background.Worker
clock clockwork.Clock
logContext *context.Context
retrySettings topic.RetrySettings
streamVal batchedStreamReader
streamContextCancel context.CancelCauseFunc
Expand All @@ -55,6 +56,7 @@ func newReaderReconnector(
readerID int64,
connector readerConnectFunc,
connectTimeout time.Duration,
logContext *context.Context,
retrySettings topic.RetrySettings,
tracer *trace.Topic,
) *readerReconnector {
Expand All @@ -64,6 +66,7 @@ func newReaderReconnector(
readerConnect: connector,
streamErr: errUnconnected,
connectTimeout: connectTimeout,
logContext: logContext,
tracer: tracer,
retrySettings: retrySettings,
}
Expand Down Expand Up @@ -263,7 +266,7 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {
}
}

onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason)
onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, &ctx, request.reason)

if request.reason != nil {
retryBackoff, stopRetryReason := r.checkErrRetryMode(
Expand Down Expand Up @@ -299,7 +302,7 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) {

//nolint:funlen
func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldReader batchedStreamReader) (err error) {
onDone := trace.TopicOnReaderReconnect(r.tracer, reason)
onDone := trace.TopicOnReaderReconnect(r.tracer, &ctx, reason)
defer func() { onDone(err) }()

if err = ctx.Err(); err != nil {
Expand Down Expand Up @@ -339,9 +342,9 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead
r.background.Start("ydb topic reader send reconnect message", func(ctx context.Context) {
select {
case r.reconnectFromBadStream <- newReconnectRequest(oldReader, sendReason):
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
trace.TopicOnReaderReconnectRequest(r.tracer, &ctx, err, true)
case <-ctx.Done():
trace.TopicOnReaderReconnectRequest(r.tracer, ctx.Err(), false)
trace.TopicOnReaderReconnectRequest(r.tracer, &ctx, ctx.Err(), false)
}
})
default:
Expand Down Expand Up @@ -441,10 +444,10 @@ func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamRe
select {
case r.reconnectFromBadStream <- newReconnectRequest(stream, err):
// send signal
trace.TopicOnReaderReconnectRequest(r.tracer, err, true)
trace.TopicOnReaderReconnectRequest(r.tracer, r.logContext, err, true)
default:
// previous reconnect signal in process, no need sent signal more
trace.TopicOnReaderReconnectRequest(r.tracer, err, false)
trace.TopicOnReaderReconnectRequest(r.tracer, r.logContext, err, false)
}
}

Expand Down
6 changes: 6 additions & 0 deletions internal/topic/topicwriterinternal/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package topicwriterinternal
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"sync"
Expand Down Expand Up @@ -164,6 +165,7 @@ type EncoderSelector struct {
parallelCompressors int
batchCounter int
measureIntervalBatches int
logContext *context.Context
}

func NewEncoderSelector(
Expand All @@ -172,6 +174,7 @@ func NewEncoderSelector(
parallelCompressors int,
tracer *trace.Topic,
writerReconnectorID, sessionID string,
logContext *context.Context,
) EncoderSelector {
if parallelCompressors <= 0 {
panic("ydb: need leas one allowed compressor")
Expand All @@ -184,6 +187,7 @@ func NewEncoderSelector(
tracer: tracer,
writerReconnectorID: writerReconnectorID,
sessionID: sessionID,
logContext: logContext,
}
res.ResetAllowedCodecs(allowedCodecs)

Expand All @@ -195,6 +199,7 @@ func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (r
if err == nil {
onCompressDone := trace.TopicOnWriterCompressMessages(
s.tracer,
s.logContext,
s.writerReconnectorID,
s.sessionID,
codec.ToInt32(),
Expand Down Expand Up @@ -265,6 +270,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt
}
onCompressDone := trace.TopicOnWriterCompressMessages(
s.tracer,
s.logContext,
s.writerReconnectorID,
s.sessionID,
codec.ToInt32(),
Expand Down
Loading

0 comments on commit e5def5a

Please sign in to comment.