diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 72ffd3ff9ba..fbcb0bbbf77 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -5,6 +5,7 @@ #include #include +#include #include "bluefs_types.h" #include "blk/BlockDevice.h" @@ -206,15 +207,21 @@ public: // to use buffer_appender exclusively here (e.g., it's notion of // offset will remain accurate). void append(const char *buf, size_t len) { + uint64_t l0 = get_buffer_length(); + ceph_assert(l0 + len <= std::numeric_limits::max()); buffer_appender.append(buf, len); } // note: used internally only, for ino 1 or 0. void append(ceph::buffer::list& bl) { + uint64_t l0 = get_buffer_length(); + ceph_assert(l0 + bl.length() <= std::numeric_limits::max()); buffer.claim_append(bl); } void append_zero(size_t len) { + uint64_t l0 = get_buffer_length(); + ceph_assert(l0 + len <= std::numeric_limits::max()); buffer_appender.append_zero(len); } @@ -564,9 +571,25 @@ public: int r = _flush(h, force, l); ceph_assert(r == 0); } - void try_flush(FileWriter *h) { - if (h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size) { - flush(h, true); + + void append_try_flush(FileWriter *h, const char* buf, size_t len) { + size_t max_size = 1ull << 30; // cap to 1GB + while (len > 0) { + bool need_flush = true; + auto l0 = h->get_buffer_length(); + if (l0 < max_size) { + size_t l = std::min(len, max_size - l0); + h->append(buf, l); + buf += l; + len -= l; + need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; + } + if (need_flush) { + flush(h, true); + // make sure we've made any progress with flush hence the + // loop doesn't iterate forever + ceph_assert(h->get_buffer_length() < max_size); + } } } void flush_range(FileWriter *h, uint64_t offset, uint64_t length) { diff --git a/src/os/bluestore/BlueRocksEnv.cc b/src/os/bluestore/BlueRocksEnv.cc index 71232b3f65f..6327fb1e8a3 100644 --- a/src/os/bluestore/BlueRocksEnv.cc +++ b/src/os/bluestore/BlueRocksEnv.cc @@ -173,10 +173,7 @@ class BlueRocksWritableFile : public rocksdb::WritableFile { }*/ rocksdb::Status Append(const rocksdb::Slice& data) override { - h->append(data.data(), data.size()); - // Avoid calling many time Append() and then calling Flush(). - // Especially for buffer mode, flush much data will cause jitter. - fs->try_flush(h); + fs->append_try_flush(h, data.data(), data.size()); return rocksdb::Status::OK(); } diff --git a/src/test/objectstore/test_bluefs.cc b/src/test/objectstore/test_bluefs.cc index 4bbd2e6dd04..394a8e9862a 100644 --- a/src/test/objectstore/test_bluefs.cc +++ b/src/test/objectstore/test_bluefs.cc @@ -171,7 +171,7 @@ TEST(BlueFS, small_appends) { TEST(BlueFS, very_large_write) { // we'll write a ~5G file, so allocate more than that for the whole fs - uint64_t size = 1048576 * 1024 * 8ull; + uint64_t size = 1048576 * 1024 * 6ull; TempBdev bdev{size}; BlueFS fs(g_ceph_context); @@ -243,6 +243,59 @@ TEST(BlueFS, very_large_write) { g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old)); } +TEST(BlueFS, very_large_write2) { + // we'll write a ~5G file, so allocate more than that for the whole fs + uint64_t size_full = 1048576 * 1024 * 6ull; + uint64_t size = 1048576 * 1024 * 5ull; + TempBdev bdev{ size_full }; + BlueFS fs(g_ceph_context); + + bool old = g_ceph_context->_conf.get_val("bluefs_buffered_io"); + g_ceph_context->_conf.set_val("bluefs_buffered_io", "false"); + uint64_t total_written = 0; + + ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, bdev.path, false, 1048576)); + uuid_d fsid; + ASSERT_EQ(0, fs.mkfs(fsid, { BlueFS::BDEV_DB, false, false })); + ASSERT_EQ(0, fs.mount()); + ASSERT_EQ(0, fs.maybe_verify_layout({ BlueFS::BDEV_DB, false, false })); + + char fill_arr[1 << 20]; // 1M + for (size_t i = 0; i < sizeof(fill_arr); ++i) { + fill_arr[i] = (char)i; + } + std::unique_ptr buf; + buf.reset(new char[size]); + for (size_t i = 0; i < size; i += sizeof(fill_arr)) { + memcpy(buf.get() + i, fill_arr, sizeof(fill_arr)); + } + { + BlueFS::FileWriter* h; + ASSERT_EQ(0, fs.mkdir("dir")); + ASSERT_EQ(0, fs.open_for_write("dir", "bigfile", &h, false)); + fs.append_try_flush(h, buf.get(), size); + total_written = size; + fs.fsync(h); + fs.close_writer(h); + } + memset(buf.get(), 0, size); + { + BlueFS::FileReader* h; + ASSERT_EQ(0, fs.open_for_read("dir", "bigfile", &h)); + ASSERT_EQ(h->file->fnode.size, total_written); + auto l = h->file->fnode.size; + int64_t r = fs.read(h, 0, l, NULL, buf.get()); + ASSERT_EQ(r, l); + for (size_t i = 0; i < size; i += sizeof(fill_arr)) { + ceph_assert(memcmp(buf.get() + i, fill_arr, sizeof(fill_arr)) == 0); + } + delete h; + } + fs.umount(); + + g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old)); +} + #define ALLOC_SIZE 4096 void write_data(BlueFS &fs, uint64_t rationed_bytes)