11/**
2- * Copyright 2017, 2018 IBM Corporation
2+ * Copyright 2017, 2018, 2019 IBM Corporation
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1919import java .util .List ;
2020import java .util .Map ;
2121import java .util .Map .Entry ;
22+ import java .util .concurrent .CountDownLatch ;
23+ import java .util .concurrent .atomic .AtomicBoolean ;
2224import java .util .concurrent .atomic .AtomicInteger ;
2325
2426import org .apache .kafka .connect .source .SourceRecord ;
3032public class MQSourceTask extends SourceTask {
3133 private static final Logger log = LoggerFactory .getLogger (MQSourceTask .class );
3234
33- private static int BATCH_SIZE = 100 ;
34- private static int MAX_UNCOMMITTED_MSGS = 10000 ;
35- private static int MAX_UNCOMMITTED_MSGS_DELAY_MS = 500 ;
35+ private static int BATCH_SIZE = 250 ; // The maximum number of records returned per call to poll()
36+ private CountDownLatch batchCompleteSignal = null ; // Used to signal completion of a batch
37+ private AtomicInteger pollCycle = new AtomicInteger (1 ); // Incremented each time poll() is called
38+ private int lastCommitPollCycle = 0 ; // The value of pollCycle the last time commit() was called
39+ private AtomicBoolean stopNow = new AtomicBoolean (); // Whether stop has been requested
3640
3741 private JMSReader reader ;
38- private AtomicInteger uncommittedMessages = new AtomicInteger (0 );
3942
4043 public MQSourceTask () {
4144 }
@@ -87,35 +90,51 @@ public MQSourceTask() {
8790
8891 final List <SourceRecord > msgs = new ArrayList <>();
8992 int messageCount = 0 ;
90- int uncommittedMessagesInt = this .uncommittedMessages .get ();
9193
92- if (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS ) {
93- log .info ("Polling for records" );
94+ // Resolve any in-flight transaction, committing unless there has been an error between
95+ // receiving the message from MQ and converting it
96+ if (batchCompleteSignal != null ) {
97+ log .debug ("Awaiting batch completion signal" );
98+ batchCompleteSignal .await ();
99+
100+ log .debug ("Committing records" );
101+ reader .commit ();
102+ }
103+
104+ // Increment the counter for the number of times poll is called so we can ensure we don't get stuck waiting for
105+ // commitRecord callbacks to trigger the batch complete signal
106+ int currentPollCycle = pollCycle .incrementAndGet ();
107+ log .debug ("Starting poll cycle {}" , currentPollCycle );
94108
109+ if (!stopNow .get ()) {
110+ log .info ("Polling for records" );
95111 SourceRecord src ;
96112 do {
97113 // For the first message in the batch, wait a while if no message
98114 src = reader .receive (messageCount == 0 );
99115 if (src != null ) {
100116 msgs .add (src );
101117 messageCount ++;
102- uncommittedMessagesInt = this .uncommittedMessages .incrementAndGet ();
103118 }
104- } while ((src != null ) && (messageCount < BATCH_SIZE ) && (uncommittedMessagesInt < MAX_UNCOMMITTED_MSGS ));
105-
106- log .debug ("Poll returning {} records" , messageCount );
119+ } while ((src != null ) && (messageCount < BATCH_SIZE ) && !stopNow .get ());
107120 }
108- else {
109- log .info ("Uncommitted message limit reached" );
110- Thread .sleep (MAX_UNCOMMITTED_MSGS_DELAY_MS );
121+
122+ synchronized (this ) {
123+ if (messageCount > 0 ) {
124+ batchCompleteSignal = new CountDownLatch (messageCount );
125+ }
126+ else {
127+ batchCompleteSignal = null ;
128+ }
111129 }
112130
131+ log .debug ("Poll returning {} records" , messageCount );
132+
113133 log .trace ("[{}] Exit {}.poll, retval={}" , Thread .currentThread ().getId (), this .getClass ().getName (), messageCount );
114134 return msgs ;
115135 }
116136
117-
118- /**
137+ /**
119138 * <p>
120139 * Commit the offsets, up to the offsets that have been returned by {@link #poll()}. This
121140 * method should block until the commit is complete.
@@ -129,9 +148,49 @@ public MQSourceTask() {
129148 public void commit () throws InterruptedException {
130149 log .trace ("[{}] Entry {}.commit" , Thread .currentThread ().getId (), this .getClass ().getName ());
131150
132- log .debug ("Committing records" );
133- reader .commit ();
134- this .uncommittedMessages .set (0 );
151+ // This callback is simply used to ensure that the mechanism to use commitRecord callbacks
152+ // to check that all messages in a batch are complete is not getting stuck. If this callback
153+ // is being called, it means that Kafka Connect believes that all outstanding messages have
154+ // been completed. That should mean that commitRecord has been called for all of them too.
155+ // However, if too few calls to commitRecord are received, the connector could wait indefinitely.
156+ // If this commit callback is called twice without the poll cycle increasing, trigger the
157+ // batch complete signal directly.
158+ int currentPollCycle = pollCycle .get ();
159+ log .debug ("Commit starting in poll cycle {}" , currentPollCycle );
160+ boolean willShutdown = false ;
161+
162+ if (lastCommitPollCycle == currentPollCycle )
163+ {
164+ synchronized (this ) {
165+ if (batchCompleteSignal != null ) {
166+ log .debug ("Bumping batch complete signal by {}" , batchCompleteSignal .getCount ());
167+
168+ // This means we're waiting for the signal in the poll() method and it's been
169+ // waiting for at least two calls to this commit callback. It's stuck.
170+ while (batchCompleteSignal .getCount () > 0 ) {
171+ batchCompleteSignal .countDown ();
172+ }
173+ }
174+ else if (stopNow .get ()) {
175+ log .debug ("Shutting down with empty batch after delay" );
176+ willShutdown = true ;
177+ }
178+ }
179+ }
180+ else {
181+ lastCommitPollCycle = currentPollCycle ;
182+
183+ synchronized (this ) {
184+ if ((batchCompleteSignal == null ) && stopNow .get ()) {
185+ log .debug ("Shutting down with empty batch" );
186+ willShutdown = true ;
187+ }
188+ }
189+ }
190+
191+ if (willShutdown ) {
192+ shutdown ();
193+ }
135194
136195 log .trace ("[{}] Exit {}.commit" , Thread .currentThread ().getId (), this .getClass ().getName ());
137196 }
@@ -149,10 +208,59 @@ public void commit() throws InterruptedException {
149208 @ Override public void stop () {
150209 log .trace ("[{}] Entry {}.stop" , Thread .currentThread ().getId (), this .getClass ().getName ());
151210
211+ stopNow .set (true );
212+
213+ boolean willShutdown = false ;
214+
215+ synchronized (this ) {
216+ if (batchCompleteSignal == null ) {
217+ willShutdown = true ;
218+ }
219+ }
220+
221+ if (willShutdown ) {
222+ shutdown ();
223+ }
224+
225+ log .trace ("[{}] Exit {}.stop" , Thread .currentThread ().getId (), this .getClass ().getName ());
226+ }
227+
228+ /**
229+ * <p>
230+ * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
231+ * </p>
232+ * <p>
233+ * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
234+ * automatically. This hook is provided for systems that also need to store offsets internally
235+ * in their own system.
236+ * </p>
237+ *
238+ * @param record {@link SourceRecord} that was successfully sent via the producer.
239+ * @throws InterruptedException
240+ */
241+ @ Override public void commitRecord (SourceRecord record ) throws InterruptedException {
242+ log .trace ("[{}] Entry {}.commitRecord, record={}" , Thread .currentThread ().getId (), this .getClass ().getName (), record );
243+
244+ synchronized (this ) {
245+ batchCompleteSignal .countDown ();
246+ }
247+
248+ log .trace ("[{}] Exit {}.commitRecord" , Thread .currentThread ().getId (), this .getClass ().getName ());
249+ }
250+
251+ /**
252+ * <p>
253+ * Shuts down the task, releasing any resource held by the task.
254+ * </p>
255+ */
256+ private void shutdown () {
257+ log .trace ("[{}] Entry {}.shutdown" , Thread .currentThread ().getId (), this .getClass ().getName ());
258+
259+ // Close the connection to MQ to clean up
152260 if (reader != null ) {
153261 reader .close ();
154262 }
155263
156- log .trace ("[{}] Exit {}.stop " , Thread .currentThread ().getId (), this .getClass ().getName ());
264+ log .trace ("[{}] Exit {}.shutdown " , Thread .currentThread ().getId (), this .getClass ().getName ());
157265 }
158266}
0 commit comments