-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-17178: Internal KTable#transformValues refactoring #20910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
KAFKA-17178: Internal KTable#transformValues refactoring #20910
Conversation
| final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal, | ||
| final NamedInternal namedInternal, | ||
| final String... stateStoreNames) { | ||
| Objects.requireNonNull(transformerSupplier, "transformerSupplier"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we add the adaptor below, the existing not-null check (which is inside KTableProcessValues constructor) does not fire any longer, so adding the check here to ensure tests don't fail
|
|
||
| final KTableProcessorSupplier<K, V, K, VR> processorSupplier = new KTableTransformValues<>( | ||
| final FixedKeyProcessorSupplier<? super K, ? super V, ? extends VR> fixedKeyProcessorSupplier = | ||
| createValueTransformerWithKeySupplierToFixedProcessorSupplierAdaptor(transformerSupplier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep the refactoring internal, we put an adopter around the user provided "transformer supplier".
|
|
||
| @Override | ||
| public <K, V> void forward(final K key, final V value) { | ||
| throw new StreamsException("Disabled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cf deleted ForwardingDisabledProcessorContext. (same below)
|
|
||
| return new FixedKeyProcessor<>() { | ||
| private final AtomicReference<Long> timestamp = new AtomicReference<>(); | ||
| private final AtomicReference<Headers> headers = new AtomicReference<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some workaround -- in the new PAPI, we get the timestamps and headers as part of the input Record. To be backward compatible with the old "Transformers", we need to ensure that both timestamp and header are accessible via the "context" object.
Not sure if there would be better way to do this?
| @Override | ||
| public String topic() { | ||
| return context.recordMetadata().orElse( | ||
| new RecordMetadata() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to do some more cleanup here? (cf. similar methods below)
| private class KTableTransformValuesProcessor extends ContextualProcessor<K, Change<V>, K, Change<VOut>> { | ||
| private final ValueTransformerWithKey<? super K, ? super V, ? extends VOut> valueTransformer; | ||
| private KeyValueStoreWrapper<K, VOut> store; | ||
| private TimestampedTupleForwarder<K, VOut> tupleForwarder; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We move both KeyValueStoreWrapper store and TimestampedTupleForwarder tupleForwarder into the newly created ForwardCaptureProcessorContext.
The reason is, that when forward is called inside FixedKeyProcessor, we want to execute the "depth first" forward directly, so we need to do it inside ForwardCaptureProcessorContext. For the old "valueTransformer", the "valueTransformer" just terminated, returning the single result value back.
This change in control flow actually raises a user facing API question (ie, when we change the public user API from "transform" to "process"): in the old code, it was ensured that transformValues would return a single result per input record. Using the new processValues interface, user invoke context.forward() and it's possible to call it more than ones. Of course, because the key cannot be modified, the latest forward() call would win (assuming the user does not use the child parameter; enabling the usage of child parameter is a second change we would get down the line).
So the question arises, if we would really want to move to processValues? Or maybe introduce something entirely new? It some new variation of the old "transformValues" but base on the new PAPI, and tailored to the KTable case.
Asking this question right away, because it could change how we want to setup the internal code to begin with, and could make this refactoring obsolete.
| super.init(context); | ||
| final InternalProcessorContext<K, Change<VOut>> internalProcessorContext = (InternalProcessorContext<K, Change<VOut>>) context; | ||
| valueTransformer.init(new ForwardingDisabledProcessorContext(internalProcessorContext)); | ||
| if (queryableName != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This moved into ForwardCaptureProcessorContext
| public void process(final Record<K, Change<V>> record) { | ||
| final VOut newValue = valueTransformer.transform(record.key(), record.value().newValue); | ||
|
|
||
| if (queryableName != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This moves into ForwardCaptureProcessorContext
|
|
||
| captureContext.forward = false; | ||
| fixedKeyProcessor.process(InternalFixedKeyRecordFactory.create( | ||
| new Record<>(key, getValueOrNull(valueAndTimestamp), timestamp) // TODO: we might pass in -1L here, which would lead to an exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not super happy about this... Not sure what to do here.
| new RecordHeaders() | ||
| )); | ||
|
|
||
| captureContext.forward = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need two modes for the ForwardCaptureProcessorContext -- one mode (defaults), which forward directly, ie, executes the direct depth-first "forward" processor chaining execution path. However here, for the "value getter" we only want to do a lookup into the parent processor, process the value, and capture it w/o forwarding is, because the result is the "oldValue" of what we want to forward.... (Also a little bit hacky... suggestions how to do this better are welcome).
Similar to above: we would only capture the last call to forward()...
| public <KForward extends K, VForward extends VOut> void forward(final FixedKeyRecord<KForward, VForward> record, final String childName) { | ||
| capturedKey = record.key(); | ||
| capturedValue = record.value(); | ||
| capturedTimestamp = record.timestamp(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In all case, we always capture the result record the user code forwards.
| final long putReturnCode = store.put(capturedKey, capturedValue, capturedTimestamp); | ||
| // if not put to store, do not forward downstream either | ||
| if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { | ||
| tupleForwarder.maybeForward(new Record<>(capturedKey, new Change<>(capturedValue, oldValue, putReturnCode == PUT_RETURN_CODE_IS_LATEST), capturedTimestamp, record.headers())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: add support for childName parameter ?
| final long timestamp = capturedTimestamp; | ||
|
|
||
| if (sendOldValues) { | ||
| forward = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to "value getter" case, we need to reprocess the "oldValue" from the original input record, to be able to get the "previous result" (ie, new "oldValue" for the new result), and thus don't want to forward directly, and not execute the depth-first forward call, but only capture the result.
Note: if context.forward() would be executed more than once inside the user provided FixedKeyProcessor we would capture only the last call to forward().
Kafka Streams replaced the DSL "KStream.transformXxx" methods with the
new PAPI "process[Values]"methods. To prepare a similar change for
"KTable.transformValues", this PR does an internal refactoring moving
internals off the old transformValues code, in favor of the new PAPI.