From 3713b3e2ad65686d59e1e20df126cc2de71e6c7f Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov Date: Fri, 24 Jan 2025 10:18:38 +0300 Subject: [PATCH] fixed TestIssue798TruncatedError --- CHANGELOG.md | 2 +- internal/table/config/config.go | 17 +- internal/table/session.go | 2 +- options.go | 4 +- tests/integration/table_truncated_err_test.go | 570 ++++++++++++------ 5 files changed, 393 insertions(+), 202 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e5e49c8a..4a826f11b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -* Added environment variable `YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE` for execute data queries from table service client using query client API +* Added `ydb.WithExecuteDataQueryOverQueryClient(bool)` option for execute data queries from table service client using query client API ## v3.98.0 * Supported pool of encoders, which implement ResetableWriter interface diff --git a/internal/table/config/config.go b/internal/table/config/config.go index b8906df5c..e3836ecc2 100644 --- a/internal/table/config/config.go +++ b/internal/table/config/config.go @@ -1,7 +1,6 @@ package config import ( - "os" "time" "github.com/jonboulle/clockwork" @@ -290,16 +289,12 @@ func (c *Config) DeleteTimeout() time.Duration { } func defaults() *Config { - executeDataQueryOverQueryService := os.Getenv("YDB_EXECUTE_DATA_QUERY_OVER_QUERY_SERVICE") != "" - return &Config{ - sizeLimit: DefaultSessionPoolSizeLimit, - createSessionTimeout: DefaultSessionPoolCreateSessionTimeout, - deleteTimeout: DefaultSessionPoolDeleteTimeout, - idleThreshold: DefaultSessionPoolIdleThreshold, - clock: clockwork.NewRealClock(), - trace: &trace.Table{}, - useQuerySession: executeDataQueryOverQueryService, - executeDataQueryOverQueryService: executeDataQueryOverQueryService, + sizeLimit: DefaultSessionPoolSizeLimit, + createSessionTimeout: DefaultSessionPoolCreateSessionTimeout, + deleteTimeout: DefaultSessionPoolDeleteTimeout, + idleThreshold: DefaultSessionPoolIdleThreshold, + clock: clockwork.NewRealClock(), + trace: &trace.Table{}, } } diff --git a/internal/table/session.go b/internal/table/session.go index 4068ced0e..9c8db9c53 100644 --- a/internal/table/session.go +++ b/internal/table/session.go @@ -378,7 +378,7 @@ func newTableSession(ctx context.Context, cc grpc.ClientConnInterface, config *c ) s.executor = tableExecutor{ client: s.client, - ignoreTruncated: false, + ignoreTruncated: s.config.IgnoreTruncated(), } return s, nil diff --git a/options.go b/options.go index 330669ce9..fcda0572c 100644 --- a/options.go +++ b/options.go @@ -558,9 +558,9 @@ func WithSessionPoolIdleThreshold(idleThreshold time.Duration) Option { // WithExecuteDataQueryOverQueryClient overrides table.Session.Execute with query service // execute with materialized result -func WithExecuteDataQueryOverQueryClient() Option { +func WithExecuteDataQueryOverQueryClient(b bool) Option { return func(ctx context.Context, d *Driver) error { - d.tableOptions = append(d.tableOptions, tableConfig.ExecuteDataQueryOverQueryService(true)) + d.tableOptions = append(d.tableOptions, tableConfig.ExecuteDataQueryOverQueryService(b)) return nil } diff --git a/tests/integration/table_truncated_err_test.go b/tests/integration/table_truncated_err_test.go index 6a1a512a7..ba84fa218 100644 --- a/tests/integration/table_truncated_err_test.go +++ b/tests/integration/table_truncated_err_test.go @@ -7,12 +7,10 @@ import ( "context" "database/sql" "fmt" - "slices" "strconv" "testing" "github.com/ydb-platform/ydb-go-sdk/v3" - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsql" "github.com/ydb-platform/ydb-go-sdk/v3/retry" "github.com/ydb-platform/ydb-go-sdk/v3/table" "github.com/ydb-platform/ydb-go-sdk/v3/table/options" @@ -23,110 +21,167 @@ import ( // https://github.com/ydb-platform/ydb-go-sdk/issues/798 func TestIssue798TruncatedError(t *testing.T) { const rowsLimit = 1000 - var ( - scope = newScope(t) - driver = scope.Driver() - db = scope.SQLDriver() - tablePath = scope.TablePath() - ) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // clear table - { - driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, _, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("DELETE FROM `%s`;", tablePath), - nil, - ) - if err != nil { + t.Run("TruncatedErrorOverTableService", func(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(false)) + db = scope.SQLDriver(ydb.WithQueryService(false)) + tablePath = scope.TablePath() + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // clear table + { + driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, _, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("DELETE FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + return err - } + }, table.WithIdempotent()) + } - return err - }, table.WithIdempotent()) - } - - // upsert rows - { - rows := make([]types.Value, rowsLimit) - for i := range rows { - rows[i] = types.StructValue( - types.StructFieldValue("id", types.Int64Value(int64(i))), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), + // upsert rows + { + rows := make([]types.Value, rowsLimit) + for i := range rows { + rows[i] = types.StructValue( + types.StructFieldValue("id", types.Int64Value(int64(i))), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), + ) + } + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), ) + scope.Require.NoError(err) } - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), - ) - scope.Require.NoError(err) - } - - // select rows without truncated error - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, + + // select rows without truncated error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + + return results.Err() + }, table.WithIdempotent()) + scope.Require.NoError(err) + + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + if count != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", count) + } + + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } + + // upsert 1 row for get 1001 rows and truncated error + { + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(types.StructValue( + types.StructFieldValue("id", types.Int64Value(rowsLimit)), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), + ))), + table.WithIdempotent(), ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if results.CurrentResultSet().RowCount() != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() - }, table.WithIdempotent()) - scope.Require.NoError(err) + scope.Require.NoError(err) + } - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err - } - defer func() { - _ = rows.Close() - }() - count := 0 - for rows.Next() { - count++ - } - if count != rowsLimit { - return fmt.Errorf("unexpected rows count: %d", count) + // select all rows with truncated result error + { + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + + rowsCount := results.CurrentResultSet().RowCount() + + if rowsCount != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return results.Err() // expected truncated error for execute data query using table client + }, table.WithIdempotent()) + scope.Require.ErrorIs(err, result.ErrTruncated) } - return rows.Err() - }, retry.WithIdempotent(true)) - scope.Require.NoError(err) - } - - // upsert 1 row for get 1001 rows and truncated error - { - err := driver.Table().BulkUpsert(ctx, tablePath, - table.BulkUpsertDataRows(types.ListValue(types.StructValue( - types.StructFieldValue("id", types.Int64Value(rowsLimit)), - types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), - ))), - table.WithIdempotent(), - ) - scope.Require.NoError(err) - } + { + var rowsCount int + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rowsCount = 0 + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + for rows.Next() { + rowsCount++ + } + + if err := rows.Err(); err != nil { + return err + } - // select all rows with truncated result error - { + if rowsCount != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return nil + }, retry.WithIdempotent(true)) + scope.Require.ErrorIs(err, result.ErrTruncated) + } + } + + // select all rows without truncated result error { - var rowsCount int err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { _, results, err := s.Execute(ctx, table.DefaultTxControl(), fmt.Sprintf("SELECT * FROM `%s`;", tablePath), nil, + options.WithIgnoreTruncated(), ) if err != nil { return err @@ -134,29 +189,122 @@ func TestIssue798TruncatedError(t *testing.T) { if err = results.NextResultSetErr(ctx); err != nil { return fmt.Errorf("no result sets: %w", err) } - - rowsCount = results.CurrentResultSet().RowCount() - if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, rowsCount) { + if rowsCount := results.CurrentResultSet().RowCount(); rowsCount != rowsLimit { return fmt.Errorf("unexpected rows count: %d", rowsCount) } - return results.Err() // expected truncated error + return nil }, table.WithIdempotent()) - switch rowsCount { - case rowsLimit: - scope.Require.ErrorIs(err, result.ErrTruncated) - case rowsLimit + 1: - scope.Require.NoError(err) - default: - scope.Require.Error(err) - scope.Require.FailNow("unexpected rows count: %d", rowsCount) + scope.Require.NoError(err) + } + + // connect with default option ignore truncated without truncated result error + { + driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) + scope.Require.NoError(err) + + err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + + db = sql.OpenDB(ydb.MustConnector(driver)) + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } + }) + t.Run("NoTruncatedErrorOverQueryService", func(t *testing.T) { + var ( + scope = newScope(t) + driver = scope.Driver(ydb.WithExecuteDataQueryOverQueryClient(true)) + db = scope.SQLDriver(ydb.WithQueryService(true)) + tablePath = scope.TablePath() + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // clear table + { + driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, _, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("DELETE FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + + return err + }, table.WithIdempotent()) + } + + // upsert rows + { + rows := make([]types.Value, rowsLimit) + for i := range rows { + rows[i] = types.StructValue( + types.StructFieldValue("id", types.Int64Value(int64(i))), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(i))), + ) } + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(rows...)), table.WithIdempotent(), + ) + scope.Require.NoError(err) } + // select rows without truncated error { - var rowsCount int - err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rowsCount = 0 + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + + return results.Err() + }, table.WithIdempotent()) + scope.Require.NoError(err) + + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) if err != nil { return err @@ -164,95 +312,143 @@ func TestIssue798TruncatedError(t *testing.T) { defer func() { _ = rows.Close() }() + count := 0 for rows.Next() { - rowsCount++ + count++ } - if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, rowsCount) { - return fmt.Errorf("unexpected rows count: %d", rowsCount) + if count != rowsLimit { + return fmt.Errorf("unexpected rows count: %d", count) } return rows.Err() }, retry.WithIdempotent(true)) - switch driverEngine(db) { - case xsql.LEGACY: - switch rowsCount { - case rowsLimit: - scope.Require.ErrorIs(err, result.ErrTruncated) - case rowsLimit + 1: - scope.Require.NoError(err) - default: - scope.Require.Error(err) - scope.Require.FailNow("unexpected rows count: %d", rowsCount) - } - scope.Require.ErrorIs(err, result.ErrTruncated) - case xsql.PROPOSE: - scope.Require.NoError(err) - } + scope.Require.NoError(err) } - } - - // select all rows without truncated result error - { - err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, - options.WithIgnoreTruncated(), - ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, results.CurrentResultSet().RowCount()) { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() // expected nil - }, table.WithIdempotent()) - scope.Require.NoError(err) - } - - // connect with default option ignore truncated without truncated result error - { - driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) - scope.Require.NoError(err) - - err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { - _, results, err := s.Execute(ctx, - table.DefaultTxControl(), - fmt.Sprintf("SELECT * FROM `%s`;", tablePath), - nil, + + // upsert 1 row for get 1001 rows and truncated error + { + err := driver.Table().BulkUpsert(ctx, tablePath, + table.BulkUpsertDataRows(types.ListValue(types.StructValue( + types.StructFieldValue("id", types.Int64Value(rowsLimit)), + types.StructFieldValue("val", types.TextValue(strconv.Itoa(rowsLimit))), + ))), + table.WithIdempotent(), ) - if err != nil { - return err - } - if err = results.NextResultSetErr(ctx); err != nil { - return fmt.Errorf("no result sets: %w", err) - } - if !slices.Contains([]int{rowsLimit, rowsLimit + 1}, results.CurrentResultSet().RowCount()) { - return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) - } - return results.Err() // expected nil - }, table.WithIdempotent()) - scope.Require.NoError(err) - - db = sql.OpenDB(ydb.MustConnector(driver)) - err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { - rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) - if err != nil { - return err + scope.Require.NoError(err) + } + + // select all rows with truncated result error + { + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + + rowsCount := results.CurrentResultSet().RowCount() + if rowsCount != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return results.Err() // expected truncated error for execute data query using table client + }, table.WithIdempotent()) + scope.Require.NoError(err) } - defer func() { - _ = rows.Close() - }() - count := 0 - for rows.Next() { - count++ + + { + var rowsCount int + err := retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rowsCount = 0 + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + for rows.Next() { + rowsCount++ + } + if rowsCount != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", rowsCount) + } + + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) } - return rows.Err() - }, retry.WithIdempotent(true)) - scope.Require.NoError(err) - } + } + + // select all rows without truncated result error + { + err := driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + } + + // connect with default option ignore truncated without truncated result error + { + driver, err := driver.With(ctx, ydb.WithIgnoreTruncated()) + scope.Require.NoError(err) + + err = driver.Table().Do(ctx, func(ctx context.Context, s table.Session) error { + _, results, err := s.Execute(ctx, + table.DefaultTxControl(), + fmt.Sprintf("SELECT * FROM `%s`;", tablePath), + nil, + ) + if err != nil { + return err + } + if err = results.NextResultSetErr(ctx); err != nil { + return fmt.Errorf("no result sets: %w", err) + } + if results.CurrentResultSet().RowCount() != rowsLimit+1 { + return fmt.Errorf("unexpected rows count: %d", results.CurrentResultSet().RowCount()) + } + return results.Err() // expected nil + }, table.WithIdempotent()) + scope.Require.NoError(err) + + db = sql.OpenDB(ydb.MustConnector(driver)) + err = retry.Do(ctx, db, func(ctx context.Context, cc *sql.Conn) error { + rows, err := cc.QueryContext(ctx, fmt.Sprintf("SELECT * FROM `%s`;", tablePath)) + if err != nil { + return err + } + defer func() { + _ = rows.Close() + }() + count := 0 + for rows.Next() { + count++ + } + return rows.Err() + }, retry.WithIdempotent(true)) + scope.Require.NoError(err) + } + }) }