Skip to content
This repository has been archived by the owner on Jan 27, 2022. It is now read-only.

Commit

Permalink
pubsub: rename Iterator to MessageIterator
Browse files Browse the repository at this point in the history
BREAKING CHANGE.

Change-Id: I2c256d44b92772a917ae197977c1dd8fcd82cc95
Reviewed-on: https://code-review.googlesource.com/7618
Reviewed-by: Michael McGreevy <[email protected]>
  • Loading branch information
jba committed Sep 15, 2016
1 parent 64568e4 commit 6d35c7b
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 51 deletions.
10 changes: 5 additions & 5 deletions pubsub/endtoend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (mc *messageCounter) Inc(msgID string) {
}

// process pulls messages from an iterator and records them in mc.
func process(t *testing.T, it *Iterator, mc *messageCounter) {
func process(t *testing.T, it *MessageIterator, mc *messageCounter) {
for {
m, err := it.Next()
if err == Done {
Expand All @@ -72,17 +72,17 @@ func process(t *testing.T, it *Iterator, mc *messageCounter) {
}
}

// newIter constructs a new Iterator.
func newIter(t *testing.T, ctx context.Context, sub *Subscription) *Iterator {
// newIter constructs a new MessageIterator.
func newIter(t *testing.T, ctx context.Context, sub *Subscription) *MessageIterator {
it, err := sub.Pull(ctx)
if err != nil {
t.Fatalf("error constructing iterator: %v", err)
}
return it
}

// launchIter launches a number of goroutines to pull from the supplied Iterator.
func launchIter(t *testing.T, ctx context.Context, it *Iterator, mc *messageCounter, n int, wg *sync.WaitGroup) {
// launchIter launches a number of goroutines to pull from the supplied MessageIterator.
func launchIter(t *testing.T, ctx context.Context, it *MessageIterator, mc *messageCounter, n int, wg *sync.WaitGroup) {
for j := 0; j < n; j++ {
wg.Add(1)
go func() {
Expand Down
8 changes: 4 additions & 4 deletions pubsub/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func ExampleSubscription_ModifyPushConfig() {
}
}

func ExampleIterator_Next() {
func ExampleMessageIterator_Next() {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, "project-id")
if err != nil {
Expand Down Expand Up @@ -239,7 +239,7 @@ func ExampleIterator_Next() {
}
}

func ExampleIterator_Stop_defer() {
func ExampleMessageIterator_Stop_defer() {
// If all uses of the iterator occur within the lifetime of a single
// function, stop it with defer.
ctx := context.Background()
Expand All @@ -255,10 +255,10 @@ func ExampleIterator_Stop_defer() {
// Ensure that the iterator is closed down cleanly.
defer it.Stop()

// TODO: Use the iterator (see the example for Iterator.Next).
// TODO: Use the iterator (see the example for MessageIterator.Next).
}

func ExampleIterator_Stop_goroutine() *pubsub.Iterator {
func ExampleMessageIterator_Stop_goroutine() *pubsub.MessageIterator {
// If you use the iterator outside the lifetime of a single function, you
// must still stop it.
// This (contrived) example returns an iterator that will yield messages
Expand Down
18 changes: 9 additions & 9 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// Done is returned when an iteration is complete.
var Done = iterator.Done

type Iterator struct {
type MessageIterator struct {
// kaTicker controls how often we send an ack deadline extension request.
kaTicker *time.Ticker
// ackTicker controls how often we acknowledge a batch of messages.
Expand All @@ -43,11 +43,11 @@ type Iterator struct {
closed chan struct{}
}

// newIterator starts a new Iterator. Stop must be called on the Iterator
// newMessageIterator starts a new MessageIterator. Stop must be called on the MessageIterator
// when it is no longer needed.
// subName is the full name of the subscription to pull messages from.
// ctx is the context to use for acking messages and extending message deadlines.
func newIterator(ctx context.Context, s service, subName string, po *pullOptions) *Iterator {
func newMessageIterator(ctx context.Context, s service, subName string, po *pullOptions) *MessageIterator {
// TODO: make kaTicker frequency more configurable.
// (ackDeadline - 5s) is a reasonable default for now, because the minimum ack period is 10s. This gives us 5s grace.
keepAlivePeriod := po.ackDeadline - 5*time.Second
Expand Down Expand Up @@ -79,7 +79,7 @@ func newIterator(ctx context.Context, s service, subName string, po *pullOptions

ka.Start()
ack.Start()
return &Iterator{
return &MessageIterator{
kaTicker: kaTicker,
ackTicker: ackTicker,
ka: ka,
Expand All @@ -92,7 +92,7 @@ func newIterator(ctx context.Context, s service, subName string, po *pullOptions
// Next returns the next Message to be processed. The caller must call
// Message.Done when finished with it.
// Once Stop has been called, calls to Next will return Done.
func (it *Iterator) Next() (*Message, error) {
func (it *MessageIterator) Next() (*Message, error) {
m, err := it.puller.Next()

if err == nil {
Expand All @@ -109,13 +109,13 @@ func (it *Iterator) Next() (*Message, error) {
}
}

// Client code must call Stop on an Iterator when finished with it.
// Client code must call Stop on a MessageIterator when finished with it.
// Stop will block until Done has been called on all Messages that have been
// returned by Next, or until the context with which the Iterator was created
// returned by Next, or until the context with which the MessageIterator was created
// is cancelled or exceeds its deadline.
// Stop need only be called once, but may be called multiple times from
// multiple goroutines.
func (it *Iterator) Stop() {
func (it *MessageIterator) Stop() {
it.mu.Lock()
defer it.mu.Unlock()

Expand Down Expand Up @@ -151,7 +151,7 @@ func (it *Iterator) Stop() {
it.ackTicker.Stop()
}

func (it *Iterator) done(ackID string, ack bool) {
func (it *MessageIterator) done(ackID string, ack bool) {
if ack {
it.acker.Ack(ackID)
// There's no need to call it.ka.Remove here, as acker will
Expand Down
30 changes: 15 additions & 15 deletions pubsub/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,32 @@ import (

func TestReturnsDoneOnStop(t *testing.T) {
type testCase struct {
abort func(*Iterator, context.CancelFunc)
abort func(*MessageIterator, context.CancelFunc)
want error
}

for _, tc := range []testCase{
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
it.Stop()
},
want: Done,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
cancel()
},
want: context.Canceled,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
it.Stop()
cancel()
},
want: Done,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
cancel()
it.Stop()
},
Expand All @@ -59,7 +59,7 @@ func TestReturnsDoneOnStop(t *testing.T) {
} {
s := &blockingFetch{}
ctx, cancel := context.WithCancel(context.Background())
it := newIterator(ctx, s, "subname", &pullOptions{ackDeadline: time.Second * 10, maxExtension: time.Hour})
it := newMessageIterator(ctx, s, "subname", &pullOptions{ackDeadline: time.Second * 10, maxExtension: time.Hour})
defer it.Stop()
tc.abort(it, cancel)

Expand Down Expand Up @@ -107,37 +107,37 @@ func (s *justInTimeFetch) modifyAckDeadline(ctx context.Context, subName string,
}

func TestAfterAbortReturnsNoMoreThanOneMessage(t *testing.T) {
// Each test case is excercised by making two concurrent blocking calls on an
// Iterator, and then aborting the iterator.
// Each test case is excercised by making two concurrent blocking calls on a
// MessageIterator, and then aborting the iterator.
// The result should be one call to Next returning a message, and the other returning an error.
type testCase struct {
abort func(*Iterator, context.CancelFunc)
abort func(*MessageIterator, context.CancelFunc)
// want is the error that should be returned from one Next invocation.
want error
}
for n := 1; n < 3; n++ {
for _, tc := range []testCase{
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
it.Stop()
},
want: Done,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
cancel()
},
want: context.Canceled,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
it.Stop()
cancel()
},
want: Done,
},
{
abort: func(it *Iterator, cancel context.CancelFunc) {
abort: func(it *MessageIterator, cancel context.CancelFunc) {
cancel()
it.Stop()
},
Expand All @@ -154,7 +154,7 @@ func TestAfterAbortReturnsNoMoreThanOneMessage(t *testing.T) {
maxExtension: time.Hour,
maxPrefetch: n,
}
it := newIterator(ctx, s, "subname", po)
it := newMessageIterator(ctx, s, "subname", po)
defer it.Stop()

type result struct {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestMultipleStopCallsBlockUntilMessageDone(t *testing.T) {
}

ctx := context.Background()
it := newIterator(ctx, s, "subname", &pullOptions{ackDeadline: time.Second * 10, maxExtension: 0})
it := newMessageIterator(ctx, s, "subname", &pullOptions{ackDeadline: time.Second * 10, maxExtension: 0})

m, err := it.Next()
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions pubsub/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Message struct {
calledDone bool

// The iterator that created this Message.
it *Iterator
it *MessageIterator
}

func toMessage(resp *raw.ReceivedMessage) (*Message, error) {
Expand All @@ -61,14 +61,14 @@ func toMessage(resp *raw.ReceivedMessage) (*Message, error) {
}, nil
}

// Done completes the processing of a Message that was returned from an Iterator.
// Done completes the processing of a Message that was returned from a MessageIterator.
// ack indicates whether the message should be acknowledged.
// Client code must call Done when finished for each Message returned by an iterator.
// Done may only be called on Messages returned by an iterator.
// Done may only be called on Messages returned by a MessageIterator.
// If message acknowledgement fails, the Message will be redelivered.
// Calls to Done have no effect after the first call.
//
// See Iterator.Next for an example.
// See MessageIterator.Next for an example.
func (m *Message) Done(ack bool) {
if m.calledDone {
return
Expand Down
28 changes: 14 additions & 14 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ type SubscriptionConfig struct {
Topic *Topic
PushConfig PushConfig

// The default maximum time after a subscriber receives a message
// before the subscriber should acknowledge the message. Note:
// messages which are obtained via an Iterator need not be acknowledged
// within this deadline, as the deadline will be automatically
// extended.
// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via a MessageIterator need not be acknowledged within this
// deadline, as the deadline will be automatically extended.
AckDeadline time.Duration
}

Expand All @@ -133,23 +132,23 @@ func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error)
return conf, nil
}

// Pull returns an Iterator that can be used to fetch Messages. The Iterator
// Pull returns a MessageIterator that can be used to fetch Messages. The MessageIterator
// will automatically extend the ack deadline of all fetched Messages, for the
// period specified by DefaultMaxExtension. This may be overridden by supplying
// a MaxExtension pull option.
//
// If ctx is cancelled or exceeds its deadline, outstanding acks or deadline
// extensions will fail.
//
// The caller must call Stop on the Iterator once finished with it.
func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*Iterator, error) {
// The caller must call Stop on the MessageIterator once finished with it.
func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error) {
config, err := s.Config(ctx)
if err != nil {
return nil, err
}
po := processPullOptions(opts)
po.ackDeadline = config.AckDeadline
return newIterator(ctx, s.s, s.name, po), nil
return newMessageIterator(ctx, s.s, s.name, po), nil
}

// ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.
Expand All @@ -172,7 +171,7 @@ type pullOptions struct {
maxExtension time.Duration

// maxPrefetch is the maximum number of Messages to have in flight, to
// be returned by Iterator.Next.
// be returned by MessageIterator.Next.
maxPrefetch int

// ackDeadline is the default ack deadline for the subscription. Not
Expand Down Expand Up @@ -204,7 +203,7 @@ func (max maxPrefetch) setOptions(o *pullOptions) {
// MaxPrefetch returns a PullOption that limits Message prefetching.
//
// For performance reasons, the pubsub library may prefetch a pool of Messages
// to be returned serially from Iterator.Next. MaxPrefetch is used to limit the
// to be returned serially from MessageIterator.Next. MaxPrefetch is used to limit the
// the size of this pool.
//
// If num is less than 1, it will be treated as if it were 1.
Expand All @@ -223,7 +222,7 @@ func (max maxExtension) setOptions(o *pullOptions) {
// MaxExtension returns a PullOption that limits how long acks deadlines are
// extended for.
//
// An Iterator will automatically extend the ack deadline of all fetched
// A MessageIterator will automatically extend the ack deadline of all fetched
// Messages for the duration specified. Automatic deadline extension may be
// disabled by specifying a duration of 0.
func MaxExtension(duration time.Duration) PullOption {
Expand All @@ -245,8 +244,9 @@ func MaxExtension(duration time.Duration) PullOption {
// the subscriber should acknowledge the message. It must be between 10 and 600
// seconds (inclusive), and is rounded down to the nearest second. If the
// provided ackDeadline is 0, then the default value of 10 seconds is used.
// Note: messages which are obtained via an Iterator need not be acknowledged
// within this deadline, as the deadline will be automatically extended.
// Note: messages which are obtained via a MessageIterator need not be
// acknowledged within this deadline, as the deadline will be automatically
// extended.
//
// pushConfig may be set to configure this subscription for push delivery.
//
Expand Down

0 comments on commit 6d35c7b

Please sign in to comment.