Skip to content

Commit d2fa723

Browse files
feat(client)!: extract auto pagination to shared classes
refactor(client)!: refactor async auto-pagination refactor(client)!: rename `getNextPage{,Params}` to `nextPage{,Params}` refactor(client)!: swap `nextPage{,Params}` to return non-optional # Migration - If you were referencing the `AutoPager` class on a specific `*Page` or `*PageAsync` type, then you should instead reference the shared `AutoPager` and `AutoPagerAsync` types, under the `core` package - `AutoPagerAsync` now has different usage. You can call `.subscribe(...)` on the returned object instead to get called back each page item. You can also call `onCompleteFuture()` to get a future that completes when all items have been processed. Finally, you can call `.close()` on the returned object to stop auto-paginating early - If you were referencing `getNextPage` or `getNextPageParams`: - Swap to `nextPage()` and `nextPageParams()` - Note that these both now return non-optional types (use `hasNextPage()` before calling these, since they will throw if it's impossible to get another page) There are examples and further information about pagination in the readme.
1 parent 9372b90 commit d2fa723

File tree

6 files changed

+521
-4
lines changed

6 files changed

+521
-4
lines changed

README.md

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,6 @@ Requests time out after 1 minute by default.
335335
To set a custom timeout, configure the method call using the `timeout` method:
336336

337337
```java
338-
import com.openlayer.api.core.JsonValue;
339-
import com.openlayer.api.models.inferencepipelines.data.DataStreamParams;
340338
import com.openlayer.api.models.inferencepipelines.data.DataStreamResponse;
341339

342340
DataStreamResponse response = client.inferencePipelines().data().stream(
@@ -602,8 +600,6 @@ DataStreamResponse response = client.inferencePipelines().data().stream(params).
602600
Or configure the method call to validate the response using the `responseValidation` method:
603601

604602
```java
605-
import com.openlayer.api.core.JsonValue;
606-
import com.openlayer.api.models.inferencepipelines.data.DataStreamParams;
607603
import com.openlayer.api.models.inferencepipelines.data.DataStreamResponse;
608604

609605
DataStreamResponse response = client.inferencePipelines().data().stream(
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package com.openlayer.api.core.http
2+
3+
import com.openlayer.api.core.http.AsyncStreamResponse.Handler
4+
import java.util.Optional
5+
import java.util.concurrent.CompletableFuture
6+
import java.util.concurrent.Executor
7+
import java.util.concurrent.atomic.AtomicReference
8+
9+
/**
10+
* A class providing access to an API response as an asynchronous stream of chunks of type [T],
11+
* where each chunk can be individually processed as soon as it arrives instead of waiting on the
12+
* full response.
13+
*/
14+
interface AsyncStreamResponse<T> {
15+
16+
/**
17+
* Registers [handler] to be called for events of this stream.
18+
*
19+
* [handler]'s methods will be called in the client's configured or default thread pool.
20+
*
21+
* @throws IllegalStateException if [subscribe] has already been called.
22+
*/
23+
fun subscribe(handler: Handler<T>): AsyncStreamResponse<T>
24+
25+
/**
26+
* Registers [handler] to be called for events of this stream.
27+
*
28+
* [handler]'s methods will be called in the given [executor].
29+
*
30+
* @throws IllegalStateException if [subscribe] has already been called.
31+
*/
32+
fun subscribe(handler: Handler<T>, executor: Executor): AsyncStreamResponse<T>
33+
34+
/**
35+
* Returns a future that completes when a stream is fully consumed, errors, or gets closed
36+
* early.
37+
*/
38+
fun onCompleteFuture(): CompletableFuture<Void?>
39+
40+
/**
41+
* Closes this resource, relinquishing any underlying resources.
42+
*
43+
* This is purposefully not inherited from [AutoCloseable] because this response should not be
44+
* synchronously closed via try-with-resources.
45+
*/
46+
fun close()
47+
48+
/** A class for handling streaming events. */
49+
fun interface Handler<in T> {
50+
51+
/** Called whenever a chunk is received. */
52+
fun onNext(value: T)
53+
54+
/**
55+
* Called when a stream is fully consumed, errors, or gets closed early.
56+
*
57+
* [onNext] will not be called once this method is called.
58+
*
59+
* @param error Non-empty if the stream completed due to an error.
60+
*/
61+
fun onComplete(error: Optional<Throwable>) {}
62+
}
63+
}
64+
65+
@JvmSynthetic
66+
internal fun <T> CompletableFuture<StreamResponse<T>>.toAsync(streamHandlerExecutor: Executor) =
67+
PhantomReachableClosingAsyncStreamResponse(
68+
object : AsyncStreamResponse<T> {
69+
70+
private val onCompleteFuture = CompletableFuture<Void?>()
71+
private val state = AtomicReference(State.NEW)
72+
73+
init {
74+
this@toAsync.whenComplete { _, error ->
75+
// If an error occurs from the original future, then we should resolve the
76+
// `onCompleteFuture` even if `subscribe` has not been called.
77+
error?.let(onCompleteFuture::completeExceptionally)
78+
}
79+
}
80+
81+
override fun subscribe(handler: Handler<T>): AsyncStreamResponse<T> =
82+
subscribe(handler, streamHandlerExecutor)
83+
84+
override fun subscribe(
85+
handler: Handler<T>,
86+
executor: Executor,
87+
): AsyncStreamResponse<T> = apply {
88+
// TODO(JDK): Use `compareAndExchange` once targeting JDK 9.
89+
check(state.compareAndSet(State.NEW, State.SUBSCRIBED)) {
90+
if (state.get() == State.SUBSCRIBED) "Cannot subscribe more than once"
91+
else "Cannot subscribe after the response is closed"
92+
}
93+
94+
this@toAsync.whenCompleteAsync(
95+
{ streamResponse, futureError ->
96+
if (state.get() == State.CLOSED) {
97+
// Avoid doing any work if `close` was called before the future
98+
// completed.
99+
return@whenCompleteAsync
100+
}
101+
102+
if (futureError != null) {
103+
// An error occurred before we started passing chunks to the handler.
104+
handler.onComplete(Optional.of(futureError))
105+
return@whenCompleteAsync
106+
}
107+
108+
var streamError: Throwable? = null
109+
try {
110+
streamResponse.stream().forEach(handler::onNext)
111+
} catch (e: Throwable) {
112+
streamError = e
113+
}
114+
115+
try {
116+
handler.onComplete(Optional.ofNullable(streamError))
117+
} finally {
118+
try {
119+
// Notify completion via the `onCompleteFuture` as well. This is in
120+
// a separate `try-finally` block so that we still complete the
121+
// future if `handler.onComplete` throws.
122+
if (streamError == null) {
123+
onCompleteFuture.complete(null)
124+
} else {
125+
onCompleteFuture.completeExceptionally(streamError)
126+
}
127+
} finally {
128+
close()
129+
}
130+
}
131+
},
132+
executor,
133+
)
134+
}
135+
136+
override fun onCompleteFuture(): CompletableFuture<Void?> = onCompleteFuture
137+
138+
override fun close() {
139+
val previousState = state.getAndSet(State.CLOSED)
140+
if (previousState == State.CLOSED) {
141+
return
142+
}
143+
144+
this@toAsync.whenComplete { streamResponse, error -> streamResponse?.close() }
145+
// When the stream is closed, we should always consider it closed. If it closed due
146+
// to an error, then we will have already completed the future earlier, and this
147+
// will be a no-op.
148+
onCompleteFuture.complete(null)
149+
}
150+
}
151+
)
152+
153+
private enum class State {
154+
NEW,
155+
SUBSCRIBED,
156+
CLOSED,
157+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.openlayer.api.core.http
2+
3+
import com.openlayer.api.core.closeWhenPhantomReachable
4+
import com.openlayer.api.core.http.AsyncStreamResponse.Handler
5+
import java.util.Optional
6+
import java.util.concurrent.CompletableFuture
7+
import java.util.concurrent.Executor
8+
9+
/**
10+
* A delegating wrapper around an `AsyncStreamResponse` that closes it once it's only phantom
11+
* reachable.
12+
*
13+
* This class ensures the `AsyncStreamResponse` is closed even if the user forgets to close it.
14+
*/
15+
internal class PhantomReachableClosingAsyncStreamResponse<T>(
16+
private val asyncStreamResponse: AsyncStreamResponse<T>
17+
) : AsyncStreamResponse<T> {
18+
19+
/**
20+
* An object used for keeping `asyncStreamResponse` open while the object is still reachable.
21+
*/
22+
private val reachabilityTracker = Object()
23+
24+
init {
25+
closeWhenPhantomReachable(reachabilityTracker, asyncStreamResponse::close)
26+
}
27+
28+
override fun subscribe(handler: Handler<T>): AsyncStreamResponse<T> = apply {
29+
asyncStreamResponse.subscribe(TrackedHandler(handler, reachabilityTracker))
30+
}
31+
32+
override fun subscribe(handler: Handler<T>, executor: Executor): AsyncStreamResponse<T> =
33+
apply {
34+
asyncStreamResponse.subscribe(TrackedHandler(handler, reachabilityTracker), executor)
35+
}
36+
37+
override fun onCompleteFuture(): CompletableFuture<Void?> =
38+
asyncStreamResponse.onCompleteFuture()
39+
40+
override fun close() = asyncStreamResponse.close()
41+
}
42+
43+
/**
44+
* A wrapper around a `Handler` that also references a `reachabilityTracker` object.
45+
*
46+
* Referencing the `reachabilityTracker` object prevents it from getting reclaimed while the handler
47+
* is still reachable.
48+
*/
49+
private class TrackedHandler<T>(
50+
private val handler: Handler<T>,
51+
private val reachabilityTracker: Any,
52+
) : Handler<T> {
53+
override fun onNext(value: T) = handler.onNext(value)
54+
55+
override fun onComplete(error: Optional<Throwable>) = handler.onComplete(error)
56+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.openlayer.api.core.http
2+
3+
import com.openlayer.api.core.closeWhenPhantomReachable
4+
import java.util.stream.Stream
5+
6+
/**
7+
* A delegating wrapper around a `StreamResponse` that closes it once it's only phantom reachable.
8+
*
9+
* This class ensures the `StreamResponse` is closed even if the user forgets to close it.
10+
*/
11+
internal class PhantomReachableClosingStreamResponse<T>(
12+
private val streamResponse: StreamResponse<T>
13+
) : StreamResponse<T> {
14+
init {
15+
closeWhenPhantomReachable(this, streamResponse)
16+
}
17+
18+
override fun stream(): Stream<T> = streamResponse.stream()
19+
20+
override fun close() = streamResponse.close()
21+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.openlayer.api.core.http
2+
3+
import java.util.stream.Stream
4+
5+
interface StreamResponse<T> : AutoCloseable {
6+
7+
fun stream(): Stream<T>
8+
9+
/** Overridden from [AutoCloseable] to not have a checked exception in its signature. */
10+
override fun close()
11+
}
12+
13+
@JvmSynthetic
14+
internal fun <T, R> StreamResponse<T>.map(transform: (T) -> R): StreamResponse<R> =
15+
object : StreamResponse<R> {
16+
override fun stream(): Stream<R> = this@map.stream().map(transform)
17+
18+
override fun close() = this@map.close()
19+
}

0 commit comments

Comments
 (0)