Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add WithReaderLogContext, WithWriterLogContext options to Topic API #1642

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (c *Client) StreamRead(ctxStreamLifeTime context.Context) (rawtopicreader.S
func (c *Client) StreamWrite(
ctxStreamLifeTime context.Context,
tracer *trace.Topic,
logContext *context.Context,
) (*rawtopicwriter.StreamWriter, error) {
protoResp, err := c.service.StreamWrite(ctxStreamLifeTime)
if err != nil {
Expand All @@ -118,6 +119,7 @@ func (c *Client) StreamWrite(
Stream: protoResp,
Tracer: tracer,
InternalStreamID: uuid.New().String(),
LogContext: logContext,
}, nil
}

Expand Down
14 changes: 12 additions & 2 deletions internal/grpcwrapper/rawtopic/rawtopicwriter/streamwriter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rawtopicwriter

import (
"context"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -36,6 +37,7 @@ type StreamWriter struct {
readMessagesCount int
writtenMessagesCount int
sessionID string
LogContext *context.Context
}

//nolint:funlen
Expand All @@ -52,7 +54,7 @@ func (w *StreamWriter) Recv() (ServerMessage, error) {
defer func() {
// defer needs for set good session id on first init response before trace the message
trace.TopicOnWriterReceiveGRPCMessage(
w.Tracer, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
w.Tracer, w.LogContext, w.InternalStreamID, w.sessionID, w.readMessagesCount, grpcMsg, sendErr,
)
}()
if sendErr != nil {
Expand Down Expand Up @@ -141,7 +143,15 @@ func (w *StreamWriter) Send(rawMsg ClientMessage) (err error) {

err = w.Stream.Send(&protoMsg)
w.writtenMessagesCount++
trace.TopicOnWriterSentGRPCMessage(w.Tracer, w.InternalStreamID, w.sessionID, w.writtenMessagesCount, &protoMsg, err)
trace.TopicOnWriterSentGRPCMessage(
w.Tracer,
w.LogContext,
w.InternalStreamID,
w.sessionID,
w.writtenMessagesCount,
&protoMsg,
err,
)
if err != nil {
return xerrors.WithStackTrace(xerrors.Wrap(fmt.Errorf("ydb: failed to send grpc message to writer stream: %w", err)))
}
Expand Down
20 changes: 12 additions & 8 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ func (c *Client) StartReader(
if err != nil {
return nil, err
}
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)

logCtx := internalReader.GetLogContext()
trace.TopicOnReaderStart(internalReader.Tracer(), &logCtx, internalReader.ID(), consumer, err)
internalReader.SetLogContext(logCtx)

return topicreader.NewReader(internalReader), nil
}
Expand Down Expand Up @@ -356,15 +359,16 @@ func (c *Client) createWriterConfig(
topicPath string,
opts []topicoptions.WriterOption,
) topicwriterinternal.WriterReconnectorConfig {
var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
topicwriterinternal.RawTopicWriterStream,
error,
) {
return c.rawClient.StreamWrite(ctx, tracer)
}
//var connector topicwriterinternal.ConnectFunc = func(ctx context.Context, tracer *trace.Topic) (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the code instead of comment please

// topicwriterinternal.RawTopicWriterStream,
// error,
//) {
// return c.rawClient.StreamWrite(ctx, tracer)
//}

options := []topicoptions.WriterOption{
topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithRawClient(&c.rawClient),
// topicwriterinternal.WithConnectFunc(connector),
topicwriterinternal.WithTopic(topicPath),
topicwriterinternal.WithCommonConfig(c.cfg.Common),
topicwriterinternal.WithTrace(c.cfg.Trace),
Expand Down
1 change: 0 additions & 1 deletion internal/topic/topiclistenerinternal/stream_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@
}
}

//nolint:funlen
func (l *streamListener) initStream(ctx context.Context, client TopicClient) error {

Check failure on line 144 in internal/topic/topiclistenerinternal/stream_listener.go

View workflow job for this annotation

GitHub Actions / golangci-lint

Function 'initStream' is too long (63 > 60) (funlen)
streamCtx, streamClose := context.WithCancelCause(xcontext.ValueOnly(ctx))
l.streamClose = streamClose
initDone := make(empty.Chan)
Expand Down
1 change: 1 addition & 0 deletions internal/topic/topicreadercommon/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func (c *Committer) pushCommitsLoop(ctx context.Context) {

onDone := trace.TopicOnReaderSendCommitMessage(
c.tracer,
&ctx,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in other place.
Don't transfer pointer to owned context directly - you shold copy local context, pass pointer to copy, them copy result to local context.

Pointer to context may be stored and then context can be changed by other code and data race may be happen.

example

&commits,
)
err := c.send(commits.ToRawMessage())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,6 @@ type batchedStreamReader interface {
Commit(ctx context.Context, commitRange topicreadercommon.CommitRange) error
CloseWithError(ctx context.Context, err error) error
PopMessagesBatchTx(ctx context.Context, tx tx.Transaction, opts ReadMessageBatchOptions) (*topicreadercommon.PublicBatch, error) //nolint:lll
GetLogContext() context.Context
SetLogContext(ctx context.Context)
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 17 additions & 7 deletions internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@
readerID int64
}

func (r *Reader) GetLogContext() context.Context {
return r.reader.GetLogContext()
}

Check failure on line 40 in internal/topic/topicreaderinternal/reader.go

View workflow job for this annotation

GitHub Actions / golangci-lint

File is not `gofumpt`-ed (gofumpt)
func (r *Reader) SetLogContext(ctx context.Context) {
r.reader.SetLogContext(ctx)
}

type ReadMessageBatchOptions struct {
batcherGetOptions
}
Expand Down Expand Up @@ -89,14 +96,17 @@
return newTopicStreamReader(client, readerID, stream, cfg.topicStreamReaderConfig)
}

reader := newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
cfg.BaseContext,
cfg.RetrySettings,
cfg.Trace,
)

res := Reader{
reader: newReaderReconnector(
readerID,
readerConnector,
cfg.OperationTimeout(),
cfg.RetrySettings,
cfg.Trace,
),
reader: reader,
defaultBatchConfig: cfg.DefaultBatchConfig,
tracer: cfg.Trace,
readerID: readerID,
Expand Down
Loading
Loading