Merge pull request #42750 from aclamk/wip-aclamk-bluefs-log-incremental

os/bluestore: incremental update mode for bluefs log

Reviewed-by: Igor Fedotov <igor.fedotov@croit.io>
This commit is contained in:
Neha Ojha 2021-12-20 17:15:13 -08:00 committed by GitHub
commit 1f28c06f2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 207 additions and 8 deletions

View File

@ -927,6 +927,7 @@ int BlueFS::mount()
log_writer = _create_writer(_get_file(1));
ceph_assert(log_writer->file->fnode.ino == 1);
log_writer->pos = log_writer->file->fnode.size;
log_writer->file->fnode.reset_delta();
dout(10) << __func__ << " log write pos set to 0x"
<< std::hex << log_writer->pos << std::dec
<< dendl;
@ -1542,6 +1543,69 @@ int BlueFS::_replay(bool noop, bool to_stdout)
}
}
break;
case bluefs_transaction_t::OP_FILE_UPDATE_INC:
{
bluefs_fnode_delta_t delta;
decode(delta, p);
dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
<< ": op_file_update_inc " << " " << delta << " " << dendl;
if (unlikely(to_stdout)) {
std::cout << " 0x" << std::hex << pos << std::dec
<< ": op_file_update_inc " << " " << delta << std::endl;
}
if (!noop) {
FileRef f = _get_file(delta.ino);
bluefs_fnode_t& fnode = f->fnode;
if (delta.offset != fnode.allocated) {
derr << __func__ << " invalid op_file_update_inc, new extents miss end of file"
<< " fnode=" << fnode
<< " delta=" << delta
<< dendl;
ceph_assert(delta.offset == fnode.allocated);
}
if (cct->_conf->bluefs_log_replay_check_allocations) {
int r = _check_allocations(fnode,
used_blocks, false, "OP_FILE_UPDATE_INC");
if (r < 0) {
return r;
}
}
fnode.ino = delta.ino;
fnode.mtime = delta.mtime;
if (fnode.ino != 1) {
vselector->sub_usage(f->vselector_hint, fnode);
}
fnode.size = delta.size;
fnode.claim_extents(delta.extents);
dout(20) << __func__ << " 0x" << std::hex << pos << std::dec
<< ": op_file_update_inc produced " << " " << fnode << " " << dendl;
if (fnode.ino != 1) {
vselector->add_usage(f->vselector_hint, fnode);
}
if (fnode.ino > ino_last) {
ino_last = fnode.ino;
}
if (cct->_conf->bluefs_log_replay_check_allocations) {
int r = _check_allocations(f->fnode,
used_blocks, true, "OP_FILE_UPDATE_INC");
if (r < 0) {
return r;
}
}
} else if (noop && delta.ino == 1) {
// we need to track bluefs log, even in noop mode
FileRef f = _get_file(1);
bluefs_fnode_t& fnode = f->fnode;
fnode.ino = delta.ino;
fnode.mtime = delta.mtime;
fnode.size = delta.size;
fnode.claim_extents(delta.extents);
}
}
break;
case bluefs_transaction_t::OP_FILE_REMOVE:
{
@ -2344,6 +2408,8 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback,
_close_writer(log_writer);
// we will write it to super
log_file->fnode.reset_delta();
log_file->fnode.size = bl.length();
vselector->sub_usage(log_file->vselector_hint, old_fnode);
vselector->add_usage(log_file->vselector_hint, log_file->fnode);
@ -2521,6 +2587,8 @@ void BlueFS::_compact_log_async(std::unique_lock<ceph::mutex>& l)
new_log->fnode.append_extent(*from);
++from;
}
// we will write it to super
new_log->fnode.reset_delta();
vselector->sub_usage(log_file->vselector_hint, log_file->fnode);
@ -2611,8 +2679,8 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
if (lsi != dirty_files.end()) {
dout(20) << __func__ << " " << lsi->second.size() << " dirty_files" << dendl;
for (auto &f : lsi->second) {
dout(20) << __func__ << " op_file_update " << f.fnode << dendl;
log_t.op_file_update(f.fnode);
dout(20) << __func__ << " op_file_update_inc " << f.fnode << dendl;
log_t.op_file_update_inc(f.fnode);
}
}
@ -2638,7 +2706,7 @@ int BlueFS::_flush_and_sync_log(std::unique_lock<ceph::mutex>& l,
&log_writer->file->fnode);
ceph_assert(r == 0);
vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
log_t.op_file_update(log_writer->file->fnode);
log_t.op_file_update_inc(log_writer->file->fnode);
just_expanded_log = true;
}
@ -3047,7 +3115,8 @@ int BlueFS::_truncate(FileWriter *h, uint64_t offset)
vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size);
h->file->fnode.size = offset;
vselector->add_usage(h->file->vselector_hint, h->file->fnode.size);
log_t.op_file_update(h->file->fnode);
log_t.op_file_update_inc(h->file->fnode);
return 0;
}
@ -3241,15 +3310,15 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)
uint64_t allocated = f->fnode.get_allocated();
if (off + len > allocated) {
uint64_t want = off + len - allocated;
vselector->sub_usage(f->vselector_hint, f->fnode);
vselector->sub_usage(f->vselector_hint, f->fnode);
int r = _allocate(vselector->select_prefer_bdev(f->vselector_hint),
want,
&f->fnode);
vselector->add_usage(f->vselector_hint, f->fnode);
if (r < 0)
return r;
log_t.op_file_update(f->fnode);
log_t.op_file_update_inc(f->fnode);
}
return 0;
}

View File

@ -147,6 +147,31 @@ mempool::bluefs::vector<bluefs_extent_t>::iterator bluefs_fnode_t::seek(
return p;
}
bluefs_fnode_delta_t* bluefs_fnode_t::make_delta(bluefs_fnode_delta_t* delta) {
ceph_assert(delta);
delta->ino = ino;
delta->size = size;
delta->mtime = mtime;
delta->offset = allocated_commited;
delta->extents.clear();
if (allocated_commited < allocated) {
uint64_t x_off = 0;
auto p = seek(allocated_commited, &x_off);
ceph_assert(p != extents.end());
if (x_off > 0) {
ceph_assert(x_off < p->length);
delta->extents.emplace_back(p->bdev, p->offset + x_off, p->length - x_off);
++p;
}
while (p != extents.end()) {
delta->extents.push_back(*p);
++p;
}
reset_delta();
}
return delta;
}
void bluefs_fnode_t::dump(Formatter *f) const
{
f->dump_unsigned("ino", ino);
@ -175,10 +200,22 @@ ostream& operator<<(ostream& out, const bluefs_fnode_t& file)
<< " size 0x" << std::hex << file.size << std::dec
<< " mtime " << file.mtime
<< " allocated " << std::hex << file.allocated << std::dec
<< " alloc_commit " << std::hex << file.allocated_commited << std::dec
<< " extents " << file.extents
<< ")";
}
// bluefs_fnode_delta_t
std::ostream& operator<<(std::ostream& out, const bluefs_fnode_delta_t& delta)
{
return out << "delta(ino " << delta.ino
<< " size 0x" << std::hex << delta.size << std::dec
<< " mtime " << delta.mtime
<< " offset " << std::hex << delta.offset << std::dec
<< " extents " << delta.extents
<< ")";
}
// bluefs_transaction_t

View File

@ -35,6 +35,29 @@ WRITE_CLASS_DENC(bluefs_extent_t)
std::ostream& operator<<(std::ostream& out, const bluefs_extent_t& e);
struct bluefs_fnode_delta_t {
uint64_t ino;
uint64_t size;
utime_t mtime;
uint64_t offset; // Contains offset in file of extents.
// Equal to 'allocated' when created.
// Used for consistency checking.
mempool::bluefs::vector<bluefs_extent_t> extents;
DENC(bluefs_fnode_delta_t, v, p) {
DENC_START(1, 1, p);
denc_varint(v.ino, p);
denc_varint(v.size, p);
denc(v.mtime, p);
denc(v.offset, p);
denc(v.extents, p);
DENC_FINISH(p);
}
};
WRITE_CLASS_DENC(bluefs_fnode_delta_t)
std::ostream& operator<<(std::ostream& out, const bluefs_fnode_delta_t& delta);
struct bluefs_fnode_t {
uint64_t ino;
uint64_t size;
@ -47,8 +70,9 @@ struct bluefs_fnode_t {
mempool::bluefs::vector<uint64_t> extents_index;
uint64_t allocated;
uint64_t allocated_commited;
bluefs_fnode_t() : ino(0), size(0), __unused__(0), allocated(0) {}
bluefs_fnode_t() : ino(0), size(0), __unused__(0), allocated(0), allocated_commited(0) {}
uint64_t get_allocated() const {
return allocated;
@ -61,6 +85,7 @@ struct bluefs_fnode_t {
extents_index.emplace_back(allocated);
allocated += p.length;
}
allocated_commited = allocated;
}
DENC_HELPERS
@ -87,6 +112,15 @@ struct bluefs_fnode_t {
DENC_FINISH(p);
}
void reset_delta() {
allocated_commited = allocated;
}
void claim_extents(mempool::bluefs::vector<bluefs_extent_t>& extents) {
for (const auto& p : extents) {
append_extent(p);
}
extents.clear();
}
void append_extent(const bluefs_extent_t& ext) {
if (!extents.empty() &&
extents.back().end() == ext.offset &&
@ -114,15 +148,18 @@ struct bluefs_fnode_t {
other.extents.swap(extents);
other.extents_index.swap(extents_index);
std::swap(allocated, other.allocated);
std::swap(allocated_commited, other.allocated_commited);
}
void clear_extents() {
extents_index.clear();
extents.clear();
allocated = 0;
allocated_commited = 0;
}
mempool::bluefs::vector<bluefs_extent_t>::iterator seek(
uint64_t off, uint64_t *x_off);
bluefs_fnode_delta_t* make_delta(bluefs_fnode_delta_t* delta);
void dump(ceph::Formatter *f) const;
static void generate_test_instances(std::list<bluefs_fnode_t*>& ls);
@ -195,6 +232,7 @@ struct bluefs_transaction_t {
OP_FILE_REMOVE, ///< remove file (ino)
OP_JUMP, ///< jump the seq # and offset
OP_JUMP_SEQ, ///< jump the seq #
OP_FILE_UPDATE_INC, ///< incremental update file metadata (file)
} op_t;
uuid_d uuid; ///< fs uuid
@ -237,10 +275,19 @@ struct bluefs_transaction_t {
encode(dir, op_bl);
encode(file, op_bl);
}
void op_file_update(const bluefs_fnode_t& file) {
void op_file_update(bluefs_fnode_t& file) {
using ceph::encode;
encode((__u8)OP_FILE_UPDATE, op_bl);
encode(file, op_bl);
file.reset_delta();
}
/* streams update to bufferlist and clears update state */
void op_file_update_inc(bluefs_fnode_t& file) {
using ceph::encode;
bluefs_fnode_delta_t delta;
file.make_delta(&delta); //also resets delta to zero
encode((__u8)OP_FILE_UPDATE_INC, op_bl);
encode(delta, op_bl);
}
void op_file_remove(uint64_t ino) {
using ceph::encode;

View File

@ -913,6 +913,52 @@ TEST(BlueFS, test_truncate_stable_53129) {
fs.umount();
}
TEST(BlueFS, test_update_ino1_delta_after_replay) {
uint64_t size = 1048576LL * (2 * 1024 + 128);
TempBdev bdev{size};
ConfSaver conf(g_ceph_context->_conf);
conf.SetVal("bluefs_alloc_size", "4096");
conf.SetVal("bluefs_shared_alloc_size", "4096");
conf.SetVal("bluefs_compact_log_sync", "false");
conf.SetVal("bluefs_min_log_runway", "32768");
conf.SetVal("bluefs_max_log_runway", "65536");
conf.SetVal("bluefs_allocator", "stupid");
conf.ApplyChanges();
BlueFS fs(g_ceph_context);
ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, bdev.path, false, 1048576));
uuid_d fsid;
ASSERT_EQ(0, fs.mkfs(fsid, { BlueFS::BDEV_DB, false, false }));
ASSERT_EQ(0, fs.mount());
ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false }));
ASSERT_EQ(0, fs.mkdir("dir"));
char data[2000];
BlueFS::FileWriter *h;
ASSERT_EQ(0, fs.open_for_write("dir", "file", &h, false));
for (size_t i = 0; i < 100; i++) {
h->append(data, 2000);
fs.fsync(h);
}
fs.close_writer(h);
fs.umount(true); //do not compact on exit!
ASSERT_EQ(0, fs.mount());
ASSERT_EQ(0, fs.open_for_write("dir", "file2", &h, false));
for (size_t i = 0; i < 100; i++) {
h->append(data, 2000);
fs.fsync(h);
}
fs.close_writer(h);
fs.umount();
// remount and check log can replay safe?
ASSERT_EQ(0, fs.mount());
ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false }));
fs.umount();
}
int main(int argc, char **argv) {
auto args = argv_to_vec(argc, argv);
map<string,string> defaults = {