Skip to content

Commit 777fa85

Browse files
committed
Add checking for required columns
1 parent 2584014 commit 777fa85

1 file changed

Lines changed: 27 additions & 3 deletions

File tree

Core/src/main/java/org/ohnlp/backbone/core/pipeline/ExecutionDAG.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@
22

33
import org.apache.beam.sdk.Pipeline;
44
import org.apache.beam.sdk.schemas.Schema;
5-
import org.apache.beam.sdk.values.PBegin;
6-
import org.apache.beam.sdk.values.PCollectionRowTuple;
7-
import org.apache.beam.sdk.values.POutput;
5+
import org.apache.beam.sdk.values.*;
86
import org.ohnlp.backbone.api.Extract;
97
import org.ohnlp.backbone.api.Load;
108
import org.ohnlp.backbone.api.components.*;
9+
import org.ohnlp.backbone.api.exceptions.ComponentInitializationException;
1110
import org.ohnlp.backbone.core.PipelineBuilder;
1211
import org.ohnlp.backbone.core.config.BackbonePipelineComponentConfiguration;
1312

@@ -122,6 +121,31 @@ private Set<PipelineBuilder.InitializedPipelineComponent> runNextBatchOrDefer(
122121
initSchema = true;
123122
}
124123
}
124+
// Check schema for required columns if any
125+
if (component.getComponent() instanceof HasInputs) {
126+
Map<String, PCollection<Row>> inputs = inputToNextStep.get().getAll();
127+
inputs.forEach((tag, coll) -> {
128+
Schema s = ((HasInputs) component.getComponent()).getRequiredColumns(tag);
129+
if (s != null) {
130+
// Has required columns
131+
Schema inputSchema = coll.getSchema();
132+
List<String> requiredButMissing = new ArrayList<>();
133+
for (Schema.Field f : s.getFields()) {
134+
if (!inputSchema.hasField(f.getName())) {
135+
// TODO do type checking
136+
requiredButMissing.add(f.getName());
137+
}
138+
}
139+
if (!requiredButMissing.isEmpty()) {
140+
throw new IllegalArgumentException(component.getComponentID()
141+
+ " requires the following fields that are missing from the supplied input: " +
142+
"[" + String.join(",", requiredButMissing) + "]. Available Fields: [" +
143+
String.join(",", inputSchema.getFieldNames()) + "]");
144+
}
145+
146+
}
147+
});
148+
}
125149
if (component.getComponent() instanceof TransformComponent) {
126150
if (initSchema) {
127151
nextSchema = ((TransformComponent) component.getComponent()).calculateOutputSchema(inputSchemas);

0 commit comments

Comments
 (0)