From e3253eed59e2b1efa0cbc694ef7598559f9778ff Mon Sep 17 00:00:00 2001 From: Aleksei Pleshakov Date: Thu, 6 Feb 2025 16:09:31 +0100 Subject: [PATCH] add context to topic logs --- internal/grpcwrapper/rawtopic/client.go | 2 + .../rawtopic/rawtopicwriter/streamwriter.go | 14 +- internal/topic/topicclientinternal/client.go | 19 +-- .../topiclistenerinternal/stream_listener.go | 1 - internal/topic/topicreadercommon/committer.go | 1 + internal/topic/topicreaderinternal/reader.go | 23 ++-- .../topicreaderinternal/stream_reader_impl.go | 27 ++-- .../topicreaderinternal/stream_reconnector.go | 15 ++- .../topic/topicwriterinternal/encoders.go | 6 + .../topicwriterinternal/encoders_test.go | 19 ++- .../topicwriterinternal/writer_config.go | 4 + .../topicwriterinternal/writer_options.go | 9 ++ .../topicwriterinternal/writer_reconnector.go | 20 ++- .../writer_single_stream.go | 11 +- log/context.go | 4 + log/topic.go | 59 ++++----- metrics/driver.go | 20 +-- topic/topicoptions/topicoptions_reader.go | 9 ++ topic/topicoptions/topicoptions_writer.go | 9 ++ trace/topic.go | 52 ++++++-- trace/topic_gtrace.go | 125 +++++++++++++----- 21 files changed, 308 insertions(+), 141 deletions(-) diff --git a/internal/grpcwrapper/rawtopic/client.go b/internal/grpcwrapper/rawtopic/client.go index 203622e85..c2a8308a4 100644 --- a/internal/grpcwrapper/rawtopic/client.go +++ b/internal/grpcwrapper/rawtopic/client.go @@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S func (c *Client) StreamWrite( ctxStreamLifeTime context.Context, tracer *trace.Topic, + logContext *context.Context, ) (*rawtopicwriter.StreamWriter, error) { protoResp, err := c.service.StreamWrite(ctxStreamLifeTime) if err != nil { @@ -118,6 +119,7 @@ func (c *Client) StreamWrite( Stream: protoResp, Tracer: tracer, InternalStreamID: uuid.New().String(), + LogContext: logContext, }, nil } diff --git a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go index 920683c38..b4c9bc65f 100644 --- a/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go +++ b/internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go @@ -1,6 +1,7 @@ package rawtopicwriter import ( + "context" "errors" "fmt" "reflect" @@ -36,6 +37,7 @@ type StreamWriter struct { readMessagesCount int writtenMessagesCount int sessionID string + LogContext *context.Context } //nolint:funlen @@ -52,7 +54,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) { defer func() { // defer needs for set good session id on first init response before trace the message trace.TopicOnWriterReceiveGRPCMessage( - w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, + w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr, ) }() if sendErr != nil { @@ -141,7 +143,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) { err = w.Stream.Send(&protoMsg) w.writtenMessagesCount++ - trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err) + trace.TopicOnWriterSentGRPCMessage( + w.Tracer, + w.LogContext, + w.InternalStreamID, + w.sessionID, + w.writtenMessagesCount, + &protoMsg, + err, + ) if err != nil { return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err))) } diff --git a/internal/topic/topicclientinternal/client.go b/internal/topic/topicclientinternal/client.go index cb4672c25..3854189db 100644 --- a/internal/topic/topicclientinternal/client.go +++ b/internal/topic/topicclientinternal/client.go @@ -311,11 +311,11 @@ func (c *Client) StartReader( } opts = append(defaultOpts, opts...) - internalReader, err := topicreaderinternal.NewReader(&c.rawClient, connector, consumer, readSelectors, opts...) + internalReader, logCtx, err := topicreaderinternal.NewReader(&c.rawClient, connector, consumer, readSelectors, opts...) if err != nil { return nil, err } - trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err) + trace.TopicOnReaderStart(internalReader.Tracer(), logCtx, internalReader.ID(), consumer, err) return topicreader.NewReader(internalReader), nil } @@ -356,15 +356,16 @@ func (c *Client) createWriterConfig( topicPath string, opts []topicoptions.WriterOption, ) topicwriterinternal.WriterReconnectorConfig { - var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( - topicwriterinternal.RawTopicWriterStream, - error, - ) { - return c.rawClient.StreamWrite(ctx, tracer) - } + //var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( + // topicwriterinternal.RawTopicWriterStream, + // error, + //) { + // return c.rawClient.StreamWrite(ctx, tracer) + //} options := []topicoptions.WriterOption{ - topicwriterinternal.WithConnectFunc(connector), + topicwriterinternal.WithRawClient(&c.rawClient), + // topicwriterinternal.WithConnectFunc(connector), topicwriterinternal.WithTopic(topicPath), topicwriterinternal.WithCommonConfig(c.cfg.Common), topicwriterinternal.WithTrace(c.cfg.Trace), diff --git a/internal/topic/topiclistenerinternal/stream_listener.go b/internal/topic/topiclistenerinternal/stream_listener.go index d86f5d593..e9e870ed1 100644 --- a/internal/topic/topiclistenerinternal/stream_listener.go +++ b/internal/topic/topiclistenerinternal/stream_listener.go @@ -141,7 +141,6 @@ func (l *streamListener) initVars(sessionIDCounter *atomic.Int64) { } } -//nolint:funlen func (l *streamListener) initStream(ctx context.Context, client TopicClient) error { streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx)) l.streamClose = streamClose diff --git a/internal/topic/topicreadercommon/committer.go b/internal/topic/topicreadercommon/committer.go index e27ab6d6e..1ae0aca22 100644 --- a/internal/topic/topicreadercommon/committer.go +++ b/internal/topic/topicreadercommon/committer.go @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) { onDone := trace.TopicOnReaderSendCommitMessage( c.tracer, + &ctx, &commits, ) err := c.send(commits.ToRawMessage()) diff --git a/internal/topic/topicreaderinternal/reader.go b/internal/topic/topicreaderinternal/reader.go index 1b7e995c9..baa7c6c4a 100644 --- a/internal/topic/topicreaderinternal/reader.go +++ b/internal/topic/topicreaderinternal/reader.go @@ -68,11 +68,11 @@ func NewReader( consumer string, readSelectors []topicreadercommon.PublicReadSelector, opts ...PublicReaderOption, -) (Reader, error) { +) (Reader, *context.Context, error) { cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...) if errs := cfg.Validate(); len(errs) > 0 { - return Reader{}, xerrors.WithStackTrace(fmt.Errorf( + return Reader{}, nil, xerrors.WithStackTrace(fmt.Errorf( "ydb: failed to start topic reader, because is contains error in config: %w", errors.Join(errs...), )) @@ -89,20 +89,23 @@ func NewReader( return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig) } + reader := newReaderReconnector( + readerID, + readerConnector, + cfg.OperationTimeout(), + &cfg.BaseContext, + cfg.RetrySettings, + cfg.Trace, + ) + res := Reader{ - reader: newReaderReconnector( - readerID, - readerConnector, - cfg.OperationTimeout(), - cfg.RetrySettings, - cfg.Trace, - ), + reader: reader, defaultBatchConfig: cfg.DefaultBatchConfig, tracer: cfg.Trace, readerID: readerID, } - return res, nil + return res, reader.logContext, nil } func (r *Reader) WaitInit(ctx context.Context) error { diff --git a/internal/topic/topicreaderinternal/stream_reader_impl.go b/internal/topic/topicreaderinternal/stream_reader_impl.go index 64f8716c7..2fd165293 100644 --- a/internal/topic/topicreaderinternal/stream_reader_impl.go +++ b/internal/topic/topicreaderinternal/stream_reader_impl.go @@ -574,7 +574,7 @@ func (r *topicStreamReaderImpl) checkCommitRange(commitRange topicreadercommon.C func (r *topicStreamReaderImpl) send(msg rawtopicreader.ClientMessage) error { err := r.stream.Send(msg) if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + trace.TopicOnReaderError(r.cfg.Trace, &r.ctx, r.readConnectionID, err) _ = r.CloseWithError(r.ctx, err) } @@ -613,7 +613,7 @@ func (r *topicStreamReaderImpl) setStarted() error { func (r *topicStreamReaderImpl) initSession() (err error) { initMessage := topicreadercommon.CreateInitMessage(r.cfg.Consumer, r.cfg.ReadSelectors) - onDone := trace.TopicOnReaderInit(r.cfg.Trace, r.readConnectionID, initMessage) + onDone := trace.TopicOnReaderInit(r.cfg.Trace, &r.ctx, r.readConnectionID, initMessage) defer func() { onDone(r.readConnectionID, err) }() @@ -664,9 +664,9 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { for { serverMessage, err := r.stream.Recv() if err != nil { - trace.TopicOnReaderError(r.cfg.Trace, r.readConnectionID, err) + trace.TopicOnReaderError(r.cfg.Trace, &ctx, r.readConnectionID, err) if errors.Is(err, rawtopicreader.ErrUnexpectedMessageType) { - trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, r.readConnectionID, err) + trace.TopicOnReaderUnknownGrpcMessage(r.cfg.Trace, &ctx, r.readConnectionID, err) // new messages can be added to protocol, it must be backward compatible to old programs // and skip message is safe continue @@ -687,7 +687,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { switch m := serverMessage.(type) { case *rawtopicreader.ReadResponse: - if err = r.onReadResponse(m); err != nil { + if err = r.onReadResponse(ctx, m); err != nil { _ = r.CloseWithError(ctx, err) } case *rawtopicreader.StartPartitionSessionRequest: @@ -703,7 +703,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { return } case *rawtopicreader.CommitOffsetResponse: - if err = r.onCommitResponse(m); err != nil { + if err = r.onCommitResponse(ctx, m); err != nil { _ = r.CloseWithError(ctx, err) return @@ -714,6 +714,7 @@ func (r *topicStreamReaderImpl) readMessagesLoop(ctx context.Context) { default: trace.TopicOnReaderUnknownGrpcMessage( r.cfg.Trace, + &ctx, r.readConnectionID, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( "ydb: unexpected message type in stream reader: %v", @@ -753,7 +754,7 @@ func (r *topicStreamReaderImpl) dataRequestLoop(ctx context.Context) { } resCapacity := r.addRestBufferBytes(sum) - trace.TopicOnReaderSentDataRequest(r.cfg.Trace, r.readConnectionID, sum, resCapacity) + trace.TopicOnReaderSentDataRequest(r.cfg.Trace, &ctx, r.readConnectionID, sum, resCapacity) if err := r.sendDataRequest(sum); err != nil { return } @@ -791,9 +792,9 @@ func (r *topicStreamReaderImpl) updateTokenLoop(ctx context.Context) { } } -func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) (err error) { +func (r *topicStreamReaderImpl) onReadResponse(ctx context.Context, msg *rawtopicreader.ReadResponse) (err error) { resCapacity := r.addRestBufferBytes(-msg.BytesSize) - onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, r.readConnectionID, resCapacity, msg) + onDone := trace.TopicOnReaderReceiveDataResponse(r.cfg.Trace, &ctx, r.readConnectionID, resCapacity, msg) defer func() { onDone(err) }() @@ -813,7 +814,7 @@ func (r *topicStreamReaderImpl) onReadResponse(msg *rawtopicreader.ReadResponse) } func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error) (closeErr error) { - onDone := trace.TopicOnReaderClose(r.cfg.Trace, r.readConnectionID, reason) + onDone := trace.TopicOnReaderClose(r.cfg.Trace, &ctx, r.readConnectionID, reason) defer onDone(closeErr) isFirstClose := false @@ -853,7 +854,7 @@ func (r *topicStreamReaderImpl) CloseWithError(ctx context.Context, reason error return closeErr } -func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffsetResponse) error { +func (r *topicStreamReaderImpl) onCommitResponse(ctx context.Context, msg *rawtopicreader.CommitOffsetResponse) error { for i := range msg.PartitionsCommittedOffsets { commit := &msg.PartitionsCommittedOffsets[i] partition, err := r.sessionController.Get(commit.PartitionSessionID) @@ -864,6 +865,7 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse trace.TopicOnReaderCommittedNotify( r.cfg.Trace, + &ctx, r.readConnectionID, partition.Topic, partition.PartitionID, @@ -880,10 +882,11 @@ func (r *topicStreamReaderImpl) onCommitResponse(msg *rawtopicreader.CommitOffse func (r *topicStreamReaderImpl) updateToken(ctx context.Context) { onUpdateToken := trace.TopicOnReaderUpdateToken( r.cfg.Trace, + &ctx, r.readConnectionID, ) token, err := r.cfg.Cred.Token(ctx) - onSent := onUpdateToken(len(token), err) + onSent := onUpdateToken(&ctx, len(token), err) if err != nil { return } diff --git a/internal/topic/topicreaderinternal/stream_reconnector.go b/internal/topic/topicreaderinternal/stream_reconnector.go index 1f41cb39b..3bf51a9ae 100644 --- a/internal/topic/topicreaderinternal/stream_reconnector.go +++ b/internal/topic/topicreaderinternal/stream_reconnector.go @@ -34,6 +34,7 @@ type readerConnectFunc func(ctx context.Context) (batchedStreamReader, error) type readerReconnector struct { background background.Worker clock clockwork.Clock + logContext *context.Context retrySettings topic.RetrySettings streamVal batchedStreamReader streamContextCancel context.CancelCauseFunc @@ -55,6 +56,7 @@ func newReaderReconnector( readerID int64, connector readerConnectFunc, connectTimeout time.Duration, + logContext *context.Context, retrySettings topic.RetrySettings, tracer *trace.Topic, ) *readerReconnector { @@ -64,6 +66,7 @@ func newReaderReconnector( readerConnect: connector, streamErr: errUnconnected, connectTimeout: connectTimeout, + logContext: logContext, tracer: tracer, retrySettings: retrySettings, } @@ -263,7 +266,7 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { } } - onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, request.reason) + onReconnectionDone := trace.TopicOnReaderReconnect(r.tracer, &ctx, request.reason) if request.reason != nil { retryBackoff, stopRetryReason := r.checkErrRetryMode( @@ -299,7 +302,7 @@ func (r *readerReconnector) reconnectionLoop(ctx context.Context) { //nolint:funlen func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldReader batchedStreamReader) (err error) { - onDone := trace.TopicOnReaderReconnect(r.tracer, reason) + onDone := trace.TopicOnReaderReconnect(r.tracer, &ctx, reason) defer func() { onDone(err) }() if err = ctx.Err(); err != nil { @@ -339,9 +342,9 @@ func (r *readerReconnector) reconnect(ctx context.Context, reason error, oldRead r.background.Start("ydb topic reader send reconnect message", func(ctx context.Context) { select { case r.reconnectFromBadStream <- newReconnectRequest(oldReader, sendReason): - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + trace.TopicOnReaderReconnectRequest(r.tracer, &ctx, err, true) case <-ctx.Done(): - trace.TopicOnReaderReconnectRequest(r.tracer, ctx.Err(), false) + trace.TopicOnReaderReconnectRequest(r.tracer, &ctx, ctx.Err(), false) } }) default: @@ -441,10 +444,10 @@ func (r *readerReconnector) fireReconnectOnRetryableError(stream batchedStreamRe select { case r.reconnectFromBadStream <- newReconnectRequest(stream, err): // send signal - trace.TopicOnReaderReconnectRequest(r.tracer, err, true) + trace.TopicOnReaderReconnectRequest(r.tracer, r.logContext, err, true) default: // previous reconnect signal in process, no need sent signal more - trace.TopicOnReaderReconnectRequest(r.tracer, err, false) + trace.TopicOnReaderReconnectRequest(r.tracer, r.logContext, err, false) } } diff --git a/internal/topic/topicwriterinternal/encoders.go b/internal/topic/topicwriterinternal/encoders.go index 8bd5ea17b..adc369586 100644 --- a/internal/topic/topicwriterinternal/encoders.go +++ b/internal/topic/topicwriterinternal/encoders.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "sync" @@ -164,6 +165,7 @@ type EncoderSelector struct { parallelCompressors int batchCounter int measureIntervalBatches int + logContext *context.Context } func NewEncoderSelector( @@ -172,6 +174,7 @@ func NewEncoderSelector( parallelCompressors int, tracer *trace.Topic, writerReconnectorID, sessionID string, + logContext *context.Context, ) EncoderSelector { if parallelCompressors <= 0 { panic("ydb: need leas one allowed compressor") @@ -184,6 +187,7 @@ func NewEncoderSelector( tracer: tracer, writerReconnectorID: writerReconnectorID, sessionID: sessionID, + logContext: logContext, } res.ResetAllowedCodecs(allowedCodecs) @@ -195,6 +199,7 @@ func (s *EncoderSelector) CompressMessages(messages []messageWithDataContent) (r if err == nil { onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + s.logContext, s.writerReconnectorID, s.sessionID, codec.ToInt32(), @@ -265,6 +270,7 @@ func (s *EncoderSelector) measureCodecs(messages []messageWithDataContent) (rawt } onCompressDone := trace.TopicOnWriterCompressMessages( s.tracer, + s.logContext, s.writerReconnectorID, s.sessionID, codec.ToInt32(), diff --git a/internal/topic/topicwriterinternal/encoders_test.go b/internal/topic/topicwriterinternal/encoders_test.go index b2bf7bd97..7a790f908 100644 --- a/internal/topic/topicwriterinternal/encoders_test.go +++ b/internal/topic/topicwriterinternal/encoders_test.go @@ -3,6 +3,7 @@ package topicwriterinternal import ( "bytes" "compress/gzip" + "context" "fmt" "io" "strings" @@ -15,14 +16,26 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/trace" ) +func NewTestEncoderSelector( + m *MultiEncoder, + allowedCodecs rawtopiccommon.SupportedCodecs, + parallelCompressors int, + tracer *trace.Topic, + writerReconnectorID, sessionID string, +) EncoderSelector { + ctx := context.Background() + + return NewEncoderSelector(m, allowedCodecs, parallelCompressors, tracer, writerReconnectorID, sessionID, &ctx) +} + func TestEncoderSelector_CodecMeasure(t *testing.T) { t.Run("Empty", func(t *testing.T) { - s := NewEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") + s := NewTestEncoderSelector(testCommonEncoders, nil, 1, &trace.Topic{}, "", "") _, err := s.measureCodecs(nil) require.Error(t, err) }) t.Run("One", func(t *testing.T) { - s := NewEncoderSelector( + s := NewTestEncoderSelector( NewMultiEncoder(), rawtopiccommon.SupportedCodecs{rawtopiccommon.CodecRaw}, 1, @@ -44,7 +57,7 @@ func TestEncoderSelector_CodecMeasure(t *testing.T) { ) testSelectCodec := func(t testing.TB, targetCodec rawtopiccommon.Codec, smallCount, largeCount int) { - s := NewEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ + s := NewTestEncoderSelector(testCommonEncoders, rawtopiccommon.SupportedCodecs{ rawtopiccommon.CodecRaw, rawtopiccommon.CodecGzip, }, 4, diff --git a/internal/topic/topicwriterinternal/writer_config.go b/internal/topic/topicwriterinternal/writer_config.go index df6bd73cf..513e4ee05 100644 --- a/internal/topic/topicwriterinternal/writer_config.go +++ b/internal/topic/topicwriterinternal/writer_config.go @@ -1,11 +1,13 @@ package topicwriterinternal import ( + "context" "time" "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-sdk/v3/credentials" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -18,7 +20,9 @@ type WritersCommonConfig struct { defaultPartitioning rawtopicwriter.Partitioning compressorCount int + LogContext context.Context //nolint:containedctx Tracer *trace.Topic + rawTopicClient *rawtopic.Client cred credentials.Credentials credUpdateInterval time.Duration clock clockwork.Clock diff --git a/internal/topic/topicwriterinternal/writer_options.go b/internal/topic/topicwriterinternal/writer_options.go index c71a989b4..0cc5a9caa 100644 --- a/internal/topic/topicwriterinternal/writer_options.go +++ b/internal/topic/topicwriterinternal/writer_options.go @@ -7,6 +7,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/credentials" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopicwriter" "github.com/ydb-platform/ydb-go-sdk/v3/trace" @@ -71,6 +72,14 @@ func WithCredentials(cred credentials.Credentials) PublicWriterOption { } } +// WithCredentials for internal usage only +// no proxy to public interface +func WithRawClient(rawClient *rawtopic.Client) PublicWriterOption { + return func(cfg *WriterReconnectorConfig) { + cfg.rawTopicClient = rawClient + } +} + func WithCodec(codec rawtopiccommon.Codec) PublicWriterOption { return func(cfg *WriterReconnectorConfig) { cfg.forceCodec = codec diff --git a/internal/topic/topicwriterinternal/writer_reconnector.go b/internal/topic/topicwriterinternal/writer_reconnector.go index 217652cd7..b2ded7516 100644 --- a/internal/topic/topicwriterinternal/writer_reconnector.go +++ b/internal/topic/topicwriterinternal/writer_reconnector.go @@ -112,6 +112,22 @@ func NewWriterReconnectorConfig(options ...PublicWriterOption) WriterReconnector WithProducerID(uuid.NewString())(&cfg) } + if cfg.Connect == nil { + logContext := context.Background() + if cfg.LogContext != nil { + logContext = cfg.LogContext + } + + var connector ConnectFunc = func(ctx context.Context, tracer *trace.Topic) ( + RawTopicWriterStream, + error, + ) { + return cfg.rawTopicClient.StreamWrite(ctx, tracer, &logContext) + } + + cfg.Connect = connector + } + return cfg } @@ -325,6 +341,7 @@ func (w *WriterReconnector) createMessagesWithContent(messages []PublicMessage) }) onCompressDone := trace.TopicOnWriterCompressMessages( w.cfg.Tracer, + &w.cfg.LogContext, w.writerInstanceID, sessionID, w.cfg.forceCodec.ToInt32(), @@ -365,7 +382,7 @@ func (w *WriterReconnector) Close(ctx context.Context) error { } func (w *WriterReconnector) close(ctx context.Context, reason error) (resErr error) { - onDone := trace.TopicOnWriterClose(w.cfg.Tracer, w.writerInstanceID, reason) + onDone := trace.TopicOnWriterClose(w.cfg.Tracer, &w.cfg.LogContext, w.writerInstanceID, reason) defer func() { onDone(resErr) }() @@ -426,6 +443,7 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) { onWriterStarted := trace.TopicOnWriterReconnect( w.cfg.Tracer, + &w.cfg.LogContext, w.writerInstanceID, w.cfg.topic, w.cfg.producerID, diff --git a/internal/topic/topicwriterinternal/writer_single_stream.go b/internal/topic/topicwriterinternal/writer_single_stream.go index 6dbb3803c..61536b829 100644 --- a/internal/topic/topicwriterinternal/writer_single_stream.go +++ b/internal/topic/topicwriterinternal/writer_single_stream.go @@ -125,7 +125,13 @@ func (w *SingleStreamWriter) start() { } func (w *SingleStreamWriter) initStream() (err error) { - traceOnDone := trace.TopicOnWriterInitStream(w.cfg.Tracer, w.cfg.reconnectorInstanceID, w.cfg.topic, w.cfg.producerID) + traceOnDone := trace.TopicOnWriterInitStream( + w.cfg.Tracer, + &w.cfg.LogContext, + w.cfg.reconnectorInstanceID, + w.cfg.topic, + w.cfg.producerID, + ) defer traceOnDone(w.SessionID, err) req := w.createInitRequest() @@ -155,6 +161,7 @@ func (w *SingleStreamWriter) initStream() (err error) { w.cfg.Tracer, w.cfg.reconnectorInstanceID, w.SessionID, + &w.cfg.LogContext, ) w.SessionID = result.SessionID @@ -205,6 +212,7 @@ func (w *SingleStreamWriter) receiveMessagesLoop(ctx context.Context) { default: trace.TopicOnWriterReadUnknownGrpcMessage( w.cfg.Tracer, + &w.cfg.LogContext, w.cfg.reconnectorInstanceID, w.SessionID, xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf( @@ -234,6 +242,7 @@ func (w *SingleStreamWriter) sendMessagesFromQueueToStreamLoop(ctx context.Conte onSentComplete := trace.TopicOnWriterSendMessages( w.cfg.Tracer, + &w.cfg.LogContext, w.cfg.reconnectorInstanceID, w.SessionID, targetCodec.ToInt32(), diff --git a/log/context.go b/log/context.go index b724e5ec3..abf168a7e 100644 --- a/log/context.go +++ b/log/context.go @@ -37,5 +37,9 @@ func NamesFromContext(ctx context.Context) []string { } func with(ctx context.Context, lvl Level, names ...string) context.Context { + if ctx == nil { + ctx = context.Background() + } + return WithLevel(WithNames(ctx, names...), lvl) } diff --git a/log/topic.go b/log/topic.go index 99392233d..01768c056 100644 --- a/log/topic.go +++ b/log/topic.go @@ -1,7 +1,6 @@ package log import ( - "context" "time" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic" @@ -25,7 +24,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "reconnect") start := time.Now() l.Log(ctx, "start") @@ -40,7 +39,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamLifeCycleEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "reconnect", "request") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "reconnect", "request") l.Log(ctx, "start", kv.NamedError("reason", info.Reason), kv.Bool("was_sent", info.WasSent), @@ -52,7 +51,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") + ctx := with(*info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "start", "response") start := time.Now() l.Log(ctx, "start", kv.String("topic", info.Topic), @@ -97,7 +96,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderPartitionEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") + ctx := with(info.PartitionContext, TRACE, "ydb", "topic", "reader", "partition", "read", "stop", "response") start := time.Now() l.Log(ctx, "start", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -170,7 +169,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "send", "commit", "message") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "send", "commit", "message") start := time.Now() commitInfo := info.CommitsInfo.GetCommitsInfo() @@ -211,7 +210,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "committed", "notify") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "committed", "notify") l.Log(ctx, "ack", kv.String("reader_connection_id", info.ReaderConnectionID), kv.String("topic", info.Topic), @@ -224,7 +223,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "close") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "close") start := time.Now() l.Log(ctx, "done", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -253,7 +252,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "init") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "init") start := time.Now() l.Log(ctx, "start", kv.String("pre_init_reader_connection_id", info.PreInitReaderConnectionID), @@ -284,7 +283,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "error") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "error") l.Log(WithLevel(ctx, INFO), "stream error", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -299,7 +298,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "update", "token") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "update", "token") start := time.Now() l.Log(ctx, "token updating...", kv.String("reader_connection_id", info.ReaderConnectionID), @@ -347,7 +346,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "sent", "data", "request") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "sent", "data", "request") l.Log(ctx, "sent data request", kv.String("reader_connection_id", info.ReaderConnectionID), kv.Int("request_bytes", info.RequestBytes), @@ -360,7 +359,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "receive", "data", "response") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "receive", "data", "response") start := time.Now() partitionsCount, batchesCount, messagesCount := info.DataResponse.GetPartitionBatchMessagesCounts() l.Log(ctx, "data response received, process starting...", @@ -436,7 +435,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicReaderMessageEvents == 0 { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "reader", "unknown", "grpc", "message") l.Log(WithLevel(ctx, INFO), "received unknown message", kv.Error(info.Error), kv.String("reader_connection_id", info.ReaderConnectionID), @@ -451,7 +450,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { } start := time.Now() - ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "customer", "popbatchtx") + ctx := with(*startInfo.LogContext, TRACE, "ydb", "topic", "reader", "customer", "popbatchtx") l.Log(WithLevel(ctx, TRACE), "starting pop batch tx", kv.Int64("reader_id", startInfo.ReaderID), kv.String("transaction_session_id", startInfo.TransactionSessionID), @@ -495,7 +494,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { } start := time.Now() - ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "popbatchtx_on_stream") + ctx := with(*startInfo.LogContext, TRACE, "ydb", "topic", "reader", "transaction", "popbatchtx_on_stream") l.Log(WithLevel(ctx, TRACE), "starting pop batch tx", kv.Int64("reader_id", startInfo.ReaderID), kv.String("reader_connection_id", startInfo.ReaderConnectionID), @@ -538,7 +537,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { } start := time.Now() - ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") + ctx := with(*startInfo.LogContext, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction", kv.Int64("reader_id", startInfo.ReaderID), kv.String("reader_connection_id", startInfo.ReaderConnectionID), @@ -581,7 +580,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { } start := time.Now() - ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") + ctx := with(*startInfo.LogContext, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction", kv.Int64("reader_id", startInfo.ReaderID), kv.String("reader_connection_id", startInfo.ReaderConnectionID), @@ -627,7 +626,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicReaderTransactionCompletedDoneInfo) { - ctx := with(*startInfo.Context, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") + ctx := with(*startInfo.LogContext, TRACE, "ydb", "topic", "reader", "transaction", "update_offsets") l.Log(WithLevel(ctx, TRACE), "starting update offsets in transaction", kv.Int64("reader_id", startInfo.ReaderID), kv.String("reader_connection_id", startInfo.ReaderConnectionID), @@ -648,7 +647,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "reconnect") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "reconnect") start := time.Now() l.Log(ctx, "connect to topic writer stream starting...", kv.String("topic", info.Topic), @@ -694,7 +693,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "stream", "init") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "stream", "init") start := time.Now() l.Log(ctx, "start", kv.String("topic", info.Topic), @@ -734,7 +733,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterBeforeCommitTransactionDoneInfo) { - ctx := with(*info.Ctx, TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "wait of flush messages before commit transaction", kv.String("kqp_session_id", info.KqpSessionID), kv.String("topic_session_id_start", info.TopicSessionID), @@ -750,7 +749,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { start := time.Now() return func(doneInfo trace.TopicOnWriterAfterFinishTransactionDoneInfo) { - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "beforecommit") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "beforecommit") l.Log(ctx, "close writer after transaction finished", kv.String("kqp_session_id", info.SessionID), kv.String("tx_id", info.TransactionID), @@ -762,7 +761,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "close") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "close") start := time.Now() l.Log(ctx, "start", kv.String("writer_instance_id", info.WriterInstanceID), @@ -793,7 +792,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "compress", "messages") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "compress", "messages") start := time.Now() l.Log(ctx, "start", kv.String("writer_instance_id", info.WriterInstanceID), @@ -836,7 +835,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return nil } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "send", "messages") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "send", "messages") start := time.Now() l.Log(ctx, "start", kv.String("writer_instance_id", info.WriterInstanceID), @@ -874,7 +873,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } acks := info.Acks.GetAcks() - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "receive", "result") + ctx := with(*info.LogContext, DEBUG, "ydb", "topic", "writer", "receive", "result") l.Log(ctx, "topic writer receive result from server", kv.String("writer_instance_id", info.WriterInstanceID), kv.String("session_id", info.SessionID), @@ -895,7 +894,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer sent grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -911,7 +910,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { return } - ctx := with(context.Background(), TRACE, "ydb", "topic", "writer", "grpc") + ctx := with(*info.LogContext, TRACE, "ydb", "topic", "writer", "grpc") l.Log( ctx, "topic writer received grpc message (message body and metadata are removed)", kv.String("topic_stream_internal_id", info.TopicStreamInternalID), @@ -926,7 +925,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) { if d.Details()&trace.TopicWriterStreamEvents == 0 { return } - ctx := with(context.Background(), DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") + ctx := with(*info.LogContext, DEBUG, "ydb", "topic", "writer", "read", "unknown", "grpc", "message") l.Log(ctx, "topic writer receive unknown message from server", kv.Error(info.Error), kv.String("writer_instance_id", info.WriterInstanceID), diff --git a/metrics/driver.go b/metrics/driver.go index 718fa2d02..eca54dbec 100644 --- a/metrics/driver.go +++ b/metrics/driver.go @@ -1,7 +1,6 @@ package metrics import ( - "strconv" "sync" "github.com/ydb-platform/ydb-go-sdk/v3/internal/repeater" @@ -19,8 +18,7 @@ func driver(config Config) (t trace.Driver) { balancerUpdates := config.WithSystem("balancer").CounterVec("updates", "cause") conns := config.GaugeVec("conns", "endpoint", "node_id") banned := config.WithSystem("conn").GaugeVec("banned", "endpoint", "node_id", "cause") - requestStatuses := config.WithSystem("conn").CounterVec("request_statuses", "status", "endpoint", "node_id") - requestMethods := config.WithSystem("conn").CounterVec("request_methods", "method", "endpoint", "node_id") + requests := config.WithSystem("conn").CounterVec("requests", "status", "method", "endpoint") tli := config.CounterVec("transaction_locks_invalidated") type endpointKey struct { @@ -33,20 +31,14 @@ func driver(config Config) (t trace.Driver) { var ( method = info.Method endpoint = info.Endpoint.Address() - nodeID = info.Endpoint.NodeID() ) return func(info trace.DriverConnInvokeDoneInfo) { if config.Details()&trace.DriverConnEvents != 0 { - requestStatuses.With(map[string]string{ + requests.With(map[string]string{ "status": errorBrief(info.Error), - "endpoint": endpoint, - "node_id": strconv.FormatUint(uint64(nodeID), 10), - }).Inc() - requestMethods.With(map[string]string{ "method": string(method), "endpoint": endpoint, - "node_id": strconv.FormatUint(uint64(nodeID), 10), }).Inc() if xerrors.IsOperationErrorTransactionLocksInvalidated(info.Error) { tli.With(nil).Inc() @@ -60,20 +52,14 @@ func driver(config Config) (t trace.Driver) { var ( method = info.Method endpoint = info.Endpoint.Address() - nodeID = info.Endpoint.NodeID() ) return func(info trace.DriverConnNewStreamDoneInfo) { if config.Details()&trace.DriverConnStreamEvents != 0 { - requestStatuses.With(map[string]string{ + requests.With(map[string]string{ "status": errorBrief(info.Error), - "endpoint": endpoint, - "node_id": strconv.FormatUint(uint64(nodeID), 10), - }).Inc() - requestMethods.With(map[string]string{ "method": string(method), "endpoint": endpoint, - "node_id": strconv.FormatUint(uint64(nodeID), 10), }).Inc() } } diff --git a/topic/topicoptions/topicoptions_reader.go b/topic/topicoptions/topicoptions_reader.go index 6569db055..cf2bee0bf 100644 --- a/topic/topicoptions/topicoptions_reader.go +++ b/topic/topicoptions/topicoptions_reader.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/config" @@ -281,3 +282,11 @@ func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption { cfg.CommitMode = CommitModeNone } } + +// WithReaderLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithReaderLogContext(ctx context.Context) ReaderOption { + return func(cfg *topicreaderinternal.ReaderConfig) { + cfg.BaseContext = ctx + } +} diff --git a/topic/topicoptions/topicoptions_writer.go b/topic/topicoptions/topicoptions_writer.go index d74befa4e..f4271e312 100644 --- a/topic/topicoptions/topicoptions_writer.go +++ b/topic/topicoptions/topicoptions_writer.go @@ -1,6 +1,7 @@ package topicoptions import ( + "context" "time" "github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon" @@ -214,3 +215,11 @@ func WithWriterTrace(t trace.Topic) WriterOption { //nolint:gocritic func WithWriterUpdateTokenInterval(interval time.Duration) WriterOption { return topicwriterinternal.WithTokenUpdateInterval(interval) } + +// WithWriterLogContext allows providing a context.Context instance which will be used +// in log/topic events. +func WithWriterLogContext(ctx context.Context) WriterOption { + return func(cfg *topicwriterinternal.WriterReconnectorConfig) { + cfg.LogContext = ctx + } +} diff --git a/trace/topic.go b/trace/topic.go index 00b41b81a..24e206666 100644 --- a/trace/topic.go +++ b/trace/topic.go @@ -153,9 +153,10 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStartInfo struct { - ReaderID int64 - Consumer string - Error error + LogContext *context.Context + ReaderID int64 + Consumer string + Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -183,11 +184,13 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSendCommitMessageStartInfo struct { + LogContext *context.Context CommitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStreamCommitInfo struct { + LogContext *context.Context Topic string PartitionID int64 PartitionSessionID int64 @@ -207,6 +210,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCommittedNotifyInfo struct { + LogContext *context.Context ReaderConnectionID string Topic string PartitionID int64 @@ -216,12 +220,14 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderErrorInfo struct { + LogContext *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderSentDataRequestInfo struct { + LogContext *context.Context ReaderConnectionID string RequestBytes int LocalBufferSizeAfterSent int @@ -229,6 +235,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReceiveDataResponseStartInfo struct { + LogContext *context.Context ReaderConnectionID string LocalBufferSizeAfterReceive int DataResponse TopicReaderDataResponseInfo @@ -267,13 +274,15 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUnknownGrpcMessageInfo struct { + LogContext *context.Context ReaderConnectionID string Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectStartInfo struct { - Reason error + LogContext *context.Context + Reason error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -283,8 +292,9 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderReconnectRequestInfo struct { - Reason error - WasSent bool + LogContext *context.Context + Reason error + WasSent bool } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -304,6 +314,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderCloseStartInfo struct { + LogContext *context.Context ReaderConnectionID string CloseReason error } @@ -315,6 +326,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderInitStartInfo struct { + LogContext *context.Context PreInitReaderConnectionID string InitRequestInfo TopicReadStreamInitRequestInfo } @@ -333,13 +345,15 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenStartInfo struct { + LogContext *context.Context ReaderConnectionID string } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals OnReadUpdateTokenMiddleTokenReceivedInfo struct { - TokenLen int - Error error + LogContext *context.Context + TokenLen int + Error error } // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals @@ -349,7 +363,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderPopBatchTxStartInfo struct { - Context *context.Context + LogContext *context.Context ReaderID int64 TransactionSessionID string Tx txInfo @@ -365,7 +379,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderStreamPopBatchTxStartInfo struct { - Context *context.Context + LogContext *context.Context ReaderID int64 ReaderConnectionID string TransactionSessionID string @@ -379,7 +393,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderOnUpdateOffsetsInTransactionStartInfo struct { - Context *context.Context + LogContext *context.Context ReaderID int64 ReaderConnectionID string TransactionSessionID string @@ -393,7 +407,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderTransactionCompletedStartInfo struct { - Context *context.Context + LogContext *context.Context ReaderID int64 ReaderConnectionID string TransactionSessionID string @@ -406,7 +420,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicReaderTransactionRollbackStartInfo struct { - Context *context.Context + LogContext *context.Context ReaderID int64 ReaderConnectionID string TransactionSessionID string @@ -424,6 +438,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReconnectStartInfo struct { + LogContext *context.Context WriterInstanceID string Topic string ProducerID string @@ -441,6 +456,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterInitStreamStartInfo struct { + LogContext *context.Context WriterInstanceID string Topic string ProducerID string @@ -454,6 +470,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCloseStartInfo struct { + LogContext *context.Context WriterInstanceID string Reason error } @@ -465,6 +482,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterCompressMessagesStartInfo struct { + LogContext *context.Context WriterInstanceID string SessionID string Codec int32 @@ -480,6 +498,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSendMessagesStartInfo struct { + LogContext *context.Context WriterInstanceID string SessionID string Codec int32 @@ -494,6 +513,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterResultMessagesInfo struct { + LogContext *context.Context WriterInstanceID string SessionID string PartitionID int64 @@ -516,7 +536,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterBeforeCommitTransactionStartInfo struct { - Ctx *context.Context + LogContext *context.Context KqpSessionID string TopicSessionID string TransactionID string @@ -530,6 +550,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterAfterFinishTransactionStartInfo struct { + LogContext *context.Context Error error SessionID string TransactionID string @@ -542,6 +563,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterSentGRPCMessageInfo struct { + LogContext *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -551,6 +573,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicWriterReceiveGRPCMessageInfo struct { + LogContext *context.Context TopicStreamInternalID string SessionID string MessageNumber int @@ -560,6 +583,7 @@ type ( // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals TopicOnWriterReadUnknownGrpcMessageInfo struct { + LogContext *context.Context WriterInstanceID string SessionID string Error error diff --git a/trace/topic_gtrace.go b/trace/topic_gtrace.go index f9c98db5b..fc75522a6 100644 --- a/trace/topic_gtrace.go +++ b/trace/topic_gtrace.go @@ -1456,17 +1456,21 @@ func (t *Topic) onWriterReadUnknownGrpcMessage(t1 TopicOnWriterReadUnknownGrpcMe } fn(t1) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderStart(t *Topic, readerID int64, consumer string, e error) { +func TopicOnReaderStart(t *Topic, logContext *context.Context, readerID int64, consumer string, e error) { var p TopicReaderStartInfo + p.LogContext = logContext p.ReaderID = readerID p.Consumer = consumer p.Error = e t.onReaderStart(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnect(t *Topic, reason error) func(error) { +func TopicOnReaderReconnect(t *Topic, logContext *context.Context, reason error) func(error) { var p TopicReaderReconnectStartInfo + p.LogContext = logContext p.Reason = reason res := t.onReaderReconnect(p) return func(e error) { @@ -1475,13 +1479,16 @@ func TopicOnReaderReconnect(t *Topic, reason error) func(error) { res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReconnectRequest(t *Topic, reason error, wasSent bool) { +func TopicOnReaderReconnectRequest(t *Topic, logContext *context.Context, reason error, wasSent bool) { var p TopicReaderReconnectRequestInfo + p.LogContext = logContext p.Reason = reason p.WasSent = wasSent t.onReaderReconnectRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string, partitionContext *context.Context, topic string, partitionID int64, partitionSessionID int64) func(readOffset *int64, commitOffset *int64, _ error) { var p TopicReaderPartitionReadStartResponseStartInfo @@ -1499,6 +1506,7 @@ func TopicOnReaderPartitionReadStartResponse(t *Topic, readerConnectionID string res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, partitionContext context.Context, topic string, partitionID int64, partitionSessionID int64, committedOffset int64, graceful bool) func(error) { var p TopicReaderPartitionReadStopResponseStartInfo @@ -1516,6 +1524,7 @@ func TopicOnReaderPartitionReadStopResponse(t *Topic, readerConnectionID string, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string, partitionID int64, partitionSessionID int64, startOffset int64, endOffset int64) func(error) { var p TopicReaderCommitStartInfo @@ -1532,9 +1541,11 @@ func TopicOnReaderCommit(t *Topic, requestContext *context.Context, topic string res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { +func TopicOnReaderSendCommitMessage(t *Topic, logContext *context.Context, commitsInfo TopicReaderStreamSendCommitMessageStartMessageInfo) func(error) { var p TopicReaderSendCommitMessageStartInfo + p.LogContext = logContext p.CommitsInfo = commitsInfo res := t.onReaderSendCommitMessage(p) return func(e error) { @@ -1543,9 +1554,11 @@ func TopicOnReaderSendCommitMessage(t *Topic, commitsInfo TopicReaderStreamSendC res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { +func TopicOnReaderCommittedNotify(t *Topic, logContext *context.Context, readerConnectionID string, topic string, partitionID int64, partitionSessionID int64, committedOffset int64) { var p TopicReaderCommittedNotifyInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.Topic = topic p.PartitionID = partitionID @@ -1553,9 +1566,11 @@ func TopicOnReaderCommittedNotify(t *Topic, readerConnectionID string, topic str p.CommittedOffset = committedOffset t.onReaderCommittedNotify(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) func(closeError error) { +func TopicOnReaderClose(t *Topic, logContext *context.Context, readerConnectionID string, closeReason error) func(closeError error) { var p TopicReaderCloseStartInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.CloseReason = closeReason res := t.onReaderClose(p) @@ -1565,9 +1580,11 @@ func TopicOnReaderClose(t *Topic, readerConnectionID string, closeReason error) res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { +func TopicOnReaderInit(t *Topic, logContext *context.Context, preInitReaderConnectionID string, initRequestInfo TopicReadStreamInitRequestInfo) func(readerConnectionID string, _ error) { var p TopicReaderInitStartInfo + p.LogContext = logContext p.PreInitReaderConnectionID = preInitReaderConnectionID p.InitRequestInfo = initRequestInfo res := t.onReaderInit(p) @@ -1578,20 +1595,25 @@ func TopicOnReaderInit(t *Topic, preInitReaderConnectionID string, initRequestIn res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderError(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderError(t *Topic, logContext *context.Context, readerConnectionID string, e error) { var p TopicReaderErrorInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderError(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen int, _ error) func(error) { +func TopicOnReaderUpdateToken(t *Topic, logContext *context.Context, readerConnectionID string) func(logContext *context.Context, tokenLen int, _ error) func(error) { var p OnReadUpdateTokenStartInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID res := t.onReaderUpdateToken(p) - return func(tokenLen int, e error) func(error) { + return func(logContext *context.Context, tokenLen int, e error) func(error) { var p OnReadUpdateTokenMiddleTokenReceivedInfo + p.LogContext = logContext p.TokenLen = tokenLen p.Error = e res := res(p) @@ -1602,10 +1624,11 @@ func TopicOnReaderUpdateToken(t *Topic, readerConnectionID string) func(tokenLen } } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, transactionSessionID string, tx txInfo) func(startOffset int64, endOffset int64, messagesCount int, _ error) { +func TopicOnReaderPopBatchTx(t *Topic, logContext *context.Context, readerID int64, transactionSessionID string, tx txInfo) func(startOffset int64, endOffset int64, messagesCount int, _ error) { var p TopicReaderPopBatchTxStartInfo - p.Context = c + p.LogContext = logContext p.ReaderID = readerID p.TransactionSessionID = transactionSessionID p.Tx = tx @@ -1619,10 +1642,11 @@ func TopicOnReaderPopBatchTx(t *Topic, c *context.Context, readerID int64, trans res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { +func TopicOnReaderStreamPopBatchTx(t *Topic, logContext *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderStreamPopBatchTxStartInfo - p.Context = c + p.LogContext = logContext p.ReaderID = readerID p.ReaderConnectionID = readerConnectionID p.TransactionSessionID = transactionSessionID @@ -1634,10 +1658,11 @@ func TopicOnReaderStreamPopBatchTx(t *Topic, c *context.Context, readerID int64, res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { +func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, logContext *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(error) { var p TopicReaderOnUpdateOffsetsInTransactionStartInfo - p.Context = c + p.LogContext = logContext p.ReaderID = readerID p.ReaderConnectionID = readerConnectionID p.TransactionSessionID = transactionSessionID @@ -1649,10 +1674,11 @@ func TopicOnReaderUpdateOffsetsInTransaction(t *Topic, c *context.Context, reade res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo, transactionResult error) func() { +func TopicOnReaderTransactionCompleted(t *Topic, logContext *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo, transactionResult error) func() { var p TopicReaderTransactionCompletedStartInfo - p.Context = c + p.LogContext = logContext p.ReaderID = readerID p.ReaderConnectionID = readerConnectionID p.TransactionSessionID = transactionSessionID @@ -1664,10 +1690,11 @@ func TopicOnReaderTransactionCompleted(t *Topic, c *context.Context, readerID in res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(rollbackError error) { +func TopicOnReaderTransactionRollback(t *Topic, logContext *context.Context, readerID int64, readerConnectionID string, transactionSessionID string, tx txInfo) func(rollbackError error) { var p TopicReaderTransactionRollbackStartInfo - p.Context = c + p.LogContext = logContext p.ReaderID = readerID p.ReaderConnectionID = readerConnectionID p.TransactionSessionID = transactionSessionID @@ -1679,17 +1706,21 @@ func TopicOnReaderTransactionRollback(t *Topic, c *context.Context, readerID int res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderSentDataRequest(t *Topic, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { +func TopicOnReaderSentDataRequest(t *Topic, logContext *context.Context, readerConnectionID string, requestBytes int, localBufferSizeAfterSent int) { var p TopicReaderSentDataRequestInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.RequestBytes = requestBytes p.LocalBufferSizeAfterSent = localBufferSizeAfterSent t.onReaderSentDataRequest(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { +func TopicOnReaderReceiveDataResponse(t *Topic, logContext *context.Context, readerConnectionID string, localBufferSizeAfterReceive int, dataResponse TopicReaderDataResponseInfo) func(error) { var p TopicReaderReceiveDataResponseStartInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.LocalBufferSizeAfterReceive = localBufferSizeAfterReceive p.DataResponse = dataResponse @@ -1700,6 +1731,7 @@ func TopicOnReaderReceiveDataResponse(t *Topic, readerConnectionID string, local res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCount int, maxCount int, freeBufferCapacity int) func(messagesCount int, topic string, partitionID int64, partitionSessionID int64, offsetStart int64, offsetEnd int64, freeBufferCapacity int, _ error) { var p TopicReaderReadMessagesStartInfo @@ -1721,16 +1753,20 @@ func TopicOnReaderReadMessages(t *Topic, requestContext *context.Context, minCou res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnReaderUnknownGrpcMessage(t *Topic, readerConnectionID string, e error) { +func TopicOnReaderUnknownGrpcMessage(t *Topic, logContext *context.Context, readerConnectionID string, e error) { var p OnReadUnknownGrpcMessageInfo + p.LogContext = logContext p.ReaderConnectionID = readerConnectionID p.Error = e t.onReaderUnknownGrpcMessage(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { +func TopicOnWriterReconnect(t *Topic, logContext *context.Context, writerInstanceID string, topic string, producerID string, attempt int) func(connectionResult error) func(error) { var p TopicWriterReconnectStartInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1747,9 +1783,11 @@ func TopicOnWriterReconnect(t *Topic, writerInstanceID string, topic string, pro } } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { +func TopicOnWriterInitStream(t *Topic, logContext *context.Context, writerInstanceID string, topic string, producerID string) func(sessionID string, _ error) { var p TopicWriterInitStreamStartInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.Topic = topic p.ProducerID = producerID @@ -1761,9 +1799,11 @@ func TopicOnWriterInitStream(t *Topic, writerInstanceID string, topic string, pr res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(error) { +func TopicOnWriterClose(t *Topic, logContext *context.Context, writerInstanceID string, reason error) func(error) { var p TopicWriterCloseStartInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.Reason = reason res := t.onWriterClose(p) @@ -1773,10 +1813,11 @@ func TopicOnWriterClose(t *Topic, writerInstanceID string, reason error) func(er res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { +func TopicOnWriterBeforeCommitTransaction(t *Topic, logContext *context.Context, kqpSessionID string, topicSessionID string, transactionID string) func(_ error, topicSessionID string) { var p TopicOnWriterBeforeCommitTransactionStartInfo - p.Ctx = ctx + p.LogContext = logContext p.KqpSessionID = kqpSessionID p.TopicSessionID = topicSessionID p.TransactionID = transactionID @@ -1788,9 +1829,11 @@ func TopicOnWriterBeforeCommitTransaction(t *Topic, ctx *context.Context, kqpSes res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, transactionID string) func(closeError error) { +func TopicOnWriterAfterFinishTransaction(t *Topic, logContext *context.Context, e error, sessionID string, transactionID string) func(closeError error) { var p TopicOnWriterAfterFinishTransactionStartInfo + p.LogContext = logContext p.Error = e p.SessionID = sessionID p.TransactionID = transactionID @@ -1801,9 +1844,11 @@ func TopicOnWriterAfterFinishTransaction(t *Topic, e error, sessionID string, tr res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { +func TopicOnWriterCompressMessages(t *Topic, logContext *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int, reason TopicWriterCompressMessagesReason) func(error) { var p TopicWriterCompressMessagesStartInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1817,9 +1862,11 @@ func TopicOnWriterCompressMessages(t *Topic, writerInstanceID string, sessionID res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { +func TopicOnWriterSendMessages(t *Topic, logContext *context.Context, writerInstanceID string, sessionID string, codec int32, firstSeqNo int64, messagesCount int) func(error) { var p TopicWriterSendMessagesStartInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Codec = codec @@ -1832,18 +1879,22 @@ func TopicOnWriterSendMessages(t *Topic, writerInstanceID string, sessionID stri res(p) } } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveResult(t *Topic, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { +func TopicOnWriterReceiveResult(t *Topic, logContext *context.Context, writerInstanceID string, sessionID string, partitionID int64, acks TopicWriterResultMessagesInfoAcks) { var p TopicWriterResultMessagesInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.PartitionID = partitionID p.Acks = acks t.onWriterReceiveResult(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { +func TopicOnWriterSentGRPCMessage(t *Topic, logContext *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromClient, e error) { var p TopicWriterSentGRPCMessageInfo + p.LogContext = logContext p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1851,9 +1902,11 @@ func TopicOnWriterSentGRPCMessage(t *Topic, topicStreamInternalID string, sessio p.Error = e t.onWriterSentGRPCMessage(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { +func TopicOnWriterReceiveGRPCMessage(t *Topic, logContext *context.Context, topicStreamInternalID string, sessionID string, messageNumber int, message *Ydb_Topic.StreamWriteMessage_FromServer, e error) { var p TopicWriterReceiveGRPCMessageInfo + p.LogContext = logContext p.TopicStreamInternalID = topicStreamInternalID p.SessionID = sessionID p.MessageNumber = messageNumber @@ -1861,9 +1914,11 @@ func TopicOnWriterReceiveGRPCMessage(t *Topic, topicStreamInternalID string, ses p.Error = e t.onWriterReceiveGRPCMessage(p) } + // Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals -func TopicOnWriterReadUnknownGrpcMessage(t *Topic, writerInstanceID string, sessionID string, e error) { +func TopicOnWriterReadUnknownGrpcMessage(t *Topic, logContext *context.Context, writerInstanceID string, sessionID string, e error) { var p TopicOnWriterReadUnknownGrpcMessageInfo + p.LogContext = logContext p.WriterInstanceID = writerInstanceID p.SessionID = sessionID p.Error = e