Skip to content

Commit cfd39c6

Browse files
committed
Parquet I/O, configurable JDBC idle timeout, increment version for rel
1 parent 29fe0e8 commit cfd39c6

11 files changed

Lines changed: 165 additions & 11 deletions

File tree

API/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<artifactId>api</artifactId>

Core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<artifactId>core</artifactId>

Example-Backbone-Configs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<artifactId>example-backbone-configs</artifactId>

IO/pom.xml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.io</groupId>
@@ -104,6 +104,13 @@
104104
<version>${beam.version}</version>
105105
</dependency>
106106

107+
<!-- Parquet -->
108+
<dependency>
109+
<groupId>org.apache.beam</groupId>
110+
<artifactId>beam-sdks-java-io-parquet</artifactId>
111+
<version>${beam.version}</version>
112+
</dependency>
113+
107114
<!-- Jackson -->
108115
<dependency>
109116
<groupId>com.fasterxml.jackson.datatype</groupId>

IO/src/main/java/org/ohnlp/backbone/io/jdbc/JDBCExtract.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public void initFromConfig(JsonNode config) throws ComponentInitializationExcept
7070
ds.setJdbcUrl(url);
7171
ds.setUser(user);
7272
ds.setPassword(password);
73+
ds.setMaxIdleTime(config.has("idleTimeout") ? config.get("idleTimeout").asInt() : 0);
7374
this.datasourceConfig = JdbcIO.DataSourceConfiguration
7475
.create(ds);
7576
this.identifierCol = config.has("identifier_col") ? config.get("identifier_col").asText() : null;

IO/src/main/java/org/ohnlp/backbone/io/jdbc/JDBCLoad.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.ohnlp.backbone.io.jdbc;
22

33
import com.fasterxml.jackson.databind.JsonNode;
4+
import com.mchange.v2.c3p0.ComboPooledDataSource;
45
import org.apache.beam.sdk.io.jdbc.JdbcIO;
56
import org.apache.beam.sdk.schemas.Schema;
67
import org.apache.beam.sdk.values.PCollection;
@@ -56,10 +57,13 @@ public void initFromConfig(JsonNode config) throws ComponentInitializationExcept
5657
mappingOps.add(new RowToPSMappingFunction(i, child.asText()));
5758
i++;
5859
}
59-
JdbcIO.DataSourceConfiguration datasourceConfig = JdbcIO.DataSourceConfiguration
60-
.create(driver, url)
61-
.withUsername(user)
62-
.withPassword(password);
60+
ComboPooledDataSource ds = new ComboPooledDataSource();
61+
ds.setDriverClass(driver);
62+
ds.setJdbcUrl(url);
63+
ds.setUser(user);
64+
ds.setPassword(password);
65+
ds.setMaxIdleTime(config.has("idleTimeout") ? config.get("idleTimeout").asInt() : 0);
66+
JdbcIO.DataSourceConfiguration datasourceConfig = JdbcIO.DataSourceConfiguration.create(ds);
6367
this.runnableInstance = JdbcIO.<Row>write()
6468
.withDataSourceConfiguration(datasourceConfig)
6569
.withStatement(query)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package org.ohnlp.backbone.io.local;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.apache.avro.Schema;
5+
import org.apache.avro.SchemaBuilder;
6+
import org.apache.avro.generic.GenericRecord;
7+
import org.apache.beam.sdk.io.parquet.ParquetIO;
8+
import org.apache.beam.sdk.schemas.utils.AvroUtils;
9+
import org.apache.beam.sdk.transforms.DoFn;
10+
import org.apache.beam.sdk.transforms.ParDo;
11+
import org.apache.beam.sdk.values.PBegin;
12+
import org.apache.beam.sdk.values.PCollection;
13+
import org.apache.beam.sdk.values.Row;
14+
import org.ohnlp.backbone.api.Extract;
15+
import org.ohnlp.backbone.api.exceptions.ComponentInitializationException;
16+
17+
import java.util.Locale;
18+
19+
/**
20+
* Reads Parquet Formatted Files From Directory
21+
*
22+
* Expected Configuration:
23+
* {
24+
* "fileSystemPath": "path/to/write/to",
25+
* "recordName": "Schema Record Name",
26+
* "recordNamespace": "Schema Record Namespace",
27+
* "schema": {
28+
* "fieldName": "fieldType"
29+
* }
30+
* }
31+
*
32+
* For a list of field types, please consult https://avro.apache.org/docs/current/spec.html#schema_primitive.
33+
* Primitive types are the only types currently supported.
34+
*/
35+
public class ParquetExtract extends Extract {
36+
private String dir;
37+
private Schema schema;
38+
39+
@Override
40+
public void initFromConfig(JsonNode config) throws ComponentInitializationException {
41+
this.dir = config.get("fileSystemPath").asText();
42+
SchemaBuilder.FieldAssembler<Schema> avroSchemaBuilder = SchemaBuilder.builder()
43+
.record(config.get("recordName").asText())
44+
.namespace(config.get("recordNamespace").asText())
45+
.fields();
46+
config.get("schema").fields().forEachRemaining((e) -> {
47+
String field = e.getKey();
48+
avroSchemaBuilder.name(field).type(e.getValue().asText().toUpperCase(Locale.ROOT)).noDefault();
49+
});
50+
schema = avroSchemaBuilder.endRecord();
51+
52+
}
53+
54+
@Override
55+
public PCollection<Row> expand(PBegin input) {
56+
return input
57+
.apply("Parquet Read", ParquetIO.read(this.schema).from(this.dir))
58+
.apply("Convert to Beam Row", ParDo.of(new DoFn<GenericRecord, Row>() {
59+
@ProcessElement
60+
public void processElement(@Element GenericRecord input, OutputReceiver<Row> output) {
61+
output.output(AvroUtils.toBeamRowStrict(input, AvroUtils.toBeamSchema(schema)));
62+
}
63+
}));
64+
}
65+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.ohnlp.backbone.io.local;
2+
3+
import com.fasterxml.jackson.databind.JsonNode;
4+
import org.apache.avro.generic.GenericRecord;
5+
import org.apache.beam.sdk.io.FileIO;
6+
import org.apache.beam.sdk.io.parquet.ParquetIO;
7+
import org.apache.beam.sdk.schemas.Schema;
8+
import org.apache.beam.sdk.schemas.utils.AvroUtils;
9+
import org.apache.beam.sdk.transforms.DoFn;
10+
import org.apache.beam.sdk.transforms.ParDo;
11+
import org.apache.beam.sdk.values.PCollection;
12+
import org.apache.beam.sdk.values.PDone;
13+
import org.apache.beam.sdk.values.Row;
14+
import org.ohnlp.backbone.api.Load;
15+
import org.ohnlp.backbone.api.exceptions.ComponentInitializationException;
16+
17+
import java.util.ArrayList;
18+
19+
/**
20+
* Writes Parquet Formatted Files to Directory
21+
*
22+
* Expected Configuration:
23+
* {
24+
* "fileSystemPath": "path/to/write/to",
25+
* "fields": ["optional", "list", "of", "columns", "to", "write"]
26+
* }
27+
*/
28+
public class ParquetLoad extends Load {
29+
private String dir;
30+
private ArrayList<String> fields;
31+
32+
private transient Schema beamSchema;
33+
private transient org.apache.avro.Schema avroSchema;
34+
35+
@Override
36+
public void initFromConfig(JsonNode config) throws ComponentInitializationException {
37+
this.dir = config.get("fileSystemPath").asText();
38+
this.fields = new ArrayList<>();
39+
if (config.has("fields")) {
40+
config.get("fields").forEach((f) -> {
41+
String field = f.asText();
42+
fields.add(field);
43+
});
44+
}
45+
46+
}
47+
48+
@Override
49+
public PDone expand(PCollection<Row> input) {
50+
input.apply("Subselect Columns and Convert to Avro Format", ParDo.of(new DoFn<Row, GenericRecord>() {
51+
@ProcessElement
52+
public void processElement(@Element Row input, OutputReceiver<GenericRecord> output) {
53+
if (beamSchema == null) {
54+
// Assume homogeneous schema
55+
Schema sourceSchema = input.getSchema();
56+
if (!fields.isEmpty()) {
57+
Schema.Builder targetBeamSchemaBuilder = org.apache.beam.sdk.schemas.Schema.builder();
58+
for (String f : fields) {
59+
targetBeamSchemaBuilder.addField(f, sourceSchema.getField(f).getType());
60+
}
61+
beamSchema = targetBeamSchemaBuilder.build();
62+
} else {
63+
beamSchema = sourceSchema;
64+
}
65+
avroSchema = AvroUtils.toAvroSchema(beamSchema);
66+
}
67+
output.output(AvroUtils.toGenericRecord(input, avroSchema));
68+
}
69+
}))
70+
.apply("Write to Filesystem", FileIO
71+
.<GenericRecord>write()
72+
.via(ParquetIO.sink(avroSchema))
73+
.to(this.dir)
74+
);
75+
return PDone.in(input.getPipeline());
76+
}
77+
}

Plugin-Manager/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<artifactId>plugin-manager</artifactId>

Transforms/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<parent>
88
<groupId>org.ohnlp.backbone</groupId>
99
<artifactId>backbone-parent</artifactId>
10-
<version>1.0.19</version>
10+
<version>1.0.20</version>
1111
</parent>
1212

1313
<groupId>org.ohnlp.backbone.transforms</groupId>

0 commit comments

Comments
 (0)