Skip to content

Commit 49b7550

Browse files
authored
[FLINK-32446][connectors/mongodb] MongoWriter should regularly check whether the latest flush time is arriving
This closes #12.
1 parent fe65806 commit 49b7550

File tree

3 files changed

+92
-11
lines changed

3 files changed

+92
-11
lines changed

flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java

Lines changed: 75 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
3434
import org.apache.flink.util.Collector;
3535
import org.apache.flink.util.FlinkRuntimeException;
36+
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
3637

3738
import com.mongodb.MongoException;
3839
import com.mongodb.client.MongoClient;
@@ -45,6 +46,10 @@
4546
import java.io.IOException;
4647
import java.util.ArrayList;
4748
import java.util.List;
49+
import java.util.concurrent.Executors;
50+
import java.util.concurrent.ScheduledExecutorService;
51+
import java.util.concurrent.ScheduledFuture;
52+
import java.util.concurrent.TimeUnit;
4853

4954
import static org.apache.flink.util.Preconditions.checkNotNull;
5055

@@ -68,11 +73,18 @@ public class MongoWriter<IN> implements SinkWriter<IN> {
6873
private final Collector<WriteModel<BsonDocument>> collector;
6974
private final Counter numRecordsOut;
7075
private final MongoClient mongoClient;
76+
private final long batchIntervalMs;
77+
private final int batchSize;
7178

7279
private boolean checkpointInProgress = false;
7380
private volatile long lastSendTime = 0L;
7481
private volatile long ackTime = Long.MAX_VALUE;
7582

83+
private transient volatile boolean closed = false;
84+
private transient ScheduledExecutorService scheduler;
85+
private transient ScheduledFuture<?> scheduledFuture;
86+
private transient volatile Exception flushException;
87+
7688
public MongoWriter(
7789
MongoConnectionOptions connectionOptions,
7890
MongoWriteOptions writeOptions,
@@ -83,6 +95,8 @@ public MongoWriter(
8395
this.writeOptions = checkNotNull(writeOptions);
8496
this.serializationSchema = checkNotNull(serializationSchema);
8597
this.flushOnCheckpoint = flushOnCheckpoint;
98+
this.batchIntervalMs = writeOptions.getBatchIntervalMs();
99+
this.batchSize = writeOptions.getBatchSize();
86100

87101
checkNotNull(initContext);
88102
this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
@@ -105,10 +119,37 @@ public MongoWriter(
105119

106120
// Initialize the mongo client.
107121
this.mongoClient = MongoClients.create(connectionOptions.getUri());
122+
123+
boolean flushOnlyOnCheckpoint = batchIntervalMs == -1 && batchSize == -1;
124+
125+
if (!flushOnlyOnCheckpoint && batchIntervalMs > 0) {
126+
this.scheduler =
127+
Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongo-writer"));
128+
129+
this.scheduledFuture =
130+
this.scheduler.scheduleWithFixedDelay(
131+
() -> {
132+
synchronized (MongoWriter.this) {
133+
if (!closed && isOverMaxBatchIntervalLimit()) {
134+
try {
135+
doBulkWrite();
136+
} catch (Exception e) {
137+
flushException = e;
138+
}
139+
}
140+
}
141+
},
142+
batchIntervalMs,
143+
batchIntervalMs,
144+
TimeUnit.MILLISECONDS);
145+
}
108146
}
109147

110148
@Override
111-
public void write(IN element, Context context) throws IOException, InterruptedException {
149+
public synchronized void write(IN element, Context context)
150+
throws IOException, InterruptedException {
151+
checkFlushException();
152+
112153
// do not allow new bulk writes until all actions are flushed
113154
while (checkpointInProgress) {
114155
mailboxExecutor.yield();
@@ -122,7 +163,9 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
122163
}
123164

124165
@Override
125-
public void flush(boolean endOfInput) throws IOException {
166+
public synchronized void flush(boolean endOfInput) throws IOException {
167+
checkFlushException();
168+
126169
checkpointInProgress = true;
127170
while (!bulkRequests.isEmpty() && (flushOnCheckpoint || endOfInput)) {
128171
doBulkWrite();
@@ -131,8 +174,28 @@ public void flush(boolean endOfInput) throws IOException {
131174
}
132175

133176
@Override
134-
public void close() {
135-
mongoClient.close();
177+
public synchronized void close() throws Exception {
178+
if (!closed) {
179+
if (scheduledFuture != null) {
180+
scheduledFuture.cancel(false);
181+
scheduler.shutdown();
182+
}
183+
184+
if (!bulkRequests.isEmpty()) {
185+
try {
186+
doBulkWrite();
187+
} catch (Exception e) {
188+
LOG.error("Writing records to MongoDB failed when closing MongoWriter", e);
189+
throw new IOException("Writing records to MongoDB failed.", e);
190+
} finally {
191+
mongoClient.close();
192+
closed = true;
193+
}
194+
} else {
195+
mongoClient.close();
196+
closed = true;
197+
}
198+
}
136199
}
137200

138201
@VisibleForTesting
@@ -172,13 +235,17 @@ void doBulkWrite() throws IOException {
172235
}
173236

174237
private boolean isOverMaxBatchSizeLimit() {
175-
int bulkActions = writeOptions.getBatchSize();
176-
return bulkActions != -1 && bulkRequests.size() >= bulkActions;
238+
return batchSize != -1 && bulkRequests.size() >= batchSize;
177239
}
178240

179241
private boolean isOverMaxBatchIntervalLimit() {
180-
long bulkFlushInterval = writeOptions.getBatchIntervalMs();
181242
long lastSentInterval = System.currentTimeMillis() - lastSendTime;
182-
return bulkFlushInterval != -1 && lastSentInterval >= bulkFlushInterval;
243+
return batchIntervalMs != -1 && lastSentInterval >= batchIntervalMs;
244+
}
245+
246+
private void checkFlushException() {
247+
if (flushException != null) {
248+
throw new RuntimeException("Writing records to MongoDB failed.", flushException);
249+
}
183250
}
184251
}

flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreNotWritten;
5555
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWritten;
56+
import static org.apache.flink.connector.mongodb.testutils.MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime;
5657
import static org.assertj.core.api.Assertions.assertThat;
5758
import static org.assertj.core.api.Assertions.fail;
5859

@@ -138,12 +139,12 @@ void testWriteOnBatchIntervalFlush() throws Exception {
138139
createWriter(collection, batchSize, batchIntervalMs, flushOnCheckpoint)) {
139140
writer.write(buildMessage(1), null);
140141
writer.write(buildMessage(2), null);
142+
writer.doBulkWrite();
141143
writer.write(buildMessage(3), null);
142144
writer.write(buildMessage(4), null);
143-
writer.doBulkWrite();
144-
}
145145

146-
assertThatIdsAreWritten(collectionOf(collection), 1, 2, 3, 4);
146+
assertThatIdsAreWrittenWithMaxWaitTime(collectionOf(collection), 10000L, 1, 2, 3, 4);
147+
}
147148
}
148149

149150
@Test

flink-connector-mongodb/src/test/java/org/apache/flink/connector/mongodb/testutils/MongoTestUtil.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,17 @@ public static void assertThatIdsAreWritten(MongoCollection<Document> coll, Integ
7777

7878
assertThat(actualIds).containsExactlyInAnyOrder(ids);
7979
}
80+
81+
public static void assertThatIdsAreWrittenWithMaxWaitTime(
82+
MongoCollection<Document> coll, long maxWaitTimeMs, Integer... ids)
83+
throws InterruptedException {
84+
long startTimeMillis = System.currentTimeMillis();
85+
while (System.currentTimeMillis() - startTimeMillis < maxWaitTimeMs) {
86+
if (coll.countDocuments(Filters.in("_id", ids)) == ids.length) {
87+
break;
88+
}
89+
Thread.sleep(1000L);
90+
}
91+
assertThatIdsAreWritten(coll, ids);
92+
}
8093
}

0 commit comments

Comments
 (0)