Skip to content

Commit a7f0c1a

Browse files
committed
[client] Throw Exception after retrying several times.
1 parent 2d69732 commit a7f0c1a

7 files changed

Lines changed: 245 additions & 40 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetchBuffer.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.fluss.client.table.scanner.log;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.exception.FetchException;
2122
import org.apache.fluss.exception.WakeupException;
2223
import org.apache.fluss.metadata.TableBucket;
24+
import org.apache.fluss.utils.ExceptionUtils;
2325

2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
@@ -73,17 +75,19 @@ public class LogFetchBuffer implements AutoCloseable {
7375
@GuardedBy("lock")
7476
private @Nullable CompletedFetch nextInLineFetch;
7577

78+
@GuardedBy("lock")
79+
private @Nullable Throwable throwable;
80+
7681
public LogFetchBuffer() {
7782
this.completedFetches = new LinkedList<>();
7883
}
7984

8085
/**
81-
* Returns {@code true} if there are no completed fetches pending to return to the user.
82-
*
83-
* @return {@code true} if the buffer is empty, {@code false} otherwise
86+
* @return {@code true} if there are no completed fetches pending to return to the user and no
87+
* error has been recorded, {@code false} otherwise
8488
*/
8589
boolean isEmpty() {
86-
return inLock(lock, completedFetches::isEmpty);
90+
return inLock(lock, () -> completedFetches.isEmpty() && throwable == null);
8791
}
8892

8993
void pend(PendingFetch pendingFetch) {
@@ -100,10 +104,14 @@ void pend(PendingFetch pendingFetch) {
100104
* Tries to complete the pending fetches in order, convert them into completed fetches in the
101105
* buffer.
102106
*/
103-
void tryComplete(TableBucket tableBucket) {
107+
void tryComplete(TableBucket tableBucket, Throwable t) {
104108
inLock(
105109
lock,
106110
() -> {
111+
if (t != null) {
112+
this.throwable = t;
113+
}
114+
107115
boolean hasCompleted = false;
108116
LinkedList<PendingFetch> pendings = this.pendingFetches.get(tableBucket);
109117
while (pendings != null && !pendings.isEmpty()) {
@@ -157,12 +165,22 @@ void setNextInLineFetch(@Nullable CompletedFetch nextInLineFetch) {
157165
inLock(lock, () -> this.nextInLineFetch = nextInLineFetch);
158166
}
159167

160-
CompletedFetch peek() {
161-
return inLock(lock, completedFetches::peek);
168+
CompletedFetch peek() throws FetchException {
169+
return inLock(
170+
lock,
171+
() -> {
172+
checkException();
173+
return completedFetches.peek();
174+
});
162175
}
163176

164-
CompletedFetch poll() {
165-
return inLock(lock, completedFetches::poll);
177+
CompletedFetch poll() throws FetchException {
178+
return inLock(
179+
lock,
180+
() -> {
181+
checkException();
182+
return completedFetches.poll();
183+
});
166184
}
167185

168186
/**
@@ -273,6 +291,12 @@ Set<TableBucket> pendedBuckets() {
273291
return inLock(lock, pendingFetches::keySet);
274292
}
275293

294+
void checkException() throws FetchException {
295+
if (throwable != null) {
296+
throw new FetchException(ExceptionUtils.stripCompletionException(throwable));
297+
}
298+
}
299+
276300
@Override
277301
public void close() throws Exception {
278302
inLock(lock, () -> retainAll(Collections.emptySet()));

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.fluss.config.ConfigOptions;
2828
import org.apache.fluss.config.Configuration;
2929
import org.apache.fluss.exception.ApiException;
30+
import org.apache.fluss.exception.FetchException;
3031
import org.apache.fluss.exception.InvalidMetadataException;
3132
import org.apache.fluss.exception.LeaderNotAvailableException;
3233
import org.apache.fluss.exception.PartitionNotExistException;
@@ -161,7 +162,7 @@ public boolean hasAvailableFetches() {
161162
return !logFetchBuffer.isEmpty();
162163
}
163164

164-
public Map<TableBucket, List<ScanRecord>> collectFetch() {
165+
public Map<TableBucket, List<ScanRecord>> collectFetch() throws FetchException {
165166
return logFetchCollector.collectFetch(logFetchBuffer);
166167
}
167168

@@ -460,7 +461,8 @@ private void pendRemoteFetches(
460461
logScannerStatus,
461462
isCheckCrcs);
462463
logFetchBuffer.pend(pendingFetch);
463-
downloadFuture.onComplete(() -> logFetchBuffer.tryComplete(segment.tableBucket()));
464+
downloadFuture.whenComplete(
465+
(throwable) -> logFetchBuffer.tryComplete(segment.tableBucket(), throwable));
464466
}
465467
}
466468

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloadFuture.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.File;
2424
import java.io.IOException;
2525
import java.util.concurrent.CompletableFuture;
26+
import java.util.function.Consumer;
2627

2728
/** Represents the future of a remote log download request. */
2829
public class RemoteLogDownloadFuture {
@@ -57,7 +58,7 @@ public Runnable getRecycleCallback() {
5758
return recycleCallback;
5859
}
5960

60-
public void onComplete(Runnable callback) {
61-
logFileFuture.thenRun(callback);
61+
public void whenComplete(Consumer<Throwable> callback) {
62+
logFileFuture.whenComplete((file, throwable) -> callback.accept(throwable));
6263
}
6364
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/RemoteLogDownloader.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class RemoteLogDownloader implements Closeable {
6262
private static final Logger LOG = LoggerFactory.getLogger(RemoteLogDownloader.class);
6363

6464
private static final long POLL_TIMEOUT = 5000L;
65+
private static final int MAX_RETRY_COUNT = 5;
6566

6667
private final Path localLogDir;
6768

@@ -151,31 +152,24 @@ void fetchOnce() throws Exception {
151152
return;
152153
}
153154

155+
downloadRemoteLog(request, MAX_RETRY_COUNT, System.currentTimeMillis());
156+
}
157+
158+
private void downloadRemoteLog(
159+
RemoteLogDownloadRequest request, int retryCount, long startTime) {
154160
try {
155161
// 1. cleanup the finished logs first to free up disk space
156162
cleanupRemoteLogs();
157163

158164
// 2. do the actual download work
159165
FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
160166
scannerMetricGroup.remoteFetchRequestCount().inc();
161-
162-
long startTime = System.currentTimeMillis();
163-
// download the remote file to local
164167
remoteFileDownloader
165168
.downloadFileAsync(fsPathAndFileName, localLogDir)
166169
.whenComplete(
167170
(bytes, throwable) -> {
168171
if (throwable != null) {
169-
LOG.error(
170-
"Failed to download remote log segment file {}.",
171-
fsPathAndFileName.getFileName(),
172-
ExceptionUtils.stripExecutionException(throwable));
173-
// release the semaphore for the failed request
174-
prefetchSemaphore.release();
175-
// add back the request to the queue,
176-
// so we do not complete the request.future here
177-
segmentsToFetch.add(request);
178-
scannerMetricGroup.remoteFetchErrorCount().inc();
172+
handleFetchException(request, throwable, retryCount, startTime);
179173
} else {
180174
LOG.info(
181175
"Successfully downloaded remote log segment file {} to local cost {} ms.",
@@ -190,12 +184,27 @@ void fetchOnce() throws Exception {
190184
}
191185
});
192186
} catch (Throwable t) {
193-
prefetchSemaphore.release();
194-
// add back the request to the queue
195-
segmentsToFetch.add(request);
196-
scannerMetricGroup.remoteFetchErrorCount().inc();
197-
// log the error and continue instead of shutdown the download thread
198-
LOG.error("Failed to download remote log segment.", t);
187+
handleFetchException(request, t, retryCount, startTime);
188+
}
189+
}
190+
191+
private void handleFetchException(
192+
RemoteLogDownloadRequest request, Throwable throwable, int retryCount, long startTime) {
193+
194+
LOG.error(
195+
"Failed to download remote log segment file {}.",
196+
request.getFsPathAndFileName().getFileName(),
197+
ExceptionUtils.stripExecutionException(throwable));
198+
scannerMetricGroup.remoteFetchErrorCount().inc();
199+
if (retryCount >= 1) {
200+
downloadRemoteLog(request, retryCount - 1, startTime);
201+
} else {
202+
request.future.completeExceptionally(
203+
new IOException(
204+
String.format(
205+
"Failed to download remote log segment file %s, retry count %d",
206+
request.getFsPathAndFileName().getFileName(), MAX_RETRY_COUNT),
207+
throwable));
199208
}
200209
}
201210

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/LogFetchBufferTest.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.client.table.scanner.log;
1919

20+
import org.apache.fluss.exception.FetchException;
2021
import org.apache.fluss.exception.WakeupException;
2122
import org.apache.fluss.metadata.TableBucket;
2223
import org.apache.fluss.record.LogRecordReadContext;
@@ -26,6 +27,7 @@
2627
import org.junit.jupiter.api.BeforeEach;
2728
import org.junit.jupiter.api.Test;
2829

30+
import java.io.IOException;
2931
import java.time.Duration;
3032
import java.util.Arrays;
3133
import java.util.Collections;
@@ -45,6 +47,7 @@
4547
import static org.apache.fluss.record.TestData.TEST_SCHEMA_GETTER;
4648
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
4749
import static org.assertj.core.api.Assertions.assertThat;
50+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4851

4952
/** Test for {@link LogFetchBuffer}. */
5053
public class LogFetchBufferTest {
@@ -223,15 +226,15 @@ void testPendFetches() throws Exception {
223226

224227
Future<Boolean> signal =
225228
service.submit(() -> await(logFetchBuffer, Duration.ofSeconds(1)));
226-
logFetchBuffer.tryComplete(pending1.tableBucket());
229+
logFetchBuffer.tryComplete(pending1.tableBucket(), null);
227230
// nothing happen, as pending1 is not completed
228231
assertThat(logFetchBuffer.isEmpty()).isTrue();
229232
// no condition signal
230233
assertThat(signal.get()).isFalse();
231234

232235
signal = service.submit(() -> await(logFetchBuffer, Duration.ofMinutes(1)));
233236
completed1.set(true);
234-
logFetchBuffer.tryComplete(pending1.tableBucket());
237+
logFetchBuffer.tryComplete(pending1.tableBucket(), null);
235238
assertThat(signal.get()).isTrue();
236239
assertThat(logFetchBuffer.isEmpty()).isFalse();
237240
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket1);
@@ -241,18 +244,42 @@ void testPendFetches() throws Exception {
241244

242245
signal = service.submit(() -> await(logFetchBuffer, Duration.ofMinutes(1)));
243246
completed2.set(true);
244-
logFetchBuffer.tryComplete(pending2.tableBucket());
247+
logFetchBuffer.tryComplete(pending2.tableBucket(), null);
245248
assertThat(signal.get()).isTrue();
246249
assertThat(logFetchBuffer.isEmpty()).isFalse();
247-
logFetchBuffer.tryComplete(pending3.tableBucket());
248-
logFetchBuffer.tryComplete(pending4.tableBucket());
250+
logFetchBuffer.tryComplete(pending3.tableBucket(), null);
251+
logFetchBuffer.tryComplete(pending4.tableBucket(), null);
249252
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket2);
250253
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
251254
assertThat(logFetchBuffer.poll().tableBucket).isEqualTo(tableBucket3);
252255
assertThat(logFetchBuffer.isEmpty()).isTrue();
253256
}
254257
}
255258

259+
@Test
260+
void testFetchException() throws Exception {
261+
try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
262+
AtomicBoolean completed = new AtomicBoolean(false);
263+
PendingFetch pendingFetch = makePendingFetch(tableBucket1, completed);
264+
265+
logFetchBuffer.tryComplete(pendingFetch.tableBucket(), null);
266+
assertThat(logFetchBuffer.isEmpty()).isTrue();
267+
logFetchBuffer.pend(pendingFetch);
268+
assertThat(logFetchBuffer.isEmpty()).isTrue();
269+
270+
completed.set(true);
271+
logFetchBuffer.tryComplete(
272+
pendingFetch.tableBucket(), new IOException("Test fetch exception"));
273+
assertThat(logFetchBuffer.isEmpty()).isFalse();
274+
assertThatThrownBy(logFetchBuffer::poll)
275+
.isExactlyInstanceOf(FetchException.class)
276+
.hasMessageContaining("Test fetch exception");
277+
assertThatThrownBy(logFetchBuffer::peek)
278+
.isExactlyInstanceOf(FetchException.class)
279+
.hasMessageContaining("Test fetch exception");
280+
}
281+
}
282+
256283
private boolean await(LogFetchBuffer buffer, Duration waitTime) throws InterruptedException {
257284
return buffer.awaitNotEmpty(System.nanoTime() + waitTime.toNanos());
258285
}

0 commit comments

Comments
 (0)