Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Jan 28, 2025
1 parent d63e923 commit 45d5c94
Show file tree
Hide file tree
Showing 10 changed files with 475 additions and 352 deletions.
4 changes: 3 additions & 1 deletion internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jonboulle/clockwork"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/closer"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
Expand All @@ -20,8 +21,9 @@ import (

type (
Item interface {
closer.Closer

IsAlive() bool
Close(ctx context.Context) error
NodeID() uint32
}
ItemConstraint[T any] interface {
Expand Down
19 changes: 9 additions & 10 deletions internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ var (
)

type (
Status uint32
sessionPool interface {
closer.Closer

Expand Down Expand Up @@ -337,10 +336,13 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
settings := options.ExecuteSettings(opts...)
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
streamResult, err := s.execute(ctx, q, settings, withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
defer func() {
_ = streamResult.Close(ctx)
}()

err = readAll(ctx, streamResult)
if err != nil {
Expand Down Expand Up @@ -381,13 +383,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := execute(ctx, s.ID(), s.client, q,
options.ExecuteSettings(opts...), withTrace(s.trace),
)
if err != nil {
return xerrors.WithStackTrace(err)
}

streamResult, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -433,10 +429,13 @@ func clientQueryResultSet(
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
) (rs result.ClosableResultSet, finalErr error) {
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
streamResult, err := s.execute(ctx, q, settings, resultOpts...)
if err != nil {
return xerrors.WithStackTrace(err)
}
defer func() {
_ = streamResult.Close(ctx)
}()

rs, err = readMaterializedResultSet(ctx, streamResult)
if err != nil {
Expand Down
15 changes: 13 additions & 2 deletions internal/query/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1541,6 +1541,7 @@ func TestClient(t *testing.T) {
type sessionControllerMock struct {
id string
status Status
done chan struct{}
}

func (s *sessionControllerMock) IsAlive() bool {
Expand All @@ -1567,16 +1568,26 @@ func (s sessionControllerMock) Status() string {
return s.status.String()
}

func (s sessionControllerMock) Done() <-chan struct{} {
return s.done
}

func newTestSession(id string) *Session {
return &Session{
Core: &sessionControllerMock{id: id},
Core: &sessionControllerMock{
id: id,
done: make(chan struct{}),
},
trace: &trace.Query{},
}
}

func newTestSessionWithClient(id string, client Ydb_Query_V1.QueryServiceClient, lazyTx bool) *Session {
return &Session{
Core: &sessionControllerMock{id: id},
Core: &sessionControllerMock{
id: id,
done: make(chan struct{}),
},
client: client,
trace: &trace.Query{},
lazyTx: lazyTx,
Expand Down
3 changes: 2 additions & 1 deletion internal/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

var (
errNilClient = xerrors.Wrap(errors.New("table client is not initialized"))
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
ErrTransactionRollingBack = xerrors.Wrap(errors.New("the transaction is rolling back"))
errWrongNextResultSetIndex = errors.New("wrong result set index")
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
Expand All @@ -17,4 +17,5 @@ var (
errNilOption = errors.New("nil option")
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
errSessionClosed = errors.New("session is closed")
)
Loading

0 comments on commit 45d5c94

Please sign in to comment.