Skip to content
This repository has been archived by the owner on Jan 27, 2022. It is now read-only.

Commit

Permalink
spanner: get read timestamp early
Browse files Browse the repository at this point in the history
For single-use transactions, the read timestamp wasn't available until
after Stop was called on the iterator.

Now it is available after the first row is read, as advertised.

Fixes Google-internal bug 38008348.

Change-Id: Id5ce338e3c610d889003c0d9f9b9a1d4b4de468f
Reviewed-on: https://code-review.googlesource.com/13370
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Vikas Kedia <[email protected]>
  • Loading branch information
jba committed Jun 1, 2017
1 parent 0a3a2cd commit 3d69fa2
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 25 deletions.
30 changes: 18 additions & 12 deletions spanner/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,26 @@ func errEarlyReadEnd() error {

// stream is the internal fault tolerant method for streaming data from
// Cloud Spanner.
func stream(ctx context.Context, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), release func(time.Time, error)) *RowIterator {
func stream(ctx context.Context, rpc func(ct context.Context, resumeToken []byte) (streamingReceiver, error), setTimestamp func(time.Time), release func(error)) *RowIterator {
ctx, cancel := context.WithCancel(ctx)
return &RowIterator{
streamd: newResumableStreamDecoder(ctx, rpc),
rowd: &partialResultSetDecoder{},
release: release,
cancel: cancel,
streamd: newResumableStreamDecoder(ctx, rpc),
rowd: &partialResultSetDecoder{},
setTimestamp: setTimestamp,
release: release,
cancel: cancel,
}
}

// RowIterator is an iterator over Rows.
type RowIterator struct {
streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
release func(time.Time, error)
cancel func()
err error
rows []*Row
streamd *resumableStreamDecoder
rowd *partialResultSetDecoder
setTimestamp func(time.Time)
release func(error)
cancel func()
err error
rows []*Row
}

// Next returns the next result. Its second return value is iterator.Done if
Expand All @@ -77,6 +79,10 @@ func (r *RowIterator) Next() (*Row, error) {
if r.err != nil {
return nil, r.err
}
if !r.rowd.ts.IsZero() && r.setTimestamp != nil {
r.setTimestamp(r.rowd.ts)
r.setTimestamp = nil
}
}
if len(r.rows) > 0 {
row := r.rows[0]
Expand Down Expand Up @@ -123,7 +129,7 @@ func (r *RowIterator) Stop() {
r.cancel()
}
if r.release != nil {
r.release(r.rowd.ts, r.err)
r.release(r.err)
if r.err == nil {
r.err = spannerErrorf(codes.FailedPrecondition, "Next called after Stop")
}
Expand Down
18 changes: 12 additions & 6 deletions spanner/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,8 @@ func TestResumeToken(t *testing.T) {
sr.rpcReceiver = r
return sr, err
},
func(time.Time, error) {})
nil,
func(error) {})
defer iter.Stop()
for {
var row *Row
Expand Down Expand Up @@ -1515,7 +1516,8 @@ func TestGrpcReconnect(t *testing.T) {
})

},
func(time.Time, error) {})
nil,
func(error) {})
defer iter.Stop()
for {
_, err = iter.Next()
Expand Down Expand Up @@ -1585,7 +1587,8 @@ func TestCancelTimeout(t *testing.T) {
ResumeToken: resumeToken,
})
},
func(time.Time, error) {})
nil,
func(error) {})
defer iter.Stop()
for {
_, err = iter.Next()
Expand Down Expand Up @@ -1618,7 +1621,8 @@ func TestCancelTimeout(t *testing.T) {
ResumeToken: resumeToken,
})
},
func(time.Time, error) {})
nil,
func(error) {})
defer iter.Stop()
for {
_, err = iter.Next()
Expand Down Expand Up @@ -1667,7 +1671,8 @@ func TestRowIteratorDo(t *testing.T) {
ResumeToken: resumeToken,
})
},
func(time.Time, error) {})
nil,
func(error) {})
err = iter.Do(func(r *Row) error { nRows++; return nil })
if err != nil {
t.Errorf("Using Do: %v", err)
Expand Down Expand Up @@ -1702,7 +1707,8 @@ func TestIteratorStopEarly(t *testing.T) {
ResumeToken: resumeToken,
})
},
func(time.Time, error) {})
nil,
func(error) {})
_, err = iter.Next()
if err != nil {
t.Fatalf("before Stop: %v", err)
Expand Down
51 changes: 51 additions & 0 deletions spanner/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,57 @@ func compareRows(iter *RowIterator, wantNums []int) (string, bool) {
return "", true
}

func TestEarlyTimestamp(t *testing.T) {
// Test that we can get the timestamp from a read-only transaction as
// soon as we have read at least one row.
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// Set up testing environment.
if err := prepare(ctx, t, readDBStatements); err != nil {
// If prepare() fails, tear down whatever that's already up.
tearDown(ctx, t)
t.Fatalf("cannot set up testing environment: %v", err)
}
// After all tests, tear down testing environment.
defer tearDown(ctx, t)

var ms []*Mutation
for i := 0; i < 3; i++ {
ms = append(ms, InsertOrUpdate(testTable,
testTableColumns,
[]interface{}{fmt.Sprintf("k%d", i), fmt.Sprintf("v%d", i)}))
}
if _, err := client.Apply(ctx, ms, ApplyAtLeastOnce()); err != nil {
t.Fatal(err)
}

txn := client.Single()
iter := txn.Read(ctx, testTable, AllKeys(), testTableColumns)
defer iter.Stop()
// In single-use transaction, we should get an error before reading anything.
if _, err := txn.Timestamp(); err == nil {
t.Error("wanted error, got nil")
}
// After reading one row, the timestamp should be available.
_, err := iter.Next()
if err != nil {
t.Fatal(err)
}
if _, err := txn.Timestamp(); err != nil {
t.Errorf("got %v, want nil", err)
}

txn = client.ReadOnlyTransaction()
defer txn.Close()
iter = txn.Read(ctx, testTable, AllKeys(), testTableColumns)
defer iter.Stop()
// In an ordinary read-only transaction, the timestamp should be
// available immediately.
if _, err := txn.Timestamp(); err != nil {
t.Errorf("got %v, want nil", err)
}
}

func TestNestedTransaction(t *testing.T) {
// You cannot use a transaction from inside a read-write transaction.
ctx := context.Background()
Expand Down
23 changes: 16 additions & 7 deletions spanner/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ type transactionID []byte
type txReadEnv interface {
// acquire returns a read-transaction environment that can be used to perform a transactional read.
acquire(ctx context.Context) (*sessionHandle, *sppb.TransactionSelector, error)
// release should be called at the end of every transactional read to deal with session recycling and read timestamp recording.
release(time.Time, error)
// sets the transaction's read timestamp
setTimestamp(time.Time)
// release should be called at the end of every transactional read to deal with session recycling.
release(error)
}

// txReadOnly contains methods for doing transactional reads.
Expand Down Expand Up @@ -97,6 +99,7 @@ func (t *txReadOnly) ReadUsingIndex(ctx context.Context, table, index string, ke
ResumeToken: resumeToken,
})
},
t.setTimestamp,
t.release,
)
}
Expand Down Expand Up @@ -155,6 +158,7 @@ func (t *txReadOnly) Query(ctx context.Context, statement Statement) *RowIterato
req.ResumeToken = resumeToken
return client.ExecuteStreamingSql(ctx, req)
},
t.setTimestamp,
t.release)
}

Expand Down Expand Up @@ -409,12 +413,17 @@ func (t *ReadOnlyTransaction) acquireMultiUse(ctx context.Context) (*sessionHand
}
}

// release implements txReadEnv.release.
func (t *ReadOnlyTransaction) release(rts time.Time, err error) {
func (t *ReadOnlyTransaction) setTimestamp(ts time.Time) {
t.mu.Lock()
if t.singleUse && !rts.IsZero() {
t.rts = rts
defer t.mu.Unlock()
if t.rts.IsZero() {
t.rts = ts
}
}

// release implements txReadEnv.release.
func (t *ReadOnlyTransaction) release(err error) {
t.mu.Lock()
sh := t.sh
t.mu.Unlock()
if sh != nil { // sh could be nil if t.acquire() fails.
Expand Down Expand Up @@ -599,7 +608,7 @@ func (t *ReadWriteTransaction) acquire(ctx context.Context) (*sessionHandle, *sp
}

// release implements txReadEnv.release.
func (t *ReadWriteTransaction) release(_ time.Time, err error) {
func (t *ReadWriteTransaction) release(err error) {
t.mu.Lock()
sh := t.sh
t.mu.Unlock()
Expand Down

0 comments on commit 3d69fa2

Please sign in to comment.