Merge pull request #37406 from MahatiC/wip-refactor-rwl

librbd/cache: Refactor common code for RWL and SSD

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2020-10-01 11:39:01 -04:00 committed by GitHub
commit 116283abac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 1130 additions and 952 deletions

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@
#include "librbd/cache/pwl/LogOperation.h"
#include "librbd/cache/pwl/Request.h"
#include "librbd/cache/pwl/LogMap.h"
#include "librbd/cache/pwl/Types.h"
#include <functional>
#include <list>
@ -29,10 +30,11 @@ namespace cache {
namespace pwl {
class SyncPointLogEntry;
class GenericWriteLogEntry;
class WriteLogEntry;
class GenericLogEntry;
class GenericWriteLogEntry;
class SyncPointLogEntry;
class WriteLogEntry;
struct WriteLogPmemEntry;
typedef std::list<std::shared_ptr<WriteLogEntry>> WriteLogEntries;
typedef std::list<std::shared_ptr<GenericLogEntry>> GenericLogEntries;
@ -65,24 +67,30 @@ public:
typedef io::Extents Extents;
AbstractWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
~AbstractWriteLog();
virtual ~AbstractWriteLog();
AbstractWriteLog(const AbstractWriteLog&) = delete;
AbstractWriteLog &operator=(const AbstractWriteLog&) = delete;
/// IO methods
void read(Extents&& image_extents, ceph::bufferlist *bl,
void read(
Extents&& image_extents, ceph::bufferlist *bl,
int fadvise_flags, Context *on_finish);
void write(Extents&& image_extents, ceph::bufferlist&& bl,
void write(
Extents&& image_extents, ceph::bufferlist&& bl,
int fadvise_flags,
Context *on_finish);
void discard(uint64_t offset, uint64_t length,
void discard(
uint64_t offset, uint64_t length,
uint32_t discard_granularity_bytes,
Context *on_finish);
void flush(io::FlushSource flush_source, Context *on_finish);
void writesame(uint64_t offset, uint64_t length,
void flush(
io::FlushSource flush_source, Context *on_finish);
void writesame(
uint64_t offset, uint64_t length,
ceph::bufferlist&& bl,
int fadvise_flags, Context *on_finish);
void compare_and_write(Extents&& image_extents,
void compare_and_write(
Extents&& image_extents,
ceph::bufferlist&& cmp_bl, ceph::bufferlist&& bl,
uint64_t *mismatch_offset,int fadvise_flags,
Context *on_finish);
@ -104,13 +112,13 @@ public:
CephContext * get_context();
void release_guarded_request(BlockGuardCell *cell);
void release_write_lanes(C_BlockIORequestT *req);
bool alloc_resources(C_BlockIORequestT *req);
template <typename V>
void flush_pmem_buffer(V& ops);
virtual bool alloc_resources(C_BlockIORequestT *req) = 0;
virtual void setup_schedule_append(
pwl::GenericLogOperationsVector &ops, bool do_early_flush) = 0;
void schedule_append(pwl::GenericLogOperationsVector &ops);
void schedule_append(pwl::GenericLogOperationSharedPtr op);
void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops);
void flush_new_sync_point(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
std::shared_ptr<pwl::SyncPoint> get_current_sync_point() {
return m_current_sync_point;
}
@ -134,22 +142,102 @@ public:
return m_free_log_entries;
}
void add_into_log_map(pwl::GenericWriteLogEntries &log_entries);
protected:
private:
typedef std::list<pwl::C_WriteRequest<This> *> C_WriteRequests;
typedef std::list<pwl::C_BlockIORequest<This> *> C_BlockIORequests;
std::atomic<bool> m_initialized = {false};
uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
pwl::WriteLogGuard m_write_log_guard;
/* Starts at 0 for a new write log. Incremented on every flush. */
uint64_t m_current_sync_gen = 0;
/* Starts at 0 on each sync gen increase. Incremented before applied
to an operation */
uint64_t m_last_op_sequence_num = 0;
bool m_persist_on_write_until_flush = true;
/* Debug counters for the places m_async_op_tracker is used */
std::atomic<int> m_async_append_ops = {0};
std::atomic<int> m_async_complete_ops = {0};
std::atomic<int> m_async_null_flush_finish = {0};
std::atomic<int> m_async_process_work = {0};
/* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
mutable ceph::mutex m_deferred_dispatch_lock;
/* Used in release/detain to make BlockGuard preserve submission order */
mutable ceph::mutex m_blockguard_lock;
/* Use m_blockguard_lock for the following 3 things */
bool m_barrier_in_progress = false;
BlockGuardCell *m_barrier_cell = nullptr;
bool m_wake_up_enabled = true;
Contexts m_flush_complete_contexts;
std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
bool m_persist_on_flush = false; /* If false, persist each write before completion */
int m_flush_ops_in_flight = 0;
int m_flush_bytes_in_flight = 0;
uint64_t m_lowest_flushing_sync_gen = 0;
/* Writes that have left the block guard, but are waiting for resources */
C_BlockIORequests m_deferred_ios;
/* Throttle writes concurrently allocating & replicating */
unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
/* Initialized from config, then set false during shutdown */
std::atomic<bool> m_periodic_stats_enabled = {false};
SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
Context *m_timer_ctx = nullptr;
ThreadPool m_thread_pool;
uint32_t m_discard_granularity_bytes;
BlockGuardCell* detain_guarded_request_helper(pwl::GuardedRequest &req);
BlockGuardCell* detain_guarded_request_barrier_helper(pwl::GuardedRequest &req);
void detain_guarded_request(C_BlockIORequestT *request,
pwl::GuardedRequestFunctionContext *guarded_ctx,
bool is_barrier);
void perf_start(const std::string name);
void perf_stop();
void log_perf();
void periodic_stats();
void arm_periodic_stats();
void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
void flush_dirty_entries(Context *on_finish);
bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
void init_flush_new_sync_point(pwl::DeferredContexts &later);
void new_sync_point(pwl::DeferredContexts &later);
pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
void internal_flush(bool invalidate, Context *on_finish);
protected:
librbd::cache::pwl::ImageCacheState<ImageCtxT>* m_cache_state = nullptr;
std::atomic<bool> m_initialized = {false};
std::atomic<bool> m_shutting_down = {false};
std::atomic<bool> m_invalidating = {false};
PMEMobjpool *m_log_pool = nullptr;
const char* m_pwl_pool_layout_name;
ImageCtxT &m_image_ctx;
@ -164,14 +252,11 @@ protected:
std::atomic<uint64_t> m_bytes_allocated = {0}; /* Total bytes allocated in write buffers */
uint64_t m_bytes_cached = 0; /* Total bytes used in write buffers */
uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */
uint64_t m_bytes_allocated_cap = 0;
utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */
std::atomic<bool> m_alloc_failed_since_retire = {false};
ImageWriteback<ImageCtxT> m_image_writeback;
pwl::WriteLogGuard m_write_log_guard;
/*
* When m_first_free_entry == m_first_valid_entry, the log is
* empty. There is always at least one free entry, which can't be
@ -180,23 +265,12 @@ protected:
uint64_t m_first_free_entry = 0; /* Entries from here to m_first_valid_entry-1 are free */
uint64_t m_first_valid_entry = 0; /* Entries from here to m_first_free_entry-1 are valid */
/* Starts at 0 for a new write log. Incremented on every flush. */
uint64_t m_current_sync_gen = 0;
/* Starts at 0 on each sync gen increase. Incremented before applied
to an operation */
uint64_t m_last_op_sequence_num = 0;
/* All writes bearing this and all prior sync gen numbers are flushed */
uint64_t m_flushed_sync_gen = 0;
bool m_persist_on_write_until_flush = true;
AsyncOpTracker m_async_op_tracker;
/* Debug counters for the places m_async_op_tracker is used */
std::atomic<int> m_async_flush_ops = {0};
std::atomic<int> m_async_append_ops = {0};
std::atomic<int> m_async_complete_ops = {0};
std::atomic<int> m_async_null_flush_finish = {0};
std::atomic<int> m_async_process_work = {0};
/* Acquire locks in order declared here */
@ -205,29 +279,19 @@ protected:
* bufs. Hold a write lock to prevent readers from being added (e.g. when
* removing log entrys from the map). No lock required to remove readers. */
mutable RWLock m_entry_reader_lock;
/* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */
mutable ceph::mutex m_deferred_dispatch_lock;
/* Hold m_log_append_lock while appending or retiring log entries. */
mutable ceph::mutex m_log_append_lock;
/* Used for most synchronization */
mutable ceph::mutex m_lock;
/* Used in release/detain to make BlockGuard preserve submission order */
mutable ceph::mutex m_blockguard_lock;
/* Use m_blockguard_lock for the following 3 things */
pwl::WriteLogGuard::BlockOperations m_awaiting_barrier;
bool m_barrier_in_progress = false;
BlockGuardCell *m_barrier_cell = nullptr;
bool m_wake_up_requested = false;
bool m_wake_up_scheduled = false;
bool m_wake_up_enabled = true;
bool m_appending = false;
bool m_dispatching_deferred_ops = false;
Contexts m_flush_complete_contexts;
pwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */
pwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */
@ -239,70 +303,53 @@ protected:
PerfCounters *m_perfcounter = nullptr;
std::shared_ptr<pwl::SyncPoint> m_current_sync_point = nullptr;
bool m_persist_on_flush = false; /* If false, persist each write before completion */
int m_flush_ops_in_flight = 0;
int m_flush_bytes_in_flight = 0;
uint64_t m_lowest_flushing_sync_gen = 0;
/* Writes that have left the block guard, but are waiting for resources */
C_BlockIORequests m_deferred_ios;
/* Throttle writes concurrently allocating & replicating */
unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES;
unsigned int m_unpublished_reserves = 0;
/* Initialized from config, then set false during shutdown */
std::atomic<bool> m_periodic_stats_enabled = {false};
SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */
mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */
Context *m_timer_ctx = nullptr;
ThreadPool m_thread_pool;
ContextWQ m_work_queue;
uint32_t m_discard_granularity_bytes;
void perf_start(const std::string name);
void perf_stop();
void log_perf();
void periodic_stats();
void arm_periodic_stats();
void pwl_init(Context *on_finish, pwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
void load_existing_entries(pwl::DeferredContexts &later);
void wake_up();
void process_work();
void flush_dirty_entries(Context *on_finish);
bool can_flush_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
Context *construct_flush_entry_ctx(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
void persist_last_flushed_sync_gen();
bool handle_flushed_sync_point(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
void sync_point_writer_flushed(std::shared_ptr<pwl::SyncPointLogEntry> log_entry);
void update_entries(
std::shared_ptr<pwl::GenericLogEntry> log_entry,
pwl::WriteLogPmemEntry *pmem_entry, std::map<uint64_t, bool> &missing_sync_points,
std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
int entry_index);
void update_sync_points(
std::map<uint64_t, bool> &missing_sync_points,
std::map<uint64_t, std::shared_ptr<pwl::SyncPointLogEntry>> &sync_point_entries,
pwl::DeferredContexts &later);
Context *construct_flush_entry(
const std::shared_ptr<pwl::GenericLogEntry> log_entry, bool invalidating);
void process_writeback_dirty_entries();
bool can_retire_entry(const std::shared_ptr<pwl::GenericLogEntry> log_entry);
bool retire_entries(const unsigned long int frees_per_tx);
void init_flush_new_sync_point(pwl::DeferredContexts &later);
void new_sync_point(pwl::DeferredContexts &later);
pwl::C_FlushRequest<AbstractWriteLog<ImageCtxT>>* make_flush_req(Context *on_finish);
void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later);
void dispatch_deferred_writes(void);
void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req);
void append_scheduled_ops(void);
void enlist_op_appender();
void schedule_append(pwl::GenericLogOperations &ops);
void flush_then_append_scheduled_ops(void);
void enlist_op_flusher();
void alloc_op_log_entries(pwl::GenericLogOperations &ops);
void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
int append_op_log_entries(pwl::GenericLogOperations &ops);
void complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r);
void internal_flush(bool invalidate, Context *on_finish);
bool check_allocation(
C_BlockIORequestT *req,
uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
uint64_t &num_lanes, uint64_t &num_log_entries,
uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap);
void append_scheduled(
pwl::GenericLogOperations &ops, bool &ops_remain, bool &appending, bool isRWL=false);
virtual void process_work() = 0;
virtual void append_scheduled_ops(void) = 0;
virtual void schedule_append_ops(pwl::GenericLogOperations &ops) = 0;
virtual void remove_pool_file() = 0;
virtual void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) = 0;
virtual void write_data_to_buffer(
std::shared_ptr<pwl::WriteLogEntry> ws_entry, pwl::WriteLogPmemEntry *pmem_entry) {}
virtual void get_pool_name(const std::string log_poolset_name) {}
virtual void alloc_op_log_entries(pwl::GenericLogOperations &ops) {}
virtual bool retire_entries(const unsigned long int frees_per_tx) {return false;}
virtual void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) {}
virtual void persist_last_flushed_sync_gen() {}
virtual void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) {}
virtual Context *construct_flush_entry_ctx(
const std::shared_ptr<pwl::GenericLogEntry> log_entry) {return nullptr;}
};
} // namespace pwl

View File

@ -1,6 +1,7 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// // vim: ts=8 sw=2 smarttab
// vim: ts=8 sw=2 smarttab
#include <libpmemobj.h>
#include "ReplicatedWriteLog.h"
#include "include/buffer.h"
#include "include/Context.h"
@ -13,8 +14,10 @@
#include "common/Timer.h"
#include "common/perf_counters.h"
#include "librbd/ImageCtx.h"
#include "librbd/asio/ContextWQ.h"
#include "librbd/cache/pwl/ImageCacheState.h"
#include "librbd/cache/pwl/LogEntry.h"
#include "librbd/cache/pwl/Types.h"
#include <map>
#include <vector>
@ -27,14 +30,881 @@
namespace librbd {
namespace cache {
namespace pwl {
using namespace librbd::cache::pwl;
const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION;
template <typename I>
ReplicatedWriteLog<I>::ReplicatedWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
ReplicatedWriteLog<I>::ReplicatedWriteLog(
I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state)
: AbstractWriteLog<I>(image_ctx, cache_state)
{
}
template <typename I>
ReplicatedWriteLog<I>::~ReplicatedWriteLog() {
m_log_pool = nullptr;
}
/*
* Allocate the (already reserved) write log entries for a set of operations.
*
* Locking:
* Acquires lock
*/
template <typename I>
void ReplicatedWriteLog<I>::alloc_op_log_entries(GenericLogOperations &ops)
{
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
/* Allocate the (already reserved) log entries */
std::lock_guard locker(m_lock);
for (auto &operation : ops) {
uint32_t entry_index = this->m_first_free_entry;
this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries;
auto &log_entry = operation->get_log_entry();
log_entry->log_entry_index = entry_index;
log_entry->ram_entry.entry_index = entry_index;
log_entry->pmem_entry = &pmem_log_entries[entry_index];
log_entry->ram_entry.entry_valid = 1;
m_log_entries.push_back(log_entry);
ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl;
}
}
/*
* Write and persist the (already allocated) write log entries and
* data buffer allocations for a set of ops. The data buffer for each
* of these must already have been persisted to its reserved area.
*/
template <typename I>
int ReplicatedWriteLog<I>::append_op_log_entries(GenericLogOperations &ops)
{
CephContext *cct = m_image_ctx.cct;
GenericLogOperationsVector entries_to_flush;
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
int ret = 0;
ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock));
if (ops.empty()) {
return 0;
}
entries_to_flush.reserve(OPS_APPENDED_TOGETHER);
/* Write log entries to ring and persist */
utime_t now = ceph_clock_now();
for (auto &operation : ops) {
if (!entries_to_flush.empty()) {
/* Flush these and reset the list if the current entry wraps to the
* tail of the ring */
if (entries_to_flush.back()->get_log_entry()->log_entry_index >
operation->get_log_entry()->log_entry_index) {
ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at "
<< "operation=[" << *operation << "]" << dendl;
flush_op_log_entries(entries_to_flush);
entries_to_flush.clear();
now = ceph_clock_now();
}
}
ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index="
<< operation->get_log_entry()->log_entry_index << " "
<< "from " << &operation->get_log_entry()->ram_entry << " "
<< "to " << operation->get_log_entry()->pmem_entry << " "
<< "operation=[" << *operation << "]" << dendl;
ldout(m_image_ctx.cct, 05) << "APPENDING: index="
<< operation->get_log_entry()->log_entry_index << " "
<< "operation=[" << *operation << "]" << dendl;
operation->log_append_time = now;
*operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry;
ldout(m_image_ctx.cct, 20) << "APPENDING: index="
<< operation->get_log_entry()->log_entry_index << " "
<< "pmem_entry=[" << *operation->get_log_entry()->pmem_entry
<< "]" << dendl;
entries_to_flush.push_back(operation);
}
flush_op_log_entries(entries_to_flush);
/* Drain once for all */
pmemobj_drain(m_log_pool);
/*
* Atomically advance the log head pointer and publish the
* allocations for all the data buffers they refer to.
*/
utime_t tx_start = ceph_clock_now();
TX_BEGIN(m_log_pool) {
D_RW(pool_root)->first_free_entry = this->m_first_free_entry;
for (auto &operation : ops) {
if (operation->reserved_allocated()) {
auto write_op = (std::shared_ptr<WriteLogOperation>&) operation;
pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1);
} else {
ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
}
}
} TX_ONCOMMIT {
} TX_ONABORT {
lderr(cct) << "failed to commit " << ops.size()
<< " log entries (" << this->m_log_pool_name << ")" << dendl;
ceph_assert(false);
ret = -EIO;
} TX_FINALLY {
} TX_END;
utime_t tx_end = ceph_clock_now();
m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start);
m_perfcounter->hinc(
l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size());
for (auto &operation : ops) {
operation->log_append_comp_time = tx_end;
}
return ret;
}
/*
* Flush the persistent write log entries set of ops. The entries must
* be contiguous in persistent memory.
*/
template <typename I>
void ReplicatedWriteLog<I>::flush_op_log_entries(GenericLogOperationsVector &ops)
{
if (ops.empty()) {
return;
}
if (ops.size() > 1) {
ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry);
}
ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " "
<< "start address="
<< ops.front()->get_log_entry()->pmem_entry << " "
<< "bytes="
<< ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))
<< dendl;
pmemobj_flush(m_log_pool,
ops.front()->get_log_entry()->pmem_entry,
ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)));
}
template <typename I>
void ReplicatedWriteLog<I>::get_pool_name(const std::string log_poolset_name) {
CephContext *cct = m_image_ctx.cct;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
if (access(log_poolset_name.c_str(), F_OK) == 0) {
this->m_log_pool_name = log_poolset_name;
this->m_log_is_poolset = true;
} else {
ldout(cct, 5) << "Poolset file " << log_poolset_name
<< " not present (or can't open). Using unreplicated pool" << dendl;
}
}
template <typename I>
void ReplicatedWriteLog<I>::remove_pool_file() {
if (m_log_pool) {
ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl;
pmemobj_close(m_log_pool);
}
if (m_cache_state->clean) {
if (this->m_log_is_poolset) {
ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
} else {
ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl;
if (remove(this->m_log_pool_name.c_str()) != 0) {
lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": "
<< pmemobj_errormsg() << dendl;
} else {
m_cache_state->clean = true;
m_cache_state->empty = true;
m_cache_state->present = false;
}
}
} else {
if (this->m_log_is_poolset) {
ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl;
} else {
ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl;
}
}
}
template <typename I>
void ReplicatedWriteLog<I>::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
TOID(struct WriteLogPoolRoot) pool_root;
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
if (access(this->m_log_pool_name.c_str(), F_OK) != 0) {
if ((m_log_pool =
pmemobj_create(this->m_log_pool_name.c_str(),
this->m_pwl_pool_layout_name,
this->m_log_pool_config_size,
(S_IWUSR | S_IRUSR))) == NULL) {
lderr(cct) << "failed to create pool (" << this->m_log_pool_name << ")"
<< pmemobj_errormsg() << dendl;
m_cache_state->present = false;
m_cache_state->clean = true;
m_cache_state->empty = true;
/* TODO: filter/replace errnos that are meaningless to the caller */
on_finish->complete(-errno);
return;
}
m_cache_state->present = true;
m_cache_state->clean = true;
m_cache_state->empty = true;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
/* new pool, calculate and store metadata */
size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry);
uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size);
if (num_small_writes > MAX_LOG_ENTRIES) {
num_small_writes = MAX_LOG_ENTRIES;
}
if (num_small_writes <= 2) {
lderr(cct) << "num_small_writes needs to > 2" << dendl;
on_finish->complete(-EINVAL);
return;
}
this->m_log_pool_actual_size = this->m_log_pool_config_size;
this->m_bytes_allocated_cap = effective_pool_size;
/* Log ring empty */
m_first_free_entry = 0;
m_first_valid_entry = 0;
TX_BEGIN(m_log_pool) {
TX_ADD(pool_root);
D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION;
D_RW(pool_root)->log_entries =
TX_ZALLOC(struct WriteLogPmemEntry,
sizeof(struct WriteLogPmemEntry) * num_small_writes);
D_RW(pool_root)->pool_size = this->m_log_pool_actual_size;
D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen;
D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE;
D_RW(pool_root)->num_log_entries = num_small_writes;
D_RW(pool_root)->first_free_entry = m_first_free_entry;
D_RW(pool_root)->first_valid_entry = m_first_valid_entry;
} TX_ONCOMMIT {
this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free
} TX_ONABORT {
this->m_total_log_entries = 0;
this->m_free_log_entries = 0;
lderr(cct) << "failed to initialize pool (" << this->m_log_pool_name << ")" << dendl;
on_finish->complete(-pmemobj_tx_errno());
return;
} TX_FINALLY {
} TX_END;
} else {
m_cache_state->present = true;
/* Open existing pool */
if ((m_log_pool =
pmemobj_open(this->m_log_pool_name.c_str(),
this->m_pwl_pool_layout_name)) == NULL) {
lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): "
<< pmemobj_errormsg() << dendl;
on_finish->complete(-errno);
return;
}
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
// TODO: will handle upgrading version in the future
lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
<< " expected " << RWL_POOL_VERSION << dendl;
on_finish->complete(-EINVAL);
return;
}
if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
<< " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
on_finish->complete(-EINVAL);
return;
}
this->m_log_pool_actual_size = D_RO(pool_root)->pool_size;
this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
this->m_total_log_entries = D_RO(pool_root)->num_log_entries;
m_first_free_entry = D_RO(pool_root)->first_free_entry;
m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
if (m_first_free_entry < m_first_valid_entry) {
/* Valid entries wrap around the end of the ring, so first_free is lower
* than first_valid. If first_valid was == first_free+1, the entry at
* first_free would be empty. The last entry is never used, so in
* that case there would be zero free log entries. */
this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
} else {
/* first_valid is <= first_free. If they are == we have zero valid log
* entries, and n-1 free log entries */
this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
}
size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE);
this->m_bytes_allocated_cap = effective_pool_size;
load_existing_entries(later);
m_cache_state->clean = this->m_dirty_log_entries.empty();
m_cache_state->empty = m_log_entries.empty();
}
}
/*
* Loads the log entries from an existing log.
*
* Creates the in-memory structures to represent the state of the
* re-opened log.
*
* Finds the last appended sync point, and any sync points referred to
* in log entries, but missing from the log. These missing sync points
* are created and scheduled for append. Some rudimentary consistency
* checking is done.
*
* Rebuilds the m_blocks_to_log_entries map, to make log entries
* readable.
*
* Places all writes on the dirty entries list, which causes them all
* to be flushed.
*
*/
template <typename I>
void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
uint64_t entry_index = m_first_valid_entry;
/* The map below allows us to find sync point log entries by sync
* gen number, which is necessary so write entries can be linked to
* their sync points. */
std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
/* The map below tracks sync points referred to in writes but not
* appearing in the sync_point_entries map. We'll use this to
* determine which sync points are missing and need to be
* created. */
std::map<uint64_t, bool> missing_sync_points;
/*
* Read the existing log entries. Construct an in-memory log entry
* object of the appropriate type for each. Add these to the global
* log entries list.
*
* Write entries will not link to their sync points yet. We'll do
* that in the next pass. Here we'll accumulate a map of sync point
* gen numbers that are referred to in writes but do not appearing in
* the log.
*/
while (entry_index != m_first_free_entry) {
WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
std::shared_ptr<GenericLogEntry> log_entry = nullptr;
ceph_assert(pmem_entry->entry_index == entry_index);
this->update_entries(log_entry, pmem_entry, missing_sync_points,
sync_point_entries, entry_index);
log_entry->ram_entry = *pmem_entry;
log_entry->pmem_entry = pmem_entry;
log_entry->log_entry_index = entry_index;
log_entry->completed = true;
m_log_entries.push_back(log_entry);
entry_index = (entry_index + 1) % this->m_total_log_entries;
}
this->update_sync_points(missing_sync_points, sync_point_entries, later);
}
template <typename I>
void ReplicatedWriteLog<I>::write_data_to_buffer(std::shared_ptr<WriteLogEntry> ws_entry,
WriteLogPmemEntry *pmem_entry) {
ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
}
/**
* Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries
* that are eligible to be retired. Returns true if anything was
* retired.
*/
template <typename I>
bool ReplicatedWriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
CephContext *cct = m_image_ctx.cct;
GenericLogEntriesVector retiring_entries;
uint32_t initial_first_valid_entry;
uint32_t first_valid_entry;
std::lock_guard retire_locker(this->m_log_retire_lock);
ldout(cct, 20) << "Look for entries to retire" << dendl;
{
/* Entry readers can't be added while we hold m_entry_reader_lock */
RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock);
std::lock_guard locker(m_lock);
initial_first_valid_entry = this->m_first_valid_entry;
first_valid_entry = this->m_first_valid_entry;
auto entry = m_log_entries.front();
while (!m_log_entries.empty() &&
retiring_entries.size() < frees_per_tx &&
this->can_retire_entry(entry)) {
if (entry->log_entry_index != first_valid_entry) {
lderr(cct) << "Retiring entry index (" << entry->log_entry_index
<< ") and first valid log entry index (" << first_valid_entry
<< ") must be ==." << dendl;
}
ceph_assert(entry->log_entry_index == first_valid_entry);
first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries;
m_log_entries.pop_front();
retiring_entries.push_back(entry);
/* Remove entry from map so there will be no more readers */
if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) {
auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(entry);
if (gen_write_entry) {
this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry);
}
}
entry = m_log_entries.front();
}
}
if (retiring_entries.size()) {
ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl;
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
utime_t tx_start;
utime_t tx_end;
/* Advance first valid entry and release buffers */
{
uint64_t flushed_sync_gen;
std::lock_guard append_locker(this->m_log_append_lock);
{
std::lock_guard locker(m_lock);
flushed_sync_gen = this->m_flushed_sync_gen;
}
tx_start = ceph_clock_now();
TX_BEGIN(m_log_pool) {
if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from "
<< D_RO(pool_root)->flushed_sync_gen << " to "
<< flushed_sync_gen << dendl;
D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
}
D_RW(pool_root)->first_valid_entry = first_valid_entry;
for (auto &entry: retiring_entries) {
if (entry->write_bytes()) {
ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo
<< "." << entry->ram_entry.write_data.oid.off << dendl;
TX_FREE(entry->ram_entry.write_data);
} else {
ldout(cct, 20) << "Retiring non-write: " << *entry << dendl;
}
}
} TX_ONCOMMIT {
} TX_ONABORT {
lderr(cct) << "failed to commit free of" << retiring_entries.size()
<< " log entries (" << this->m_log_pool_name << ")" << dendl;
ceph_assert(false);
} TX_FINALLY {
} TX_END;
tx_end = ceph_clock_now();
}
m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start);
m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(),
retiring_entries.size());
/* Update runtime copy of first_valid, and free entries counts */
{
std::lock_guard locker(m_lock);
ceph_assert(this->m_first_valid_entry == initial_first_valid_entry);
this->m_first_valid_entry = first_valid_entry;
this->m_free_log_entries += retiring_entries.size();
for (auto &entry: retiring_entries) {
if (entry->write_bytes()) {
ceph_assert(this->m_bytes_cached >= entry->write_bytes());
this->m_bytes_cached -= entry->write_bytes();
uint64_t entry_allocation_size = entry->write_bytes();
if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) {
entry_allocation_size = MIN_WRITE_ALLOC_SIZE;
}
ceph_assert(this->m_bytes_allocated >= entry_allocation_size);
this->m_bytes_allocated -= entry_allocation_size;
}
}
this->m_alloc_failed_since_retire = false;
this->wake_up();
}
} else {
ldout(cct, 20) << "Nothing to retire" << dendl;
return false;
}
return true;
}
template <typename I>
Context* ReplicatedWriteLog<I>::construct_flush_entry_ctx(
std::shared_ptr<GenericLogEntry> log_entry) {
bool invalidating = this->m_invalidating; // snapshot so we behave consistently
Context *ctx = this->construct_flush_entry(log_entry, invalidating);
if (invalidating) {
return ctx;
}
return new LambdaContext(
[this, log_entry, ctx](int r) {
m_image_ctx.op_work_queue->queue(new LambdaContext(
[this, log_entry, ctx](int r) {
ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
<< " " << *log_entry << dendl;
log_entry->writeback(this->m_image_writeback, ctx);
}), 0);
});
}
const unsigned long int ops_flushed_together = 4;
/*
* Performs the pmem buffer flush on all scheduled ops, then schedules
* the log event append operation for all of them.
*/
template <typename I>
void ReplicatedWriteLog<I>::flush_then_append_scheduled_ops(void)
{
GenericLogOperations ops;
bool ops_remain = false;
ldout(m_image_ctx.cct, 20) << dendl;
do {
{
ops.clear();
std::lock_guard locker(m_lock);
if (m_ops_to_flush.size()) {
auto last_in_batch = m_ops_to_flush.begin();
unsigned int ops_to_flush = m_ops_to_flush.size();
if (ops_to_flush > ops_flushed_together) {
ops_to_flush = ops_flushed_together;
}
ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl;
std::advance(last_in_batch, ops_to_flush);
ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch);
ops_remain = !m_ops_to_flush.empty();
ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", "
<< m_ops_to_flush.size() << " remain" << dendl;
} else {
ops_remain = false;
}
}
if (ops_remain) {
enlist_op_flusher();
}
/* Ops subsequently scheduled for flush may finish before these,
* which is fine. We're unconcerned with completion order until we
* get to the log message append step. */
if (ops.size()) {
flush_pmem_buffer(ops);
schedule_append_ops(ops);
}
} while (ops_remain);
append_scheduled_ops();
}
/*
* Performs the log event append operation for all of the scheduled
* events.
*/
template <typename I>
void ReplicatedWriteLog<I>::append_scheduled_ops(void) {
GenericLogOperations ops;
int append_result = 0;
bool ops_remain = false;
bool appending = false; /* true if we set m_appending */
ldout(m_image_ctx.cct, 20) << dendl;
do {
ops.clear();
this->append_scheduled(ops, ops_remain, appending, true);
if (ops.size()) {
std::lock_guard locker(this->m_log_append_lock);
alloc_op_log_entries(ops);
append_result = append_op_log_entries(ops);
}
int num_ops = ops.size();
if (num_ops) {
/* New entries may be flushable. Completion will wake up flusher. */
this->complete_op_log_entries(std::move(ops), append_result);
}
} while (ops_remain);
}
template <typename I>
void ReplicatedWriteLog<I>::enlist_op_flusher()
{
this->m_async_flush_ops++;
this->m_async_op_tracker.start_op();
Context *flush_ctx = new LambdaContext([this](int r) {
flush_then_append_scheduled_ops();
this->m_async_flush_ops--;
this->m_async_op_tracker.finish_op();
});
this->m_work_queue.queue(flush_ctx);
}
template <typename I>
void ReplicatedWriteLog<I>::setup_schedule_append(
pwl::GenericLogOperationsVector &ops, bool do_early_flush) {
if (do_early_flush) {
/* This caller is waiting for persist, so we'll use their thread to
* expedite it */
flush_pmem_buffer(ops);
this->schedule_append(ops);
} else {
/* This is probably not still the caller's thread, so do the payload
* flushing/replicating later. */
schedule_flush_and_append(ops);
}
}
/*
* Takes custody of ops. They'll all get their log entries appended,
* and have their on_write_persist contexts completed once they and
* all prior log entries are persisted everywhere.
*/
template <typename I>
void ReplicatedWriteLog<I>::schedule_append_ops(GenericLogOperations &ops)
{
bool need_finisher;
GenericLogOperationsVector appending;
std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending));
{
std::lock_guard locker(m_lock);
need_finisher = this->m_ops_to_append.empty() && !this->m_appending;
this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops);
}
if (need_finisher) {
this->enlist_op_appender();
}
for (auto &op : appending) {
op->appending();
}
}
/*
* Takes custody of ops. They'll all get their pmem blocks flushed,
* then get their log entries appended.
*/
template <typename I>
void ReplicatedWriteLog<I>::schedule_flush_and_append(GenericLogOperationsVector &ops)
{
GenericLogOperations to_flush(ops.begin(), ops.end());
bool need_finisher;
ldout(m_image_ctx.cct, 20) << dendl;
{
std::lock_guard locker(m_lock);
need_finisher = m_ops_to_flush.empty();
m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush);
}
if (need_finisher) {
enlist_op_flusher();
}
}
template <typename I>
void ReplicatedWriteLog<I>::process_work() {
CephContext *cct = m_image_ctx.cct;
int max_iterations = 4;
bool wake_up_requested = false;
uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER;
uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER;
uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER;
uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER;
uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER;
uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER;
ldout(cct, 20) << dendl;
do {
{
std::lock_guard locker(m_lock);
this->m_wake_up_requested = false;
}
if (this->m_alloc_failed_since_retire || this->m_invalidating ||
this->m_bytes_allocated > high_water_bytes ||
(m_log_entries.size() > high_water_entries)) {
int retired = 0;
utime_t started = ceph_clock_now();
ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire
<< ", allocated > high_water="
<< (this->m_bytes_allocated > high_water_bytes)
<< ", allocated_entries > high_water="
<< (m_log_entries.size() > high_water_entries)
<< dendl;
while (this->m_alloc_failed_since_retire || this->m_invalidating ||
(this->m_bytes_allocated > high_water_bytes) ||
(m_log_entries.size() > high_water_entries) ||
(((this->m_bytes_allocated > low_water_bytes) ||
(m_log_entries.size() > low_water_entries)) &&
(utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) {
if (!retire_entries((this->m_shutting_down || this->m_invalidating ||
(this->m_bytes_allocated > aggressive_high_water_bytes) ||
(m_log_entries.size() > aggressive_high_water_entries))
? MAX_ALLOC_PER_TRANSACTION
: MAX_FREE_PER_TRANSACTION)) {
break;
}
retired++;
this->dispatch_deferred_writes();
this->process_writeback_dirty_entries();
}
ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl;
}
this->dispatch_deferred_writes();
this->process_writeback_dirty_entries();
{
std::lock_guard locker(m_lock);
wake_up_requested = this->m_wake_up_requested;
}
} while (wake_up_requested && --max_iterations > 0);
{
std::lock_guard locker(m_lock);
this->m_wake_up_scheduled = false;
/* Reschedule if it's still requested */
if (this->m_wake_up_requested) {
this->wake_up();
}
}
}
/*
* Flush the pmem regions for the data blocks of a set of operations
*
* V is expected to be GenericLogOperations<I>, or GenericLogOperationsVector<I>
*/
template <typename I>
template <typename V>
void ReplicatedWriteLog<I>::flush_pmem_buffer(V& ops)
{
for (auto &operation : ops) {
operation->flush_pmem_buf_to_cache(m_log_pool);
}
/* Drain once for all */
pmemobj_drain(m_log_pool);
utime_t now = ceph_clock_now();
for (auto &operation : ops) {
if (operation->reserved_allocated()) {
operation->buf_persist_comp_time = now;
} else {
ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl;
}
}
}
/**
* Update/persist the last flushed sync point in the log
*/
template <typename I>
void ReplicatedWriteLog<I>::persist_last_flushed_sync_gen()
{
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
uint64_t flushed_sync_gen;
std::lock_guard append_locker(this->m_log_append_lock);
{
std::lock_guard locker(m_lock);
flushed_sync_gen = this->m_flushed_sync_gen;
}
if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) {
ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from "
<< D_RO(pool_root)->flushed_sync_gen << " to "
<< flushed_sync_gen << dendl;
TX_BEGIN(m_log_pool) {
D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen;
} TX_ONCOMMIT {
} TX_ONABORT {
lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl;
ceph_assert(false);
} TX_FINALLY {
} TX_END;
}
}
template <typename I>
void ReplicatedWriteLog<I>::reserve_pmem(C_BlockIORequestT *req,
bool &alloc_succeeds, bool &no_space) {
std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
for (auto &buffer : buffers) {
utime_t before_reserve = ceph_clock_now();
buffer.buffer_oid = pmemobj_reserve(m_log_pool,
&buffer.buffer_alloc_action,
buffer.allocation_size,
0 /* Object type */);
buffer.allocation_lat = ceph_clock_now() - before_reserve;
if (TOID_IS_NULL(buffer.buffer_oid)) {
if (!req->has_io_waited_for_buffers()) {
req->set_io_waited_for_entries(true);
}
ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: "
<< pmemobj_errormsg() << ". "
<< *req << dendl;
alloc_succeeds = false;
no_space = true; /* Entries need to be retired */
break;
} else {
buffer.allocated = true;
}
ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo
<< "." << buffer.buffer_oid.oid.off
<< ", size=" << buffer.allocation_size << dendl;
}
}
template <typename I>
bool ReplicatedWriteLog<I>::alloc_resources(C_BlockIORequestT *req) {
bool alloc_succeeds = true;
uint64_t bytes_allocated = 0;
uint64_t bytes_cached = 0;
uint64_t bytes_dirtied = 0;
uint64_t num_lanes = 0;
uint64_t num_unpublished_reserves = 0;
uint64_t num_log_entries = 0;
ldout(m_image_ctx.cct, 20) << dendl;
// Setup buffer, and get all the number of required resources
req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated,
num_lanes, num_log_entries, num_unpublished_reserves);
alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, bytes_allocated,
num_lanes, num_log_entries, num_unpublished_reserves,
this->m_bytes_allocated_cap);
std::vector<WriteBufferAllocation>& buffers = req->get_resources_buffers();
if (!alloc_succeeds) {
/* On alloc failure, free any buffers we did allocate */
for (auto &buffer : buffers) {
if (buffer.allocated) {
pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1);
}
}
}
req->set_allocated(alloc_succeeds);
return alloc_succeeds;
}
} // namespace pwl
} // namespace cache
} // namespace librbd

View File

@ -33,10 +33,8 @@ namespace pwl {
template <typename ImageCtxT>
class ReplicatedWriteLog : public AbstractWriteLog<ImageCtxT> {
public:
typedef io::Extent Extent;
typedef io::Extents Extents;
ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
ReplicatedWriteLog(
ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState<ImageCtxT>* cache_state);
~ReplicatedWriteLog();
ReplicatedWriteLog(const ReplicatedWriteLog&) = delete;
ReplicatedWriteLog &operator=(const ReplicatedWriteLog&) = delete;
@ -50,6 +48,45 @@ private:
using C_WriteSameRequestT = pwl::C_WriteSameRequest<This>;
using C_CompAndWriteRequestT = pwl::C_CompAndWriteRequest<This>;
PMEMobjpool *m_log_pool = nullptr;
void remove_pool_file();
void load_existing_entries(pwl::DeferredContexts &later);
void alloc_op_log_entries(pwl::GenericLogOperations &ops);
int append_op_log_entries(pwl::GenericLogOperations &ops);
void flush_then_append_scheduled_ops(void);
void enlist_op_flusher();
void flush_op_log_entries(pwl::GenericLogOperationsVector &ops);
template <typename V>
void flush_pmem_buffer(V& ops);
protected:
using AbstractWriteLog<ImageCtxT>::m_lock;
using AbstractWriteLog<ImageCtxT>::m_log_entries;
using AbstractWriteLog<ImageCtxT>::m_image_ctx;
using AbstractWriteLog<ImageCtxT>::m_perfcounter;
using AbstractWriteLog<ImageCtxT>::m_ops_to_flush;
using AbstractWriteLog<ImageCtxT>::m_cache_state;
using AbstractWriteLog<ImageCtxT>::m_first_free_entry;
using AbstractWriteLog<ImageCtxT>::m_first_valid_entry;
void process_work() override;
void schedule_append_ops(pwl::GenericLogOperations &ops) override;
void append_scheduled_ops(void) override;
void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) override;
bool retire_entries(const unsigned long int frees_per_tx) override;
void persist_last_flushed_sync_gen() override;
bool alloc_resources(C_BlockIORequestT *req) override;
void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) override;
void setup_schedule_append(
pwl::GenericLogOperationsVector &ops, bool do_early_flush) override;
Context *construct_flush_entry_ctx(
const std::shared_ptr<pwl::GenericLogEntry> log_entry) override;
void get_pool_name(const std::string log_poolset_name) override;
void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override;
void write_data_to_buffer(
std::shared_ptr<pwl::WriteLogEntry> ws_entry,
pwl::WriteLogPmemEntry *pmem_entry) override;
};
} // namespace pwl

View File

@ -275,16 +275,7 @@ bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_poi
template <typename T>
void C_WriteRequest<T>::schedule_append() {
ceph_assert(++m_appended == 1);
if (m_do_early_flush) {
/* This caller is waiting for persist, so we'll use their thread to
* expedite it */
pwl.flush_pmem_buffer(this->op_set->operations);
pwl.schedule_append(this->op_set->operations);
} else {
/* This is probably not still the caller's thread, so do the payload
* flushing/replicating later. */
pwl.schedule_flush_and_append(this->op_set->operations);
}
pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush);
}
/**