Skip to content

Commit 4e64955

Browse files
committed
Add concurrent result sets in db.Query.Query(...)
1 parent b47b59e commit 4e64955

File tree

9 files changed

+172
-12
lines changed

9 files changed

+172
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `WithCocurrentResultSets` option for `db.Query().Query()`
2+
13
## v3.117.1
24
* Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()`
35
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`

internal/query/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,11 +401,12 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
401401
if err != nil {
402402
return xerrors.WithStackTrace(err)
403403
}
404+
404405
defer func() {
405406
_ = streamResult.Close(ctx)
406407
}()
407408

408-
r, err = resultToMaterializedResult(ctx, streamResult)
409+
r, err = concurrentResultToMaterializedResult(ctx, streamResult)
409410
if err != nil {
410411
return xerrors.WithStackTrace(err)
411412
}

internal/query/client_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,24 @@ func TestClient(t *testing.T) {
922922
Status: Ydb.StatusIds_SUCCESS,
923923
ResultSetIndex: 0,
924924
ResultSet: &Ydb.ResultSet{
925+
Columns: []*Ydb.Column{
926+
{
927+
Name: "a",
928+
Type: &Ydb.Type{
929+
Type: &Ydb.Type_TypeId{
930+
TypeId: Ydb.Type_UINT64,
931+
},
932+
},
933+
},
934+
{
935+
Name: "b",
936+
Type: &Ydb.Type{
937+
Type: &Ydb.Type_TypeId{
938+
TypeId: Ydb.Type_UTF8,
939+
},
940+
},
941+
},
942+
},
925943
Rows: []*Ydb.Value{
926944
{
927945
Items: []*Ydb.Value{{

internal/query/execute_query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) (
9393
},
9494
Parameters: params,
9595
StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()),
96-
ConcurrentResultSets: cfg.ConcurrentResultSets(),
96+
ConcurrentResultSets: false,
9797
PoolId: cfg.ResourcePool(),
9898
ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(),
9999
}

internal/query/result.go

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -341,6 +342,33 @@ func (r *streamResult) nextPartFunc(
341342
}
342343
}
343344

345+
func (r *streamResult) NextPart(ctx context.Context) (_ result.Part, err error) {
346+
if r.lastPart == nil {
347+
return nil, xerrors.WithStackTrace(io.EOF)
348+
}
349+
350+
select {
351+
case <-r.closer.Done():
352+
return nil, xerrors.WithStackTrace(r.closer.Err())
353+
case <-ctx.Done():
354+
return nil, xerrors.WithStackTrace(ctx.Err())
355+
default:
356+
part, err := r.nextPart(ctx)
357+
if err != nil && !xerrors.Is(err, io.EOF) {
358+
return nil, xerrors.WithStackTrace(err)
359+
}
360+
if part.GetExecStats() != nil && r.statsCallback != nil {
361+
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
362+
}
363+
defer func() {
364+
r.lastPart = part
365+
r.resultSetIndex = part.GetResultSetIndex()
366+
}()
367+
368+
return newResultPart(r.lastPart), nil
369+
}
370+
}
371+
344372
func (r *streamResult) NextResultSet(ctx context.Context) (_ result.Set, err error) {
345373
if r.trace != nil {
346374
onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx,
@@ -425,11 +453,20 @@ func exactlyOneResultSetFromResult(ctx context.Context, r result.Result) (rs res
425453
return MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows), nil
426454
}
427455

428-
func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Result, error) {
429-
var resultSets []result.Set
456+
func concurrentResultToMaterializedResult(ctx context.Context, r result.ConcurrentResult) (result.Result, error) {
457+
type resultSet struct {
458+
rows []query.Row
459+
columnNames []string
460+
columnTypes []types.Type
461+
}
462+
resultSetByIndex := make(map[int64]resultSet)
430463

431464
for {
432-
rs, err := r.NextResultSet(ctx)
465+
if ctx.Err() != nil {
466+
return nil, xerrors.WithStackTrace(ctx.Err())
467+
}
468+
469+
part, err := r.NextPart(ctx)
433470
if err != nil {
434471
if xerrors.Is(err, io.EOF) {
435472
break
@@ -438,21 +475,32 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re
438475
return nil, xerrors.WithStackTrace(err)
439476
}
440477

441-
var rows []query.Row
478+
rs := resultSetByIndex[part.ResultSetIndex()]
479+
if len(rs.columnNames) == 0 {
480+
rs.columnTypes = part.ColumnTypes()
481+
rs.columnNames = part.ColumnNames()
482+
}
483+
484+
rows := make([]query.Row, 0)
442485
for {
443-
row, err := rs.NextRow(ctx)
486+
row, err := part.NextRow(ctx)
444487
if err != nil {
445488
if xerrors.Is(err, io.EOF) {
446489
break
447490
}
448491

449492
return nil, xerrors.WithStackTrace(err)
450493
}
451-
452494
rows = append(rows, row)
453495
}
496+
rs.rows = append(rs.rows, rows...)
497+
498+
resultSetByIndex[part.ResultSetIndex()] = rs
499+
}
454500

455-
resultSets = append(resultSets, MaterializedResultSet(rs.Index(), rs.Columns(), rs.ColumnTypes(), rows))
501+
resultSets := make([]result.Set, len(resultSetByIndex))
502+
for rsIndex, rs := range resultSetByIndex {
503+
resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), rs.columnNames, rs.columnTypes, rs.rows)
456504
}
457505

458506
return &materializedResult{

internal/query/result/result.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ type (
2121
// with Go version 1.23+
2222
ResultSets(ctx context.Context) xiter.Seq2[Set, error]
2323
}
24+
ConcurrentResult interface {
25+
closer.Closer
26+
27+
NextPart(ctx context.Context) (Part, error)
28+
}
2429
Set interface {
2530
Index() int
2631
Columns() []string
@@ -34,6 +39,13 @@ type (
3439
Set
3540
closer.Closer
3641
}
42+
Part interface {
43+
ResultSetIndex() int64
44+
ColumnNames() []string
45+
ColumnTypes() []types.Type
46+
47+
NextRow(ctx context.Context) (Row, error)
48+
}
3749
Row interface {
3850
Values() []value.Value
3951

internal/query/result_part.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package query
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
9+
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/query"
13+
)
14+
15+
var _ query.Part = (*resultPart)(nil)
16+
17+
type (
18+
resultPart struct {
19+
resultSetIndex int64
20+
columns []*Ydb.Column
21+
rows []*Ydb.Value
22+
columnNames []string
23+
columnTypes []types.Type
24+
rowIndex int
25+
}
26+
)
27+
28+
func (p *resultPart) ResultSetIndex() int64 {
29+
return p.resultSetIndex
30+
}
31+
32+
func (p *resultPart) ColumnNames() []string {
33+
if len(p.columnNames) != 0 {
34+
return p.columnNames
35+
}
36+
names := make([]string, len(p.columns))
37+
for i, col := range p.columns {
38+
names[i] = col.GetName()
39+
}
40+
p.columnNames = names
41+
42+
return names
43+
}
44+
45+
func (p *resultPart) ColumnTypes() []types.Type {
46+
if len(p.columnTypes) != 0 {
47+
return p.columnTypes
48+
}
49+
colTypes := make([]types.Type, len(p.columns))
50+
for i, col := range p.columns {
51+
colTypes[i] = types.TypeFromYDB(col.GetType())
52+
}
53+
p.columnTypes = colTypes
54+
55+
return colTypes
56+
}
57+
58+
func (p *resultPart) NextRow(ctx context.Context) (query.Row, error) {
59+
if p.rowIndex == len(p.rows) {
60+
return nil, xerrors.WithStackTrace(io.EOF)
61+
}
62+
63+
defer func() {
64+
p.rowIndex++
65+
}()
66+
67+
return NewRow(p.columns, p.rows[p.rowIndex]), nil
68+
}
69+
70+
func newResultPart(part *Ydb_Query.ExecuteQueryResponsePart) *resultPart {
71+
return &resultPart{
72+
resultSetIndex: part.GetResultSetIndex(),
73+
columns: part.GetResultSet().GetColumns(),
74+
rows: part.GetResultSet().GetRows(),
75+
rowIndex: 0,
76+
}
77+
}

query/execute_options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
6363
return options.WithResponsePartLimitSizeBytes(size)
6464
}
6565

66-
//func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
67-
// return options.WithConcurrentResultSets(isEnabled)
68-
//}
66+
func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
67+
return options.WithConcurrentResultSets(isEnabled)
68+
}
6969

7070
func WithCallOptions(opts ...grpc.CallOption) ExecuteOption {
7171
return options.WithCallOptions(opts...)

query/result.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88

99
type (
1010
Result = result.Result
11+
ConcurrentResult = result.ConcurrentResult
1112
ResultSet = result.Set
1213
ClosableResultSet = result.ClosableResultSet
14+
Part = result.Part
1315
Row = result.Row
1416
Type = types.Type
1517
NamedDestination = scanner.NamedDestination

0 commit comments

Comments
 (0)