Skip to content

Commit

Permalink
improve topic writer logs
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Feb 3, 2025
1 parent 94a33dc commit a6d5183
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added reason of internal topic writer reconnect to logs

## v3.99.3
* Fixed potential infinity loop for local dc detection (CWE-835)
* Fixed nil pointer dereferenced in a topic listener (CWE-476)
Expand Down
25 changes: 12 additions & 13 deletions internal/topic/topicwriterinternal/writer_reconnector.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,24 @@ func (w *WriterReconnector) connectionLoop(ctx context.Context) {
}
}

writer, err := w.startWriteStream(ctx, streamCtx, attempt)
onWriterStarted := trace.TopicOnWriterReconnect(
w.cfg.Tracer,
w.writerInstanceID,
w.cfg.topic,
w.cfg.producerID,
attempt,
)

writer, err := w.startWriteStream(ctx, streamCtx)
w.onWriterChange(writer)
onStreamError := onWriterStarted(err)
if err == nil {
reconnectReason = writer.WaitClose(ctx)
startOfRetries = time.Now()
} else {
reconnectReason = err
}
onStreamError(reconnectReason)
}
}

Expand Down Expand Up @@ -467,21 +477,10 @@ func (w *WriterReconnector) handleReconnectRetry(
return false
}

func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context, attempt int) (
func (w *WriterReconnector) startWriteStream(ctx, streamCtx context.Context) (
writer *SingleStreamWriter,
err error,
) {
traceOnDone := trace.TopicOnWriterReconnect(
w.cfg.Tracer,
w.writerInstanceID,
w.cfg.topic,
w.cfg.producerID,
attempt,
)
defer func() {
traceOnDone(err)
}()

stream, err := w.connectWithTimeout(streamCtx)
if err != nil {
return nil, err
Expand Down
18 changes: 14 additions & 4 deletions log/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
///
t.OnWriterReconnect = func(
info trace.TopicWriterReconnectStartInfo,
) func(doneInfo trace.TopicWriterReconnectDoneInfo) {
) func(doneInfo trace.TopicWriterReconnectConnectedInfo) func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) {
if d.Details()&trace.TopicWriterStreamLifeCycleEvents == 0 {
return nil
}
Expand All @@ -653,8 +653,9 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
kv.Int("attempt", info.Attempt),
)

return func(doneInfo trace.TopicWriterReconnectDoneInfo) {
if doneInfo.Error == nil {
return func(doneInfo trace.TopicWriterReconnectConnectedInfo) func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) { //nolint:lll
connectedTime := time.Now()
if doneInfo.ConnectionResult == nil {
l.Log(WithLevel(ctx, DEBUG), "connect to topic writer stream completed",
kv.String("topic", info.Topic),
kv.String("producer_id", info.ProducerID),
Expand All @@ -664,14 +665,23 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
)
} else {
l.Log(WithLevel(ctx, WARN), "connect to topic writer stream completed",
kv.Error(doneInfo.Error),
kv.Error(doneInfo.ConnectionResult),
kv.String("topic", info.Topic),
kv.String("producer_id", info.ProducerID),
kv.String("writer_instance_id", info.WriterInstanceID),
kv.Int("attempt", info.Attempt),
kv.Latency(start),
)
}

return func(reconnectDoneInfo trace.TopicWriterReconnectDoneInfo) {
l.Log(WithLevel(ctx, INFO), "stop topic writer stream reason",
kv.String("topic", info.Topic),
kv.String("producer_id", info.ProducerID),
kv.String("writer_instance_id", info.WriterInstanceID),
kv.Duration("write with topic writer stream duration", time.Since(connectedTime)),
kv.NamedError("reason", reconnectDoneInfo.Error))
}
}
}
t.OnWriterInitStream = func(
Expand Down
6 changes: 5 additions & 1 deletion trace/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type (
// TopicWriterStreamLifeCycleEvents

// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
OnWriterReconnect func(TopicWriterReconnectStartInfo) func(TopicWriterReconnectDoneInfo)
OnWriterReconnect func(TopicWriterReconnectStartInfo) func(TopicWriterReconnectConnectedInfo) func(TopicWriterReconnectDoneInfo) //nolint:lll
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
OnWriterInitStream func(TopicWriterInitStreamStartInfo) func(TopicWriterInitStreamDoneInfo)
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
Expand Down Expand Up @@ -424,6 +424,10 @@ type (
Attempt int
}

TopicWriterReconnectConnectedInfo struct {
ConnectionResult error
}

// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
TopicWriterReconnectDoneInfo struct {
Error error
Expand Down
68 changes: 49 additions & 19 deletions trace/topic_gtrace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit a6d5183

Please sign in to comment.