Skip to content

Commit

Permalink
Merge pull request #1598 from ydb-platform/fix-goroutine-leak
Browse files Browse the repository at this point in the history
* Fixed goroutine leak on failed execute call in query client
  • Loading branch information
asmyasnikov authored Dec 23, 2024
2 parents 2855451 + 45f8ca2 commit 2e23e14
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 11 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed goroutine leak on failed execute call in query client

## v3.95.4
* Fixed connections pool leak on closing sessions
* Fixed an error in logging session deletion events
Expand Down
12 changes: 10 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,22 @@ func execute(
return nil, xerrors.WithStackTrace(err)
}

executeCtx := xcontext.ValueOnly(ctx)
executeCtx, executeCancel := xcontext.WithCancel(xcontext.ValueOnly(ctx))
defer func() {
if finalErr != nil {
executeCancel()
}
}()

stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

r, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
r, err := newResult(ctx, stream, append(opts,
withStatsCallback(settings.StatsCallback()),
withOnClose(executeCancel),
)...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down
36 changes: 27 additions & 9 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (
closed chan struct{}
trace *trace.Query
statsCallback func(queryStats stats.QueryStats)
onClose []func()
onNextPartErr []func(err error)
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
}
Expand Down Expand Up @@ -98,6 +99,12 @@ func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption
}
}

func withOnClose(onClose func()) resultOption {
return func(s *streamResult) {
s.onClose = append(s.onClose, onClose)
}
}

func onNextPartErr(callback func(err error)) resultOption {
return func(s *streamResult) {
s.onNextPartErr = append(s.onNextPartErr, callback)
Expand All @@ -115,22 +122,33 @@ func newResult(
stream Ydb_Query_V1.QueryService_ExecuteQueryClient,
opts ...resultOption,
) (_ *streamResult, finalErr error) {
r := streamResult{
stream: stream,
closed: make(chan struct{}),
resultSetIndex: -1,
}
r.closeOnce = sync.OnceFunc(func() {
close(r.closed)
r.stream = nil
})
var (
closed = make(chan struct{})
r = streamResult{
stream: stream,
onClose: []func(){
func() {
close(closed)
},
},
closed: closed,
resultSetIndex: -1,
}
)

for _, opt := range opts {
if opt != nil {
opt(&r)
}
}

r.closeOnce = sync.OnceFunc(func() {
for _, onClose := range r.onClose {
onClose()
}
r.stream = nil
})

if r.trace != nil {
onDone := trace.QueryOnResultNew(r.trace, &ctx,
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/query.newResult"),
Expand Down

0 comments on commit 2e23e14

Please sign in to comment.