mirror of
https://github.com/ceph/ceph
synced 2025-02-23 11:07:35 +00:00
Merge pull request #39320 from ifed01/wip-ifed-fix-huge-bluefs-write
os/bluestore: fix huge(>4GB) writes from RocksDB to BlueFS. Reviewed-by: Adam Kupczyk <akucpzyk@redhat.com>
This commit is contained in:
commit
5b793096c8
@ -5,6 +5,7 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <limits>
|
||||
|
||||
#include "bluefs_types.h"
|
||||
#include "blk/BlockDevice.h"
|
||||
@ -206,15 +207,21 @@ public:
|
||||
// to use buffer_appender exclusively here (e.g., it's notion of
|
||||
// offset will remain accurate).
|
||||
void append(const char *buf, size_t len) {
|
||||
uint64_t l0 = get_buffer_length();
|
||||
ceph_assert(l0 + len <= std::numeric_limits<unsigned>::max());
|
||||
buffer_appender.append(buf, len);
|
||||
}
|
||||
|
||||
// note: used internally only, for ino 1 or 0.
|
||||
void append(ceph::buffer::list& bl) {
|
||||
uint64_t l0 = get_buffer_length();
|
||||
ceph_assert(l0 + bl.length() <= std::numeric_limits<unsigned>::max());
|
||||
buffer.claim_append(bl);
|
||||
}
|
||||
|
||||
void append_zero(size_t len) {
|
||||
uint64_t l0 = get_buffer_length();
|
||||
ceph_assert(l0 + len <= std::numeric_limits<unsigned>::max());
|
||||
buffer_appender.append_zero(len);
|
||||
}
|
||||
|
||||
@ -564,9 +571,25 @@ public:
|
||||
int r = _flush(h, force, l);
|
||||
ceph_assert(r == 0);
|
||||
}
|
||||
void try_flush(FileWriter *h) {
|
||||
if (h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size) {
|
||||
flush(h, true);
|
||||
|
||||
void append_try_flush(FileWriter *h, const char* buf, size_t len) {
|
||||
size_t max_size = 1ull << 30; // cap to 1GB
|
||||
while (len > 0) {
|
||||
bool need_flush = true;
|
||||
auto l0 = h->get_buffer_length();
|
||||
if (l0 < max_size) {
|
||||
size_t l = std::min(len, max_size - l0);
|
||||
h->append(buf, l);
|
||||
buf += l;
|
||||
len -= l;
|
||||
need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size;
|
||||
}
|
||||
if (need_flush) {
|
||||
flush(h, true);
|
||||
// make sure we've made any progress with flush hence the
|
||||
// loop doesn't iterate forever
|
||||
ceph_assert(h->get_buffer_length() < max_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
void flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
|
||||
|
@ -173,10 +173,7 @@ class BlueRocksWritableFile : public rocksdb::WritableFile {
|
||||
}*/
|
||||
|
||||
rocksdb::Status Append(const rocksdb::Slice& data) override {
|
||||
h->append(data.data(), data.size());
|
||||
// Avoid calling many time Append() and then calling Flush().
|
||||
// Especially for buffer mode, flush much data will cause jitter.
|
||||
fs->try_flush(h);
|
||||
fs->append_try_flush(h, data.data(), data.size());
|
||||
return rocksdb::Status::OK();
|
||||
}
|
||||
|
||||
|
@ -171,7 +171,7 @@ TEST(BlueFS, small_appends) {
|
||||
|
||||
TEST(BlueFS, very_large_write) {
|
||||
// we'll write a ~5G file, so allocate more than that for the whole fs
|
||||
uint64_t size = 1048576 * 1024 * 8ull;
|
||||
uint64_t size = 1048576 * 1024 * 6ull;
|
||||
TempBdev bdev{size};
|
||||
BlueFS fs(g_ceph_context);
|
||||
|
||||
@ -243,6 +243,59 @@ TEST(BlueFS, very_large_write) {
|
||||
g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old));
|
||||
}
|
||||
|
||||
TEST(BlueFS, very_large_write2) {
|
||||
// we'll write a ~5G file, so allocate more than that for the whole fs
|
||||
uint64_t size_full = 1048576 * 1024 * 6ull;
|
||||
uint64_t size = 1048576 * 1024 * 5ull;
|
||||
TempBdev bdev{ size_full };
|
||||
BlueFS fs(g_ceph_context);
|
||||
|
||||
bool old = g_ceph_context->_conf.get_val<bool>("bluefs_buffered_io");
|
||||
g_ceph_context->_conf.set_val("bluefs_buffered_io", "false");
|
||||
uint64_t total_written = 0;
|
||||
|
||||
ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, bdev.path, false, 1048576));
|
||||
uuid_d fsid;
|
||||
ASSERT_EQ(0, fs.mkfs(fsid, { BlueFS::BDEV_DB, false, false }));
|
||||
ASSERT_EQ(0, fs.mount());
|
||||
ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false }));
|
||||
|
||||
char fill_arr[1 << 20]; // 1M
|
||||
for (size_t i = 0; i < sizeof(fill_arr); ++i) {
|
||||
fill_arr[i] = (char)i;
|
||||
}
|
||||
std::unique_ptr<char[]> buf;
|
||||
buf.reset(new char[size]);
|
||||
for (size_t i = 0; i < size; i += sizeof(fill_arr)) {
|
||||
memcpy(buf.get() + i, fill_arr, sizeof(fill_arr));
|
||||
}
|
||||
{
|
||||
BlueFS::FileWriter* h;
|
||||
ASSERT_EQ(0, fs.mkdir("dir"));
|
||||
ASSERT_EQ(0, fs.open_for_write("dir", "bigfile", &h, false));
|
||||
fs.append_try_flush(h, buf.get(), size);
|
||||
total_written = size;
|
||||
fs.fsync(h);
|
||||
fs.close_writer(h);
|
||||
}
|
||||
memset(buf.get(), 0, size);
|
||||
{
|
||||
BlueFS::FileReader* h;
|
||||
ASSERT_EQ(0, fs.open_for_read("dir", "bigfile", &h));
|
||||
ASSERT_EQ(h->file->fnode.size, total_written);
|
||||
auto l = h->file->fnode.size;
|
||||
int64_t r = fs.read(h, 0, l, NULL, buf.get());
|
||||
ASSERT_EQ(r, l);
|
||||
for (size_t i = 0; i < size; i += sizeof(fill_arr)) {
|
||||
ceph_assert(memcmp(buf.get() + i, fill_arr, sizeof(fill_arr)) == 0);
|
||||
}
|
||||
delete h;
|
||||
}
|
||||
fs.umount();
|
||||
|
||||
g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old));
|
||||
}
|
||||
|
||||
#define ALLOC_SIZE 4096
|
||||
|
||||
void write_data(BlueFS &fs, uint64_t rationed_bytes)
|
||||
|
Loading…
Reference in New Issue
Block a user