Skip to content

Commit

Permalink
Merge pull request #1629 from ydb-platform/fix-close
Browse files Browse the repository at this point in the history
* Fixed goroutine leak on closing `database/sql` driver
  • Loading branch information
asmyasnikov authored Feb 4, 2025
2 parents e7acaa3 + 4bf75c2 commit dd9f27b
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed goroutine leak on closing `database/sql` driver
* "No endpoints" is retriable error now

## v3.99.3
Expand Down
4 changes: 2 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ func (d *Driver) Close(ctx context.Context) (finalErr error) {
d.ctxCancel()

defer func() {
for _, f := range d.onClose {
f(d)
for _, onClose := range d.onClose {
onClose(d)
}
}()

Expand Down
6 changes: 4 additions & 2 deletions internal/repeater/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ func (r *repeater) wakeUp(e Event) (err error) {
}

func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
defer close(r.stopped)
defer tick.Stop()
defer func() {
close(r.stopped)
tick.Stop()
}()

// force returns backoff with delays [500ms...32s]
force := backoff.New(
Expand Down
4 changes: 2 additions & 2 deletions internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type (
LegacyOpts []legacy.Option
Options []propose.Option
disableServerBalancer bool
onCLose []func(*Connector)
onClose []func(*Connector)

clock clockwork.Clock
idleThreshold time.Duration
Expand Down Expand Up @@ -204,7 +204,7 @@ func (c *Connector) Close() error {
default:
close(c.done)

for _, onClose := range c.onCLose {
for _, onClose := range c.onClose {
onClose(c)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (opt traceRetryOption) Apply(c *Connector) error {
}

func (onClose onCloseOption) Apply(c *Connector) error {
c.onCLose = append(c.onCLose, onClose)
c.onClose = append(c.onClose, onClose)

return nil
}
Expand Down
40 changes: 10 additions & 30 deletions sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/legacy"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql/propose"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
Expand All @@ -32,31 +31,13 @@ func withConnectorOptions(opts ...ConnectorOption) Option {
}
}

type sqlDriver struct {
connectors xsync.Map[*xsql.Connector, *Driver]
}
type sqlDriver struct{}

var (
_ driver.Driver = &sqlDriver{}
_ driver.DriverContext = &sqlDriver{}
)

func (d *sqlDriver) Close() error {
var errs []error
d.connectors.Range(func(c *xsql.Connector, _ *Driver) bool {
if err := c.Close(); err != nil {
errs = append(errs, err)
}

return true
})
if len(errs) > 0 {
return xerrors.NewWithIssues("ydb legacy driver close failed", errs...)
}

return nil
}

// Open returns a new Driver to the ydb.
func (d *sqlDriver) Open(string) (driver.Conn, error) {
return nil, xsql.ErrUnsupported
Expand All @@ -68,15 +49,16 @@ func (d *sqlDriver) OpenConnector(dataSourceName string) (driver.Connector, erro
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to connect by data source name '%s': %w", dataSourceName, err))
}

return Connector(db, db.databaseSQLOptions...)
}

func (d *sqlDriver) attach(c *xsql.Connector, parent *Driver) {
d.connectors.Set(c, parent)
}
c, err := Connector(db, append(db.databaseSQLOptions,
xsql.WithOnClose(func(connector *xsql.Connector) {
_ = db.Close(context.Background())
}),
)...)
if err != nil {
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to create connector: %w", err))
}

func (d *sqlDriver) detach(c *xsql.Connector) {
d.connectors.Delete(c)
return c, nil
}

type QueryMode int
Expand Down Expand Up @@ -242,15 +224,13 @@ func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
parent.databaseSQLOptions,
opts...,
),
xsql.WithOnClose(d.detach),
xsql.WithTraceRetry(parent.config.TraceRetry()),
xsql.WithRetryBudget(parent.config.RetryBudget()),
)...,
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.attach(c, parent)

return c, nil
}
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/basic_example_database_sql_bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
)

func TestBasicExampleDatabaseSqlBindings(t *testing.T) {
defer simpleDetectGoroutineLeak(t)

folder := t.Name()

ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/basic_example_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
)

func TestBasicExampleDatabaseSql(t *testing.T) {
defer simpleDetectGoroutineLeak(t)

folder := t.Name()

ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path"
"runtime"
"strings"
"testing"
"text/template"
Expand Down Expand Up @@ -468,3 +469,16 @@ func driverEngine(db *sql.DB) (engine xsql.Engine) {

return engine
}

func simpleDetectGoroutineLeak(t *testing.T) {
// 1) testing.go => main.main()
// 2) current test
const expectedGoroutinesCount = 2
if num := runtime.NumGoroutine(); num > expectedGoroutinesCount {
bb := make([]byte, 2<<32)
if n := runtime.Stack(bb, true); n < len(bb) {
bb = bb[:n]
}
t.Error(fmt.Sprintf("unexpected goroutines:\n%s\n", string(bb[runtime.Stack(bb, false)+1:])))
}
}

0 comments on commit dd9f27b

Please sign in to comment.