Skip to content

Commit e385fc3

Browse files
authored
Merge pull request #677 from basho/streaming-api-2
Streaming results for List Keys/Buckets, 2i, and MR
2 parents 709ce84 + a797ba6 commit e385fc3

File tree

84 files changed

+3774
-2151
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+3774
-2151
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2013-2016 Basho Technologies Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.basho.riak.client.api;
17+
18+
import com.basho.riak.client.core.FutureOperation;
19+
import com.basho.riak.client.core.PBStreamingFutureOperation;
20+
import com.basho.riak.client.core.RiakCluster;
21+
import com.basho.riak.client.core.RiakFuture;
22+
23+
/**
24+
* @author Sergey Galkin <srggal at gmail dot com>
25+
* @since 2.1.0
26+
*/
27+
public abstract class AsIsRiakCommand<R, I> extends RiakCommand<R, I>
28+
{
29+
protected abstract FutureOperation<R, ?, I> buildCoreOperation();
30+
31+
protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
32+
{
33+
final FutureOperation<R, ?, I> coreOperation = buildCoreOperation();
34+
35+
return cluster.execute(coreOperation);
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Copyright 2013-2016 Basho Technologies Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.basho.riak.client.api;
17+
18+
import com.basho.riak.client.api.commands.CoreFutureAdapter;
19+
import com.basho.riak.client.core.FutureOperation;
20+
import com.basho.riak.client.core.RiakCluster;
21+
import com.basho.riak.client.core.RiakFuture;
22+
23+
/**
24+
* @author Sergey Galkin <srggal at gmail dot com>
25+
* @since 2.1.0
26+
*/
27+
public abstract class GenericRiakCommand<R, I, CoreR, CoreI> extends RiakCommand<R, I>
28+
{
29+
public static abstract class GenericRiakCommandWithSameInfo<R, I, CoreR> extends GenericRiakCommand<R,I, CoreR, I>
30+
{
31+
@Override
32+
protected I convertInfo(I coreInfo) {
33+
return coreInfo;
34+
}
35+
}
36+
37+
protected abstract FutureOperation<CoreR, ?, CoreI> buildCoreOperation();
38+
39+
protected RiakFuture<R,I> executeAsync(RiakCluster cluster)
40+
{
41+
final FutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation();
42+
assert coreOperation != null;
43+
44+
final RiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);
45+
46+
assert coreFuture != null;
47+
48+
final CoreFutureAdapter<R, I, CoreR, CoreI> future =
49+
new CoreFutureAdapter<R, I, CoreR, CoreI>(coreFuture)
50+
{
51+
@Override
52+
protected R convertResponse(CoreR coreResponse)
53+
{
54+
return GenericRiakCommand.this.convertResponse(coreOperation, coreResponse);
55+
}
56+
57+
@Override
58+
protected I convertQueryInfo(CoreI coreQueryInfo)
59+
{
60+
return GenericRiakCommand.this.convertInfo(coreQueryInfo);
61+
}
62+
};
63+
coreFuture.addListener(future);
64+
return future;
65+
}
66+
67+
protected abstract R convertResponse(FutureOperation<CoreR, ?, CoreI> request, CoreR coreResponse);
68+
69+
protected abstract I convertInfo(CoreI coreInfo);
70+
}

src/main/java/com/basho/riak/client/api/RiakClient.java

+41-1
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@
155155
* </ul>
156156
* @author Dave Rusek <drusek at basho dot com>
157157
* @author Brian Roach <roach at basho dot com>
158+
* @author Alex Moore <amoore at basho.com>
158159
* @author Sergey Galkin <srggal at gmail dot com>
159160
* @since 2.0
160161
*/
@@ -277,8 +278,9 @@ public static RiakClient newClient(InetSocketAddress... addresses) throws Unknow
277278
* @return a new RiakClient instance.
278279
* @throws java.net.UnknownHostException if a supplied hostname cannot be resolved.
279280
* @since 2.0.3
280-
* @see com.basho.riak.client.core.RiakCluster.Builder#Builder(RiakNode.Builder, List)
281+
* @see com.basho.riak.client.core.RiakCluster.Builder#RiakCluster.Builder(RiakNode.Builder, List)
281282
*/
283+
// NB: IntelliJ will see the above @see statement as invalid, but it's correct: https://bugs.openjdk.java.net/browse/JDK-8031625
282284
public static RiakClient newClient(RiakNode.Builder nodeBuilder, List<String> addresses) throws UnknownHostException
283285
{
284286
final RiakCluster cluster = new RiakCluster.Builder(nodeBuilder, addresses).build();
@@ -402,6 +404,44 @@ public <T,S> RiakFuture<T,S> executeAsync(RiakCommand<T,S> command)
402404
return command.executeAsync(cluster);
403405
}
404406

407+
/**
408+
* Execute a StreamableRiakCommand asynchronously, and stream the results back before
409+
* the command {@link RiakFuture#isDone() is done}.
410+
* <p>
411+
* Calling this method causes the client to execute the provided
412+
* StreamableRiakCommand asynchronously.
413+
* It will immediately return a RiakFuture that contains an
414+
* <b>immediately</b> available result (via {@link RiakFuture#get()}) that
415+
* data will be streamed to.
416+
* The RiakFuture will also keep track of the overall operation's progress
417+
* with the {@link RiakFuture#isDone}, etc methods.
418+
* </p>
419+
* <p>
420+
* Because the consumer thread will poll for new results, it is advisable to check the
421+
* consumer thread's interrupted status via
422+
* {@link Thread#isInterrupted() Thread.currentThread().isInterrupted() }, as the result
423+
* iterator will not propagate an InterruptedException, but it will set the Thread's
424+
* interrupted flag.
425+
* </p>
426+
* @param <I> StreamableRiakCommand's immediate return type, available before the command/operation is complete.
427+
* @param <S> The RiakCommand's query info type.
428+
* @param command The RiakCommand to execute.
429+
* @param timeoutMS The polling timeout in milliseconds for each result chunk.
430+
* If the timeout is reached it will try again, instead of blocking indefinitely.
431+
* If the value is too small (less than the average chunk arrival time), the
432+
* result iterator will essentially busy wait.
433+
* If the timeout is too large (much greater than the average chunk arrival time),
434+
* the result iterator can block the consuming thread from seeing the done()
435+
* status until the timeout is reached.
436+
* @return a RiakFuture for the operation
437+
* @since 2.1.0
438+
* @see RiakFuture
439+
*/
440+
public <I extends StreamableRiakCommand.StreamableResponse,S> RiakFuture<I,S> executeAsyncStreaming(StreamableRiakCommand<I, S, ?, ?> command, int timeoutMS)
441+
{
442+
return command.executeAsyncStreaming(cluster, timeoutMS);
443+
}
444+
405445
/**
406446
* Shut down the client and the underlying RiakCluster.
407447
* <p>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright 2016 Basho Technologies, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.basho.riak.client.api;
18+
19+
import com.basho.riak.client.api.commands.ChunkedResponseIterator;
20+
import com.basho.riak.client.api.commands.ImmediateCoreFutureAdapter;
21+
import com.basho.riak.client.core.*;
22+
23+
import java.util.Iterator;
24+
25+
/*
26+
* The base class for all Streamable Riak Commands.
27+
* Allows the user to either use {@link RiakCommand#executeAsync} and return a "batch-mode" result
28+
* that is only available after the command is complete, or
29+
* use {@link StreamableRiakCommand#executeAsyncStreaming} and return a "immediate" or "stream-mode" result
30+
* that data will flow into.
31+
* @param <S> The response type returned by "streaming mode" {@link executeAsyncStreaming}
32+
* @param <R> The response type returned by the "batch mode" @{link executeAsync}
33+
* @param <I> The query info type
34+
* @author Alex Moore <amoore at basho.com>
35+
* @author Sergey Galkin <srggal at gmail dot com>
36+
* @since 2.0
37+
*/
38+
public abstract class StreamableRiakCommand<R extends StreamableRiakCommand.StreamableResponse, I, CoreR, CoreI> extends GenericRiakCommand<R, I, CoreR, CoreI>
39+
{
40+
public static abstract class StreamableRiakCommandWithSameInfo<R extends StreamableResponse, I, CoreR> extends StreamableRiakCommand<R,I, CoreR, I>
41+
{
42+
@Override
43+
protected I convertInfo(I coreInfo) {
44+
return coreInfo;
45+
}
46+
}
47+
48+
public static abstract class StreamableResponse<T, S> implements Iterable<T>
49+
{
50+
protected ChunkedResponseIterator<T, ?, S> chunkedResponseIterator;
51+
52+
/**
53+
* Constructor for streamable response
54+
* @param chunkedResponseIterator
55+
*/
56+
protected StreamableResponse(ChunkedResponseIterator<T, ?, S> chunkedResponseIterator) {
57+
this.chunkedResponseIterator = chunkedResponseIterator;
58+
}
59+
60+
/**
61+
* Constructor for not streamable response.
62+
*/
63+
protected StreamableResponse()
64+
{
65+
}
66+
67+
/**
68+
* Whether the results are to be streamed back.
69+
* If true, results will appear in this class's iterator.
70+
* If false, they will appear in the original result collection.
71+
* @return true if the results are to be streamed.
72+
*/
73+
public boolean isStreaming()
74+
{
75+
return chunkedResponseIterator != null;
76+
}
77+
78+
/**
79+
* Get an iterator over the result data.
80+
*
81+
* If using the streaming API, this method will block
82+
* and wait for more data if none is immediately available.
83+
* It is also advisable to check {@link Thread#isInterrupted()}
84+
* in environments where thread interrupts must be obeyed.
85+
*
86+
* @return an iterator over the result data.
87+
*/
88+
@Override
89+
public Iterator<T> iterator() {
90+
if (isStreaming()) {
91+
assert chunkedResponseIterator != null;
92+
return chunkedResponseIterator;
93+
}
94+
95+
throw new UnsupportedOperationException("Iterating is only supported for streamable response");
96+
}
97+
}
98+
99+
protected abstract R createResponse(int timeout, StreamingRiakFuture<CoreR, CoreI> coreFuture);
100+
101+
protected abstract PBStreamingFutureOperation<CoreR, ?, CoreI> buildCoreOperation(boolean streamResults);
102+
103+
@Override
104+
protected final FutureOperation<CoreR, ?, CoreI> buildCoreOperation() {
105+
return buildCoreOperation(false);
106+
}
107+
108+
protected final RiakFuture<R, I> executeAsyncStreaming(RiakCluster cluster, int timeout)
109+
{
110+
final PBStreamingFutureOperation<CoreR, ?, CoreI> coreOperation = buildCoreOperation(true);
111+
final StreamingRiakFuture<CoreR, CoreI> coreFuture = cluster.execute(coreOperation);
112+
113+
final R r = createResponse(timeout, coreFuture);
114+
115+
final ImmediateCoreFutureAdapter<R,I, CoreR, CoreI> future = new ImmediateCoreFutureAdapter<R,I,CoreR,CoreI>(coreFuture, r)
116+
{
117+
@Override
118+
protected R convertResponse(CoreR response)
119+
{
120+
return StreamableRiakCommand.this.convertResponse(coreOperation, response);
121+
}
122+
123+
@Override
124+
protected I convertQueryInfo(CoreI coreQueryInfo)
125+
{
126+
return StreamableRiakCommand.this.convertInfo(coreQueryInfo);
127+
}
128+
};
129+
130+
coreFuture.addListener(future);
131+
return future;
132+
}
133+
}

0 commit comments

Comments
 (0)