Skip to content

Commit 2ffba6b

Browse files
committed
Add concurrent result sets in db.Query.Query(...)
1 parent 8f235f3 commit 2ffba6b

File tree

5 files changed

+65
-18
lines changed

5 files changed

+65
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
* Supported `sql.Null*` from `database/sql` as query params in `toValue` func
2+
* Added `WithConcurrentResultSets` option for `db.Query().Query()`
23

34
## v3.118.0
45
* Added support for nullable `Date32`, `Datetime64`, `Timestamp64`, and `Interval64` types in the `optional` parameter builder

internal/query/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
404404
if err != nil {
405405
return xerrors.WithStackTrace(err)
406406
}
407+
407408
defer func() {
408409
_ = streamResult.Close(ctx)
409410
}()

internal/query/client_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,24 @@ func TestClient(t *testing.T) {
922922
Status: Ydb.StatusIds_SUCCESS,
923923
ResultSetIndex: 0,
924924
ResultSet: &Ydb.ResultSet{
925+
Columns: []*Ydb.Column{
926+
{
927+
Name: "a",
928+
Type: &Ydb.Type{
929+
Type: &Ydb.Type_TypeId{
930+
TypeId: Ydb.Type_UINT64,
931+
},
932+
},
933+
},
934+
{
935+
Name: "b",
936+
Type: &Ydb.Type{
937+
Type: &Ydb.Type_TypeId{
938+
TypeId: Ydb.Type_UTF8,
939+
},
940+
},
941+
},
942+
},
925943
Rows: []*Ydb.Value{
926944
{
927945
Items: []*Ydb.Value{{

internal/query/result.go

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import (
88
"time"
99

1010
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1112
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
1213
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1314

1415
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
1516
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1719
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1820
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
1921
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -433,34 +435,59 @@ func exactlyOneResultSetFromResult(ctx context.Context, r result.Result) (rs res
433435
return MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows), nil
434436
}
435437

436-
func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Result, error) {
437-
var resultSets []result.Set
438+
func resultToMaterializedResult(ctx context.Context, r *streamResult) (result.Result, error) {
439+
type resultSet struct {
440+
rows []query.Row
441+
columns []*Ydb.Column
442+
}
443+
resultSetByIndex := make(map[int64]resultSet)
438444

439445
for {
440-
rs, err := r.NextResultSet(ctx)
446+
if ctx.Err() != nil {
447+
return nil, xerrors.WithStackTrace(ctx.Err())
448+
}
449+
if r.closer.Err() != nil {
450+
return nil, xerrors.WithStackTrace(r.closer.Err())
451+
}
452+
453+
rs := resultSetByIndex[r.lastPart.GetResultSetIndex()]
454+
if len(rs.columns) == 0 {
455+
rs.columns = r.lastPart.GetResultSet().GetColumns()
456+
}
457+
458+
rows := make([]query.Row, len(r.lastPart.GetResultSet().GetRows()))
459+
for i := range r.lastPart.GetResultSet().GetRows() {
460+
rows[i] = NewRow(rs.columns, r.lastPart.GetResultSet().GetRows()[i])
461+
}
462+
rs.rows = append(rs.rows, rows...)
463+
464+
resultSetByIndex[r.lastPart.GetResultSetIndex()] = rs
465+
466+
var err error
467+
r.lastPart, err = r.nextPart(ctx)
441468
if err != nil {
442469
if xerrors.Is(err, io.EOF) {
443470
break
444471
}
445472

446473
return nil, xerrors.WithStackTrace(err)
447474
}
475+
if r.lastPart.GetExecStats() != nil && r.statsCallback != nil {
476+
r.statsCallback(stats.FromQueryStats(r.lastPart.GetExecStats()))
477+
}
478+
}
448479

449-
var rows []query.Row
450-
for {
451-
row, err := rs.NextRow(ctx)
452-
if err != nil {
453-
if xerrors.Is(err, io.EOF) {
454-
break
455-
}
456-
457-
return nil, xerrors.WithStackTrace(err)
458-
}
480+
resultSets := make([]result.Set, len(resultSetByIndex))
481+
for rsIndex, rs := range resultSetByIndex {
482+
columnNames := make([]string, len(rs.columns))
483+
columnTypes := make([]types.Type, len(rs.columns))
459484

460-
rows = append(rows, row)
485+
for i := range rs.columns {
486+
columnNames[i] = rs.columns[i].Name
487+
columnTypes[i] = types.TypeFromYDB(rs.columns[i].Type)
461488
}
462489

463-
resultSets = append(resultSets, MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows))
490+
resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), columnNames, columnTypes, rs.rows)
464491
}
465492

466493
return &materializedResult{

query/execute_options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
7070
return options.WithResponsePartLimitSizeBytes(size)
7171
}
7272

73-
//func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
74-
// return options.WithConcurrentResultSets(isEnabled)
75-
//}
73+
func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
74+
return options.WithConcurrentResultSets(isEnabled)
75+
}
7676

7777
func WithCallOptions(opts ...grpc.CallOption) ExecuteOption {
7878
return options.WithCallOptions(opts...)

0 commit comments

Comments
 (0)