Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jan 27, 2025
1 parent 86394b3 commit 2fd3c4e
Show file tree
Hide file tree
Showing 14 changed files with 361 additions and 386 deletions.
24 changes: 12 additions & 12 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
Expand All @@ -35,6 +34,7 @@ var (
)

type (
Status uint32
sessionPool interface {
closer.Closer

Expand Down Expand Up @@ -208,14 +208,14 @@ func do(
opts ...retry.Option,
) (finalErr error) {
err := pool.With(ctx, func(ctx context.Context, s *Session) error {
s.SetStatus(session.StatusInUse)
s.SetStatus(StatusInUse)

err := op(ctx, s)
if err != nil {
return xerrors.WithStackTrace(err)
}

s.SetStatus(session.StatusIdle)
s.SetStatus(StatusIdle)

return nil
}, opts...)
Expand Down Expand Up @@ -326,7 +326,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
onDone(finalErr)
}()

row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace()))
row, err := clientQueryRow(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand All @@ -337,7 +337,7 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, WithTrace(s.trace))
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -382,7 +382,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), WithTrace(s.trace),
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
return xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -467,7 +467,7 @@ func (c *Client) QueryResultSet(
onDone(finalErr)
}()

rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), WithTrace(c.config.Trace()))
rs, err := clientQueryResultSet(ctx, c.pool, q, options.ExecuteSettings(opts...), withTrace(c.config.Trace()))
if err != nil {
return nil, xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -526,8 +526,8 @@ func CreateSession(ctx context.Context, c *Client) (*Session, error) {
defer cancelCreate()

s, err := createSession(createCtx, c.client,
session.WithDeleteTimeout(c.config.SessionDeleteTimeout()),
session.WithTrace(c.config.Trace()),
WithDeleteTimeout(c.config.SessionDeleteTimeout()),
WithTrace(c.config.Trace()),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down Expand Up @@ -576,9 +576,9 @@ func New(ctx context.Context, cc grpc.ClientConnInterface, cfg *config.Config) *
defer cancelCreate()

s, err := createSession(createCtx, client,
session.WithConn(cc),
session.WithDeleteTimeout(cfg.SessionDeleteTimeout()),
session.WithTrace(cfg.Trace()),
WithConn(cc),
WithDeleteTimeout(cfg.SessionDeleteTimeout()),
WithTrace(cfg.Trace()),
)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down
9 changes: 4 additions & 5 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3/internal/pool"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/session"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/tx"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
Expand All @@ -48,7 +47,7 @@ func TestClient(t *testing.T) {
Status: Ydb.StatusIds_SUCCESS,
}, nil)
attached := 0
s, err := createSession(ctx, client, session.WithTrace(
s, err := createSession(ctx, client, WithTrace(
&trace.Query{
OnSessionAttach: func(info trace.QuerySessionAttachStartInfo) func(info trace.QuerySessionAttachDoneInfo) {
return func(info trace.QuerySessionAttachDoneInfo) {
Expand Down Expand Up @@ -1541,18 +1540,18 @@ func TestClient(t *testing.T) {

type sessionControllerMock struct {
id string
status session.Status
status Status
}

func (s *sessionControllerMock) IsAlive() bool {
return session.IsAlive(s.status)
return IsAlive(s.status)
}

func (s *sessionControllerMock) Close(ctx context.Context) error {
return nil
}

func (s *sessionControllerMock) SetStatus(status session.Status) {
func (s *sessionControllerMock) SetStatus(status Status) {
s.status = status
}

Expand Down
4 changes: 2 additions & 2 deletions internal/query/execute_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func execute(
}

r, err := newResult(ctx, stream, append(opts,
WithStatsCallback(settings.StatsCallback()),
WithOnClose(executeCancel),
withStatsCallback(settings.StatsCallback()),
withOnClose(executeCancel),
)...)
if err != nil {
return nil, xerrors.WithStackTrace(err)
Expand Down
6 changes: 3 additions & 3 deletions internal/query/execute_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func TestExecute(t *testing.T) {
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestExecute(t *testing.T) {
t.Log("execute")
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down Expand Up @@ -718,7 +718,7 @@ func TestExecute(t *testing.T) {
t.Log("execute")
var txID string
r, err := execute(ctx, "123", client, "", options.ExecuteSettings(),
OnTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
onTxMeta(func(txMeta *Ydb_Query.TransactionMeta) {
txID = txMeta.GetId()
}),
)
Expand Down
10 changes: 5 additions & 5 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,31 +87,31 @@ func (r *materializedResult) NextResultSet(ctx context.Context) (result.Set, err
return r.resultSets[r.idx], nil
}

func WithTrace(t *trace.Query) resultOption {
func withTrace(t *trace.Query) resultOption {
return func(s *streamResult) {
s.trace = t
}
}

func WithStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
func withStatsCallback(callback func(queryStats stats.QueryStats)) resultOption {
return func(s *streamResult) {
s.statsCallback = callback
}
}

func WithOnClose(onClose func()) resultOption {
func withOnClose(onClose func()) resultOption {
return func(s *streamResult) {
s.onClose = append(s.onClose, onClose)
}
}

func OnNextPartErr(callback func(err error)) resultOption {
func onNextPartErr(callback func(err error)) resultOption {
return func(s *streamResult) {
s.onNextPartErr = append(s.onNextPartErr, callback)
}
}

func OnTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption {
func onTxMeta(callback func(txMeta *Ydb_Query.TransactionMeta)) resultOption {
return func(s *streamResult) {
s.onTxMeta = append(s.onTxMeta, callback)
}
Expand Down
18 changes: 9 additions & 9 deletions internal/query/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,7 +1544,7 @@ func TestCloseResultOnCloseClosableResultSet(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var closed bool
r, err := newResult(ctx, stream, WithTrace(&trace.Query{
r, err := newResult(ctx, stream, withTrace(&trace.Query{
OnResultClose: func(info trace.QueryResultCloseStartInfo) func(info trace.QueryResultCloseDoneInfo) {
require.False(t, closed)
closed = true
Expand Down Expand Up @@ -1927,7 +1927,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2288,7 +2288,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2650,7 +2650,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -2987,7 +2987,7 @@ func TestResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3359,7 +3359,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -3720,7 +3720,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4082,7 +4082,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down Expand Up @@ -4419,7 +4419,7 @@ func TestMaterializedResultStats(t *testing.T) {
}, nil)
stream.EXPECT().Recv().Return(nil, io.EOF)
var s stats.QueryStats
result, err := newResult(ctx, stream, WithStatsCallback(func(queryStats stats.QueryStats) {
result, err := newResult(ctx, stream, withStatsCallback(func(queryStats stats.QueryStats) {
s = queryStats
}))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 2fd3c4e

Please sign in to comment.