-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17885: Enable clients to rebootstrap based on timeout or error code (KIP-1102) #17720
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @rajinisivaram. Just some questions for my understanding.
private void rebootstrap(long now) { | ||
closeAll(); | ||
metadata.rebootstrap(); | ||
metadataAttemptStartMs = Optional.of(now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Do we need to set the time while rebootstrapping or reset to Optional.empty
? As whenever there would be successful response from metadata then the time will be updated?
Or is it more to handle that after re-bootstrap, if we never receive successful metadata response then we would like to rebootstrap again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is to handle rebootstrapping again if successful metadata response is not received after one rebootstrap (the second case you mentioned).
|
||
if (!cluster.nodes().isEmpty()) { | ||
this.cluster = cluster; | ||
} | ||
} | ||
|
||
public void initiateRebootstrap() { | ||
requestUpdate(); | ||
this.metadataAttemptStartMs = Optional.of(0L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Why don't we set the time to now
as we do in NetworkClient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In rebootstrap(), we set time to now
as in NetworkClient. In both, when we receive REBOOTSTRAP_REQUIRED error code, we set to 0, so that rebootstrap is triggered on the next poll regardless of time.
logContext, | ||
Long.MAX_VALUE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems this constructor is used be Connect Worker which gets passed to ConsumerNetworkClient. So should we not support the consumer client spun in other dependant modules? Do you think the a default value of Long.MAX_VALUE
for this config makes sense so we can just get the ocnfigured vs default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean the NetworkClient in WorkerGroupMember? This PR changes the constructor used in WorkerGroupMember to pass in the configured value. Let me know if I have missed a different one.
@apoorvmittal10 Thanks for the review, I have responded to the comments. |
38bc9dd
to
1759594
Compare
Implementation of https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code
metadata.recovery.rebootstrap.trigger.ms
, set to 5 minutes by defaultrebootstrap
the default formetadata.recovery.strategy
REBOOTSTRAP_REQUIRED
, introduces top-level error code in metadata response. On this error, clients rebootstrap.Committer Checklist (excluded from commit message)