-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ct: dl_stm mvcc snapshot #23960
base: dev
Are you sure you want to change the base?
ct: dl_stm mvcc snapshot #23960
Conversation
5e9b39f
to
ec9064e
Compare
ec9064e
to
62dc683
Compare
the below tests from https://buildkite.com/redpanda/redpanda/builds/57612#0192fd1e-999f-4f92-bbba-a801e042a7e9 have failed and will be retried
the below tests from https://buildkite.com/redpanda/redpanda/builds/57805#01930b34-b146-4c2a-8a1a-1899f0b9b050 have failed and will be retried
the below tests from https://buildkite.com/redpanda/redpanda/builds/57951#01932068-9731-4bd9-a765-c2f38c1c431d have failed and will be retried
|
62dc683
to
d69e291
Compare
ducktape was retried in https://buildkite.com/redpanda/redpanda/builds/57766#019306d4-ec26-4751-bedd-d29206118d6d |
d69e291
to
ef1d9be
Compare
@nvartolomei Please add some description to the PR and to the commit message. The PR has one commit with 455 lines of code and one line commit message. |
This is intended to be used to implement recovery from cloud storage. The detailed design can be found in the shared Redpanda gdrive under the name "Shadow Topics: Recovery RFC". The dl_stm_api provides 3 new methods: - v start_snapshot() - a metadata only operation - snap read_snapshot(v) - the actual snapshot payload generation - remove_snapshots_before(v) The design assumes single user of mvcc snapshots. The cloud topics recovery subsystem. It will create a logical snapshot first, then read and backup the contents to cloud storage. When done, cloud topics recovery subsystem is responsible to clean up older snapshots by calling `remove_snapshots_before(v)`. After a v' = start_snapshot() call it is guaranteed that any cloud storage references contained in the result of read_snapshot(v') will be available in cloud storage and thus recoverable until a call to `remove_snapshots_before(v'') where v'' > v'. We don't have a garbage collection mechanism implemented yet so there is no code related to the property described above. If we don't garbage collect anything the above property is always true. When we add garbage collection, it will be able to respect the property with logic as simple as ``` bool overlay_eligible_for_gc(dl_version v, dl_overlay o) { bool is_removed = o.dl_removed_at <= v; bool referenced = !_snapshots.empty() && o.added_at >= _snapshots.front().version; return is_removed && !referenced; } ```
ef1d9be
to
d4e0d28
Compare
@Lazin updated commit message. PTAL. |
@@ -68,4 +70,81 @@ std::optional<dl_overlay> dl_stm_api::lower_bound(kafka::offset offset) const { | |||
return _stm->_state.lower_bound(offset); | |||
} | |||
|
|||
ss::future<checked<dl_snapshot_id, dl_stm_api_errc>> | |||
dl_stm_api::start_snapshot() { | |||
model::term_id term = _stm->_raft->term(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very similar to command_builder
that archival_metadata_stm
uses. Maybe it makes sense to add get_api
method to the dl_stm
which would return an instance of this class?
Right now the dl_stm_api
is constructed separately but it's also a friend of the dl_stm
and is the only way to interact with it. So IMO it makes sense to make this more apparent by allowing dl_stm
to construct api
objects.
auto expected_id = dl_snapshot_id(dl_version(res.value().last_offset)); | ||
|
||
auto applied = co_await _stm->wait_no_throw( | ||
res.value().last_offset, model::timeout_clock::now() + 30s); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using model::no_timeout
, otherwise the command could be applied concurrently with the code that invokes start_snapshot
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would that be a problem? PS. there are no locks taken here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, but imagine a situation when this fails. The caller is retrying the operation and replicating new command, then the original command is applied and then the retried command is applied. I guess that currently there is an implicit assumption that this is OK because the command itself is empty. It doesn't have any state on its own. If this is correct than this should be mentioned in the comment at least. Also, this may not always be the case. Eventually, some new fields could be added to the command and this implicit assumption may no longer be true.
/// to avoid the state growing indefinitely. | ||
ss::future<checked<void, dl_stm_api_errc>> | ||
remove_snapshots_before(dl_version last_version_to_keep); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class needs a gate and a shutdown code path if the object is not transient (only created to replicate one message)
fmt::format("Failed to replicate remove snapshots: {}", res.error())); | ||
} | ||
|
||
co_await _stm->wait_no_throw( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this workflow (replicate and then wait until the command is applied) can be extracted and reused
opts.set_force_flush(); | ||
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts); | ||
|
||
if (res.has_error()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It probably makes sense to return not_leader
error code from here. The dl_stm_api_errc
has two values, timeout and not-leader. The user will expect the last one to be returned when the replication fails due to lost leadership. And 99% of the time the res.error()
will be equal to raft::errc::not_leader
.
} | ||
|
||
// Ensure that the expected snapshot was created. | ||
vassert(_stm->_state.snapshot_exists(expected_id), "Snapshot not found"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Sounds good.
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts); | ||
|
||
if (res.has_error()) { | ||
throw std::runtime_error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as previous
ASSERT_EQ(snapshot3->overlays[1], overlay2) << snapshot3->overlays; | ||
} | ||
|
||
TEST(dl_stm_state, remove_snapshots_before) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a test case that tries to remove all snapshots?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you share a bit more about what you have in mind? Currently it would be impossible to remove all snapshots unless due to a bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it should be impossible to remove everything than we should probably have a test that tries to remove everything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How if there are no api calls for that and by design you can't have a dl_snapshot_id without creating one first? I feel like you want me to test a specific case but I'm not sure which one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the case is
state.start_snapshot();
state.start_snapshot();
auto id = state.start_snapshot();
...
state.remove_snapshots_before(id + 1);// should fail
does it make sense?
does |
do you see a problem somewhere? |
this is a question |
non flaky failures in https://buildkite.com/redpanda/redpanda/builds/57951#019320c4-643a-44ad-901c-2bb6e5a76c18:
|
Retry command for Build#57951please wait until all jobs are finished before running the slash command
|
Backports Required
Release Notes