Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ct: dl_stm mvcc snapshot #23960

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/v/cloud_topics/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "dl_version",
hdrs = ["dl_version.h"],
include_prefix = "cloud_topics",
deps = ["//src/v/utils:named_type"],
)

redpanda_cc_library(
name = "dl_overlay",
srcs = [
Expand Down Expand Up @@ -82,6 +75,26 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "dl_version",
hdrs = ["dl_version.h"],
include_prefix = "cloud_topics",
deps = ["//src/v/utils:named_type"],
)

redpanda_cc_library(
name = "dl_snapshot",
hdrs = [
"dl_snapshot.h",
],
include_prefix = "cloud_topics",
deps = [
":dl_overlay",
":dl_version",
"//src/v/serde",
],
)

redpanda_cc_library(
name = "app",
srcs = [
Expand Down
47 changes: 47 additions & 0 deletions src/v/cloud_topics/dl_snapshot.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_version.h"
#include "container/fragmented_vector.h"
#include "serde/envelope.h"

namespace experimental::cloud_topics {

struct dl_snapshot_id
: serde::
envelope<dl_snapshot_id, serde::version<0>, serde::compat_version<0>> {
dl_snapshot_id() noexcept = default;

explicit dl_snapshot_id(dl_version version) noexcept
: version(version) {}

auto serde_fields() { return std::tie(version); }

bool operator==(const dl_snapshot_id& other) const noexcept = default;

/// Version for which the snapshot is created.
dl_version version;
};

struct dl_snapshot_payload
: serde::checksum_envelope<
dl_snapshot_id,
serde::version<0>,
serde::compat_version<0>> {
/// Version for which the snapshot is created.
dl_snapshot_id id;

/// Overlays visible at the snapshot version.
fragmented_vector<dl_overlay> overlays;
};

}; // namespace experimental::cloud_topics
4 changes: 4 additions & 0 deletions src/v/cloud_topics/dl_stm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ redpanda_cc_library(
visibility = [":__subpackages__"],
deps = [
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_version",
"//src/v/serde",
],
)
Expand All @@ -19,6 +20,7 @@ redpanda_cc_library(
visibility = [":__subpackages__"],
deps = [
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_snapshot",
"//src/v/cloud_topics:dl_version",
"//src/v/container:fragmented_vector",
"//src/v/model",
Expand Down Expand Up @@ -58,6 +60,8 @@ redpanda_cc_library(
deps = [
"//src/v/base",
"//src/v/cloud_topics:dl_overlay",
"//src/v/cloud_topics:dl_snapshot",
"//src/v/cloud_topics:dl_version",
"//src/v/model",
"@seastar",
],
Expand Down
11 changes: 11 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ ss::future<> dl_stm::do_apply(const model::record_batch& batch) {
_state.push_overlay(new_dl_version, std::move(cmd.overlay));
break;
}
case dl_stm_key::start_snapshot: {
std::ignore = serde::from_iobuf<start_snapshot_cmd>(
r.release_value());
_state.start_snapshot(new_dl_version);
break;
}
case dl_stm_key::remove_snapshots_before_version:
auto cmd = serde::from_iobuf<remove_snapshots_before_version_cmd>(
r.release_value());
_state.remove_snapshots_before(cmd.last_version_to_keep);
break;
}
});

Expand Down
79 changes: 79 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "serde/rw/uuid.h"
#include "storage/record_batch_builder.h"

#include <stdexcept>

namespace experimental::cloud_topics {

std::ostream& operator<<(std::ostream& o, dl_stm_api_errc errc) {
Expand Down Expand Up @@ -68,4 +70,81 @@ std::optional<dl_overlay> dl_stm_api::lower_bound(kafka::offset offset) const {
return _stm->_state.lower_bound(offset);
}

ss::future<checked<dl_snapshot_id, dl_stm_api_errc>>
dl_stm_api::start_snapshot() {
model::term_id term = _stm->_raft->term();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to command_builder that archival_metadata_stm uses. Maybe it makes sense to add get_api method to the dl_stm which would return an instance of this class?

Right now the dl_stm_api is constructed separately but it's also a friend of the dl_stm and is the only way to interact with it. So IMO it makes sense to make this more apparent by allowing dl_stm to construct api objects.


vlog(_logger.debug, "Replicating dl_stm_cmd::start_snapshot_cmd");

storage::record_batch_builder builder(
model::record_batch_type::dl_stm_command, model::offset(0));
builder.add_raw_kv(
serde::to_iobuf(dl_stm_key::start_snapshot),
serde::to_iobuf(start_snapshot_cmd()));

auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);

if (res.has_error()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense to return not_leader error code from here. The dl_stm_api_errc has two values, timeout and not-leader. The user will expect the last one to be returned when the replication fails due to lost leadership. And 99% of the time the res.error() will be equal to raft::errc::not_leader.

throw std::runtime_error(
fmt::format("Failed to replicate snapshot: {}", res.error()));
}

// We abuse knowledge of implementation detail here to construct the
// dl_snapshot_id without having to setup listeners and notifiers of command
// apply.
auto expected_id = dl_snapshot_id(dl_version(res.value().last_offset));

auto applied = co_await _stm->wait_no_throw(
res.value().last_offset, model::timeout_clock::now() + 30s);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using model::no_timeout, otherwise the command could be applied concurrently with the code that invokes start_snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that be a problem? PS. there are no locks taken here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but imagine a situation when this fails. The caller is retrying the operation and replicating new command, then the original command is applied and then the retried command is applied. I guess that currently there is an implicit assumption that this is OK because the command itself is empty. It doesn't have any state on its own. If this is correct than this should be mentioned in the comment at least. Also, this may not always be the case. Eventually, some new fields could be added to the command and this implicit assumption may no longer be true.

if (!applied) {
co_return outcome::failure(dl_stm_api_errc::timeout);
}

// Ensure that the expected snapshot was created.
vassert(_stm->_state.snapshot_exists(expected_id), "Snapshot not found");
nvartolomei marked this conversation as resolved.
Show resolved Hide resolved

co_return outcome::success(expected_id);
}

std::optional<dl_snapshot_payload>
dl_stm_api::read_snapshot(dl_snapshot_id id) {
return _stm->_state.read_snapshot(id);
}

ss::future<checked<void, dl_stm_api_errc>>
dl_stm_api::remove_snapshots_before(dl_version last_version_to_keep) {
model::term_id term = _stm->_raft->term();

vlog(_logger.debug, "Replicating dl_stm_cmd::remove_snapshots_cmd");

storage::record_batch_builder builder(
model::record_batch_type::dl_stm_command, model::offset(0));
builder.add_raw_kv(
serde::to_iobuf(dl_stm_key::remove_snapshots_before_version),
serde::to_iobuf(
remove_snapshots_before_version_cmd(last_version_to_keep)));

auto batch = std::move(builder).build();
auto reader = model::make_memory_record_batch_reader(std::move(batch));

auto opts = raft::replicate_options(raft::consistency_level::quorum_ack);
opts.set_force_flush();
auto res = co_await _stm->_raft->replicate(term, std::move(reader), opts);

if (res.has_error()) {
throw std::runtime_error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as previous

fmt::format("Failed to replicate remove snapshots: {}", res.error()));
}

co_await _stm->wait_no_throw(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this workflow (replicate and then wait until the command is applied) can be extracted and reused

res.value().last_offset, model::timeout_clock::now() + 30s);

co_return outcome::success();
}

}; // namespace experimental::cloud_topics
14 changes: 14 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include "base/outcome.h"
#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_snapshot.h"
#include "cloud_topics/dl_version.h"

#include <seastar/util/log.hh>

Expand Down Expand Up @@ -41,6 +43,18 @@ class dl_stm_api {
/// available offset.
std::optional<dl_overlay> lower_bound(kafka::offset offset) const;

/// Request a new snapshot to be created.
ss::future<checked<dl_snapshot_id, dl_stm_api_errc>> start_snapshot();

/// Read the payload of a snapshot.
std::optional<dl_snapshot_payload> read_snapshot(dl_snapshot_id id);

/// Remove all snapshots with version less than the given version.
/// This must be called periodically as new snapshots are being created
/// to avoid the state growing indefinitely.
ss::future<checked<void, dl_stm_api_errc>>
remove_snapshots_before(dl_version last_version_to_keep);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class needs a gate and a shutdown code path if the object is not transient (only created to replicate one message)

private:
ss::logger& _logger;

Expand Down
26 changes: 26 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_version.h"
#include "model/fundamental.h"
#include "model/timestamp.h"
#include "serde/envelope.h"
Expand All @@ -28,4 +29,29 @@ struct push_overlay_cmd
dl_overlay overlay;
};

struct start_snapshot_cmd
: public serde::envelope<
start_snapshot_cmd,
serde::version<0>,
serde::compat_version<0>> {
start_snapshot_cmd() noexcept = default;

auto serde_fields() { return std::tie(); }
};

struct remove_snapshots_before_version_cmd
: public serde::envelope<
remove_snapshots_before_version_cmd,
serde::version<0>,
serde::compat_version<0>> {
remove_snapshots_before_version_cmd() noexcept = default;
explicit remove_snapshots_before_version_cmd(
dl_version last_version_to_keep)
: last_version_to_keep(last_version_to_keep) {}

auto serde_fields() { return std::tie(last_version_to_keep); }

dl_version last_version_to_keep{};
};

} // namespace experimental::cloud_topics
59 changes: 59 additions & 0 deletions src/v/cloud_topics/dl_stm/dl_stm_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "cloud_topics/dl_stm/dl_stm_state.h"

#include "cloud_topics/dl_overlay.h"
#include "cloud_topics/dl_snapshot.h"
#include "model/fundamental.h"

#include <algorithm>
Expand Down Expand Up @@ -71,4 +72,62 @@ dl_stm_state::lower_bound(kafka::offset offset) const {
return best_match;
}

dl_snapshot_id dl_stm_state::start_snapshot(dl_version version) noexcept {
_version_invariant.set_last_snapshot_version(version);

auto id = dl_snapshot_id(version);
_snapshots.push_back(id);

return id;
}

bool dl_stm_state::snapshot_exists(dl_snapshot_id id) const noexcept {
return std::binary_search(
_snapshots.begin(),
_snapshots.end(),
id,
[](const dl_snapshot_id& a, const dl_snapshot_id& b) {
return a.version < b.version;
});
}

std::optional<dl_snapshot_payload>
dl_stm_state::read_snapshot(dl_snapshot_id id) const {
auto it = std::find_if(
_snapshots.begin(), _snapshots.end(), [&id](const dl_snapshot_id& s) {
return s.version == id.version;
});

// Snapshot not found.
if (it == _snapshots.end()) {
return std::nullopt;
}

// Collect overlays that are visible at the snapshot version.
fragmented_vector<dl_overlay> overlays;
for (const auto& entry : _overlays) {
if (
entry.added_at <= id.version
&& (entry.removed_at == dl_version{} || entry.removed_at > id.version)) {
overlays.push_back(entry.overlay);
}
}

return dl_snapshot_payload{
.id = *it,
.overlays = std::move(overlays),
};
}

void dl_stm_state::remove_snapshots_before(dl_version last_version_to_keep) {
auto it = std::remove_if(
_snapshots.begin(),
_snapshots.end(),
[last_version_to_keep](const dl_snapshot_id& id) {
return id.version < last_version_to_keep;
});

_snapshots.erase(it, _snapshots.end());
}

} // namespace experimental::cloud_topics
Loading