Skip to content

Commit

Permalink
Implement most of the logobj code
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaacKhor committed Sep 18, 2024
1 parent 67e4414 commit 76ea220
Show file tree
Hide file tree
Showing 13 changed files with 273 additions and 170 deletions.
2 changes: 1 addition & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ project(
'warning_level=2',
'default_library=static',
'b_colorout=always',
# 'b_sanitize=address,undefined',
'b_sanitize=address,undefined',
'b_asneeded=false',
'b_lto=true',
'b_thinlto_cache=true',
Expand Down
2 changes: 1 addition & 1 deletion src_rewrite/backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "backend.h"
#include "utils.h"

class Rados : public Backend
class Rados : public ObjStore
{
private:
librados::IoCtx io;
Expand Down
17 changes: 12 additions & 5 deletions src_rewrite/backend.h
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
#pragma once

#include <folly/File.h>
#include <rados/librados.h>
#include <sys/uio.h>

#include "representation.h"
#include "smartiov.h"
#include "utils.h"

class Backend
class ObjStore
{
public:
virtual ~Backend() {}
virtual ~ObjStore() {}

virtual ResTask<usize> get_size(str name) = 0;
virtual ResTask<bool> exists(str name) = 0;

virtual ResTask<vec<byte>> read_all(str name) = 0;
virtual ResTask<usize> read(str name, off_t offset, smartiov &iov) = 0;
virtual ResTask<usize> read(str name, off_t offset, iovec iov) = 0;
virtual ResTask<vec<byte>> read_all(str name) = 0;
virtual ResTask<usize> write(str name, smartiov &iov) = 0;
virtual ResTask<usize> write(str name, iovec iov) = 0;
virtual ResTask<void> remove(str name) = 0;
};

class FileIo
{
virtual ResTask<void> preadv(off_t offset, smartiov iov) = 0;
virtual ResTask<void> pwritev(off_t offset, smartiov iov) = 0;
};

sptr<Backend> make_rados_backend(rados_ioctx_t io);
Result<rados_ioctx_t> connect_to_pool(str pool_name);
Result<uptr<ObjStore>> connect_to_pool(str pool_name);
178 changes: 150 additions & 28 deletions src_rewrite/image.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,61 @@
#include "image.h"
#include "representation.h"
#include "smartiov.h"
#include "utils.h"

FutRes<void> LsvdImage::read(off_t offset, smartiov iovs)
class LogObj
{
return read_task(offset, std::move(iovs)).semi().via(executor);
}
public:
const seqnum_t seqnum;

private:
u32 bytes_written = 0;
vec<byte> data;

// We start with one to mark that we're expecting further writes, only when
// we get mark as completed to we let it go to 0
std::atomic<u32> writes_pending = 1;
folly::coro::Baton writes_done_baton;

public:
LogObj(seqnum_t seqnum, usize size) : seqnum(seqnum), data(size) {}
~LogObj() { assert(writes_pending == 0); }

auto append(usize len) -> std::pair<S3Ext, byte *>
{
assert(bytes_written + len <= data.size());
auto ret = bytes_written;
bytes_written += len;
return {S3Ext{seqnum, ret, len}, data.data() + ret};
}

FutRes<void> LsvdImage::write(off_t offset, smartiov iovs)
{
return write_task(offset, std::move(iovs)).semi().via(executor);
}
auto remaining() { return data.size() - bytes_written; }
auto as_iov() { return iovec{data.data(), bytes_written}; }
auto at(S3Ext ext) { return data.data() + ext.offset; }

Task<void> wait_for_writes() { co_await writes_done_baton; }
void mark_complete() { write_end(); }
void write_start() { writes_pending++; }
void write_end()
{
if (writes_pending.fetch_sub(1) == 1)
writes_done_baton.post();
}
};

ResTask<void> LsvdImage::read_task(off_t offset, smartiov iovs)
ResTask<void> LsvdImage::read(off_t offset, smartiov iovs)
{
// Since backend objects are immutable, we just need a read lock for as long
// as we're getting the extents. Once we have the extents, the map can be
// moified and we don't care.
auto exts = co_await extmap.lookup(offset, iovs.bytes());

folly::fbvector<folly::SemiFuture<Result<void>>> tasks(exts.size());
folly::fbvector<folly::SemiFuture<Result<void>>> tasks;
tasks.reserve(exts.size());

// We can hold on to this for a while since we only write to the map on
// rollover, and those are fairly rare compared to reads
auto l = co_await pending_mtx.co_scoped_lock_shared();
for (auto &[img_off, ext] : exts) {
auto base = img_off - offset;

Expand All @@ -34,20 +70,24 @@ ResTask<void> LsvdImage::read_task(off_t offset, smartiov iovs)
auto ext_iov = iovs.slice(base, ext.len);

// First try to read from the write log in case it's in memory
auto success = co_await wlog->try_read(ext, ext_iov);
if (success)
auto it = pending_objs.find(ext.seqnum);
if (it != pending_objs.end()) {
auto ptr = it->second->at(ext);
iovs.copy_in(ptr, ext.len);
continue;
}

// Not in pending logobjs, get it from the read-through backend cache
tasks.push_back(cache->read(ext, ext_iov).semi());
}
l.unlock();

auto all = co_await folly::collectAll(tasks);
for (auto &t : all)
if (t.hasException())
co_return std::errc::io_error;
else if (t.value().has_error())
co_return t.value().error();
co_return outcome::failure(std::errc::io_error);
else
BOOST_OUTCOME_CO_TRYX(t.value());

co_return outcome::success();
}
Expand Down Expand Up @@ -94,38 +134,120 @@ avoid locking anything for too long by doing this and ensure maximum
parallelism.
*/
ResTask<void> LsvdImage::write_task(off_t offset, smartiov iovs)
ResTask<void> LsvdImage::write(off_t offset, smartiov iovs)
{
assert(offset >= 0);
auto data_bytes = iovs.bytes();
assert(data_bytes > 0 && data_bytes % block_size == 0);

auto [h, buf] = co_await wlog->new_write_entry(offset, data_bytes);

// we can't avoid the copy, we have to retain the write somewhere until
// we can flush it to the backend
iovs.copy_out(buf);

// pin the current object to reserve space in the logobj, and while the
// log is held check if we have to rollover
auto lck = co_await logobj_mtx.co_scoped_lock();
auto obj = cur_logobj;
obj->write_start();
auto [ext, buf] = obj->append(data_bytes);
co_await log_rollover(false);
lck.unlock();

// Write out the data
auto *entry = reinterpret_cast<log_write_entry *>(buf);
*entry = log_write_entry{
.type = log_entry_type::WRITE,
.offset = static_cast<u64>(offset),
.len = data_bytes,
};
iovs.copy_out(entry->data);

// Write it to the journal
BOOST_OUTCOME_CO_TRY(
co_await journal->record_write(offset, iovec{buf, data_bytes}, h.ext));
co_await journal->record_write(offset, iovec{buf, data_bytes}, ext));

// Update the extent map and ack. The extent map takes extents that do not
// include headers, so strip those off before updating.
auto data_ext = get_data_ext(h.ext);
auto data_ext = get_data_ext(ext);
co_await extmap.update(offset, data_bytes, data_ext);

obj->write_end();
co_return outcome::success();
}

ResTask<void> LsvdImage::trim_task(off_t offset, usize len)
ResTask<void> LsvdImage::trim(off_t offset, usize len)
{
auto h = co_await wlog->new_trim_entry(offset, len);
BOOST_OUTCOME_CO_TRY(co_await journal->record_trim(offset, len, h.ext));
assert(offset >= 0);
if (len == 0)
co_return outcome::success();

auto ol = co_await logobj_mtx.co_scoped_lock();
auto obj = cur_logobj;
obj->write_start();
auto [ext, buf] = obj->append(len);
co_await log_rollover(false);
ol.unlock();

// Write out the data
auto *entry = reinterpret_cast<log_trim_entry *>(buf);
*entry = log_trim_entry{
.type = log_entry_type::TRIM,
.offset = static_cast<u64>(offset),
.len = len,
};

BOOST_OUTCOME_CO_TRY(co_await journal->record_trim(offset, len, ext));
co_await extmap.unmap(offset, len);

obj->write_end();
co_return outcome::success();
}

ResTask<void> LsvdImage::flush_task()
ResTask<void> LsvdImage::flush()
{
co_await wlog->force_rollover();
auto ol = co_await logobj_mtx.co_scoped_lock();
auto obj = co_await log_rollover(true);
ol.unlock();

co_await obj->wait_for_writes();
co_return outcome::success();
}

// Assumes that the logobj_mtx is held exclusively
Task<sptr<LogObj>> LsvdImage::log_rollover(bool force)
{
if (!force && cur_logobj->remaining() > rollover_threshold) {
co_return cur_logobj;
}

auto prev = cur_logobj;
auto new_seqnum = prev->seqnum + 1;
auto new_logobj = std::make_shared<LogObj>(new_seqnum, max_log_size);
{
auto l = co_await pending_mtx.co_scoped_lock();
auto [it, inserted] = pending_objs.emplace(new_seqnum, new_logobj);
assert(inserted == true);
}

cur_logobj = new_logobj;
prev->mark_complete();

auto exe = co_await folly::coro::co_current_executor;
flush_logobj(prev).scheduleOn(exe).start();
co_return prev;
}

Task<void> LsvdImage::flush_logobj(sptr<LogObj> obj)
{
co_await obj->wait_for_writes();
auto k = get_key(obj->seqnum);

// TODO think about what to do in the case of failure here
std::ignore = co_await s3->write(k, obj->as_iov());
std::ignore = co_await cache->insert_obj(obj->seqnum, obj->as_iov());

auto l = co_await pending_mtx.co_scoped_lock();
pending_objs.erase(obj->seqnum);
}

Task<void> LsvdImage::checkpoint()
{
TODO();
co_return;
}
68 changes: 50 additions & 18 deletions src_rewrite/image.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <folly/AtomicHashMap.h>
#include <folly/FBVector.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <uuid.h>

Expand All @@ -8,43 +10,73 @@
#include "read_cache.h"
#include "smartiov.h"
#include "utils.h"
#include "writelog.h"

template <typename T> using FutRes = folly::Future<Result<T>>;

struct SuperblockInfo {
u64 magic;
u64 version;
u64 block_size_bytes;
u64 image_size_bytes;

folly::fbvector<seqnum_t> checkpoints;
folly::fbvector<seqnum_t> snapshots;
folly::AtomicHashMap<seqnum_t, str> clones;
};

class LogObj;

class LsvdImage
{
const usize rollover_threshold = 4 * 1024 * 1024; // 4MB
const usize max_log_size = rollover_threshold * 2; // 8MB
const usize block_size = 4096;

public:
const str name;
const uuid_t uuid;
const usize size_bytes;
const usize block_size = 4096;

private:
folly::Executor::KeepAlive<> &executor;
LsvdConfig cfg;
ExtMap extmap;
sptr<Backend> s3;
sptr<ObjStore> s3;
sptr<ReadCache> cache;
sptr<Journal> journal;
sptr<LsvdLog> wlog;

// The "log" part of LSVD
folly::coro::SharedMutex logobj_mtx;
sptr<LogObj> cur_logobj;
folly::coro::SharedMutex pending_mtx;
folly::F14FastMap<seqnum_t, sptr<LogObj>> pending_objs;

// Clone, checkpoint, and snapshopt metadata
SuperblockInfo info;

// Internal functions
std::string get_key(seqnum_t seqnum);
Task<sptr<LogObj>> log_rollover(bool force);
Task<void> flush_logobj(sptr<LogObj> obj);
Task<void> checkpoint();

// Cannot be copied or moved
LsvdImage(LsvdImage &) = delete;
LsvdImage(LsvdImage &&) = delete;
LsvdImage operator=(LsvdImage &) = delete;
LsvdImage operator=(LsvdImage &&) = delete;

public:
static Result<uptr<LsvdImage>> mount(sptr<Backend> s3, str name,
static Result<uptr<LsvdImage>> mount(sptr<ObjStore> s3, str name,
str config);
void unmount();

FutRes<void> read(off_t offset, smartiov iovs);
FutRes<void> write(off_t offset, smartiov iovs);
FutRes<void> trim(off_t offset, usize len);
FutRes<void> flush();

static FutRes<void> create(sptr<Backend> s3, str name);
static FutRes<void> remove(sptr<Backend> s3, str name);
static FutRes<void> clone(sptr<Backend> s3, str src, str dst);
static FutRes<void> create(sptr<ObjStore> s3, str name);
static FutRes<void> remove(sptr<ObjStore> s3, str name);
static FutRes<void> clone(sptr<ObjStore> s3, str src, str dst);

private:
ResTask<void> read_task(off_t offset, smartiov iovs);
ResTask<void> write_task(off_t offset, smartiov iovs);
ResTask<void> trim_task(off_t offset, usize len);
ResTask<void> flush_task();
ResTask<void> read(off_t offset, smartiov iovs);
ResTask<void> write(off_t offset, smartiov iovs);
ResTask<void> trim(off_t offset, usize len);
ResTask<void> flush();
};
5 changes: 5 additions & 0 deletions src_rewrite/journal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "journal.h"

ResTask<void> Journal::record_write(off_t offset, iovec iov, S3Ext ext)
{
}
Loading

0 comments on commit 76ea220

Please sign in to comment.