44import com .google .api .services .bigquery .model .TableRow ;
55import org .apache .beam .sdk .io .gcp .bigquery .BigQueryIO ;
66import org .apache .beam .sdk .io .gcp .bigquery .BigQueryUtils ;
7+ import org .apache .beam .sdk .schemas .Schema ;
78import org .apache .beam .sdk .schemas .transforms .Select ;
89import org .apache .beam .sdk .transforms .DoFn ;
910import org .apache .beam .sdk .transforms .ParDo ;
1011import org .apache .beam .sdk .values .PCollection ;
1112import org .apache .beam .sdk .values .PDone ;
1213import org .apache .beam .sdk .values .POutput ;
1314import org .apache .beam .sdk .values .Row ;
15+ import org .joda .time .Duration ;
1416import org .ohnlp .backbone .api .Load ;
1517import org .ohnlp .backbone .api .exceptions .ComponentInitializationException ;
1618
@@ -36,13 +38,32 @@ public void initFromConfig(JsonNode config) throws ComponentInitializationExcept
3638 @ Override
3739 public POutput expand (PCollection <Row > input ) {
3840 if (this .selectedFields .size () > 0 ) {
39- input = input .apply ("Subset Columns" , Select .fieldNames (this .selectedFields .toArray (new String [0 ])));
41+ input = input .apply ("Subset Columns" , ParDo .of (new DoFn <Row , Row >() {
42+ @ ProcessElement
43+ public void process (ProcessContext c ) {
44+ Row input = c .element ();
45+ // We have to dynamically resolve schema row by row because
46+ // we don't store schema in the collection itself as it is user-configurable/dynamic
47+ Schema inputSchema = input .getSchema ();
48+ List <Schema .Field > outputSchemaFields = new ArrayList <>();
49+ for (String fieldName : selectedFields ) {
50+ outputSchemaFields .add (inputSchema .getField (fieldName ));
51+ }
52+ Schema outSchema = Schema .of (outputSchemaFields .toArray (new Schema .Field [0 ]));
53+ // And now just map the values
54+ List <Object > outputValues = new ArrayList <>();
55+ for (String s : selectedFields ) {
56+ outputValues .add (input .getValue (s ));
57+ }
58+ c .output (Row .withSchema (outSchema ).addValues (outputValues ).build ());
59+ }
60+ }));
4061 }
4162 return input .apply (
4263 "Transform output rows to BigQuery TableRow format" , ParDo .of (
4364 new DoFn <Row , TableRow >() {
4465 @ ProcessElement
45- public void processELement (@ Element Row row , OutputReceiver <TableRow > out ) {
66+ public void processElement (@ Element Row row , OutputReceiver <TableRow > out ) {
4667 out .output (BigQueryUtils .toTableRow (row ));
4768 }
4869 }
0 commit comments