|
33 | 33 | import org.slf4j.LoggerFactory; |
34 | 34 |
|
35 | 35 | import java.io.IOException; |
| 36 | +import java.sql.ResultSet; |
36 | 37 | import java.sql.SQLException; |
| 38 | +import java.sql.Statement; |
37 | 39 | import java.util.List; |
38 | 40 | import java.util.concurrent.ScheduledExecutorService; |
39 | 41 | import java.util.concurrent.ScheduledFuture; |
|
44 | 46 |
|
45 | 47 | /** |
46 | 48 | * An upsert OutputFormat for JDBC. |
| 49 | + * |
47 | 50 | * @author maqi |
48 | 51 | */ |
49 | 52 | public class JDBCUpsertOutputFormat extends AbstractJDBCOutputFormat<Tuple2<Boolean, Row>> { |
@@ -103,7 +106,7 @@ public JDBCUpsertOutputFormat( |
103 | 106 | * |
104 | 107 | * @param taskNumber The number of the parallel instance. |
105 | 108 | * @throws IOException Thrown, if the output could not be opened due to an |
106 | | - * I/O problem. |
| 109 | + * I/O problem. |
107 | 110 | */ |
108 | 111 | @Override |
109 | 112 | public void open(int taskNumber, int numTasks) throws IOException { |
@@ -167,7 +170,7 @@ public synchronized void writeRecord(Tuple2<Boolean, Row> tuple2) throws IOExcep |
167 | 170 |
|
168 | 171 | private void checkConnectionOpen() { |
169 | 172 | try { |
170 | | - if (connection.isClosed()) { |
| 173 | + if (!connection.isValid(10)) { |
171 | 174 | LOG.info("db connection reconnect.."); |
172 | 175 | establishConnection(); |
173 | 176 | jdbcWriter.prepareStatement(connection); |
@@ -270,7 +273,8 @@ public Builder setFieldTypes(int[] fieldTypes) { |
270 | 273 | } |
271 | 274 |
|
272 | 275 | /** |
273 | | - * optional, partition Fields |
| 276 | + * optional, partition Fields |
| 277 | + * |
274 | 278 | * @param partitionFields |
275 | 279 | * @return |
276 | 280 | */ |
|
0 commit comments