From b7d2de806cf22f9a70eb4ea242b2c386a497418a Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 16:56:48 +0300 Subject: [PATCH 1/5] check context before call --- internal/query/session_core.go | 4 ++++ internal/table/session.go | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/internal/query/session_core.go b/internal/query/session_core.go index e0f1ca768..c388901f5 100644 --- a/internal/query/session_core.go +++ b/internal/query/session_core.go @@ -240,6 +240,10 @@ func (core *sessionCore) deleteSession(ctx context.Context) (finalErr error) { defer cancel() } + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) + } + _, err := core.Client.DeleteSession(ctx, &Ydb_Query.DeleteSessionRequest{ SessionId: core.id, diff --git a/internal/table/session.go b/internal/table/session.go index 063ddf6ea..ef49db10a 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -357,6 +357,10 @@ func newTableSession( //nolint:funlen status: table.SessionReady, onClose: []func(s *Session) error{ func(s *Session) error { + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) + } + _, err = s.client.DeleteSession(ctx, &Ydb_Table.DeleteSessionRequest{ SessionId: s.id, @@ -471,13 +475,13 @@ func (s *Session) ID() string { return s.id } -func (s *Session) Close(ctx context.Context) (err error) { +func (s *Session) Close(ctx context.Context) (finalErr error) { onDone := trace.TableOnSessionDelete(s.config.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/v3/internal/table.(*Session).Close"), s, ) defer func() { - onDone(err) + onDone(finalErr) s.SetStatus(table.SessionClosed) }() From 7c73ec58e03abc10e571d431963c061d26e5eb8c Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 17:15:28 +0300 Subject: [PATCH 2/5] test of Close --- internal/xsql/connector.go | 2 +- internal/xsql/errors.go | 1 - .../basic_example_database_sql_test.go | 18 ++++++------------ 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/internal/xsql/connector.go b/internal/xsql/connector.go index 5936c321e..089edb6bd 100644 --- a/internal/xsql/connector.go +++ b/internal/xsql/connector.go @@ -200,7 +200,7 @@ func (c *Connector) Parent() ydbDriver { func (c *Connector) Close() error { select { case <-c.done: - return xerrors.WithStackTrace(errAlreadyClosed) + return nil default: close(c.done) diff --git a/internal/xsql/errors.go b/internal/xsql/errors.go index 719d62522..dc9f4a0dd 100644 --- a/internal/xsql/errors.go +++ b/internal/xsql/errors.go @@ -10,7 +10,6 @@ import ( var ( ErrUnsupported = driver.ErrSkip errDeprecated = driver.ErrSkip - errAlreadyClosed = errors.New("already closed") errWrongQueryProcessor = errors.New("wrong query processor") errNotReadyConn = xerrors.Retryable(errors.New("iface not ready"), xerrors.InvalidObject()) ) diff --git a/tests/integration/basic_example_database_sql_test.go b/tests/integration/basic_example_database_sql_test.go index 2a51e4dfe..858993adf 100644 --- a/tests/integration/basic_example_database_sql_test.go +++ b/tests/integration/basic_example_database_sql_test.go @@ -46,14 +46,12 @@ func TestBasicExampleDatabaseSql(t *testing.T) { db, err := sql.Open("ydb", os.Getenv("YDB_CONNECTION_STRING")) require.NoError(t, err) - err = db.PingContext(ctx) - require.NoError(t, err) + require.NoError(t, db.PingContext(ctx)) _, err = ydb.Unwrap(db) require.NoError(t, err) - err = db.Close() - require.NoError(t, err) + require.NoError(t, db.Close()) }) t.Run("sql.OpenDB", func(t *testing.T) { @@ -64,26 +62,22 @@ func TestBasicExampleDatabaseSql(t *testing.T) { require.NoError(t, err) defer func() { - // cleanup - _ = nativeDriver.Close(ctx) + require.NoError(t, nativeDriver.Close(ctx)) }() c, err := ydb.Connector(nativeDriver) require.NoError(t, err) defer func() { - // cleanup - _ = c.Close() + require.NoError(t, c.Close()) }() db := sql.OpenDB(c) defer func() { - // cleanup - _ = db.Close() + require.NoError(t, db.Close()) }() - err = db.PingContext(ctx) - require.NoError(t, err) + require.NoError(t, db.PingContext(ctx)) db.SetMaxOpenConns(50) db.SetMaxIdleConns(50) From caf6f23e8501c8b33f32ed6313b86a869fc1e5b8 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 17:47:07 +0300 Subject: [PATCH 3/5] fixed context usage for closing of sessions --- internal/table/retry_test.go | 4 +- internal/table/session.go | 91 ++++++++++++++++++++---------------- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/internal/table/retry_test.go b/internal/table/retry_test.go index 8d7ba91e1..681537518 100644 --- a/internal/table/retry_test.go +++ b/internal/table/retry_test.go @@ -88,11 +88,11 @@ func TestDoBadSession(t *testing.T) { p := pool.New[*Session, Session](ctx, pool.WithCreateItemFunc[*Session, Session](func(ctx context.Context) (*Session, error) { s := simpleSession(t) - s.onClose = append(s.onClose, func(s *Session) error { + s.closeOnce = func(_ context.Context) error { closed[s] = true return nil - }) + } return s, nil }), diff --git a/internal/table/session.go b/internal/table/session.go index ef49db10a..9a3407b2a 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -35,6 +35,7 @@ import ( "github.com/ydb-platform/ydb-go-sdk/v3/internal/value" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext" "github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors" + "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" @@ -248,7 +249,7 @@ var ( // Note that after session is no longer needed it should be destroyed by // Close() call. type Session struct { - onClose []func(s *Session) error + closeOnce func(ctx context.Context) error id string client Ydb_Table_V1.TableServiceClient status table.SessionStatus @@ -329,7 +330,7 @@ func newSession(ctx context.Context, cc grpc.ClientConnInterface, config *config return newTableSession(ctx, cc, config) } -func newTableSession( //nolint:funlen +func newTableSession( ctx context.Context, cc grpc.ClientConnInterface, config *config.Config, ) (*Session, error) { response, err := Ydb_Table_V1.NewTableServiceClient(cc).CreateSession(ctx, @@ -355,29 +356,6 @@ func newTableSession( //nolint:funlen id: result.GetSessionId(), config: config, status: table.SessionReady, - onClose: []func(s *Session) error{ - func(s *Session) error { - if err := ctx.Err(); err != nil { - return xerrors.WithStackTrace(err) - } - - _, err = s.client.DeleteSession(ctx, - &Ydb_Table.DeleteSessionRequest{ - SessionId: s.id, - OperationParams: operation.Params(ctx, - s.config.OperationTimeout(), - s.config.OperationCancelAfter(), - operation.ModeSync, - ), - }, - ) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - }, } s.lastUsage.Store(time.Now().Unix()) @@ -391,6 +369,7 @@ func newTableSession( //nolint:funlen }, ), ) + s.closeOnce = xsync.OnceFunc(closeTableSession(s.client, s.config, s.id)) s.dataQuery = tableClientExecutor{ client: s.client, ignoreTruncated: s.config.IgnoreTruncated(), @@ -399,7 +378,37 @@ func newTableSession( //nolint:funlen return s, nil } -func newQuerySession( //nolint:funlen +func closeTableSession(c Ydb_Table_V1.TableServiceClient, cfg *config.Config, id string) func(context.Context) error { + return func(ctx context.Context) error { + if err := ctx.Err(); err != nil { + return xerrors.WithStackTrace(err) + } + + if t := cfg.DeleteTimeout(); t > 0 { + var cancel context.CancelFunc + ctx, cancel = xcontext.WithTimeout(ctx, t) + defer cancel() + } + + _, err := c.DeleteSession(ctx, + &Ydb_Table.DeleteSessionRequest{ + SessionId: id, + OperationParams: operation.Params(ctx, + cfg.OperationTimeout(), + cfg.OperationCancelAfter(), + operation.ModeSync, + ), + }, + ) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + } +} + +func newQuerySession( ctx context.Context, cc grpc.ClientConnInterface, config *config.Config, ) (*Session, error) { s := &Session{ @@ -442,16 +451,7 @@ func newQuerySession( //nolint:funlen }, ), ) - s.onClose = []func(s *Session) error{ - func(s *Session) error { - err := core.Close(ctx) - if err != nil { - return xerrors.WithStackTrace(err) - } - - return nil - }, - } + s.closeOnce = xsync.OnceFunc(closeQuerySession(core)) if config.ExecuteDataQueryOverQueryService() { s.dataQuery = queryClientExecutor{ core: core, @@ -467,6 +467,17 @@ func newQuerySession( //nolint:funlen return s, nil } +func closeQuerySession(core query.Core) func(context.Context) error { + return func(ctx context.Context) error { + err := core.Close(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } + + return nil + } +} + func (s *Session) ID() string { if s == nil { return "" @@ -485,11 +496,9 @@ func (s *Session) Close(ctx context.Context) (finalErr error) { s.SetStatus(table.SessionClosed) }() - for _, onClose := range s.onClose { - err := onClose(s) - if err != nil { - return xerrors.WithStackTrace(err) - } + err := s.closeOnce(ctx) + if err != nil { + return xerrors.WithStackTrace(err) } return nil From b1ca5be8669f8b84351b02bb8ae633bfce73980f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 17:49:48 +0300 Subject: [PATCH 4/5] CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d57b0fa6..31408b949 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,4 @@ +* Fixed bug with wrong context on session closing * Fixed goroutine leak on closing `database/sql` driver * "No endpoints" is retriable error now From 54669b414e381131931a66c54b50d45ccff60df1 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Tue, 4 Feb 2025 18:03:48 +0300 Subject: [PATCH 5/5] fixed test --- internal/table/session.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/table/session.go b/internal/table/session.go index 9a3407b2a..f60ee4f51 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -496,9 +496,11 @@ func (s *Session) Close(ctx context.Context) (finalErr error) { s.SetStatus(table.SessionClosed) }() - err := s.closeOnce(ctx) - if err != nil { - return xerrors.WithStackTrace(err) + if s.closeOnce != nil { + err := s.closeOnce(ctx) + if err != nil { + return xerrors.WithStackTrace(err) + } } return nil