Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink exactly-once Reader and Writer #4

Merged
merged 2 commits into from
May 4, 2017
Merged

Flink exactly-once Reader and Writer #4

merged 2 commits into from
May 4, 2017

Conversation

StephanEwen
Copy link
Contributor

@StephanEwen StephanEwen commented May 2, 2017

Moved code for the exactly-once reader and writer.

Fixes #3

@StephanEwen StephanEwen changed the title Flink reader writer Flink exactly-once Reader and Writer May 2, 2017
@StephanEwen
Copy link
Contributor Author

Addresses comments that were left on the original pull request on pravega/pravega and made checkstyle pass.

private volatile boolean running = true;

// checkpoint trigger callback, invoked when a checkpoint event is received.
// no need to be volatile, the source is driven by but one thread
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

text - driven by only one thread

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing that. I thought that was valid English, though...

try {
Exceptions.handleInterrupted( txn::abort );
} catch (Exception e) {
suppressed = e;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually a bug in my code that the suppressed exception is not always re-thrown. Fixing that.

Do you want me to log it anyways (that would double-log it, because suppressed exceptions are anyways printed in the stack trace of the parent exception).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought not rethrowning was on purpose, that's why wanted it logged. No need to log now since exception is not lost.

final Transaction<T> txn = this.currentTxn;
Preconditions.checkState(txn != null, "bug: no transaction object when performing state snapshot");

if (log.isDebugEnabled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could remove this check

Copy link
Contributor Author

@StephanEwen StephanEwen May 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. I often do these guards when there are more than two arguments to the log statement parameters, because that involves packaging them as parameters into an array. The overhead is admittedly little, and readability is probably more important here.

// ==> There should never be a case where we have no pending transaction here
//

if (txnsPendingCommit.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preconditions.checkState

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's nicer


// the big assumption is that this now actually works and that the transaction has not timed out, yet

// TODO: currently, if this fails, there is actually data loss
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create an issue for this and link it here, will be easier to track. This is the approach we take in pravega for TODOs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, see #5

// checkpoint can be null when restoring from a savepoint that
// did not include any state for that particular reader name
if (checkpoint != null) {
this.readerGroup.resetReadersToCheckpoint(checkpoint);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case where checkpoint is null (if failure happened even before the first checkpoint succeeded), the readergroup still needs to be reset to its initial position right? because readers might have already read some data which will be discarded, hence they have to reread them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, it is one of the followup questions that I also raised.

Unfortunately, we cannot do the call in that particular place, because if there was never a complete checkpoint, no checkpoint will be restored and the hooks will not be called.

Copy link
Contributor

@skrishnappa skrishnappa May 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please file an issue for this and we can address it as a separate task.

// Setup utility.
private static final SetupUtils SETUP_UTILS = new SetupUtils();

//Ensure each test completes within 30 seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

600 seconds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks!


return (Checkpoint) SerializationUtils.deserialize(bytes);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add newline for consistency, for other files too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@StephanEwen
Copy link
Contributor Author

Addressed all comments except one, for which I would need some input:
Making sure readers are reset to the beginning, when recovering before a checkpoint was taken.
It looks like there is no call on a reader group to to that.

Is the only way to do this to create a new reader group each time a recovery happens? Or should we call 'readerOffline' with a zero position on each reader (currently the checkpoint coordinator has no view on how many readers exist - all pravega structures are sort of opaque to it).

@skrishnappa skrishnappa merged commit 9418482 into pravega:master May 4, 2017
crazyzhou pushed a commit to crazyzhou/flink-connectors that referenced this pull request Oct 20, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants