@@ -82,11 +82,8 @@ public class JDBCExtract extends ExtractToOne {
8282 private int idleTimeout = 0 ;
8383
8484 private JdbcIO .DataSourceConfiguration datasourceConfig ;
85- private long numBatches ;
8685 private ComboPooledDataSource initializationDS ;
87- private String [] orderByCols ;
8886 private String viewName ;
89- private String orderedQuery ;
9087 private Schema schema ;
9188 private String keyValueQuery ;
9289 private Schema keyValueSchema ;
@@ -135,43 +132,7 @@ public void init() throws ComponentInitializationException {
135132 // of batches
136133 String runId = UUID .randomUUID ().toString ().replaceAll ("-" , "_" );
137134 this .viewName = "backbone_jdbcextract_" + runId ;
138- if (this .identifierCol == null ) {
139- // No identifier column provided so we can only do a full-form sort.
140- // TODO find a better solution for this
141- //noinspection SqlResolve
142- String countQuery = "SELECT COUNT(*) FROM (" + query + ") bckbone_preflight_query_" + runId ;
143- // Find appropriate columns to order by so that pagination results are consistent
144- this .orderByCols = findPaginationOrderingColumns (this .query );
145- // Get record count so that we know how many batches are going to be needed
146- try (Connection conn = initializationDS .getConnection ()) {
147- ResultSet rs = conn .createStatement ().executeQuery (countQuery );
148- rs .next ();
149- int resultCount = rs .getInt (1 );
150- this .numBatches = Math .round (Math .ceil ((double ) resultCount / this .batchSize ));
151- }
152- // Normally I would say use Strings.join for the below, but this was causing cross-jvm issues
153- // so we use the more portable stringbuilder instead...
154- StringBuilder sB = new StringBuilder ();
155- boolean flag = false ;
156- for (String s : this .orderByCols ) {
157- if (flag ) {
158- sB .append (", " );
159- }
160- sB .append (s );
161- flag = true ;
162- }
163- this .orderedQuery = "SELECT * FROM (" + this .query + ") " + this .viewName
164- + " ORDER BY " + sB .toString () + " " ;
165- // Now we have to add the offset/fetch in the dialect local format..
166- // Specifically, postgres and MySQL are special in that they do not conform to the
167- // SQL:2011 standard syntax
168- if (driver .equals ("org.postgresql.Driver" ) || driver .equals ("com.mysql.jdbc.Driver" )
169- || driver .equals ("com.mysql.cj.jdbc.Driver" ) || driver .equals ("org.sqlite.JDBC" )) {
170- this .orderedQuery += "LIMIT " + batchSize + " OFFSET ?" ;
171- } else { // This is the SQL:2011 standard definition of an offset...fetch syntax
172- this .orderedQuery += "OFFSET ? ROWS FETCH NEXT " + batchSize + " ROWS ONLY" ;
173- }
174- } else {
135+ if (this .identifierCol != null ) {
175136 this .keyValueQuery = "SELECT DISTINCT " + identifierCol + " FROM (" + query + ") " + viewName ;
176137 this .keyValueSchema = getIdentifierColumnsSchema ();
177138 }
@@ -201,10 +162,6 @@ public Schema calculateOutputSchema() {
201162 @ Override
202163 public PCollection <Row > begin (PBegin input ) {
203164 if (this .identifierCol == null ) {
204- List <Integer > offsets = new ArrayList <>();
205- for (int i = 0 ; i < numBatches ; i ++) {
206- offsets .add (i * batchSize ); // Create a sequence of batches at the appropriate offset
207- }
208165 return input .apply (
209166 "Read from JDBC" ,
210167 JdbcIO .<Row >read ()
0 commit comments