Skip to content

Commit

Permalink
add support for async queries (#54)
Browse files Browse the repository at this point in the history
* add support for async queries
* add test case for query validation
* address code review feedback
* increase go test timeout
  • Loading branch information
pmenglund authored Sep 22, 2022
1 parent e60455c commit d30348d
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
steps:
- checkout
- run: go install github.com/mattn/goveralls@latest
- run: go test -v -cover -race -coverprofile=cover.out
- run: go test -v -cover -timeout 20m -race -coverprofile=cover.out
- run: goveralls -coverprofile=cover.out -service=circle-ci
release:
docker:
Expand Down
49 changes: 42 additions & 7 deletions option/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,61 @@ package option

import "github.com/rockset/rockset-go-client/openapi"

type QueryOption func(request *openapi.QueryRequestSql)
type QueryOption func(request *openapi.QueryRequest)

func WithWarnings() QueryOption {
return func(o *openapi.QueryRequestSql) {
o.GenerateWarnings = openapi.PtrBool(true)
return func(o *openapi.QueryRequest) {
o.Sql.GenerateWarnings = openapi.PtrBool(true)
}
}

func WithRowLimit(limit int32) QueryOption {
return func(o *openapi.QueryRequestSql) {
o.DefaultRowLimit = &limit
return func(o *openapi.QueryRequest) {
o.Sql.DefaultRowLimit = &limit
}
}

func WithParameter(name, valueType, value string) QueryOption {
return func(o *openapi.QueryRequestSql) {
o.Parameters = append(o.Parameters, openapi.QueryParameter{
return func(o *openapi.QueryRequest) {
o.Sql.Parameters = append(o.Sql.Parameters, openapi.QueryParameter{
Name: name,
Type: valueType,
Value: value,
})
}
}

// WithAsyncClientTimeout is maximum amount of time that the client is willing to wait for the query to complete.
// If the query is not complete by this timeout, a response will be returned with a query_id that can be used to
// check the status of the query and retrieve results once the query has completed.
func WithAsyncClientTimeout(timeout int64) QueryOption {
return func(o *openapi.QueryRequest) {
if o.AsyncOptions == nil {
o.AsyncOptions = &openapi.AsyncQueryOptions{}
}
o.AsyncOptions.ClientTimeoutMs = &timeout
}
}

// WithAsyncTimeout maximum amount of time that Rockset will attempt to complete query execution before
// aborting the query and returning an error.
func WithAsyncTimeout(timeout int64) QueryOption {
return func(o *openapi.QueryRequest) {
if o.AsyncOptions == nil {
o.AsyncOptions = &openapi.AsyncQueryOptions{}
}
o.AsyncOptions.TimeoutMs = &timeout
}
}

// WithAsyncMaxInitialResults maximum number of results you will receive as a client. If the query exceeds this limit,
// the remaining results can be requested using a returned pagination cursor. In addition, there is a maximum response
// size of 100MiB so fewer than max_results may be returned.
func WithAsyncMaxInitialResults(timeout int64) QueryOption {
return func(o *openapi.QueryRequest) {
if o.AsyncOptions == nil {
o.AsyncOptions = &openapi.AsyncQueryOptions{}
}
o.AsyncOptions.MaxInitialResults = &timeout
}
}
82 changes: 80 additions & 2 deletions queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ import (
"github.com/rockset/rockset-go-client/option"
)

type QueryState string

const (
QueryQueued QueryState = "QUEUED"
QueryRunning QueryState = "RUNNING"
QueryError QueryState = "ERROR"
QueryCompleted QueryState = "COMPLETED"
QueryCancelled QueryState = "CANCELLED"
)

// Query executes a sql query with optional option.QueryOption
func (rc *RockClient) Query(ctx context.Context, sql string,
options ...option.QueryOption) (openapi.QueryResponse, error) {
Expand All @@ -19,7 +29,7 @@ func (rc *RockClient) Query(ctx context.Context, sql string,
rq.Sql.Parameters = []openapi.QueryParameter{}

for _, o := range options {
o(&rq.Sql)
o(rq)
}

err = rc.Retry(ctx, func() error {
Expand Down Expand Up @@ -47,7 +57,7 @@ func (rc *RockClient) ValidateQuery(ctx context.Context, sql string,
rq.Sql.Parameters = []openapi.QueryParameter{}

for _, o := range options {
o(&rq.Sql)
o(rq)
}

err = rc.Retry(ctx, func() error {
Expand All @@ -61,3 +71,71 @@ func (rc *RockClient) ValidateQuery(ctx context.Context, sql string,

return *r, nil
}

// GetQueryInfo retrieves information about a query.
func (rc *RockClient) GetQueryInfo(ctx context.Context, queryID string) (openapi.QueryInfo, error) {
var err error
var response *openapi.GetQueryResponse

q := rc.QueriesApi.GetQuery(ctx, queryID)

err = rc.Retry(ctx, func() error {
response, _, err = q.Execute()
return err
})

if err != nil {
return openapi.QueryInfo{}, err
}

return *response.Data, nil
}

// GetQueryResults retrieves the results of a completed async query.
func (rc *RockClient) GetQueryResults(ctx context.Context, queryID string) (openapi.QueryPaginationResponse, error) {
var err error
var response *openapi.QueryPaginationResponse

q := rc.QueriesApi.GetQueryResults(ctx, queryID)

err = rc.Retry(ctx, func() error {
response, _, err = q.Execute()
return err
})

if err != nil {
return openapi.QueryPaginationResponse{}, err
}

return *response, nil
}

// ListActiveQueries lists all active queries, i.e. queued or running.
func (rc *RockClient) ListActiveQueries(ctx context.Context) ([]openapi.QueryInfo, error) {
var err error
var response *openapi.ListQueriesResponse

q := rc.QueriesApi.ListActiveQueries(ctx)

err = rc.Retry(ctx, func() error {
response, _, err = q.Execute()
return err
})

return response.Data, nil
}

// CancelQuery cancels a queued or running query.
func (rc *RockClient) CancelQuery(ctx context.Context, queryID string) (openapi.QueryInfo, error) {
var err error
var response *openapi.CancelQueryResponse

q := rc.QueriesApi.CancelQuery(ctx, queryID)

err = rc.Retry(ctx, func() error {
response, _, err = q.Execute()
return err
})

return *response.Data, nil
}
79 changes: 79 additions & 0 deletions queries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package rockset_test

import (
"github.com/rockset/rockset-go-client"
"github.com/rockset/rockset-go-client/option"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
)

const slowQuery = `script {{{ import * as rockset from "/rockset"; export function delay(x) { rockset.sleep(x); return x; } }}} select _script.delay(2000, q) from unnest([1] as q)`

type QueryTestSuite struct {
suite.Suite
rc *rockset.RockClient
}

func TestQueryTestSuite(t *testing.T) {
skipUnlessIntegrationTest(t)

rc, err := rockset.NewClient()
require.NoError(t, err)

s := QueryTestSuite{
rc: rc,
}
suite.Run(t, &s)
}

func (s *QueryTestSuite) TestQuery() {
ctx := testCtx()

_, err := s.rc.Query(ctx, "SELECT 1")
s.Require().NoError(err)
}

func (s *QueryTestSuite) TestListQueries() {
ctx := testCtx()

_, err := s.rc.ListActiveQueries(ctx)
s.Require().NoError(err)
}

func (s *QueryTestSuite) TestAsyncQuery() {
ctx := testCtx()

resp, err := s.rc.Query(ctx, slowQuery,
option.WithAsyncClientTimeout(100),
option.WithAsyncMaxInitialResults(10),
)
s.Require().NoError(err)

err = s.rc.WaitForQuery(ctx, *resp.QueryId)
s.Require().NoError(err)

_, err = s.rc.GetQueryResults(ctx, *resp.QueryId)
s.Require().NoError(err)
}

func (s *QueryTestSuite) TestCancelQuery() {
ctx := testCtx()

resp, err := s.rc.Query(ctx, slowQuery,
option.WithAsyncClientTimeout(100),
option.WithAsyncMaxInitialResults(10),
)
s.Require().NoError(err)

info, err := s.rc.CancelQuery(ctx, *resp.QueryId)
s.Require().NoError(err)
s.Require().Equal("CANCELLED", info.GetStatus())
}

func (s *QueryTestSuite) TestValidateQuery() {
ctx := testCtx()

_, err := s.rc.ValidateQuery(ctx, "SELECT 1")
s.Require().NoError(err)
}
23 changes: 23 additions & 0 deletions wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ func (rc *RockClient) WaitUntilAliasAvailable(ctx context.Context, workspace, al
})
}

// WaitForQuery waits until queryID has either completed, errored, or been cancelled.
func (rc *RockClient) WaitForQuery(ctx context.Context, queryID string) error {
// TODO should this only wait for COMPLETED and return an error for ERROR and CANCELLED?
return rc.RetryWithCheck(ctx, rc.queryHasStatus(ctx, queryID, []QueryState{QueryCompleted, QueryError, QueryCancelled}))
}

// WaitUntilCollectionReady waits until the collection is ready.
func (rc *RockClient) WaitUntilCollectionReady(ctx context.Context, workspace, name string) error {
return rc.RetryWithCheck(ctx, rc.collectionHasState(ctx, workspace, name, collectionStatusREADY))
Expand Down Expand Up @@ -62,6 +68,23 @@ func (rc *RockClient) WaitUntilWorkspaceAvailable(ctx context.Context, workspace

// TODO(pme) refactor viewIsGone() and collectionIsGone() to be DRY

func (rc *RockClient) queryHasStatus(ctx context.Context, queryID string, statuses []QueryState) RetryCheck {
return func() (bool, error) {
res, err := rc.GetQueryInfo(ctx, queryID)
if err != nil {
return false, err
}

for _, s := range statuses {
if string(s) == res.GetStatus() {
return false, nil
}
}

return true, nil
}
}

func (rc *RockClient) workspaceIsAvailable(ctx context.Context, workspace string) RetryCheck {
return func() (bool, error) {
_, err := rc.GetWorkspace(ctx, workspace)
Expand Down

0 comments on commit d30348d

Please sign in to comment.