Skip to content

Commit 9affbde

Browse files
feat(client)!: extract auto pagination to shared classes
refactor(client)!: refactor async auto-pagination refactor(client)!: rename `getNextPage{,Params}` to `nextPage{,Params}` refactor(client)!: swap `nextPage{,Params}` to return non-optional # Migration - If you were referencing the `AutoPager` class on a specific `*Page` or `*PageAsync` type, then you should instead reference the shared `AutoPager` and `AutoPagerAsync` types, under the `core` package - `AutoPagerAsync` now has different usage. You can call `.subscribe(...)` on the returned object instead to get called back each page item. You can also call `onCompleteFuture()` to get a future that completes when all items have been processed. Finally, you can call `.close()` on the returned object to stop auto-paginating early - If you were referencing `getNextPage` or `getNextPageParams`: - Swap to `nextPage()` and `nextPageParams()` - Note that these both now return non-optional types (use `hasNextPage()` before calling these, since they will throw if it's impossible to get another page) There are examples and further information about pagination in the readme.
1 parent 7ddd872 commit 9affbde

File tree

74 files changed

+1934
-2055
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1934
-2055
lines changed

README.md

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -215,53 +215,101 @@ The SDK throws custom unchecked exception types:
215215

216216
## Pagination
217217

218-
For methods that return a paginated list of results, this library provides convenient ways access the results either one page at a time, or item-by-item across all pages.
218+
The SDK defines methods that return a paginated lists of results. It provides convenient ways to access the results either one page at a time or item-by-item across all pages.
219219

220220
### Auto-pagination
221221

222-
To iterate through all results across all pages, you can use `autoPager`, which automatically handles fetching more pages for you:
222+
To iterate through all results across all pages, use the `autoPager()` method, which automatically fetches more pages as needed.
223223

224-
### Synchronous
224+
When using the synchronous client, the method returns an [`Iterable`](https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html)
225225

226226
```java
227227
import com.withorb.api.models.Coupon;
228228
import com.withorb.api.models.CouponListPage;
229229

230-
// As an Iterable:
231-
CouponListPage page = client.coupons().list(params);
230+
CouponListPage page = client.coupons().list();
231+
232+
// Process as an Iterable
232233
for (Coupon coupon : page.autoPager()) {
233234
System.out.println(coupon);
234-
};
235+
}
235236

236-
// As a Stream:
237-
client.coupons().list(params).autoPager().stream()
237+
// Process as a Stream
238+
page.autoPager()
239+
.stream()
238240
.limit(50)
239241
.forEach(coupon -> System.out.println(coupon));
240242
```
241243

242-
### Asynchronous
244+
When using the asynchronous client, the method returns an [`AsyncStreamResponse`](orb-java-core/src/main/kotlin/com/withorb/api/core/http/AsyncStreamResponse.kt):
243245

244246
```java
245-
// Using forEach, which returns CompletableFuture<Void>:
246-
asyncClient.coupons().list(params).autoPager()
247-
.forEach(coupon -> System.out.println(coupon), executor);
247+
import com.withorb.api.core.http.AsyncStreamResponse;
248+
import com.withorb.api.models.Coupon;
249+
import com.withorb.api.models.CouponListPageAsync;
250+
import java.util.Optional;
251+
import java.util.concurrent.CompletableFuture;
252+
253+
CompletableFuture<CouponListPageAsync> pageFuture = client.async().coupons().list();
254+
255+
pageFuture.thenRun(page -> page.autoPager().subscribe(coupon -> {
256+
System.out.println(coupon);
257+
}));
258+
259+
// If you need to handle errors or completion of the stream
260+
pageFuture.thenRun(page -> page.autoPager().subscribe(new AsyncStreamResponse.Handler<>() {
261+
@Override
262+
public void onNext(Coupon coupon) {
263+
System.out.println(coupon);
264+
}
265+
266+
@Override
267+
public void onComplete(Optional<Throwable> error) {
268+
if (error.isPresent()) {
269+
System.out.println("Something went wrong!");
270+
throw new RuntimeException(error.get());
271+
} else {
272+
System.out.println("No more!");
273+
}
274+
}
275+
}));
276+
277+
// Or use futures
278+
pageFuture.thenRun(page -> page.autoPager()
279+
.subscribe(coupon -> {
280+
System.out.println(coupon);
281+
})
282+
.onCompleteFuture()
283+
.whenComplete((unused, error) -> {
284+
if (error != null) {
285+
System.out.println("Something went wrong!");
286+
throw new RuntimeException(error);
287+
} else {
288+
System.out.println("No more!");
289+
}
290+
}));
248291
```
249292

250293
### Manual pagination
251294

252-
If none of the above helpers meet your needs, you can also manually request pages one-by-one. A page of results has a `data()` method to fetch the list of objects, as well as top-level `response` and other methods to fetch top-level data about the page. It also has methods `hasNextPage`, `getNextPage`, and `getNextPageParams` methods to help with pagination.
295+
To access individual page items and manually request the next page, use the `items()`,
296+
`hasNextPage()`, and `nextPage()` methods:
253297

254298
```java
255299
import com.withorb.api.models.Coupon;
256300
import com.withorb.api.models.CouponListPage;
257301

258-
CouponListPage page = client.coupons().list(params);
259-
while (page != null) {
260-
for (Coupon coupon : page.data()) {
302+
CouponListPage page = client.coupons().list();
303+
while (true) {
304+
for (Coupon coupon : page.items()) {
261305
System.out.println(coupon);
262306
}
263307

264-
page = page.getNextPage().orElse(null);
308+
if (!page.hasNextPage()) {
309+
break;
310+
}
311+
312+
page = page.nextPage();
265313
}
266314
```
267315

@@ -338,7 +386,6 @@ To set a custom timeout, configure the method call using the `timeout` method:
338386

339387
```java
340388
import com.withorb.api.models.Customer;
341-
import com.withorb.api.models.CustomerCreateParams;
342389

343390
Customer customer = client.customers().create(
344391
params, RequestOptions.builder().timeout(Duration.ofSeconds(30)).build()
@@ -587,7 +634,6 @@ Or configure the method call to validate the response using the `responseValidat
587634

588635
```java
589636
import com.withorb.api.models.Customer;
590-
import com.withorb.api.models.CustomerCreateParams;
591637

592638
Customer customer = client.customers().create(
593639
params, RequestOptions.builder().responseValidation(true).build()

orb-java-client-okhttp/src/main/kotlin/com/withorb/api/client/okhttp/OrbOkHttpClient.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class OrbOkHttpClient private constructor() {
@@ -47,6 +48,10 @@ class OrbOkHttpClient private constructor() {
4748

4849
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
4950

51+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
52+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
53+
}
54+
5055
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5156

5257
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }

orb-java-client-okhttp/src/main/kotlin/com/withorb/api/client/okhttp/OrbOkHttpClientAsync.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import java.net.Proxy
1313
import java.time.Clock
1414
import java.time.Duration
1515
import java.util.Optional
16+
import java.util.concurrent.Executor
1617
import kotlin.jvm.optionals.getOrNull
1718

1819
class OrbOkHttpClientAsync private constructor() {
@@ -47,6 +48,10 @@ class OrbOkHttpClientAsync private constructor() {
4748

4849
fun jsonMapper(jsonMapper: JsonMapper) = apply { clientOptions.jsonMapper(jsonMapper) }
4950

51+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
52+
clientOptions.streamHandlerExecutor(streamHandlerExecutor)
53+
}
54+
5055
fun clock(clock: Clock) = apply { clientOptions.clock(clock) }
5156

5257
fun headers(headers: Headers) = apply { clientOptions.headers(headers) }
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.withorb.api.core
4+
5+
import java.util.stream.Stream
6+
import java.util.stream.StreamSupport
7+
8+
class AutoPager<T> private constructor(private val firstPage: Page<T>) : Iterable<T> {
9+
10+
companion object {
11+
12+
fun <T> from(firstPage: Page<T>): AutoPager<T> = AutoPager(firstPage)
13+
}
14+
15+
override fun iterator(): Iterator<T> =
16+
generateSequence(firstPage) { if (it.hasNextPage()) it.nextPage() else null }
17+
.flatMap { it.items() }
18+
.iterator()
19+
20+
fun stream(): Stream<T> = StreamSupport.stream(spliterator(), false)
21+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.withorb.api.core
4+
5+
import com.withorb.api.core.http.AsyncStreamResponse
6+
import java.util.Optional
7+
import java.util.concurrent.CompletableFuture
8+
import java.util.concurrent.CompletionException
9+
import java.util.concurrent.Executor
10+
import java.util.concurrent.atomic.AtomicReference
11+
12+
class AutoPagerAsync<T>
13+
private constructor(private val firstPage: PageAsync<T>, private val defaultExecutor: Executor) :
14+
AsyncStreamResponse<T> {
15+
16+
companion object {
17+
18+
fun <T> from(firstPage: PageAsync<T>, defaultExecutor: Executor): AutoPagerAsync<T> =
19+
AutoPagerAsync(firstPage, defaultExecutor)
20+
}
21+
22+
private val onCompleteFuture = CompletableFuture<Void?>()
23+
private val state = AtomicReference(State.NEW)
24+
25+
override fun subscribe(handler: AsyncStreamResponse.Handler<T>): AsyncStreamResponse<T> =
26+
subscribe(handler, defaultExecutor)
27+
28+
override fun subscribe(
29+
handler: AsyncStreamResponse.Handler<T>,
30+
executor: Executor,
31+
): AsyncStreamResponse<T> = apply {
32+
// TODO(JDK): Use `compareAndExchange` once targeting JDK 9.
33+
check(state.compareAndSet(State.NEW, State.SUBSCRIBED)) {
34+
if (state.get() == State.SUBSCRIBED) "Cannot subscribe more than once"
35+
else "Cannot subscribe after the response is closed"
36+
}
37+
38+
fun PageAsync<T>.handle(): CompletableFuture<Void?> {
39+
if (state.get() == State.CLOSED) {
40+
return CompletableFuture.completedFuture(null)
41+
}
42+
43+
items().forEach { handler.onNext(it) }
44+
return if (hasNextPage()) nextPage().thenCompose { it.handle() }
45+
else CompletableFuture.completedFuture(null)
46+
}
47+
48+
executor.execute {
49+
firstPage.handle().whenComplete { _, error ->
50+
val actualError =
51+
if (error is CompletionException && error.cause != null) error.cause else error
52+
try {
53+
handler.onComplete(Optional.ofNullable(actualError))
54+
} finally {
55+
try {
56+
if (actualError == null) {
57+
onCompleteFuture.complete(null)
58+
} else {
59+
onCompleteFuture.completeExceptionally(actualError)
60+
}
61+
} finally {
62+
close()
63+
}
64+
}
65+
}
66+
}
67+
}
68+
69+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
70+
71+
override fun close() {
72+
val previousState = state.getAndSet(State.CLOSED)
73+
if (previousState == State.CLOSED) {
74+
return
75+
}
76+
77+
// When the stream is closed, we should always consider it closed. If it closed due
78+
// to an error, then we will have already completed the future earlier, and this
79+
// will be a no-op.
80+
onCompleteFuture.complete(null)
81+
}
82+
}
83+
84+
private enum class State {
85+
NEW,
86+
SUBSCRIBED,
87+
CLOSED,
88+
}

orb-java-core/src/main/kotlin/com/withorb/api/core/ClientOptions.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ import com.withorb.api.core.http.QueryParams
1010
import com.withorb.api.core.http.RetryingHttpClient
1111
import java.time.Clock
1212
import java.util.Optional
13+
import java.util.concurrent.Executor
14+
import java.util.concurrent.Executors
15+
import java.util.concurrent.ThreadFactory
16+
import java.util.concurrent.atomic.AtomicLong
1317
import kotlin.jvm.optionals.getOrNull
1418

1519
class ClientOptions
@@ -18,6 +22,7 @@ private constructor(
1822
@get:JvmName("httpClient") val httpClient: HttpClient,
1923
@get:JvmName("checkJacksonVersionCompatibility") val checkJacksonVersionCompatibility: Boolean,
2024
@get:JvmName("jsonMapper") val jsonMapper: JsonMapper,
25+
@get:JvmName("streamHandlerExecutor") val streamHandlerExecutor: Executor,
2126
@get:JvmName("clock") val clock: Clock,
2227
@get:JvmName("baseUrl") val baseUrl: String,
2328
@get:JvmName("headers") val headers: Headers,
@@ -63,6 +68,7 @@ private constructor(
6368
private var httpClient: HttpClient? = null
6469
private var checkJacksonVersionCompatibility: Boolean = true
6570
private var jsonMapper: JsonMapper = jsonMapper()
71+
private var streamHandlerExecutor: Executor? = null
6672
private var clock: Clock = Clock.systemUTC()
6773
private var baseUrl: String = PRODUCTION_URL
6874
private var headers: Headers.Builder = Headers.builder()
@@ -78,6 +84,7 @@ private constructor(
7884
httpClient = clientOptions.originalHttpClient
7985
checkJacksonVersionCompatibility = clientOptions.checkJacksonVersionCompatibility
8086
jsonMapper = clientOptions.jsonMapper
87+
streamHandlerExecutor = clientOptions.streamHandlerExecutor
8188
clock = clientOptions.clock
8289
baseUrl = clientOptions.baseUrl
8390
headers = clientOptions.headers.toBuilder()
@@ -97,6 +104,10 @@ private constructor(
97104

98105
fun jsonMapper(jsonMapper: JsonMapper) = apply { this.jsonMapper = jsonMapper }
99106

107+
fun streamHandlerExecutor(streamHandlerExecutor: Executor) = apply {
108+
this.streamHandlerExecutor = streamHandlerExecutor
109+
}
110+
100111
fun clock(clock: Clock) = apply { this.clock = clock }
101112

102113
fun baseUrl(baseUrl: String) = apply { this.baseUrl = baseUrl }
@@ -251,6 +262,20 @@ private constructor(
251262
),
252263
checkJacksonVersionCompatibility,
253264
jsonMapper,
265+
streamHandlerExecutor
266+
?: Executors.newCachedThreadPool(
267+
object : ThreadFactory {
268+
269+
private val threadFactory: ThreadFactory =
270+
Executors.defaultThreadFactory()
271+
private val count = AtomicLong(0)
272+
273+
override fun newThread(runnable: Runnable): Thread =
274+
threadFactory.newThread(runnable).also {
275+
it.name = "orb-stream-handler-thread-${count.getAndIncrement()}"
276+
}
277+
}
278+
),
254279
clock,
255280
baseUrl,
256281
headers.build(),
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// File generated from our OpenAPI spec by Stainless.
2+
3+
package com.withorb.api.core
4+
5+
/**
6+
* An interface representing a single page, with items of type [T], from a paginated endpoint
7+
* response.
8+
*
9+
* Implementations of this interface are expected to request additional pages synchronously. For
10+
* asynchronous pagination, see the [PageAsync] interface.
11+
*/
12+
interface Page<T> {
13+
14+
/**
15+
* Returns whether there's another page after this one.
16+
*
17+
* The method generally doesn't make requests so the result depends entirely on the data in this
18+
* page. If a significant amount of time has passed between requesting this page and calling
19+
* this method, then the result could be stale.
20+
*/
21+
fun hasNextPage(): Boolean
22+
23+
/**
24+
* Returns the page after this one by making another request.
25+
*
26+
* @throws IllegalStateException if it's impossible to get the next page. This exception is
27+
* avoidable by calling [hasNextPage] first.
28+
*/
29+
fun nextPage(): Page<T>
30+
31+
/** Returns the items in this page. */
32+
fun items(): List<T>
33+
}

0 commit comments

Comments
 (0)