Skip to content

Commit c4f03ae

Browse files
committed
ingest: better support for conditionals with simulate?verbose (elastic#34155)
This commit introduces two corrections to the way simulate?verbose handles conditionals on processors. 1) Prior to this change when executing simulate?verbose for processors with conditionals that evaluate to false, that processor would still be displayed in the result set. What was displayed was correct, such that no changes to the document occurred. However, if the conditional evaluates to false, the processor should not even be displayed. 2) Prior to this change when executing simulate?verbose for pipeline processors with conditionals, the individual steps would no longer be displayed. Commit e37e5df addressed the issue, but failed account for a conditional on the pipeline processor. Since a pipeline processor can introduce cycles and is effectively a single processor that encapsulates multiple other processors that are potentially guarded by a single conditional, special handling is needed to for pipeline and conditional pipeline processors.
1 parent dd2bbfd commit c4f03ae

File tree

5 files changed

+275
-71
lines changed

5 files changed

+275
-71
lines changed

modules/ingest-common/src/test/resources/rest-api-spec/test/ingest/90_simulate.yml

+4-3
Original file line numberDiff line numberDiff line change
@@ -641,7 +641,6 @@ teardown:
641641
- match: { acknowledged: true }
642642

643643
- do:
644-
catch: /illegal_state_exception/
645644
ingest.simulate:
646645
verbose: true
647646
body: >
@@ -667,8 +666,10 @@ teardown:
667666
}
668667
]
669668
}
670-
- match: { error.root_cause.0.type: "illegal_state_exception" }
671-
- match: { error.root_cause.0.reason: "Cycle detected for pipeline: inner" }
669+
- length: { docs: 1 }
670+
- length: { docs.0.processor_results: 1 }
671+
- match: { docs.0.processor_results.0.error.reason: "java.lang.IllegalArgumentException: java.lang.IllegalStateException: Cycle detected for pipeline: outer" }
672+
- match: { docs.0.processor_results.0.error.caused_by.caused_by.reason: "Cycle detected for pipeline: outer" }
672673

673674
---
674675
"Test verbose simulate with Pipeline Processor with Multiple Pipelines":

server/src/main/java/org/elasticsearch/action/ingest/SimulateExecutionService.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,13 @@
2121

2222
import org.elasticsearch.action.ActionListener;
2323
import org.elasticsearch.action.ActionRunnable;
24+
import org.elasticsearch.ingest.CompoundProcessor;
2425
import org.elasticsearch.ingest.IngestDocument;
2526
import org.elasticsearch.ingest.Pipeline;
26-
import org.elasticsearch.ingest.CompoundProcessor;
27-
import org.elasticsearch.ingest.PipelineProcessor;
2827
import org.elasticsearch.threadpool.ThreadPool;
2928

3029
import java.util.ArrayList;
31-
import java.util.Collections;
32-
import java.util.IdentityHashMap;
3330
import java.util.List;
34-
import java.util.Set;
3531

3632
import static org.elasticsearch.ingest.TrackingResultProcessor.decorate;
3733

@@ -46,11 +42,9 @@ class SimulateExecutionService {
4642
}
4743

4844
SimulateDocumentResult executeDocument(Pipeline pipeline, IngestDocument ingestDocument, boolean verbose) {
49-
// Prevent cycles in pipeline decoration
50-
final Set<PipelineProcessor> pipelinesSeen = Collections.newSetFromMap(new IdentityHashMap<>());
5145
if (verbose) {
5246
List<SimulateProcessorResult> processorResultList = new ArrayList<>();
53-
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList, pipelinesSeen);
47+
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
5448
try {
5549
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
5650
verbosePipelineProcessor);

server/src/main/java/org/elasticsearch/ingest/ConditionalProcessor.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,7 @@ public class ConditionalProcessor extends AbstractProcessor {
6262

6363
@Override
6464
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
65-
IngestConditionalScript script =
66-
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
67-
if (script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()))) {
68-
// Only record metric if the script evaluates to true
65+
if (evaluate(ingestDocument)) {
6966
long startTimeInNanos = relativeTimeProvider.getAsLong();
7067
try {
7168
metric.preIngest();
@@ -81,6 +78,12 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
8178
return ingestDocument;
8279
}
8380

81+
boolean evaluate(IngestDocument ingestDocument) {
82+
IngestConditionalScript script =
83+
scriptService.compile(condition, IngestConditionalScript.CONTEXT).newInstance(condition.getParams());
84+
return script.execute(new UnmodifiableIngestData(ingestDocument.getSourceAndMetadata()));
85+
}
86+
8487
Processor getProcessor() {
8588
return processor;
8689
}

server/src/main/java/org/elasticsearch/ingest/TrackingResultProcessor.java

+42-26
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
package org.elasticsearch.ingest;
2121

22+
import org.elasticsearch.ElasticsearchException;
2223
import org.elasticsearch.action.ingest.SimulateProcessorResult;
2324

2425
import java.util.ArrayList;
2526
import java.util.List;
26-
import java.util.Set;
2727

2828
/**
2929
* Processor to be used within Simulate API to keep track of processors executed in pipeline.
@@ -42,14 +42,46 @@ public final class TrackingResultProcessor implements Processor {
4242

4343
@Override
4444
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
45+
Processor processor = actualProcessor;
4546
try {
46-
actualProcessor.execute(ingestDocument);
47-
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument)));
47+
if (processor instanceof ConditionalProcessor) {
48+
ConditionalProcessor conditionalProcessor = (ConditionalProcessor) processor;
49+
if (conditionalProcessor.evaluate(ingestDocument) == false) {
50+
return ingestDocument;
51+
}
52+
if (conditionalProcessor.getProcessor() instanceof PipelineProcessor) {
53+
processor = conditionalProcessor.getProcessor();
54+
}
55+
}
56+
if (processor instanceof PipelineProcessor) {
57+
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
58+
Pipeline pipeline = pipelineProcessor.getPipeline();
59+
//runtime check for cycles against a copy of the document. This is needed to properly handle conditionals around pipelines
60+
try {
61+
IngestDocument ingestDocumentCopy = new IngestDocument(ingestDocument);
62+
ingestDocumentCopy.executePipeline(pipelineProcessor.getPipeline());
63+
} catch (ElasticsearchException elasticsearchException) {
64+
if (elasticsearchException.getCause().getCause() instanceof IllegalStateException) {
65+
throw elasticsearchException;
66+
}
67+
//else do nothing, let the tracking processors throw the exception while recording the path up to the failure
68+
} catch (Exception e) {
69+
// do nothing, let the tracking processors throw the exception while recording the path up to the failure
70+
}
71+
//now that we know that there are no cycles between pipelines, decorate the processors for this pipeline and execute it
72+
CompoundProcessor verbosePipelineProcessor = decorate(pipeline.getCompoundProcessor(), processorResultList);
73+
Pipeline verbosePipeline = new Pipeline(pipeline.getId(), pipeline.getDescription(), pipeline.getVersion(),
74+
verbosePipelineProcessor);
75+
ingestDocument.executePipeline(verbosePipeline);
76+
} else {
77+
processor.execute(ingestDocument);
78+
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument)));
79+
}
4880
} catch (Exception e) {
4981
if (ignoreFailure) {
50-
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), new IngestDocument(ingestDocument), e));
82+
processorResultList.add(new SimulateProcessorResult(processor.getTag(), new IngestDocument(ingestDocument), e));
5183
} else {
52-
processorResultList.add(new SimulateProcessorResult(actualProcessor.getTag(), e));
84+
processorResultList.add(new SimulateProcessorResult(processor.getTag(), e));
5385
}
5486
throw e;
5587
}
@@ -66,35 +98,19 @@ public String getTag() {
6698
return actualProcessor.getTag();
6799
}
68100

69-
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList,
70-
Set<PipelineProcessor> pipelinesSeen) {
101+
public static CompoundProcessor decorate(CompoundProcessor compoundProcessor, List<SimulateProcessorResult> processorResultList) {
71102
List<Processor> processors = new ArrayList<>(compoundProcessor.getProcessors().size());
72103
for (Processor processor : compoundProcessor.getProcessors()) {
73-
if (processor instanceof PipelineProcessor) {
74-
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
75-
if (pipelinesSeen.add(pipelineProcessor) == false) {
76-
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
77-
}
78-
processors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList, pipelinesSeen));
79-
pipelinesSeen.remove(pipelineProcessor);
80-
} else if (processor instanceof CompoundProcessor) {
81-
processors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
104+
if (processor instanceof CompoundProcessor) {
105+
processors.add(decorate((CompoundProcessor) processor, processorResultList));
82106
} else {
83107
processors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
84108
}
85109
}
86110
List<Processor> onFailureProcessors = new ArrayList<>(compoundProcessor.getProcessors().size());
87111
for (Processor processor : compoundProcessor.getOnFailureProcessors()) {
88-
if (processor instanceof PipelineProcessor) {
89-
PipelineProcessor pipelineProcessor = ((PipelineProcessor) processor);
90-
if (pipelinesSeen.add(pipelineProcessor) == false) {
91-
throw new IllegalStateException("Cycle detected for pipeline: " + pipelineProcessor.getPipeline().getId());
92-
}
93-
onFailureProcessors.add(decorate(pipelineProcessor.getPipeline().getCompoundProcessor(), processorResultList,
94-
pipelinesSeen));
95-
pipelinesSeen.remove(pipelineProcessor);
96-
} else if (processor instanceof CompoundProcessor) {
97-
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList, pipelinesSeen));
112+
if (processor instanceof CompoundProcessor) {
113+
onFailureProcessors.add(decorate((CompoundProcessor) processor, processorResultList));
98114
} else {
99115
onFailureProcessors.add(new TrackingResultProcessor(compoundProcessor.isIgnoreFailure(), processor, processorResultList));
100116
}

0 commit comments

Comments
 (0)