Skip to content

Commit

Permalink
* Fixed goroutine leak on closing database/sql driver
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Feb 4, 2025
1 parent 55762c7 commit 23190ab
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 25 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Fixed goroutine leak on closing `database/sql` driver
* "No endpoints" is retriable error now

## v3.99.3
Expand Down
6 changes: 4 additions & 2 deletions internal/repeater/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,10 @@ func (r *repeater) wakeUp(e Event) (err error) {
}

func (r *repeater) worker(ctx context.Context, tick clockwork.Ticker) {
defer close(r.stopped)
defer tick.Stop()
defer func() {
close(r.stopped)
tick.Stop()
}()

// force returns backoff with delays [500ms...32s]
force := backoff.New(
Expand Down
4 changes: 2 additions & 2 deletions internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type (
LegacyOpts []legacy.Option
Options []propose.Option
disableServerBalancer bool
onCLose []func(*Connector)
onClose []func(*Connector)

clock clockwork.Clock
idleThreshold time.Duration
Expand Down Expand Up @@ -204,7 +204,7 @@ func (c *Connector) Close() error {
default:
close(c.done)

for _, onClose := range c.onCLose {
for _, onClose := range c.onClose {
onClose(c)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/xsql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (opt traceRetryOption) Apply(c *Connector) error {
}

func (onClose onCloseOption) Apply(c *Connector) error {
c.onCLose = append(c.onCLose, onClose)
c.onClose = append(c.onClose, onClose)

return nil
}
Expand Down
44 changes: 24 additions & 20 deletions sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,6 @@ var (
_ driver.DriverContext = &sqlDriver{}
)

func (d *sqlDriver) Close() error {
var errs []error
d.connectors.Range(func(c *xsql.Connector, _ *Driver) bool {
if err := c.Close(); err != nil {
errs = append(errs, err)
}

return true
})
if len(errs) > 0 {
return xerrors.NewWithIssues("ydb legacy driver close failed", errs...)
}

return nil
}

// Open returns a new Driver to the ydb.
func (d *sqlDriver) Open(string) (driver.Conn, error) {
return nil, xsql.ErrUnsupported
Expand All @@ -68,15 +52,25 @@ func (d *sqlDriver) OpenConnector(dataSourceName string) (driver.Connector, erro
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to connect by data source name '%s': %w", dataSourceName, err))
}

return Connector(db, db.databaseSQLOptions...)
c, err := connector(db, db.databaseSQLOptions...)
if err != nil {
return nil, xerrors.WithStackTrace(fmt.Errorf("failed to create connector: %w", err))
}

d.attach(c, db)

return c, nil
}

func (d *sqlDriver) attach(c *xsql.Connector, parent *Driver) {
d.connectors.Set(c, parent)
}

func (d *sqlDriver) detach(c *xsql.Connector) {
d.connectors.Delete(c)
parent, _ := d.connectors.Extract(c)
if d.connectors.Len() == 0 && parent != nil {
_ = parent.Close(context.Background())
}
}

type QueryMode int
Expand Down Expand Up @@ -235,7 +229,7 @@ type SQLConnector interface {
Close() error
}

func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
func connector(parent *Driver, opts ...ConnectorOption) (*xsql.Connector, error) {
c, err := xsql.Open(parent, parent.metaBalancer,
append(
append(
Expand All @@ -250,7 +244,17 @@ func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
d.attach(c, parent)

return c, nil
}

func Connector(parent *Driver, opts ...ConnectorOption) (SQLConnector, error) {
c, err := connector(parent, opts...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
}

d.attach(c, nil)

return c, nil
}
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/basic_example_database_sql_bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
)

func TestBasicExampleDatabaseSqlBindings(t *testing.T) {
defer simpleDetectGoroutineLeak(t)

folder := t.Name()

ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/basic_example_database_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
)

func TestBasicExampleDatabaseSql(t *testing.T) {
defer simpleDetectGoroutineLeak(t)

folder := t.Name()

ctx, cancel := context.WithTimeout(xtest.Context(t), 42*time.Second)
Expand Down
13 changes: 13 additions & 0 deletions tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path"
"runtime"
"strings"
"testing"
"text/template"
Expand Down Expand Up @@ -468,3 +469,15 @@ func driverEngine(db *sql.DB) (engine xsql.Engine) {

return engine
}

func simpleDetectGoroutineLeak(t *testing.T) {
// 1) testing.go => main.main()
// 2) current test
const expectedGoroutinesCount = 2
if num := runtime.NumGoroutine(); num > expectedGoroutinesCount {
bb := make([]byte, 2<<32)
if n := runtime.Stack(bb, true); n < len(bb) {
t.Error(fmt.Sprintf("unexpected goroutines:\n%s\n", string(bb[:n])))
}
}
}

0 comments on commit 23190ab

Please sign in to comment.