diff --git a/meson.build b/meson.build index fd1e977d..15b32149 100644 --- a/meson.build +++ b/meson.build @@ -7,7 +7,7 @@ project( 'warning_level=2', 'default_library=static', 'b_colorout=always', - # 'b_sanitize=address,undefined', + 'b_sanitize=address,undefined', 'b_asneeded=false', 'b_lto=true', 'b_thinlto_cache=true', diff --git a/src_rewrite/backend.cc b/src_rewrite/backend.cc index 92804abe..31f7bd52 100644 --- a/src_rewrite/backend.cc +++ b/src_rewrite/backend.cc @@ -11,7 +11,7 @@ #include "backend.h" #include "utils.h" -class Rados : public Backend +class Rados : public ObjStore { private: librados::IoCtx io; diff --git a/src_rewrite/backend.h b/src_rewrite/backend.h index 2c5edb72..48e53380 100644 --- a/src_rewrite/backend.h +++ b/src_rewrite/backend.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -7,20 +8,26 @@ #include "smartiov.h" #include "utils.h" -class Backend +class ObjStore { public: - virtual ~Backend() {} + virtual ~ObjStore() {} virtual ResTask get_size(str name) = 0; virtual ResTask exists(str name) = 0; + virtual ResTask> read_all(str name) = 0; virtual ResTask read(str name, off_t offset, smartiov &iov) = 0; virtual ResTask read(str name, off_t offset, iovec iov) = 0; - virtual ResTask> read_all(str name) = 0; virtual ResTask write(str name, smartiov &iov) = 0; virtual ResTask write(str name, iovec iov) = 0; + virtual ResTask remove(str name) = 0; +}; + +class FileIo +{ + virtual ResTask preadv(off_t offset, smartiov iov) = 0; + virtual ResTask pwritev(off_t offset, smartiov iov) = 0; }; -sptr make_rados_backend(rados_ioctx_t io); -Result connect_to_pool(str pool_name); +Result> connect_to_pool(str pool_name); diff --git a/src_rewrite/image.cc b/src_rewrite/image.cc index 2d36f86d..99a9de9a 100644 --- a/src_rewrite/image.cc +++ b/src_rewrite/image.cc @@ -3,25 +3,61 @@ #include "image.h" #include "representation.h" #include "smartiov.h" +#include "utils.h" -FutRes LsvdImage::read(off_t offset, smartiov iovs) +class LogObj { - return read_task(offset, std::move(iovs)).semi().via(executor); -} + public: + const seqnum_t seqnum; + + private: + u32 bytes_written = 0; + vec data; + + // We start with one to mark that we're expecting further writes, only when + // we get mark as completed to we let it go to 0 + std::atomic writes_pending = 1; + folly::coro::Baton writes_done_baton; + + public: + LogObj(seqnum_t seqnum, usize size) : seqnum(seqnum), data(size) {} + ~LogObj() { assert(writes_pending == 0); } + + auto append(usize len) -> std::pair + { + assert(bytes_written + len <= data.size()); + auto ret = bytes_written; + bytes_written += len; + return {S3Ext{seqnum, ret, len}, data.data() + ret}; + } -FutRes LsvdImage::write(off_t offset, smartiov iovs) -{ - return write_task(offset, std::move(iovs)).semi().via(executor); -} + auto remaining() { return data.size() - bytes_written; } + auto as_iov() { return iovec{data.data(), bytes_written}; } + auto at(S3Ext ext) { return data.data() + ext.offset; } + + Task wait_for_writes() { co_await writes_done_baton; } + void mark_complete() { write_end(); } + void write_start() { writes_pending++; } + void write_end() + { + if (writes_pending.fetch_sub(1) == 1) + writes_done_baton.post(); + } +}; -ResTask LsvdImage::read_task(off_t offset, smartiov iovs) +ResTask LsvdImage::read(off_t offset, smartiov iovs) { // Since backend objects are immutable, we just need a read lock for as long // as we're getting the extents. Once we have the extents, the map can be // moified and we don't care. auto exts = co_await extmap.lookup(offset, iovs.bytes()); - folly::fbvector>> tasks(exts.size()); + folly::fbvector>> tasks; + tasks.reserve(exts.size()); + + // We can hold on to this for a while since we only write to the map on + // rollover, and those are fairly rare compared to reads + auto l = co_await pending_mtx.co_scoped_lock_shared(); for (auto &[img_off, ext] : exts) { auto base = img_off - offset; @@ -34,20 +70,24 @@ ResTask LsvdImage::read_task(off_t offset, smartiov iovs) auto ext_iov = iovs.slice(base, ext.len); // First try to read from the write log in case it's in memory - auto success = co_await wlog->try_read(ext, ext_iov); - if (success) + auto it = pending_objs.find(ext.seqnum); + if (it != pending_objs.end()) { + auto ptr = it->second->at(ext); + iovs.copy_in(ptr, ext.len); continue; + } // Not in pending logobjs, get it from the read-through backend cache tasks.push_back(cache->read(ext, ext_iov).semi()); } + l.unlock(); auto all = co_await folly::collectAll(tasks); for (auto &t : all) if (t.hasException()) - co_return std::errc::io_error; - else if (t.value().has_error()) - co_return t.value().error(); + co_return outcome::failure(std::errc::io_error); + else + BOOST_OUTCOME_CO_TRYX(t.value()); co_return outcome::success(); } @@ -94,38 +134,120 @@ avoid locking anything for too long by doing this and ensure maximum parallelism. */ -ResTask LsvdImage::write_task(off_t offset, smartiov iovs) +ResTask LsvdImage::write(off_t offset, smartiov iovs) { + assert(offset >= 0); auto data_bytes = iovs.bytes(); assert(data_bytes > 0 && data_bytes % block_size == 0); - auto [h, buf] = co_await wlog->new_write_entry(offset, data_bytes); - - // we can't avoid the copy, we have to retain the write somewhere until - // we can flush it to the backend - iovs.copy_out(buf); - + // pin the current object to reserve space in the logobj, and while the + // log is held check if we have to rollover + auto lck = co_await logobj_mtx.co_scoped_lock(); + auto obj = cur_logobj; + obj->write_start(); + auto [ext, buf] = obj->append(data_bytes); + co_await log_rollover(false); + lck.unlock(); + + // Write out the data + auto *entry = reinterpret_cast(buf); + *entry = log_write_entry{ + .type = log_entry_type::WRITE, + .offset = static_cast(offset), + .len = data_bytes, + }; + iovs.copy_out(entry->data); + + // Write it to the journal BOOST_OUTCOME_CO_TRY( - co_await journal->record_write(offset, iovec{buf, data_bytes}, h.ext)); + co_await journal->record_write(offset, iovec{buf, data_bytes}, ext)); // Update the extent map and ack. The extent map takes extents that do not // include headers, so strip those off before updating. - auto data_ext = get_data_ext(h.ext); + auto data_ext = get_data_ext(ext); co_await extmap.update(offset, data_bytes, data_ext); + obj->write_end(); co_return outcome::success(); } -ResTask LsvdImage::trim_task(off_t offset, usize len) +ResTask LsvdImage::trim(off_t offset, usize len) { - auto h = co_await wlog->new_trim_entry(offset, len); - BOOST_OUTCOME_CO_TRY(co_await journal->record_trim(offset, len, h.ext)); + assert(offset >= 0); + if (len == 0) + co_return outcome::success(); + + auto ol = co_await logobj_mtx.co_scoped_lock(); + auto obj = cur_logobj; + obj->write_start(); + auto [ext, buf] = obj->append(len); + co_await log_rollover(false); + ol.unlock(); + + // Write out the data + auto *entry = reinterpret_cast(buf); + *entry = log_trim_entry{ + .type = log_entry_type::TRIM, + .offset = static_cast(offset), + .len = len, + }; + + BOOST_OUTCOME_CO_TRY(co_await journal->record_trim(offset, len, ext)); co_await extmap.unmap(offset, len); + + obj->write_end(); co_return outcome::success(); } -ResTask LsvdImage::flush_task() +ResTask LsvdImage::flush() { - co_await wlog->force_rollover(); + auto ol = co_await logobj_mtx.co_scoped_lock(); + auto obj = co_await log_rollover(true); + ol.unlock(); + + co_await obj->wait_for_writes(); co_return outcome::success(); } + +// Assumes that the logobj_mtx is held exclusively +Task> LsvdImage::log_rollover(bool force) +{ + if (!force && cur_logobj->remaining() > rollover_threshold) { + co_return cur_logobj; + } + + auto prev = cur_logobj; + auto new_seqnum = prev->seqnum + 1; + auto new_logobj = std::make_shared(new_seqnum, max_log_size); + { + auto l = co_await pending_mtx.co_scoped_lock(); + auto [it, inserted] = pending_objs.emplace(new_seqnum, new_logobj); + assert(inserted == true); + } + + cur_logobj = new_logobj; + prev->mark_complete(); + + auto exe = co_await folly::coro::co_current_executor; + flush_logobj(prev).scheduleOn(exe).start(); + co_return prev; +} + +Task LsvdImage::flush_logobj(sptr obj) +{ + co_await obj->wait_for_writes(); + auto k = get_key(obj->seqnum); + + // TODO think about what to do in the case of failure here + std::ignore = co_await s3->write(k, obj->as_iov()); + std::ignore = co_await cache->insert_obj(obj->seqnum, obj->as_iov()); + + auto l = co_await pending_mtx.co_scoped_lock(); + pending_objs.erase(obj->seqnum); +} + +Task LsvdImage::checkpoint() +{ + TODO(); + co_return; +} diff --git a/src_rewrite/image.h b/src_rewrite/image.h index 2882a832..942766b2 100644 --- a/src_rewrite/image.h +++ b/src_rewrite/image.h @@ -1,3 +1,5 @@ +#include +#include #include #include @@ -8,43 +10,73 @@ #include "read_cache.h" #include "smartiov.h" #include "utils.h" -#include "writelog.h" template using FutRes = folly::Future>; +struct SuperblockInfo { + u64 magic; + u64 version; + u64 block_size_bytes; + u64 image_size_bytes; + + folly::fbvector checkpoints; + folly::fbvector snapshots; + folly::AtomicHashMap clones; +}; + +class LogObj; + class LsvdImage { + const usize rollover_threshold = 4 * 1024 * 1024; // 4MB + const usize max_log_size = rollover_threshold * 2; // 8MB + const usize block_size = 4096; + public: const str name; const uuid_t uuid; const usize size_bytes; - const usize block_size = 4096; private: folly::Executor::KeepAlive<> &executor; LsvdConfig cfg; ExtMap extmap; - sptr s3; + sptr s3; sptr cache; sptr journal; - sptr wlog; + + // The "log" part of LSVD + folly::coro::SharedMutex logobj_mtx; + sptr cur_logobj; + folly::coro::SharedMutex pending_mtx; + folly::F14FastMap> pending_objs; + + // Clone, checkpoint, and snapshopt metadata + SuperblockInfo info; + + // Internal functions + std::string get_key(seqnum_t seqnum); + Task> log_rollover(bool force); + Task flush_logobj(sptr obj); + Task checkpoint(); + + // Cannot be copied or moved + LsvdImage(LsvdImage &) = delete; + LsvdImage(LsvdImage &&) = delete; + LsvdImage operator=(LsvdImage &) = delete; + LsvdImage operator=(LsvdImage &&) = delete; public: - static Result> mount(sptr s3, str name, + static Result> mount(sptr s3, str name, str config); + void unmount(); - FutRes read(off_t offset, smartiov iovs); - FutRes write(off_t offset, smartiov iovs); - FutRes trim(off_t offset, usize len); - FutRes flush(); - - static FutRes create(sptr s3, str name); - static FutRes remove(sptr s3, str name); - static FutRes clone(sptr s3, str src, str dst); + static FutRes create(sptr s3, str name); + static FutRes remove(sptr s3, str name); + static FutRes clone(sptr s3, str src, str dst); - private: - ResTask read_task(off_t offset, smartiov iovs); - ResTask write_task(off_t offset, smartiov iovs); - ResTask trim_task(off_t offset, usize len); - ResTask flush_task(); + ResTask read(off_t offset, smartiov iovs); + ResTask write(off_t offset, smartiov iovs); + ResTask trim(off_t offset, usize len); + ResTask flush(); }; \ No newline at end of file diff --git a/src_rewrite/journal.cc b/src_rewrite/journal.cc new file mode 100644 index 00000000..cb4c8bc2 --- /dev/null +++ b/src_rewrite/journal.cc @@ -0,0 +1,5 @@ +#include "journal.h" + +ResTask Journal::record_write(off_t offset, iovec iov, S3Ext ext) +{ +} diff --git a/src_rewrite/journal.h b/src_rewrite/journal.h index d7a3609a..c8421caf 100644 --- a/src_rewrite/journal.h +++ b/src_rewrite/journal.h @@ -1,4 +1,6 @@ #pragma once +#include +#include #include "representation.h" #include "smartiov.h" @@ -6,11 +8,12 @@ class Journal { + folly::File journal_file; + folly::IoUring uring; public: static uptr open(fspath path); - Task backpressure(); ResTask record_write(off_t offset, iovec iov, S3Ext ext); ResTask record_trim(off_t offset, usize len, S3Ext ext); }; \ No newline at end of file diff --git a/src_rewrite/meson.build b/src_rewrite/meson.build index f3d392ba..a7262688 100644 --- a/src_rewrite/meson.build +++ b/src_rewrite/meson.build @@ -1,13 +1,15 @@ -folly_sub = subproject('folly') -libfolly = folly_sub.get_variable('folly_dep') - lsvd_src = files( 'backend.cc', + 'extmap.cc', + 'image.cc', + 'journal.cc', ) -lsvd_inc = include_directories('.') lsvd_deps = [ - # libfolly, + subproject('folly').get_variable('folly_dep'), + subproject('liburing').get_variable('uring'), + subproject('cereal').get_variable('cereal_dep'), + dependency('threads', static: true), dependency('zlib', static: true), dependency('zstd', modules: ['zstd::libzstd_static'], static: true), @@ -18,14 +20,11 @@ lsvd_deps = [ modules: ['system', 'filesystem', 'program_options', 'thread', 'regex'], static: true, ), - subproject('liburing').get_variable('uring'), - subproject('cereal').get_variable('cereal_dep'), dependency('uuid'), dependency('nlohmann_json'), cxx.find_library('rados', required: true), cxx.find_library('rbd', required: true), cxx.find_library('jemalloc', required: false), - libfolly, ] spdk_fe = lsvd_src + files( diff --git a/src_rewrite/read_cache.h b/src_rewrite/read_cache.h index 37808dbd..d09d6efc 100644 --- a/src_rewrite/read_cache.h +++ b/src_rewrite/read_cache.h @@ -9,10 +9,6 @@ class ReadCache public: virtual ~ReadCache() {} - virtual bool should_bypass_cache(str img, seqnum_t seqnum, - usize offset) = 0; - virtual void served_bypass_request(str img, seqnum_t seqnum, usize offset, - usize bytes) = 0; virtual ResTask read(S3Ext ext, smartiov &dest) = 0; - virtual ResTask insert_object(str img, seqnum_t seqnum, iovec v) = 0; + virtual ResTask insert_obj(seqnum_t seqnum, iovec iov) = 0; }; diff --git a/src_rewrite/representation.h b/src_rewrite/representation.h index d23a50d5..43c28ec4 100644 --- a/src_rewrite/representation.h +++ b/src_rewrite/representation.h @@ -35,6 +35,24 @@ struct S3Ext { uint64_t len; }; +// === Log headers === + +struct __attribute((packed)) log_obj_hdr { + uint64_t magic; + uint64_t total_bytes; + seqnum_t seqnum; + uint32_t num_entries; + byte data[]; +}; + +struct __attribute((packed)) superblock_hdr { + uint64_t magic; + uint64_t total_bytes; + byte data[]; +}; + +// === Log entries === + enum log_entry_type { WRITE = 0, TRIM = 1, @@ -44,10 +62,20 @@ struct __attribute((packed)) log_write_entry { log_entry_type type : 2; uint64_t offset : 62; uint64_t len; - std::byte data[]; + byte data[]; +}; + +struct __attribute((packed)) log_trim_entry { + log_entry_type type : 2; + uint64_t offset : 62; + uint64_t len; }; const uint32_t LOG_WRITE_ENTRY_SIZE = sizeof(log_write_entry); +static_assert(LOG_WRITE_ENTRY_SIZE == 16); + +const auto LOG_TRIM_ENTRY_SIZE = sizeof(log_trim_entry); +static_assert(LOG_TRIM_ENTRY_SIZE == 16); inline auto get_data_ext(S3Ext ext) { @@ -58,10 +86,18 @@ inline auto get_data_ext(S3Ext ext) }; } -struct __attribute((packed)) log_trim_entry { +// === Backend representation === + +inline auto get_object_name(str image_name, seqnum_t seqnum) -> std::string +{ + return fmt::format("{}.{:08x}", image_name, seqnum); +} + +// === Journal entries === + +struct __attribute((packed)) journal_entry { log_entry_type type : 2; uint64_t offset : 62; uint64_t len; + byte data[]; }; - -const auto LOG_TRIM_ENTRY_SIZE = sizeof(log_trim_entry); diff --git a/src_rewrite/smartiov.h b/src_rewrite/smartiov.h index 7e18354b..64159ffa 100644 --- a/src_rewrite/smartiov.h +++ b/src_rewrite/smartiov.h @@ -7,6 +7,8 @@ #include "utils.h" +using byte = std::byte; + /* this makes readv / writev a lot easier... */ class smartiov diff --git a/src_rewrite/writelog.cc b/src_rewrite/writelog.cc deleted file mode 100644 index 5358e198..00000000 --- a/src_rewrite/writelog.cc +++ /dev/null @@ -1,21 +0,0 @@ -#include "writelog.h" - -Task LsvdLog::try_read(S3Ext ext, smartiov &iovs) -{ - auto l = co_await mtx.co_scoped_lock_shared(); - - auto it = pending_flush.find(ext.seqnum); - if (it == pending_flush.end()) { - co_return false; - } - - auto &obj = it->second; - obj.read_refs += 1; - l.unlock(); - - iovs.copy_in(obj.data.data() + ext.offset, ext.len); - obj.read_refs -= 1; - co_return true; -} - -Task LsvdLog::new_entry(usize len) {} diff --git a/src_rewrite/writelog.h b/src_rewrite/writelog.h deleted file mode 100644 index 2c8dced5..00000000 --- a/src_rewrite/writelog.h +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "representation.h" -#include "smartiov.h" -#include "utils.h" - -/** - -A wrapper around the "log-structured" part of LSVD. Owns the objects that will -be written to the backend and associated functions like backend flushes. - - */ -class LsvdLog -{ - const usize ROLLOVER_THRESHOLD = 4 * 1024 * 1024; // 4MB - const usize MAX_LOG_SIZE = ROLLOVER_THRESHOLD * 2; // 8MB - - struct LogObj { - const seqnum_t seqnum; - std::atomic read_refs; - std::atomic write_refs; - vec data; - }; - - folly::coro::SharedMutex mtx; - LogObj &cur_obj; - std::atomic cur_seqnum; - std::atomic cur_obj_offset; - - folly::F14FastMap pending_flush; - - public: - struct LogHandle { - S3Ext ext; - LogObj &obj; - - LogHandle(S3Ext ext, byte *ptr, LogObj &obj) : ext(ext), obj(obj) - { - obj.write_refs.fetch_add(1, std::memory_order_seq_cst); - } - - ~LogHandle() { obj.write_refs.fetch_sub(1, std::memory_order_seq_cst); } - }; - - /** - Tries to read a range of data from objects that are pending flush. If - successful, copy the data into the iovs and return true, otherwise return - false and do nothing else. - */ - Task try_read(S3Ext ext, smartiov &iovs); - - /** - Allocates the requested space for a new log entry. This does NOT fill it - with any kind of metadata; the caller is responsible for requesting some - additional space for metadata headers and filling them in. - */ - Task new_entry(usize len); - - /** - Record a write on the log. This will fill in the metadata as appropriate and - return a loghandle and a ptr to the buffer where it should be copied, but - the extent is NOT shrunk to ONLY include the data and not the metadata - header, it includes both. - */ - Task> new_write_entry(off_t offset, usize len); - - /** - Record a write on the log. This will fill in the metadata as appropriate - and return a loghandle to the entire entry, including the metadata. - */ - Task new_trim_entry(off_t offset, usize len); - - Task force_rollover(); -}; \ No newline at end of file