Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
asmyasnikov committed Dec 11, 2024
1 parent 43abd71 commit 7de1f3b
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 13 deletions.
10 changes: 10 additions & 0 deletions internal/xsql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ var (

type (
connWrapper struct {
processor Engine

cc conn.Conn
currentTx *txWrapper

Expand All @@ -45,6 +47,10 @@ type (
}
)

func (c *connWrapper) Engine() Engine {
return c.processor
}

func (c *connWrapper) Ping(ctx context.Context) error {
return c.cc.Ping(ctx)
}
Expand Down Expand Up @@ -82,6 +88,10 @@ func (c *connWrapper) Close() error {
return nil
}

func (c *connWrapper) Connector() *Connector {
return c.connector
}

func (c *connWrapper) Begin() (driver.Tx, error) {
return nil, xerrors.WithStackTrace(errDeprecated)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/xsql/conn/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ import (
var (
ErrUnsupported = driver.ErrSkip
errDeprecated = driver.ErrSkip
errConnClosedEarly = xerrors.Retryable(errors.New("Conn closed early"), xerrors.InvalidObject())
errNotReadyConn = xerrors.Retryable(errors.New("Conn not ready"), xerrors.InvalidObject())
errConnClosedEarly = xerrors.Retryable(errors.New("conn closed early"), xerrors.InvalidObject())
errNotReadyConn = xerrors.Retryable(errors.New("conn not ready"), xerrors.InvalidObject())
)
15 changes: 9 additions & 6 deletions internal/xsql/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql/driver"
"io"
"os"
"strconv"
"time"

"github.com/google/uuid"
Expand All @@ -31,12 +32,12 @@ var (
)

type (
queryProcessor uint8
Connector struct {
Engine uint8
Connector struct {
parent ydbDriver
balancer grpc.ClientConnInterface

queryProcessor queryProcessor
processor Engine

TableOpts []connOverTableServiceClient.Option
QueryOpts []connOverQueryServiceClient.Option
Expand Down Expand Up @@ -112,7 +113,7 @@ func (c *Connector) Open(name string) (driver.Conn, error) {
}

func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
switch c.queryProcessor {
switch c.processor {
case QUERY_SERVICE:
s, err := query.CreateSession(ctx, c.Query())
if err != nil {
Expand All @@ -122,6 +123,7 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
id := uuid.New()

conn := &connWrapper{
processor: QUERY_SERVICE,
cc: connOverQueryServiceClient.New(ctx, c, s, append(
c.QueryOpts,
connOverQueryServiceClient.WithOnClose(func() {
Expand All @@ -145,6 +147,7 @@ func (c *Connector) Connect(ctx context.Context) (driver.Conn, error) {
id := uuid.New()

conn := &connWrapper{
processor: TABLE_SERVICE,
cc: connOverTableServiceClient.New(ctx, c, s, append(c.TableOpts,
connOverTableServiceClient.WithOnClose(func() {
c.conns.Delete(id)
Expand Down Expand Up @@ -189,8 +192,8 @@ func Open(parent ydbDriver, balancer grpc.ClientConnInterface, opts ...Option) (
c := &Connector{
parent: parent,
balancer: balancer,
queryProcessor: func() queryProcessor {
if v, has := os.LookupEnv("YDB_DATABASE_SQL_OVER_QUERY_SERVICE"); has && v != "" {
processor: func() Engine {
if overQueryService, _ := strconv.ParseBool(os.Getenv("YDB_DATABASE_SQL_OVER_QUERY_SERVICE")); overQueryService {
return QUERY_SERVICE
}

Expand Down
4 changes: 2 additions & 2 deletions internal/xsql/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type (
bindOption struct {
bind.Bind
}
queryProcessorOption queryProcessor
queryProcessorOption Engine
)

func (t tablePathPrefixOption) Apply(c *Connector) error {
Expand All @@ -52,7 +52,7 @@ func (t tablePathPrefixOption) Apply(c *Connector) error {
}

func (processor queryProcessorOption) Apply(c *Connector) error {
c.queryProcessor = queryProcessor(processor)
c.processor = Engine(processor)

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions retry/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func WithDoRetryOptions(opts ...Option) doRetryOptionsOption {
return opts
}

// Do is a retryer of database/sql Conn with fallbacks on errors
// Do is a retryer of database/sql conn with fallbacks on errors
func Do(ctx context.Context, db *sql.DB, op func(ctx context.Context, cc *sql.Conn) error, opts ...doOption) error {
_, err := DoWithResult(ctx, db, func(ctx context.Context, cc *sql.Conn) (*struct{}, error) {
err := op(ctx, cc)
Expand All @@ -57,7 +57,7 @@ func Do(ctx context.Context, db *sql.DB, op func(ctx context.Context, cc *sql.Co
return nil
}

// DoWithResult is a retryer of database/sql Conn with fallbacks on errors
// DoWithResult is a retryer of database/sql conn with fallbacks on errors
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func DoWithResult[T any](ctx context.Context, db *sql.DB,
Expand Down
8 changes: 8 additions & 0 deletions sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ func WithDefaultQueryMode(mode QueryMode) ConnectorOption {
)
}

// OverQueryService is an experimental flag for create database/sql driver over query service client
//
// By default database/sql driver works over table service client
// Default will be changed to `OverQueryService` after March 2025
func OverQueryService() ConnectorOption {
return xsql.OverQueryService()
}

func WithFakeTx(modes ...QueryMode) ConnectorOption {
opts := make([]ConnectorOption, 0, len(modes))

Expand Down
24 changes: 24 additions & 0 deletions tests/integration/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/log"
Expand Down Expand Up @@ -443,3 +444,26 @@ func (t *testLogger) flush() {
t.test.Log(message)
})
}

func driverEngine(db *sql.DB) (engine xsql.Engine) {
cc, err := db.Conn(context.Background())
if err != nil {
return engine
}

defer func() {
_ = cc.Close()
}()

cc.Raw(func(driverConn any) error {
if ccc, has := driverConn.(interface {
Engine() xsql.Engine
}); has {
engine = ccc.Engine()
}

return nil
})

return engine
}
8 changes: 7 additions & 1 deletion tests/integration/table_truncated_err_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
"github.com/ydb-platform/ydb-go-sdk/v3/table"
"github.com/ydb-platform/ydb-go-sdk/v3/table/options"
Expand Down Expand Up @@ -133,7 +134,12 @@ func TestIssue798TruncatedError(t *testing.T) {
}
return rows.Err()
}, retry.WithIdempotent(true))
scope.Require.ErrorIs(err, result.ErrTruncated)
switch driverEngine(db) {
case xsql.TABLE_SERVICE:
scope.Require.ErrorIs(err, result.ErrTruncated)
case xsql.QUERY_SERVICE:
scope.Require.NoError(err)
}
}

// select all rows without truncated result error
Expand Down

0 comments on commit 7de1f3b

Please sign in to comment.