Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partial recovery in FLIP-27 source #663

Open
crazyzhou opened this issue Mar 21, 2022 · 7 comments
Open

Partial recovery in FLIP-27 source #663

crazyzhou opened this issue Mar 21, 2022 · 7 comments

Comments

@crazyzhou
Copy link
Contributor

Problem description
In the new FLIP-27 source, we still only support a global recovery that relies on the Flink's externally induced source reader API(added in FLINK-20270) + Pravega checkpoint. This cannot make the story complete for a local/region/subgraph recovery. This would have a greater impact and takes a longer time for a job recovery, which is a concern in the production usage.
To support a local recovery, Flink would like the connector to have a "split" abstraction to have its own state(like file/kafka partition+offset) and Flink checkpoint is consist of the list of all these states. In a taskmanager failure case, it will perform a local recovery, that restart a new reader and recover all the "splits" that assigned by the faulty reader to the state(position) stored in the last checkpoint, and also recover the affected downstream operator states.
Although Pravega has a story to involve a Position for local recovery, but due to the async nature and segments may auto-scale, a group of positions in one reader group cannot make up a complete streamcut for checkpoint, and recovering with the newly-spawned reader may get data loss as the related operators rolls its state back to the last checkpoint, meanwhile the other online readers take over the segment and continue reading.
In general, it is a problem that Pravega has two different stories for global and local recovery which cannot live together to fit the Flink interface.

Problem location

Suggestions for an improvement

@fpj
Copy link
Contributor

fpj commented Mar 21, 2022

Pravega has a story to involve a Position for local recovery, but due to the async nature and segments may auto-scale, a group of positions in one reader group cannot make up a complete streamcut for checkpoint

I think I understand what you are trying to say here, that a set of position objects taken independently does not form a valid stream cut necessarily.

recovering with the newly-spawned reader may get data loss as the related operators rolls its state back to the last checkpoint

I don't really understand this part, can you give an example with a scenario that causes such data loss? Is there a risk of duplication too?

@crazyzhou
Copy link
Contributor Author

crazyzhou commented Mar 21, 2022

recovering with the newly-spawned reader may get data loss as the related operators rolls its state back to the last checkpoint

Say if one reader reading three events: 1,2 and 3, and last checkpoint goes after the second event 2. According to our integration of Flink checkpointing, the downstream operators will also persist the state that contains the event 1 and 2 to guarantee the correctness. If there is a failure on this reader after reading event 3, Flink will roll back all the operators to last checkpoint, but the new reader will skip the event 3 and start reading the fourth event, which causes the event 3 missing in the after-recovery pipeline.

Things would get more complicated if there are more readers and they rebalance. The main reason is just as I said, I cannot combine the partial and global recovery together as they are two different logic path in Pravega.

@tkaitchuck
Copy link
Member

tkaitchuck commented Mar 22, 2022

I think this is a basic misunderstanding of how the Pravega reader works.

If there is a failure on this reader after reading event 3, Flink will roll back all the operators to last checkpoint, but the new reader will skip the event 3 and start reading the fourth event, which causes the event 3 missing in the after-recovery pipeline.

This does not make sense. From a pravega point of view to declare a reader offline it needs to either provide a position explicitly for where it failed, or provide nothing to indicate no data since the last checkpoint was persisted.

So if in the scenario the application is reporting to pravega that event 3 was read: Why is it doing that?
If it instead were to use the default then there would be no lost event.

If the issue is that in that case Flink is resetting all the operators including those with data from other readers, then it's throwing out data reguardless of what pravega is doing... But if that's what it's doing that resetting the reader group to the last checkpoint (which we have a special API for) would be the correct thing to do.

@tkaitchuck
Copy link
Member

Although Pravega has a story to involve a Position for local recovery, but due to the async nature and segments may auto-scale, a group of positions in one reader group cannot make up a complete streamcut for checkpoint, and recovering with the newly-spawned reader may get data loss as the related operators rolls its state back to the last checkpoint, meanwhile the other online readers take over the segment and continue reading.

This isn't true either. The "local recovery" is not so much a recovery as declaring the reader dead. Indeed other readers will take over the segment and continue reading. But there is no such operation as "recovering with the newly-spawned reader". The only thing that can happen is a new reader can be brought online. But this does not roll back any state.

@crazyzhou
Copy link
Contributor Author

I'm not hanging up the idea that it should be the same reader, I was thinking the reader recovery should involve a roll back, while Pravega has all the segment redistribution + continue reading work done internally. I was thinking more on a close() method that shut down a reader automatically to the latest offset, so make that example. However, although we have a readerOffline(id, position) method to rollback, there are still some challenges we need to solve:

  • It may cause issues when a new reader is not coming up yet while the other online readers have already taken over the segments and start reading.
  • It does not fit the FLIP-27 interface much, as we map the "split“ to Pravega reader, the interface just prefers to support having a "start reader from a position" style to roll back.

@tkaitchuck
Copy link
Member

I think rather than framing this in terms of partial recovery, which is already possible we need to frame this in terms of: Allowing application managed group state. Which it seems is what Flip-27 is actually about.

@crazyzhou
Copy link
Contributor Author

@tkaitchuck The partial recovery is the problem we want to solve in the Flink connector, not in Pravega itself, so I would still put it like this. You are right that let application to take control the group state is a solution, but I think maybe bringing in the rust reader logic into Java client, which involves segment readers and the application can actively acquire and release segments that is closer to the FLIP-27 abstraction, may be a better solution to this. That requirement also fits in a better Spark connector.

And for Pravega's partial recovery, I'd like to share more thoughts on this.

There are two kinds of single-reader recovery in Pravega's point of view: reader.close() and readerGroup.readerOffline().

The reader.close() story is complete which fits for the stateless Java apps that just requires the completeness+no-duplicate of the data being read out. No matter it is an application failure or system failure, as the reader is auto-closed, this method will be finally called either actively or passively by the app, then these open segments would be re-assigned to either other readers or new readers to continue the reading.

However, the readerGroup.readerOffline() story that involves the rollback is not complete to me. It is used more in stateful apps that needs to rollback a whole pipeline, e.g Flink. This readerOffline logic would require the app to persist the reader position and call this readerOffline actively. This would require application to

  1. Have the capability of the reader status detection.
  2. Know when and which readers to call this readerOffline when there is an application failure and reader is closed passively.

These answer of these two question is not clear in Pravega itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants