3232import com .dtstack .flink .sql .util .RowDataComplete ;
3333import com .google .common .collect .Lists ;
3434import com .google .common .collect .Maps ;
35- import io .vertx .core .DeploymentOptions ;
3635import io .vertx .core .Vertx ;
3736import io .vertx .core .VertxOptions ;
3837import io .vertx .core .json .JsonArray ;
@@ -95,13 +94,9 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
9594
9695 private transient Vertx vertx ;
9796
98- private transient VertxOptions vertxOptions ;
99-
100- private transient JsonObject jdbcConfig ;
101-
10297 private int asyncPoolSize = 1 ;
10398
104- private int errorLogPrintNum = 3 ;
99+ private final int errorLogPrintNum = 3 ;
105100
106101 private final AtomicBoolean connectionStatus = new AtomicBoolean (true );
107102
@@ -114,9 +109,9 @@ public class RdbAsyncReqRow extends BaseAsyncReqRow {
114109 public void open (Configuration parameters ) throws Exception {
115110 super .open (parameters );
116111
117- vertxOptions = new VertxOptions ();
112+ VertxOptions vertxOptions = new VertxOptions ();
118113
119- jdbcConfig = buildJdbcConfig ();
114+ JsonObject jdbcConfig = buildJdbcConfig ();
120115 System .setProperty ("vertx.disableFileCPResolving" , "true" );
121116 vertxOptions
122117 .setEventLoopPoolSize (DEFAULT_VERTX_EVENT_LOOP_POOL_SIZE )
@@ -131,6 +126,9 @@ public void open(Configuration parameters) throws Exception {
131126 new LinkedBlockingQueue <>(MAX_TASK_QUEUE_SIZE ),
132127 new DTThreadFactory ("rdbAsyncExec" ),
133128 new ThreadPoolExecutor .CallerRunsPolicy ());
129+
130+ vertx = Vertx .vertx (vertxOptions );
131+ rdbSqlClient = JDBCClient .createNonShared (vertx , jdbcConfig );
134132 }
135133
136134 public RdbAsyncReqRow (BaseSideInfo sideInfo ) {
@@ -153,8 +151,6 @@ public JsonObject buildJdbcConfig() {
153151
154152 @ Override
155153 protected void preInvoke (BaseRow input , ResultFuture <BaseRow > resultFuture ) {
156- vertx = Vertx .vertx (vertxOptions );
157- rdbSqlClient = JDBCClient .createNonShared (vertx , jdbcConfig );
158154 }
159155
160156 @ Override
@@ -163,7 +159,7 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
163159 //network is unhealthy
164160 while (!connectionStatus .get ()) {
165161 if (networkLogCounter .getAndIncrement () % 1000 == 0 ) {
166- LOG .info ("network unhealth to block task" );
162+ LOG .info ("network unhealthy to block task" );
167163 }
168164 Thread .sleep (100 );
169165 }
@@ -178,12 +174,14 @@ protected void asyncQueryData(Map<String, Object> inputParams,
178174 AtomicLong failCounter ,
179175 AtomicBoolean finishFlag ,
180176 CountDownLatch latch ) {
181- doAsyncQueryData (inputParams ,
182- input , resultFuture ,
183- rdbSqlClient ,
184- failCounter ,
185- finishFlag ,
186- latch );
177+ doAsyncQueryData (
178+ inputParams ,
179+ input ,
180+ resultFuture ,
181+ rdbSqlClient ,
182+ failCounter ,
183+ finishFlag ,
184+ latch );
187185 }
188186
189187 final protected void doAsyncQueryData (
@@ -198,18 +196,23 @@ final protected void doAsyncQueryData(
198196 try {
199197 String errorMsg ;
200198 Integer retryMaxNum = sideInfo .getSideTableInfo ().getConnectRetryMaxNum (3 );
199+ int logPrintTime = retryMaxNum / errorLogPrintNum == 0 ?
200+ retryMaxNum : retryMaxNum / errorLogPrintNum ;
201201 if (conn .failed ()) {
202202 connectionStatus .set (false );
203203 errorMsg = ExceptionTrace .traceOriginalCause (conn .cause ());
204- if (failCounter .getAndIncrement () %
205- (retryMaxNum / 3 == 0 ? retryMaxNum : retryMaxNum / 3 ) == 0 ) {
204+ if (failCounter .getAndIncrement () % logPrintTime == 0 ) {
206205 LOG .error ("getConnection error. cause by " + errorMsg );
207206 }
208207 LOG .error (String .format ("retry ... current time [%s]" , failCounter .get ()));
209208 if (failCounter .get () >= retryMaxNum ) {
210209 resultFuture .completeExceptionally (
211210 new SuppressRestartsException (
212- new Throwable (ExceptionTrace .traceOriginalCause (conn .cause ()))));
211+ new Throwable (
212+ ExceptionTrace .traceOriginalCause (conn .cause ())
213+ )
214+ )
215+ );
213216 finishFlag .set (true );
214217 }
215218 return ;
@@ -233,12 +236,14 @@ private void connectWithRetry(Map<String, Object> inputParams, BaseRow input, Re
233236 while (!finishFlag .get ()) {
234237 try {
235238 CountDownLatch latch = new CountDownLatch (1 );
236- asyncQueryData (inputParams ,
237- input , resultFuture ,
238- rdbSqlClient ,
239- failCounter ,
240- finishFlag ,
241- latch );
239+ asyncQueryData (
240+ inputParams ,
241+ input ,
242+ resultFuture ,
243+ rdbSqlClient ,
244+ failCounter ,
245+ finishFlag ,
246+ latch );
242247 try {
243248 latch .await ();
244249 } catch (InterruptedException e ) {
@@ -331,7 +336,9 @@ public void close() throws Exception {
331336 if (rdbSqlClient != null ) {
332337 rdbSqlClient .close (done -> {
333338 if (done .failed ()) {
334- LOG .error ("sql client close failed! " , done .cause ());
339+ LOG .error ("sql client close failed! " +
340+ ExceptionTrace .traceOriginalCause (done .cause ())
341+ );
335342 }
336343
337344 if (done .succeeded ()) {
@@ -348,7 +355,9 @@ public void close() throws Exception {
348355 if (Objects .nonNull (vertx )) {
349356 vertx .close (done -> {
350357 if (done .failed ()) {
351- LOG .error ("vert.x close error. cause by " + done .cause ().getMessage ());
358+ LOG .error ("vert.x close error. cause by " +
359+ ExceptionTrace .traceOriginalCause (done .cause ())
360+ );
352361 }
353362 });
354363 }
@@ -363,7 +372,9 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
363372 LOG .error (
364373 String .format ("\n get data with sql [%s] failed! \n cause: [%s]" ,
365374 sideInfo .getSqlCondition (),
366- rs .cause ().getMessage ()));
375+ rs .cause ().getMessage ()
376+ )
377+ );
367378 dealFillDataError (input , resultFuture , rs .cause ());
368379 return ;
369380 }
@@ -397,7 +408,10 @@ private void handleQuery(SQLConnection connection, Map<String, Object> inputPara
397408 connection .close (done -> {
398409 if (done .failed ()) {
399410 throw new SuppressRestartsException (
400- new Throwable (ExceptionTrace .traceOriginalCause (done .cause ())));
411+ new Throwable (
412+ ExceptionTrace .traceOriginalCause (done .cause ())
413+ )
414+ );
401415 }
402416 });
403417 }
@@ -411,8 +425,4 @@ private Map<String, Object> formatInputParam(Map<String, Object> inputParam) {
411425 });
412426 return result ;
413427 }
414-
415- protected int getAsyncPoolSize () {
416- return asyncPoolSize ;
417- }
418428}
0 commit comments