From b485bf83182c469a8877d655d3f95d1f53f75ab4 Mon Sep 17 00:00:00 2001 From: Mahati Chamarthy Date: Wed, 23 Sep 2020 15:02:17 +0530 Subject: [PATCH] librbd/cache: Refactor common code for RWL and SSD ... from AbstractWriteLog class Signed-off-by: Lisa Li Signed-off-by: Mahati Chamarthy Signed-off-by: Changcheng Liu --- src/librbd/cache/pwl/AbstractWriteLog.cc | 915 ++------------------- src/librbd/cache/pwl/AbstractWriteLog.h | 237 +++--- src/librbd/cache/pwl/ReplicatedWriteLog.cc | 874 +++++++++++++++++++- src/librbd/cache/pwl/ReplicatedWriteLog.h | 45 +- src/librbd/cache/pwl/Request.cc | 11 +- 5 files changed, 1130 insertions(+), 952 deletions(-) diff --git a/src/librbd/cache/pwl/AbstractWriteLog.cc b/src/librbd/cache/pwl/AbstractWriteLog.cc index 8bb111ed5c5..8cbdc6fce6c 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.cc +++ b/src/librbd/cache/pwl/AbstractWriteLog.cc @@ -1,7 +1,6 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -#include #include "AbstractWriteLog.h" #include "include/buffer.h" #include "include/Context.h" @@ -37,30 +36,28 @@ using namespace librbd::cache::pwl; typedef AbstractWriteLog::Extent Extent; typedef AbstractWriteLog::Extents Extents; -const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION; - template AbstractWriteLog::AbstractWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state) - : m_cache_state(cache_state), + : m_write_log_guard(image_ctx.cct), + m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), + m_blockguard_lock(ceph::make_mutex(util::unique_lock_name( + "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), + m_thread_pool( + image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl", 4, ""), + m_cache_state(cache_state), m_pwl_pool_layout_name(POBJ_LAYOUT_NAME(rbd_pwl)), m_image_ctx(image_ctx), m_log_pool_config_size(DEFAULT_POOL_SIZE), - m_image_writeback(image_ctx), m_write_log_guard(image_ctx.cct), + m_image_writeback(image_ctx), m_log_retire_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))), m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"), - m_deferred_dispatch_lock(ceph::make_mutex(util::unique_lock_name( - "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))), - m_log_append_lock(ceph::make_mutex(util::unique_lock_name( + m_log_append_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))), m_lock(ceph::make_mutex(util::unique_lock_name( "librbd::cache::pwl::AbstractWriteLog::m_lock", this))), - m_blockguard_lock(ceph::make_mutex(util::unique_lock_name( - "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))), m_blocks_to_log_entries(image_ctx.cct), - m_thread_pool(image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool", "tp_pwl", - 4, - ""), m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue", ceph::make_timespan( image_ctx.config.template get_val( @@ -84,7 +81,6 @@ AbstractWriteLog::~AbstractWriteLog() { ceph_assert(m_ops_to_append.size() == 0); ceph_assert(m_flush_ops_in_flight == 0); - m_log_pool = nullptr; delete m_cache_state; m_cache_state = nullptr; } @@ -335,56 +331,12 @@ void AbstractWriteLog::arm_periodic_stats() { } } -/* - * Loads the log entries from an existing log. - * - * Creates the in-memory structures to represent the state of the - * re-opened log. - * - * Finds the last appended sync point, and any sync points referred to - * in log entries, but missing from the log. These missing sync points - * are created and scheduled for append. Some rudimentary consistency - * checking is done. - * - * Rebuilds the m_blocks_to_log_entries map, to make log entries - * readable. - * - * Places all writes on the dirty entries list, which causes them all - * to be flushed. - * - */ template -void AbstractWriteLog::load_existing_entries(DeferredContexts &later) { - TOID(struct WriteLogPoolRoot) pool_root; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); - uint64_t entry_index = m_first_valid_entry; - /* The map below allows us to find sync point log entries by sync - * gen number, which is necessary so write entries can be linked to - * their sync points. */ - std::map> sync_point_entries; - /* The map below tracks sync points referred to in writes but not - * appearing in the sync_point_entries map. We'll use this to - * determine which sync points are missing and need to be - * created. */ - std::map missing_sync_points; - - /* - * Read the existing log entries. Construct an in-memory log entry - * object of the appropriate type for each. Add these to the global - * log entries list. - * - * Write entries will not link to their sync points yet. We'll do - * that in the next pass. Here we'll accumulate a map of sync point - * gen numbers that are referred to in writes but do not appearing in - * the log. - */ - while (entry_index != m_first_free_entry) { - WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index]; - std::shared_ptr log_entry = nullptr; +void AbstractWriteLog::update_entries(std::shared_ptr log_entry, + WriteLogPmemEntry *pmem_entry, std::map &missing_sync_points, + std::map> &sync_point_entries, + int entry_index) { bool writer = pmem_entry->is_writer(); - - ceph_assert(pmem_entry->entry_index == entry_index); if (pmem_entry->is_sync_point()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index << " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl; @@ -398,7 +350,7 @@ void AbstractWriteLog::load_existing_entries(DeferredContexts &later) { << " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl; auto write_entry = std::make_shared(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes); - write_entry->pmem_buffer = D_RW(pmem_entry->write_data); + write_data_to_buffer(write_entry, pmem_entry); log_entry = write_entry; } else if (pmem_entry->is_writesame()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index @@ -406,7 +358,7 @@ void AbstractWriteLog::load_existing_entries(DeferredContexts &later) { auto ws_entry = std::make_shared(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes, pmem_entry->ws_datalen); - ws_entry->pmem_buffer = D_RW(pmem_entry->write_data); + write_data_to_buffer(ws_entry, pmem_entry); log_entry = ws_entry; } else if (pmem_entry->is_discard()) { ldout(m_image_ctx.cct, 20) << "Entry " << entry_index @@ -427,17 +379,12 @@ void AbstractWriteLog::load_existing_entries(DeferredContexts &later) { missing_sync_points[pmem_entry->sync_gen_number] = true; } } +} - log_entry->ram_entry = *pmem_entry; - log_entry->pmem_entry = pmem_entry; - log_entry->log_entry_index = entry_index; - log_entry->completed = true; - - m_log_entries.push_back(log_entry); - - entry_index = (entry_index + 1) % m_total_log_entries; - } - +template +void AbstractWriteLog::update_sync_points(std::map &missing_sync_points, + std::map> &sync_point_entries, + DeferredContexts &later) { /* Create missing sync points. These must not be appended until the * entry reload is complete and the write map is up to * date. Currently this is handled by the deferred contexts object @@ -536,7 +483,6 @@ template void AbstractWriteLog::pwl_init(Context *on_finish, DeferredContexts &later) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << dendl; - TOID(struct WriteLogPoolRoot) pool_root; ceph_assert(m_cache_state); std::lock_guard locker(m_lock); ceph_assert(!m_initialized); @@ -550,14 +496,8 @@ void AbstractWriteLog::pwl_init(Context *on_finish, DeferredContexts &later) std::string log_poolset_name = pwl_path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".poolset"; m_log_pool_config_size = max(m_cache_state->size, MIN_POOL_SIZE); - if (access(log_poolset_name.c_str(), F_OK) == 0) { - m_log_pool_name = log_poolset_name; - m_log_is_poolset = true; - } else { - m_log_pool_name = log_pool_name; - ldout(cct, 5) << "Poolset file " << log_poolset_name - << " not present (or can't open). Using unreplicated pool" << dendl; - } + m_log_pool_name = log_pool_name; + get_pool_name(log_poolset_name); if ((!m_cache_state->present) && (access(m_log_pool_name.c_str(), F_OK) == 0)) { @@ -573,113 +513,7 @@ void AbstractWriteLog::pwl_init(Context *on_finish, DeferredContexts &later) } } - if (access(m_log_pool_name.c_str(), F_OK) != 0) { - if ((m_log_pool = - pmemobj_create(m_log_pool_name.c_str(), - m_pwl_pool_layout_name, - m_log_pool_config_size, - (S_IWUSR | S_IRUSR))) == NULL) { - lderr(cct) << "failed to create pool (" << m_log_pool_name << ")" - << pmemobj_errormsg() << dendl; - m_cache_state->present = false; - m_cache_state->clean = true; - m_cache_state->empty = true; - /* TODO: filter/replace errnos that are meaningless to the caller */ - on_finish->complete(-errno); - return; - } - m_cache_state->present = true; - m_cache_state->clean = true; - m_cache_state->empty = true; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - - /* new pool, calculate and store metadata */ - size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE); - size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry); - uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size); - if (num_small_writes > MAX_LOG_ENTRIES) { - num_small_writes = MAX_LOG_ENTRIES; - } - if (num_small_writes <= 2) { - lderr(cct) << "num_small_writes needs to > 2" << dendl; - on_finish->complete(-EINVAL); - return; - } - m_log_pool_actual_size = m_log_pool_config_size; - m_bytes_allocated_cap = effective_pool_size; - /* Log ring empty */ - m_first_free_entry = 0; - m_first_valid_entry = 0; - TX_BEGIN(m_log_pool) { - TX_ADD(pool_root); - D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION; - D_RW(pool_root)->log_entries = - TX_ZALLOC(struct WriteLogPmemEntry, - sizeof(struct WriteLogPmemEntry) * num_small_writes); - D_RW(pool_root)->pool_size = m_log_pool_actual_size; - D_RW(pool_root)->flushed_sync_gen = m_flushed_sync_gen; - D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE; - D_RW(pool_root)->num_log_entries = num_small_writes; - D_RW(pool_root)->first_free_entry = m_first_free_entry; - D_RW(pool_root)->first_valid_entry = m_first_valid_entry; - } TX_ONCOMMIT { - m_total_log_entries = D_RO(pool_root)->num_log_entries; - m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free - } TX_ONABORT { - m_total_log_entries = 0; - m_free_log_entries = 0; - lderr(cct) << "failed to initialize pool (" << m_log_pool_name << ")" << dendl; - on_finish->complete(-pmemobj_tx_errno()); - return; - } TX_FINALLY { - } TX_END; - } else { - m_cache_state->present = true; - /* Open existing pool */ - if ((m_log_pool = - pmemobj_open(m_log_pool_name.c_str(), - m_pwl_pool_layout_name)) == NULL) { - lderr(cct) << "failed to open pool (" << m_log_pool_name << "): " - << pmemobj_errormsg() << dendl; - on_finish->complete(-errno); - return; - } - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) { - // TODO: will handle upgrading version in the future - lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version - << " expected " << RWL_POOL_VERSION << dendl; - on_finish->complete(-EINVAL); - return; - } - if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) { - lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size - << " expected " << MIN_WRITE_ALLOC_SIZE << dendl; - on_finish->complete(-EINVAL); - return; - } - m_log_pool_actual_size = D_RO(pool_root)->pool_size; - m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen; - m_total_log_entries = D_RO(pool_root)->num_log_entries; - m_first_free_entry = D_RO(pool_root)->first_free_entry; - m_first_valid_entry = D_RO(pool_root)->first_valid_entry; - if (m_first_free_entry < m_first_valid_entry) { - /* Valid entries wrap around the end of the ring, so first_free is lower - * than first_valid. If first_valid was == first_free+1, the entry at - * first_free would be empty. The last entry is never used, so in - * that case there would be zero free log entries. */ - m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1; - } else { - /* first_valid is <= first_free. If they are == we have zero valid log - * entries, and n-1 free log entries */ - m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1; - } - size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE); - m_bytes_allocated_cap = effective_pool_size; - load_existing_entries(later); - m_cache_state->clean = m_dirty_log_entries.empty(); - m_cache_state->empty = m_log_entries.empty(); - } + initialize_pool(on_finish, later); ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries << " log entries, " << m_free_log_entries << " of which are free." @@ -770,31 +604,9 @@ void AbstractWriteLog::shut_down(Context *on_finish) { m_wake_up_enabled = false; m_cache_state->clean = true; m_log_entries.clear(); - if (m_log_pool) { - ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl; - pmemobj_close(m_log_pool); - } - if (m_cache_state->clean) { - if (m_log_is_poolset) { - ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl; - } else { - ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << m_log_pool_name << dendl; - if (remove(m_log_pool_name.c_str()) != 0) { - lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << m_log_pool_name << "\": " - << pmemobj_errormsg() << dendl; - } else { - m_cache_state->clean = true; - m_cache_state->empty = true; - m_cache_state->present = false; - } - } - } else { - if (m_log_is_poolset) { - ldout(m_image_ctx.cct, 5) << "Not removing poolset " << m_log_pool_name << dendl; - } else { - ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << m_log_pool_name << dendl; - } - } + + remove_pool_file(); + if (m_perfcounter) { perf_stop(); } @@ -1338,62 +1150,40 @@ void AbstractWriteLog::release_guarded_request(BlockGuardCell *released_cell) ldout(cct, 20) << "exit" << dendl; } -/* - * Performs the log event append operation for all of the scheduled - * events. - */ template -void AbstractWriteLog::append_scheduled_ops(void) +void AbstractWriteLog::append_scheduled(GenericLogOperations &ops, bool &ops_remain, + bool &appending, bool isRWL) { - GenericLogOperations ops; - int append_result = 0; - bool ops_remain = false; - bool appending = false; /* true if we set m_appending */ - ldout(m_image_ctx.cct, 20) << dendl; - do { - ops.clear(); - - { - std::lock_guard locker(m_lock); - if (!appending && m_appending) { - /* Another thread is appending */ - ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; - return; + const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION + : MAX_WRITES_PER_SYNC_POINT; + { + std::lock_guard locker(m_lock); + if (!appending && m_appending) { + /* Another thread is appending */ + ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl; + return; + } + if (m_ops_to_append.size()) { + appending = true; + m_appending = true; + auto last_in_batch = m_ops_to_append.begin(); + unsigned int ops_to_append = m_ops_to_append.size(); + if (ops_to_append > OPS_APPENDED) { + ops_to_append = OPS_APPENDED; } - if (m_ops_to_append.size()) { - appending = true; - m_appending = true; - auto last_in_batch = m_ops_to_append.begin(); - unsigned int ops_to_append = m_ops_to_append.size(); - if (ops_to_append > OPS_APPENDED_TOGETHER) { - ops_to_append = OPS_APPENDED_TOGETHER; - } - std::advance(last_in_batch, ops_to_append); - ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); - ops_remain = true; /* Always check again before leaving */ - ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " - << m_ops_to_append.size() << " remain" << dendl; - } else { - ops_remain = false; - if (appending) { - appending = false; - m_appending = false; - } + std::advance(last_in_batch, ops_to_append); + ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch); + ops_remain = true; /* Always check again before leaving */ + ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", " + << m_ops_to_append.size() << " remain" << dendl; + } else if (isRWL) { + ops_remain = false; + if (appending) { + appending = false; + m_appending = false; } } - - if (ops.size()) { - std::lock_guard locker(m_log_append_lock); - alloc_op_log_entries(ops); - append_result = append_op_log_entries(ops); - } - - int num_ops = ops.size(); - if (num_ops) { - /* New entries may be flushable. Completion will wake up flusher. */ - complete_op_log_entries(std::move(ops), append_result); - } - } while (ops_remain); + } } template @@ -1409,40 +1199,12 @@ void AbstractWriteLog::enlist_op_appender() m_work_queue.queue(append_ctx); } -/* - * Takes custody of ops. They'll all get their log entries appended, - * and have their on_write_persist contexts completed once they and - * all prior log entries are persisted everywhere. - */ -template -void AbstractWriteLog::schedule_append(GenericLogOperations &ops) -{ - bool need_finisher; - GenericLogOperationsVector appending; - - std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); - { - std::lock_guard locker(m_lock); - - need_finisher = m_ops_to_append.empty() && !m_appending; - m_ops_to_append.splice(m_ops_to_append.end(), ops); - } - - if (need_finisher) { - enlist_op_appender(); - } - - for (auto &op : appending) { - op->appending(); - } -} - template void AbstractWriteLog::schedule_append(GenericLogOperationsVector &ops) { GenericLogOperations to_append(ops.begin(), ops.end()); - schedule_append(to_append); + schedule_append_ops(to_append); } template @@ -1450,263 +1212,7 @@ void AbstractWriteLog::schedule_append(GenericLogOperationSharedPtr op) { GenericLogOperations to_append { op }; - schedule_append(to_append); -} - -const unsigned long int ops_flushed_together = 4; -/* - * Performs the pmem buffer flush on all scheduled ops, then schedules - * the log event append operation for all of them. - */ -template -void AbstractWriteLog::flush_then_append_scheduled_ops(void) -{ - GenericLogOperations ops; - bool ops_remain = false; - ldout(m_image_ctx.cct, 20) << dendl; - do { - { - ops.clear(); - std::lock_guard locker(m_lock); - if (m_ops_to_flush.size()) { - auto last_in_batch = m_ops_to_flush.begin(); - unsigned int ops_to_flush = m_ops_to_flush.size(); - if (ops_to_flush > ops_flushed_together) { - ops_to_flush = ops_flushed_together; - } - ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; - std::advance(last_in_batch, ops_to_flush); - ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); - ops_remain = !m_ops_to_flush.empty(); - ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " - << m_ops_to_flush.size() << " remain" << dendl; - } else { - ops_remain = false; - } - } - if (ops_remain) { - enlist_op_flusher(); - } - - /* Ops subsequently scheduled for flush may finish before these, - * which is fine. We're unconcerned with completion order until we - * get to the log message append step. */ - if (ops.size()) { - flush_pmem_buffer(ops); - schedule_append(ops); - } - } while (ops_remain); - append_scheduled_ops(); -} - -template -void AbstractWriteLog::enlist_op_flusher() -{ - m_async_flush_ops++; - m_async_op_tracker.start_op(); - Context *flush_ctx = new LambdaContext([this](int r) { - flush_then_append_scheduled_ops(); - m_async_flush_ops--; - m_async_op_tracker.finish_op(); - }); - m_work_queue.queue(flush_ctx); -} - -/* - * Takes custody of ops. They'll all get their pmem blocks flushed, - * then get their log entries appended. - */ -template -void AbstractWriteLog::schedule_flush_and_append(GenericLogOperationsVector &ops) -{ - GenericLogOperations to_flush(ops.begin(), ops.end()); - bool need_finisher; - ldout(m_image_ctx.cct, 20) << dendl; - { - std::lock_guard locker(m_lock); - - need_finisher = m_ops_to_flush.empty(); - m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush); - } - - if (need_finisher) { - enlist_op_flusher(); - } -} - -/* - * Flush the pmem regions for the data blocks of a set of operations - * - * V is expected to be GenericLogOperations, or GenericLogOperationsVector - */ -template -template -void AbstractWriteLog::flush_pmem_buffer(V& ops) -{ - for (auto &operation : ops) { - operation->flush_pmem_buf_to_cache(m_log_pool); - } - - /* Drain once for all */ - pmemobj_drain(m_log_pool); - - utime_t now = ceph_clock_now(); - for (auto &operation : ops) { - if (operation->reserved_allocated()) { - operation->buf_persist_comp_time = now; - } else { - ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; - } - } -} - -/* - * Allocate the (already reserved) write log entries for a set of operations. - * - * Locking: - * Acquires lock - */ -template -void AbstractWriteLog::alloc_op_log_entries(GenericLogOperations &ops) -{ - TOID(struct WriteLogPoolRoot) pool_root; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); - - ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock)); - - /* Allocate the (already reserved) log entries */ - std::lock_guard locker(m_lock); - - for (auto &operation : ops) { - uint32_t entry_index = m_first_free_entry; - m_first_free_entry = (m_first_free_entry + 1) % m_total_log_entries; - auto &log_entry = operation->get_log_entry(); - log_entry->log_entry_index = entry_index; - log_entry->ram_entry.entry_index = entry_index; - log_entry->pmem_entry = &pmem_log_entries[entry_index]; - log_entry->ram_entry.entry_valid = 1; - m_log_entries.push_back(log_entry); - ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; - } -} - -/* - * Flush the persistent write log entries set of ops. The entries must - * be contiguous in persistent memory. - */ -template -void AbstractWriteLog::flush_op_log_entries(GenericLogOperationsVector &ops) -{ - if (ops.empty()) { - return; - } - - if (ops.size() > 1) { - ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry); - } - - ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " - << "start address=" - << ops.front()->get_log_entry()->pmem_entry << " " - << "bytes=" - << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) - << dendl; - pmemobj_flush(m_log_pool, - ops.front()->get_log_entry()->pmem_entry, - ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))); -} - -/* - * Write and persist the (already allocated) write log entries and - * data buffer allocations for a set of ops. The data buffer for each - * of these must already have been persisted to its reserved area. - */ -template -int AbstractWriteLog::append_op_log_entries(GenericLogOperations &ops) -{ - CephContext *cct = m_image_ctx.cct; - GenericLogOperationsVector entries_to_flush; - TOID(struct WriteLogPoolRoot) pool_root; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - int ret = 0; - - ceph_assert(ceph_mutex_is_locked_by_me(m_log_append_lock)); - - if (ops.empty()) { - return 0; - } - entries_to_flush.reserve(OPS_APPENDED_TOGETHER); - - /* Write log entries to ring and persist */ - utime_t now = ceph_clock_now(); - for (auto &operation : ops) { - if (!entries_to_flush.empty()) { - /* Flush these and reset the list if the current entry wraps to the - * tail of the ring */ - if (entries_to_flush.back()->get_log_entry()->log_entry_index > - operation->get_log_entry()->log_entry_index) { - ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " - << "operation=[" << *operation << "]" << dendl; - flush_op_log_entries(entries_to_flush); - entries_to_flush.clear(); - now = ceph_clock_now(); - } - } - ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" - << operation->get_log_entry()->log_entry_index << " " - << "from " << &operation->get_log_entry()->ram_entry << " " - << "to " << operation->get_log_entry()->pmem_entry << " " - << "operation=[" << *operation << "]" << dendl; - ldout(m_image_ctx.cct, 05) << "APPENDING: index=" - << operation->get_log_entry()->log_entry_index << " " - << "operation=[" << *operation << "]" << dendl; - operation->log_append_time = now; - *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry; - ldout(m_image_ctx.cct, 20) << "APPENDING: index=" - << operation->get_log_entry()->log_entry_index << " " - << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry - << "]" << dendl; - entries_to_flush.push_back(operation); - } - flush_op_log_entries(entries_to_flush); - - /* Drain once for all */ - pmemobj_drain(m_log_pool); - - /* - * Atomically advance the log head pointer and publish the - * allocations for all the data buffers they refer to. - */ - utime_t tx_start = ceph_clock_now(); - TX_BEGIN(m_log_pool) { - D_RW(pool_root)->first_free_entry = m_first_free_entry; - for (auto &operation : ops) { - if (operation->reserved_allocated()) { - auto write_op = (std::shared_ptr&) operation; - pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1); - } else { - ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; - } - } - } TX_ONCOMMIT { - } TX_ONABORT { - lderr(cct) << "failed to commit " << ops.size() - << " log entries (" << m_log_pool_name << ")" << dendl; - ceph_assert(false); - ret = -EIO; - } TX_FINALLY { - } TX_END; - - utime_t tx_end = ceph_clock_now(); - m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start); - m_perfcounter->hinc( - l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); - for (auto &operation : ops) { - operation->log_append_comp_time = tx_end; - } - - return ret; + schedule_append_ops(to_append); } /* @@ -1888,20 +1394,12 @@ void AbstractWriteLog::alloc_and_dispatch_io_req(C_BlockIORequestT *req) } template -bool AbstractWriteLog::alloc_resources(C_BlockIORequestT *req) { +bool AbstractWriteLog::check_allocation(C_BlockIORequestT *req, + uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, + uint64_t &num_lanes, uint64_t &num_log_entries, + uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap){ bool alloc_succeeds = true; bool no_space = false; - uint64_t bytes_allocated = 0; - uint64_t bytes_cached = 0; - uint64_t bytes_dirtied = 0; - uint64_t num_lanes = 0; - uint64_t num_unpublished_reserves = 0; - uint64_t num_log_entries = 0; - - // Setup buffer, and get all the number of required resources - req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated, - num_lanes, num_log_entries, num_unpublished_reserves); - { std::lock_guard locker(m_lock); if (m_free_lanes < num_lanes) { @@ -1923,11 +1421,11 @@ bool AbstractWriteLog::alloc_resources(C_BlockIORequestT *req) { no_space = true; /* Entries must be retired */ } /* Don't attempt buffer allocate if we've exceeded the "full" threshold */ - if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) { + if (m_bytes_allocated + bytes_allocated > bytes_allocated_cap) { if (!req->has_io_waited_for_buffers()) { req->set_io_waited_for_entries(true); ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap=" - << m_bytes_allocated_cap + << bytes_allocated_cap << ", allocated=" << m_bytes_allocated << ") in write [" << *req << "]" << dendl; } @@ -1936,32 +1434,8 @@ bool AbstractWriteLog::alloc_resources(C_BlockIORequestT *req) { } } - std::vector& buffers = req->get_resources_buffers(); if (alloc_succeeds) { - for (auto &buffer : buffers) { - utime_t before_reserve = ceph_clock_now(); - buffer.buffer_oid = pmemobj_reserve(m_log_pool, - &buffer.buffer_alloc_action, - buffer.allocation_size, - 0 /* Object type */); - buffer.allocation_lat = ceph_clock_now() - before_reserve; - if (TOID_IS_NULL(buffer.buffer_oid)) { - if (!req->has_io_waited_for_buffers()) { - req->set_io_waited_for_entries(true); - } - ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: " - << pmemobj_errormsg() << ". " - << *req << dendl; - alloc_succeeds = false; - no_space = true; /* Entries need to be retired */ - break; - } else { - buffer.allocated = true; - } - ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo - << "." << buffer.buffer_oid.oid.off - << ", size=" << buffer.allocation_size << dendl; - } + reserve_pmem(req, alloc_succeeds, no_space); } if (alloc_succeeds) { @@ -1981,23 +1455,13 @@ bool AbstractWriteLog::alloc_resources(C_BlockIORequestT *req) { } } - if (!alloc_succeeds) { - /* On alloc failure, free any buffers we did allocate */ - for (auto &buffer : buffers) { - if (buffer.allocated) { - pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1); - } - } - if (no_space) { - /* Expedite flushing and/or retiring */ - std::lock_guard locker(m_lock); - m_alloc_failed_since_retire = true; - m_last_alloc_fail = ceph_clock_now(); - } + if (!alloc_succeeds && no_space) { + /* Expedite flushing and/or retiring */ + std::lock_guard locker(m_lock); + m_alloc_failed_since_retire = true; + m_last_alloc_fail = ceph_clock_now(); } - req->set_allocated(alloc_succeeds); - return alloc_succeeds; } @@ -2047,73 +1511,6 @@ void AbstractWriteLog::wake_up() { }), 0); } -template -void AbstractWriteLog::process_work() { - CephContext *cct = m_image_ctx.cct; - int max_iterations = 4; - bool wake_up_requested = false; - uint64_t aggressive_high_water_bytes = m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; - uint64_t high_water_bytes = m_bytes_allocated_cap * RETIRE_HIGH_WATER; - uint64_t low_water_bytes = m_bytes_allocated_cap * RETIRE_LOW_WATER; - uint64_t aggressive_high_water_entries = m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER; - uint64_t high_water_entries = m_total_log_entries * RETIRE_HIGH_WATER; - uint64_t low_water_entries = m_total_log_entries * RETIRE_LOW_WATER; - - ldout(cct, 20) << dendl; - - do { - { - std::lock_guard locker(m_lock); - m_wake_up_requested = false; - } - if (m_alloc_failed_since_retire || m_invalidating || - m_bytes_allocated > high_water_bytes || - (m_log_entries.size() > high_water_entries)) { - int retired = 0; - utime_t started = ceph_clock_now(); - ldout(m_image_ctx.cct, 10) << "alloc_fail=" << m_alloc_failed_since_retire - << ", allocated > high_water=" - << (m_bytes_allocated > high_water_bytes) - << ", allocated_entries > high_water=" - << (m_log_entries.size() > high_water_entries) - << dendl; - while (m_alloc_failed_since_retire || m_invalidating || - (m_bytes_allocated > high_water_bytes) || - (m_log_entries.size() > high_water_entries) || - (((m_bytes_allocated > low_water_bytes) || (m_log_entries.size() > low_water_entries)) && - (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) { - if (!retire_entries((m_shutting_down || m_invalidating || - (m_bytes_allocated > aggressive_high_water_bytes) || - (m_log_entries.size() > aggressive_high_water_entries)) - ? MAX_ALLOC_PER_TRANSACTION - : MAX_FREE_PER_TRANSACTION)) { - break; - } - retired++; - dispatch_deferred_writes(); - process_writeback_dirty_entries(); - } - ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl; - } - dispatch_deferred_writes(); - process_writeback_dirty_entries(); - - { - std::lock_guard locker(m_lock); - wake_up_requested = m_wake_up_requested; - } - } while (wake_up_requested && --max_iterations > 0); - - { - std::lock_guard locker(m_lock); - m_wake_up_scheduled = false; - /* Reschedule if it's still requested */ - if (m_wake_up_requested) { - wake_up(); - } - } -} - template bool AbstractWriteLog::can_flush_entry(std::shared_ptr log_entry) { CephContext *cct = m_image_ctx.cct; @@ -2155,9 +1552,9 @@ bool AbstractWriteLog::can_flush_entry(std::shared_ptr log_e } template -Context* AbstractWriteLog::construct_flush_entry_ctx(std::shared_ptr log_entry) { +Context* AbstractWriteLog::construct_flush_entry(std::shared_ptr log_entry, + bool invalidating) { CephContext *cct = m_image_ctx.cct; - bool invalidating = m_invalidating; // snapshot so we behave consistently ldout(cct, 20) << "" << dendl; ceph_assert(m_entry_reader_lock.is_locked()); @@ -2167,7 +1564,7 @@ Context* AbstractWriteLog::construct_flush_entry_ctx(std::shared_ptrram_entry.sync_gen_number; } m_flush_ops_in_flight += 1; - /* For write same this is the bytes affected bt the flush op, not the bytes transferred */ + /* For write same this is the bytes affected by the flush op, not the bytes transferred */ m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes; /* Flush write completion action */ @@ -2204,19 +1601,7 @@ Context* AbstractWriteLog::construct_flush_entry_ctx(std::shared_ptrqueue(new LambdaContext( - [this, log_entry, ctx](int r) { - ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry - << " " << *log_entry << dendl; - log_entry->writeback(m_image_writeback, ctx); - }), 0); - }); + return ctx; } template @@ -2267,37 +1652,6 @@ void AbstractWriteLog::process_writeback_dirty_entries() { } } -/** - * Update/persist the last flushed sync point in the log - */ -template -void AbstractWriteLog::persist_last_flushed_sync_gen() -{ - TOID(struct WriteLogPoolRoot) pool_root; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - uint64_t flushed_sync_gen; - - std::lock_guard append_locker(m_log_append_lock); - { - std::lock_guard locker(m_lock); - flushed_sync_gen = m_flushed_sync_gen; - } - - if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { - ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from " - << D_RO(pool_root)->flushed_sync_gen << " to " - << flushed_sync_gen << dendl; - TX_BEGIN(m_log_pool) { - D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; - } TX_ONCOMMIT { - } TX_ONABORT { - lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl; - ceph_assert(false); - } TX_FINALLY { - } TX_END; - } -} - /* Returns true if the specified SyncPointLogEntry is considered flushed, and * the log will be updated to reflect this. */ template @@ -2643,129 +1997,8 @@ bool AbstractWriteLog::can_retire_entry(std::shared_ptr log_ return log_entry->can_retire(); } -/** - * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries - * that are eligible to be retired. Returns true if anything was - * retired. - */ -template -bool AbstractWriteLog::retire_entries(const unsigned long int frees_per_tx) { - CephContext *cct = m_image_ctx.cct; - GenericLogEntriesVector retiring_entries; - uint32_t initial_first_valid_entry; - uint32_t first_valid_entry; - - std::lock_guard retire_locker(m_log_retire_lock); - ldout(cct, 20) << "Look for entries to retire" << dendl; - { - /* Entry readers can't be added while we hold m_entry_reader_lock */ - RWLock::WLocker entry_reader_locker(m_entry_reader_lock); - std::lock_guard locker(m_lock); - initial_first_valid_entry = m_first_valid_entry; - first_valid_entry = m_first_valid_entry; - auto entry = m_log_entries.front(); - while (!m_log_entries.empty() && - retiring_entries.size() < frees_per_tx && - can_retire_entry(entry)) { - if (entry->log_entry_index != first_valid_entry) { - lderr(cct) << "Retiring entry index (" << entry->log_entry_index - << ") and first valid log entry index (" << first_valid_entry - << ") must be ==." << dendl; - } - ceph_assert(entry->log_entry_index == first_valid_entry); - first_valid_entry = (first_valid_entry + 1) % m_total_log_entries; - m_log_entries.pop_front(); - retiring_entries.push_back(entry); - /* Remove entry from map so there will be no more readers */ - if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) { - auto gen_write_entry = static_pointer_cast(entry); - if (gen_write_entry) { - m_blocks_to_log_entries.remove_log_entry(gen_write_entry); - } - } - entry = m_log_entries.front(); - } - } - - if (retiring_entries.size()) { - ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl; - TOID(struct WriteLogPoolRoot) pool_root; - pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); - - utime_t tx_start; - utime_t tx_end; - /* Advance first valid entry and release buffers */ - { - uint64_t flushed_sync_gen; - std::lock_guard append_locker(m_log_append_lock); - { - std::lock_guard locker(m_lock); - flushed_sync_gen = m_flushed_sync_gen; - } - - tx_start = ceph_clock_now(); - TX_BEGIN(m_log_pool) { - if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { - ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from " - << D_RO(pool_root)->flushed_sync_gen << " to " - << flushed_sync_gen << dendl; - D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; - } - D_RW(pool_root)->first_valid_entry = first_valid_entry; - for (auto &entry: retiring_entries) { - if (entry->write_bytes()) { - ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo - << "." << entry->ram_entry.write_data.oid.off << dendl; - TX_FREE(entry->ram_entry.write_data); - } else { - ldout(cct, 20) << "Retiring non-write: " << *entry << dendl; - } - } - } TX_ONCOMMIT { - } TX_ONABORT { - lderr(cct) << "failed to commit free of" << retiring_entries.size() << " log entries (" << m_log_pool_name << ")" << dendl; - ceph_assert(false); - } TX_FINALLY { - } TX_END; - tx_end = ceph_clock_now(); - } - m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start); - m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), retiring_entries.size()); - - /* Update runtime copy of first_valid, and free entries counts */ - { - std::lock_guard locker(m_lock); - - ceph_assert(m_first_valid_entry == initial_first_valid_entry); - m_first_valid_entry = first_valid_entry; - m_free_log_entries += retiring_entries.size(); - for (auto &entry: retiring_entries) { - if (entry->write_bytes()) { - ceph_assert(m_bytes_cached >= entry->write_bytes()); - m_bytes_cached -= entry->write_bytes(); - uint64_t entry_allocation_size = entry->write_bytes(); - if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) { - entry_allocation_size = MIN_WRITE_ALLOC_SIZE; - } - ceph_assert(m_bytes_allocated >= entry_allocation_size); - m_bytes_allocated -= entry_allocation_size; - } - } - m_alloc_failed_since_retire = false; - wake_up(); - } - } else { - ldout(cct, 20) << "Nothing to retire" << dendl; - return false; - } - return true; -} - } // namespace pwl } // namespace cache } // namespace librbd template class librbd::cache::pwl::AbstractWriteLog; -template void librbd::cache::pwl::AbstractWriteLog:: \ - flush_pmem_buffer(std::vector>&); diff --git a/src/librbd/cache/pwl/AbstractWriteLog.h b/src/librbd/cache/pwl/AbstractWriteLog.h index d47980cc2fd..5796e1b9df3 100644 --- a/src/librbd/cache/pwl/AbstractWriteLog.h +++ b/src/librbd/cache/pwl/AbstractWriteLog.h @@ -15,6 +15,7 @@ #include "librbd/cache/pwl/LogOperation.h" #include "librbd/cache/pwl/Request.h" #include "librbd/cache/pwl/LogMap.h" +#include "librbd/cache/pwl/Types.h" #include #include @@ -29,10 +30,11 @@ namespace cache { namespace pwl { -class SyncPointLogEntry; -class GenericWriteLogEntry; -class WriteLogEntry; class GenericLogEntry; +class GenericWriteLogEntry; +class SyncPointLogEntry; +class WriteLogEntry; +struct WriteLogPmemEntry; typedef std::list> WriteLogEntries; typedef std::list> GenericLogEntries; @@ -65,24 +67,30 @@ public: typedef io::Extents Extents; AbstractWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state); - ~AbstractWriteLog(); + virtual ~AbstractWriteLog(); AbstractWriteLog(const AbstractWriteLog&) = delete; AbstractWriteLog &operator=(const AbstractWriteLog&) = delete; /// IO methods - void read(Extents&& image_extents, ceph::bufferlist *bl, + void read( + Extents&& image_extents, ceph::bufferlist *bl, int fadvise_flags, Context *on_finish); - void write(Extents&& image_extents, ceph::bufferlist&& bl, + void write( + Extents&& image_extents, ceph::bufferlist&& bl, int fadvise_flags, Context *on_finish); - void discard(uint64_t offset, uint64_t length, + void discard( + uint64_t offset, uint64_t length, uint32_t discard_granularity_bytes, Context *on_finish); - void flush(io::FlushSource flush_source, Context *on_finish); - void writesame(uint64_t offset, uint64_t length, + void flush( + io::FlushSource flush_source, Context *on_finish); + void writesame( + uint64_t offset, uint64_t length, ceph::bufferlist&& bl, int fadvise_flags, Context *on_finish); - void compare_and_write(Extents&& image_extents, + void compare_and_write( + Extents&& image_extents, ceph::bufferlist&& cmp_bl, ceph::bufferlist&& bl, uint64_t *mismatch_offset,int fadvise_flags, Context *on_finish); @@ -104,13 +112,13 @@ public: CephContext * get_context(); void release_guarded_request(BlockGuardCell *cell); void release_write_lanes(C_BlockIORequestT *req); - bool alloc_resources(C_BlockIORequestT *req); - template - void flush_pmem_buffer(V& ops); + virtual bool alloc_resources(C_BlockIORequestT *req) = 0; + virtual void setup_schedule_append( + pwl::GenericLogOperationsVector &ops, bool do_early_flush) = 0; void schedule_append(pwl::GenericLogOperationsVector &ops); void schedule_append(pwl::GenericLogOperationSharedPtr op); - void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops); void flush_new_sync_point(C_FlushRequestT *flush_req, pwl::DeferredContexts &later); + std::shared_ptr get_current_sync_point() { return m_current_sync_point; } @@ -134,9 +142,67 @@ public: return m_free_log_entries; } void add_into_log_map(pwl::GenericWriteLogEntries &log_entries); -protected: - typedef std::list *> C_WriteRequests; - typedef std::list *> C_BlockIORequests; + +private: + typedef std::list *> C_WriteRequests; + typedef std::list *> C_BlockIORequests; + + std::atomic m_initialized = {false}; + + uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */ + utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */ + + pwl::WriteLogGuard m_write_log_guard; + + /* Starts at 0 for a new write log. Incremented on every flush. */ + uint64_t m_current_sync_gen = 0; + /* Starts at 0 on each sync gen increase. Incremented before applied + to an operation */ + uint64_t m_last_op_sequence_num = 0; + + bool m_persist_on_write_until_flush = true; + + /* Debug counters for the places m_async_op_tracker is used */ + std::atomic m_async_append_ops = {0}; + std::atomic m_async_complete_ops = {0}; + std::atomic m_async_null_flush_finish = {0}; + std::atomic m_async_process_work = {0}; + + /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */ + mutable ceph::mutex m_deferred_dispatch_lock; + + /* Used in release/detain to make BlockGuard preserve submission order */ + mutable ceph::mutex m_blockguard_lock; + + /* Use m_blockguard_lock for the following 3 things */ + bool m_barrier_in_progress = false; + BlockGuardCell *m_barrier_cell = nullptr; + + bool m_wake_up_enabled = true; + + Contexts m_flush_complete_contexts; + + std::shared_ptr m_current_sync_point = nullptr; + bool m_persist_on_flush = false; /* If false, persist each write before completion */ + + int m_flush_ops_in_flight = 0; + int m_flush_bytes_in_flight = 0; + uint64_t m_lowest_flushing_sync_gen = 0; + + /* Writes that have left the block guard, but are waiting for resources */ + C_BlockIORequests m_deferred_ios; + /* Throttle writes concurrently allocating & replicating */ + unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES; + + /* Initialized from config, then set false during shutdown */ + std::atomic m_periodic_stats_enabled = {false}; + SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */ + mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */ + Context *m_timer_ctx = nullptr; + + ThreadPool m_thread_pool; + + uint32_t m_discard_granularity_bytes; BlockGuardCell* detain_guarded_request_helper(pwl::GuardedRequest &req); BlockGuardCell* detain_guarded_request_barrier_helper(pwl::GuardedRequest &req); @@ -144,12 +210,34 @@ protected: pwl::GuardedRequestFunctionContext *guarded_ctx, bool is_barrier); + void perf_start(const std::string name); + void perf_stop(); + void log_perf(); + void periodic_stats(); + void arm_periodic_stats(); + + void pwl_init(Context *on_finish, pwl::DeferredContexts &later); + void update_image_cache_state(Context *on_finish); + + void flush_dirty_entries(Context *on_finish); + bool can_flush_entry(const std::shared_ptr log_entry); + bool handle_flushed_sync_point(std::shared_ptr log_entry); + void sync_point_writer_flushed(std::shared_ptr log_entry); + + void init_flush_new_sync_point(pwl::DeferredContexts &later); + void new_sync_point(pwl::DeferredContexts &later); + pwl::C_FlushRequest>* make_flush_req(Context *on_finish); + void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later); + + void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); + void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r); + void internal_flush(bool invalidate, Context *on_finish); + +protected: librbd::cache::pwl::ImageCacheState* m_cache_state = nullptr; - std::atomic m_initialized = {false}; std::atomic m_shutting_down = {false}; std::atomic m_invalidating = {false}; - PMEMobjpool *m_log_pool = nullptr; const char* m_pwl_pool_layout_name; ImageCtxT &m_image_ctx; @@ -164,14 +252,11 @@ protected: std::atomic m_bytes_allocated = {0}; /* Total bytes allocated in write buffers */ uint64_t m_bytes_cached = 0; /* Total bytes used in write buffers */ - uint64_t m_bytes_dirty = 0; /* Total bytes yet to flush to RBD */ uint64_t m_bytes_allocated_cap = 0; - utime_t m_last_alloc_fail; /* Entry or buffer allocation fail seen */ std::atomic m_alloc_failed_since_retire = {false}; ImageWriteback m_image_writeback; - pwl::WriteLogGuard m_write_log_guard; /* * When m_first_free_entry == m_first_valid_entry, the log is * empty. There is always at least one free entry, which can't be @@ -180,23 +265,12 @@ protected: uint64_t m_first_free_entry = 0; /* Entries from here to m_first_valid_entry-1 are free */ uint64_t m_first_valid_entry = 0; /* Entries from here to m_first_free_entry-1 are valid */ - /* Starts at 0 for a new write log. Incremented on every flush. */ - uint64_t m_current_sync_gen = 0; - /* Starts at 0 on each sync gen increase. Incremented before applied - to an operation */ - uint64_t m_last_op_sequence_num = 0; /* All writes bearing this and all prior sync gen numbers are flushed */ uint64_t m_flushed_sync_gen = 0; - bool m_persist_on_write_until_flush = true; - AsyncOpTracker m_async_op_tracker; /* Debug counters for the places m_async_op_tracker is used */ std::atomic m_async_flush_ops = {0}; - std::atomic m_async_append_ops = {0}; - std::atomic m_async_complete_ops = {0}; - std::atomic m_async_null_flush_finish = {0}; - std::atomic m_async_process_work = {0}; /* Acquire locks in order declared here */ @@ -205,29 +279,19 @@ protected: * bufs. Hold a write lock to prevent readers from being added (e.g. when * removing log entrys from the map). No lock required to remove readers. */ mutable RWLock m_entry_reader_lock; - /* Hold m_deferred_dispatch_lock while consuming from m_deferred_ios. */ - mutable ceph::mutex m_deferred_dispatch_lock; /* Hold m_log_append_lock while appending or retiring log entries. */ mutable ceph::mutex m_log_append_lock; /* Used for most synchronization */ mutable ceph::mutex m_lock; - /* Used in release/detain to make BlockGuard preserve submission order */ - mutable ceph::mutex m_blockguard_lock; - /* Use m_blockguard_lock for the following 3 things */ pwl::WriteLogGuard::BlockOperations m_awaiting_barrier; - bool m_barrier_in_progress = false; - BlockGuardCell *m_barrier_cell = nullptr; bool m_wake_up_requested = false; bool m_wake_up_scheduled = false; - bool m_wake_up_enabled = true; bool m_appending = false; bool m_dispatching_deferred_ops = false; - Contexts m_flush_complete_contexts; - pwl::GenericLogOperations m_ops_to_flush; /* Write ops needing flush in local log */ pwl::GenericLogOperations m_ops_to_append; /* Write ops needing event append in local log */ @@ -239,70 +303,53 @@ protected: PerfCounters *m_perfcounter = nullptr; - std::shared_ptr m_current_sync_point = nullptr; - bool m_persist_on_flush = false; /* If false, persist each write before completion */ - - int m_flush_ops_in_flight = 0; - int m_flush_bytes_in_flight = 0; - uint64_t m_lowest_flushing_sync_gen = 0; - - /* Writes that have left the block guard, but are waiting for resources */ - C_BlockIORequests m_deferred_ios; - /* Throttle writes concurrently allocating & replicating */ - unsigned int m_free_lanes = pwl::MAX_CONCURRENT_WRITES; unsigned int m_unpublished_reserves = 0; - /* Initialized from config, then set false during shutdown */ - std::atomic m_periodic_stats_enabled = {false}; - SafeTimer *m_timer = nullptr; /* Used with m_timer_lock */ - mutable ceph::mutex *m_timer_lock = nullptr; /* Used with and by m_timer */ - Context *m_timer_ctx = nullptr; - - ThreadPool m_thread_pool; ContextWQ m_work_queue; - uint32_t m_discard_granularity_bytes; - - void perf_start(const std::string name); - void perf_stop(); - void log_perf(); - void periodic_stats(); - void arm_periodic_stats(); - - void pwl_init(Context *on_finish, pwl::DeferredContexts &later); - void update_image_cache_state(Context *on_finish); - void load_existing_entries(pwl::DeferredContexts &later); void wake_up(); - void process_work(); - void flush_dirty_entries(Context *on_finish); - bool can_flush_entry(const std::shared_ptr log_entry); - Context *construct_flush_entry_ctx(const std::shared_ptr log_entry); - void persist_last_flushed_sync_gen(); - bool handle_flushed_sync_point(std::shared_ptr log_entry); - void sync_point_writer_flushed(std::shared_ptr log_entry); + void update_entries( + std::shared_ptr log_entry, + pwl::WriteLogPmemEntry *pmem_entry, std::map &missing_sync_points, + std::map> &sync_point_entries, + int entry_index); + void update_sync_points( + std::map &missing_sync_points, + std::map> &sync_point_entries, + pwl::DeferredContexts &later); + Context *construct_flush_entry( + const std::shared_ptr log_entry, bool invalidating); void process_writeback_dirty_entries(); bool can_retire_entry(const std::shared_ptr log_entry); - bool retire_entries(const unsigned long int frees_per_tx); - - void init_flush_new_sync_point(pwl::DeferredContexts &later); - void new_sync_point(pwl::DeferredContexts &later); - pwl::C_FlushRequest>* make_flush_req(Context *on_finish); - void flush_new_sync_point_if_needed(C_FlushRequestT *flush_req, pwl::DeferredContexts &later); void dispatch_deferred_writes(void); - void alloc_and_dispatch_io_req(C_BlockIORequestT *write_req); - void append_scheduled_ops(void); void enlist_op_appender(); - void schedule_append(pwl::GenericLogOperations &ops); - void flush_then_append_scheduled_ops(void); - void enlist_op_flusher(); - void alloc_op_log_entries(pwl::GenericLogOperations &ops); - void flush_op_log_entries(pwl::GenericLogOperationsVector &ops); - int append_op_log_entries(pwl::GenericLogOperations &ops); void complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r); - void schedule_complete_op_log_entries(pwl::GenericLogOperations &&ops, const int r); - void internal_flush(bool invalidate, Context *on_finish); + + bool check_allocation( + C_BlockIORequestT *req, + uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, + uint64_t &num_lanes, uint64_t &num_log_entries, + uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap); + void append_scheduled( + pwl::GenericLogOperations &ops, bool &ops_remain, bool &appending, bool isRWL=false); + + virtual void process_work() = 0; + virtual void append_scheduled_ops(void) = 0; + virtual void schedule_append_ops(pwl::GenericLogOperations &ops) = 0; + virtual void remove_pool_file() = 0; + virtual void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) = 0; + virtual void write_data_to_buffer( + std::shared_ptr ws_entry, pwl::WriteLogPmemEntry *pmem_entry) {} + virtual void get_pool_name(const std::string log_poolset_name) {} + virtual void alloc_op_log_entries(pwl::GenericLogOperations &ops) {} + virtual bool retire_entries(const unsigned long int frees_per_tx) {return false;} + virtual void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) {} + virtual void persist_last_flushed_sync_gen() {} + virtual void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) {} + virtual Context *construct_flush_entry_ctx( + const std::shared_ptr log_entry) {return nullptr;} }; } // namespace pwl diff --git a/src/librbd/cache/pwl/ReplicatedWriteLog.cc b/src/librbd/cache/pwl/ReplicatedWriteLog.cc index 5db62a5fd50..c094cafff21 100644 --- a/src/librbd/cache/pwl/ReplicatedWriteLog.cc +++ b/src/librbd/cache/pwl/ReplicatedWriteLog.cc @@ -1,6 +1,7 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// // vim: ts=8 sw=2 smarttab +// vim: ts=8 sw=2 smarttab +#include #include "ReplicatedWriteLog.h" #include "include/buffer.h" #include "include/Context.h" @@ -13,8 +14,10 @@ #include "common/Timer.h" #include "common/perf_counters.h" #include "librbd/ImageCtx.h" +#include "librbd/asio/ContextWQ.h" #include "librbd/cache/pwl/ImageCacheState.h" #include "librbd/cache/pwl/LogEntry.h" +#include "librbd/cache/pwl/Types.h" #include #include @@ -27,14 +30,881 @@ namespace librbd { namespace cache { namespace pwl { + using namespace librbd::cache::pwl; +const unsigned long int OPS_APPENDED_TOGETHER = MAX_ALLOC_PER_TRANSACTION; + template -ReplicatedWriteLog::ReplicatedWriteLog(I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state) +ReplicatedWriteLog::ReplicatedWriteLog( + I &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state) : AbstractWriteLog(image_ctx, cache_state) { } +template +ReplicatedWriteLog::~ReplicatedWriteLog() { + m_log_pool = nullptr; +} + +/* + * Allocate the (already reserved) write log entries for a set of operations. + * + * Locking: + * Acquires lock + */ +template +void ReplicatedWriteLog::alloc_op_log_entries(GenericLogOperations &ops) +{ + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); + + ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock)); + + /* Allocate the (already reserved) log entries */ + std::lock_guard locker(m_lock); + + for (auto &operation : ops) { + uint32_t entry_index = this->m_first_free_entry; + this->m_first_free_entry = (this->m_first_free_entry + 1) % this->m_total_log_entries; + auto &log_entry = operation->get_log_entry(); + log_entry->log_entry_index = entry_index; + log_entry->ram_entry.entry_index = entry_index; + log_entry->pmem_entry = &pmem_log_entries[entry_index]; + log_entry->ram_entry.entry_valid = 1; + m_log_entries.push_back(log_entry); + ldout(m_image_ctx.cct, 20) << "operation=[" << *operation << "]" << dendl; + } +} + +/* + * Write and persist the (already allocated) write log entries and + * data buffer allocations for a set of ops. The data buffer for each + * of these must already have been persisted to its reserved area. + */ +template +int ReplicatedWriteLog::append_op_log_entries(GenericLogOperations &ops) +{ + CephContext *cct = m_image_ctx.cct; + GenericLogOperationsVector entries_to_flush; + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + int ret = 0; + + ceph_assert(ceph_mutex_is_locked_by_me(this->m_log_append_lock)); + + if (ops.empty()) { + return 0; + } + entries_to_flush.reserve(OPS_APPENDED_TOGETHER); + + /* Write log entries to ring and persist */ + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + if (!entries_to_flush.empty()) { + /* Flush these and reset the list if the current entry wraps to the + * tail of the ring */ + if (entries_to_flush.back()->get_log_entry()->log_entry_index > + operation->get_log_entry()->log_entry_index) { + ldout(m_image_ctx.cct, 20) << "entries to flush wrap around the end of the ring at " + << "operation=[" << *operation << "]" << dendl; + flush_op_log_entries(entries_to_flush); + entries_to_flush.clear(); + now = ceph_clock_now(); + } + } + ldout(m_image_ctx.cct, 20) << "Copying entry for operation at index=" + << operation->get_log_entry()->log_entry_index << " " + << "from " << &operation->get_log_entry()->ram_entry << " " + << "to " << operation->get_log_entry()->pmem_entry << " " + << "operation=[" << *operation << "]" << dendl; + ldout(m_image_ctx.cct, 05) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "operation=[" << *operation << "]" << dendl; + operation->log_append_time = now; + *operation->get_log_entry()->pmem_entry = operation->get_log_entry()->ram_entry; + ldout(m_image_ctx.cct, 20) << "APPENDING: index=" + << operation->get_log_entry()->log_entry_index << " " + << "pmem_entry=[" << *operation->get_log_entry()->pmem_entry + << "]" << dendl; + entries_to_flush.push_back(operation); + } + flush_op_log_entries(entries_to_flush); + + /* Drain once for all */ + pmemobj_drain(m_log_pool); + + /* + * Atomically advance the log head pointer and publish the + * allocations for all the data buffers they refer to. + */ + utime_t tx_start = ceph_clock_now(); + TX_BEGIN(m_log_pool) { + D_RW(pool_root)->first_free_entry = this->m_first_free_entry; + for (auto &operation : ops) { + if (operation->reserved_allocated()) { + auto write_op = (std::shared_ptr&) operation; + pmemobj_tx_publish(&write_op->buffer_alloc->buffer_alloc_action, 1); + } else { + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; + } + } + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(cct) << "failed to commit " << ops.size() + << " log entries (" << this->m_log_pool_name << ")" << dendl; + ceph_assert(false); + ret = -EIO; + } TX_FINALLY { + } TX_END; + + utime_t tx_end = ceph_clock_now(); + m_perfcounter->tinc(l_librbd_pwl_append_tx_t, tx_end - tx_start); + m_perfcounter->hinc( + l_librbd_pwl_append_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), ops.size()); + for (auto &operation : ops) { + operation->log_append_comp_time = tx_end; + } + + return ret; +} + +/* + * Flush the persistent write log entries set of ops. The entries must + * be contiguous in persistent memory. + */ +template +void ReplicatedWriteLog::flush_op_log_entries(GenericLogOperationsVector &ops) +{ + if (ops.empty()) { + return; + } + + if (ops.size() > 1) { + ceph_assert(ops.front()->get_log_entry()->pmem_entry < ops.back()->get_log_entry()->pmem_entry); + } + + ldout(m_image_ctx.cct, 20) << "entry count=" << ops.size() << " " + << "start address=" + << ops.front()->get_log_entry()->pmem_entry << " " + << "bytes=" + << ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry)) + << dendl; + pmemobj_flush(m_log_pool, + ops.front()->get_log_entry()->pmem_entry, + ops.size() * sizeof(*(ops.front()->get_log_entry()->pmem_entry))); +} + +template +void ReplicatedWriteLog::get_pool_name(const std::string log_poolset_name) { + CephContext *cct = m_image_ctx.cct; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (access(log_poolset_name.c_str(), F_OK) == 0) { + this->m_log_pool_name = log_poolset_name; + this->m_log_is_poolset = true; + } else { + ldout(cct, 5) << "Poolset file " << log_poolset_name + << " not present (or can't open). Using unreplicated pool" << dendl; + } +} + +template +void ReplicatedWriteLog::remove_pool_file() { + if (m_log_pool) { + ldout(m_image_ctx.cct, 6) << "closing pmem pool" << dendl; + pmemobj_close(m_log_pool); + } + if (m_cache_state->clean) { + if (this->m_log_is_poolset) { + ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl; + } else { + ldout(m_image_ctx.cct, 5) << "Removing empty pool file: " << this->m_log_pool_name << dendl; + if (remove(this->m_log_pool_name.c_str()) != 0) { + lderr(m_image_ctx.cct) << "failed to remove empty pool \"" << this->m_log_pool_name << "\": " + << pmemobj_errormsg() << dendl; + } else { + m_cache_state->clean = true; + m_cache_state->empty = true; + m_cache_state->present = false; + } + } + } else { + if (this->m_log_is_poolset) { + ldout(m_image_ctx.cct, 5) << "Not removing poolset " << this->m_log_pool_name << dendl; + } else { + ldout(m_image_ctx.cct, 5) << "Not removing pool file: " << this->m_log_pool_name << dendl; + } + } +} + +template +void ReplicatedWriteLog::initialize_pool(Context *on_finish, pwl::DeferredContexts &later) { + CephContext *cct = m_image_ctx.cct; + TOID(struct WriteLogPoolRoot) pool_root; + ceph_assert(ceph_mutex_is_locked_by_me(m_lock)); + if (access(this->m_log_pool_name.c_str(), F_OK) != 0) { + if ((m_log_pool = + pmemobj_create(this->m_log_pool_name.c_str(), + this->m_pwl_pool_layout_name, + this->m_log_pool_config_size, + (S_IWUSR | S_IRUSR))) == NULL) { + lderr(cct) << "failed to create pool (" << this->m_log_pool_name << ")" + << pmemobj_errormsg() << dendl; + m_cache_state->present = false; + m_cache_state->clean = true; + m_cache_state->empty = true; + /* TODO: filter/replace errnos that are meaningless to the caller */ + on_finish->complete(-errno); + return; + } + m_cache_state->present = true; + m_cache_state->clean = true; + m_cache_state->empty = true; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + + /* new pool, calculate and store metadata */ + size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE); + size_t small_write_size = MIN_WRITE_ALLOC_SIZE + BLOCK_ALLOC_OVERHEAD_BYTES + sizeof(struct WriteLogPmemEntry); + uint64_t num_small_writes = (uint64_t)(effective_pool_size / small_write_size); + if (num_small_writes > MAX_LOG_ENTRIES) { + num_small_writes = MAX_LOG_ENTRIES; + } + if (num_small_writes <= 2) { + lderr(cct) << "num_small_writes needs to > 2" << dendl; + on_finish->complete(-EINVAL); + return; + } + this->m_log_pool_actual_size = this->m_log_pool_config_size; + this->m_bytes_allocated_cap = effective_pool_size; + /* Log ring empty */ + m_first_free_entry = 0; + m_first_valid_entry = 0; + TX_BEGIN(m_log_pool) { + TX_ADD(pool_root); + D_RW(pool_root)->header.layout_version = RWL_POOL_VERSION; + D_RW(pool_root)->log_entries = + TX_ZALLOC(struct WriteLogPmemEntry, + sizeof(struct WriteLogPmemEntry) * num_small_writes); + D_RW(pool_root)->pool_size = this->m_log_pool_actual_size; + D_RW(pool_root)->flushed_sync_gen = this->m_flushed_sync_gen; + D_RW(pool_root)->block_size = MIN_WRITE_ALLOC_SIZE; + D_RW(pool_root)->num_log_entries = num_small_writes; + D_RW(pool_root)->first_free_entry = m_first_free_entry; + D_RW(pool_root)->first_valid_entry = m_first_valid_entry; + } TX_ONCOMMIT { + this->m_total_log_entries = D_RO(pool_root)->num_log_entries; + this->m_free_log_entries = D_RO(pool_root)->num_log_entries - 1; // leave one free + } TX_ONABORT { + this->m_total_log_entries = 0; + this->m_free_log_entries = 0; + lderr(cct) << "failed to initialize pool (" << this->m_log_pool_name << ")" << dendl; + on_finish->complete(-pmemobj_tx_errno()); + return; + } TX_FINALLY { + } TX_END; + } else { + m_cache_state->present = true; + /* Open existing pool */ + if ((m_log_pool = + pmemobj_open(this->m_log_pool_name.c_str(), + this->m_pwl_pool_layout_name)) == NULL) { + lderr(cct) << "failed to open pool (" << this->m_log_pool_name << "): " + << pmemobj_errormsg() << dendl; + on_finish->complete(-errno); + return; + } + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) { + // TODO: will handle upgrading version in the future + lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version + << " expected " << RWL_POOL_VERSION << dendl; + on_finish->complete(-EINVAL); + return; + } + if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) { + lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size + << " expected " << MIN_WRITE_ALLOC_SIZE << dendl; + on_finish->complete(-EINVAL); + return; + } + this->m_log_pool_actual_size = D_RO(pool_root)->pool_size; + this->m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen; + this->m_total_log_entries = D_RO(pool_root)->num_log_entries; + m_first_free_entry = D_RO(pool_root)->first_free_entry; + m_first_valid_entry = D_RO(pool_root)->first_valid_entry; + if (m_first_free_entry < m_first_valid_entry) { + /* Valid entries wrap around the end of the ring, so first_free is lower + * than first_valid. If first_valid was == first_free+1, the entry at + * first_free would be empty. The last entry is never used, so in + * that case there would be zero free log entries. */ + this->m_free_log_entries = this->m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1; + } else { + /* first_valid is <= first_free. If they are == we have zero valid log + * entries, and n-1 free log entries */ + this->m_free_log_entries = this->m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1; + } + size_t effective_pool_size = (size_t)(this->m_log_pool_config_size * USABLE_SIZE); + this->m_bytes_allocated_cap = effective_pool_size; + load_existing_entries(later); + m_cache_state->clean = this->m_dirty_log_entries.empty(); + m_cache_state->empty = m_log_entries.empty(); + } +} + +/* + * Loads the log entries from an existing log. + * + * Creates the in-memory structures to represent the state of the + * re-opened log. + * + * Finds the last appended sync point, and any sync points referred to + * in log entries, but missing from the log. These missing sync points + * are created and scheduled for append. Some rudimentary consistency + * checking is done. + * + * Rebuilds the m_blocks_to_log_entries map, to make log entries + * readable. + * + * Places all writes on the dirty entries list, which causes them all + * to be flushed. + * + */ + +template +void ReplicatedWriteLog::load_existing_entries(DeferredContexts &later) { + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries); + uint64_t entry_index = m_first_valid_entry; + /* The map below allows us to find sync point log entries by sync + * gen number, which is necessary so write entries can be linked to + * their sync points. */ + std::map> sync_point_entries; + /* The map below tracks sync points referred to in writes but not + * appearing in the sync_point_entries map. We'll use this to + * determine which sync points are missing and need to be + * created. */ + std::map missing_sync_points; + + /* + * Read the existing log entries. Construct an in-memory log entry + * object of the appropriate type for each. Add these to the global + * log entries list. + * + * Write entries will not link to their sync points yet. We'll do + * that in the next pass. Here we'll accumulate a map of sync point + * gen numbers that are referred to in writes but do not appearing in + * the log. + */ + while (entry_index != m_first_free_entry) { + WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index]; + std::shared_ptr log_entry = nullptr; + ceph_assert(pmem_entry->entry_index == entry_index); + + this->update_entries(log_entry, pmem_entry, missing_sync_points, + sync_point_entries, entry_index); + + log_entry->ram_entry = *pmem_entry; + log_entry->pmem_entry = pmem_entry; + log_entry->log_entry_index = entry_index; + log_entry->completed = true; + + m_log_entries.push_back(log_entry); + + entry_index = (entry_index + 1) % this->m_total_log_entries; + } + + this->update_sync_points(missing_sync_points, sync_point_entries, later); +} + +template +void ReplicatedWriteLog::write_data_to_buffer(std::shared_ptr ws_entry, + WriteLogPmemEntry *pmem_entry) { + ws_entry->pmem_buffer = D_RW(pmem_entry->write_data); +} + +/** + * Retire up to MAX_ALLOC_PER_TRANSACTION of the oldest log entries + * that are eligible to be retired. Returns true if anything was + * retired. + */ +template +bool ReplicatedWriteLog::retire_entries(const unsigned long int frees_per_tx) { + CephContext *cct = m_image_ctx.cct; + GenericLogEntriesVector retiring_entries; + uint32_t initial_first_valid_entry; + uint32_t first_valid_entry; + + std::lock_guard retire_locker(this->m_log_retire_lock); + ldout(cct, 20) << "Look for entries to retire" << dendl; + { + /* Entry readers can't be added while we hold m_entry_reader_lock */ + RWLock::WLocker entry_reader_locker(this->m_entry_reader_lock); + std::lock_guard locker(m_lock); + initial_first_valid_entry = this->m_first_valid_entry; + first_valid_entry = this->m_first_valid_entry; + auto entry = m_log_entries.front(); + while (!m_log_entries.empty() && + retiring_entries.size() < frees_per_tx && + this->can_retire_entry(entry)) { + if (entry->log_entry_index != first_valid_entry) { + lderr(cct) << "Retiring entry index (" << entry->log_entry_index + << ") and first valid log entry index (" << first_valid_entry + << ") must be ==." << dendl; + } + ceph_assert(entry->log_entry_index == first_valid_entry); + first_valid_entry = (first_valid_entry + 1) % this->m_total_log_entries; + m_log_entries.pop_front(); + retiring_entries.push_back(entry); + /* Remove entry from map so there will be no more readers */ + if ((entry->write_bytes() > 0) || (entry->bytes_dirty() > 0)) { + auto gen_write_entry = static_pointer_cast(entry); + if (gen_write_entry) { + this->m_blocks_to_log_entries.remove_log_entry(gen_write_entry); + } + } + entry = m_log_entries.front(); + } + } + + if (retiring_entries.size()) { + ldout(cct, 20) << "Retiring " << retiring_entries.size() << " entries" << dendl; + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + + utime_t tx_start; + utime_t tx_end; + /* Advance first valid entry and release buffers */ + { + uint64_t flushed_sync_gen; + std::lock_guard append_locker(this->m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = this->m_flushed_sync_gen; + } + + tx_start = ceph_clock_now(); + TX_BEGIN(m_log_pool) { + if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { + ldout(m_image_ctx.cct, 20) << "flushed_sync_gen in log updated from " + << D_RO(pool_root)->flushed_sync_gen << " to " + << flushed_sync_gen << dendl; + D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; + } + D_RW(pool_root)->first_valid_entry = first_valid_entry; + for (auto &entry: retiring_entries) { + if (entry->write_bytes()) { + ldout(cct, 20) << "Freeing " << entry->ram_entry.write_data.oid.pool_uuid_lo + << "." << entry->ram_entry.write_data.oid.off << dendl; + TX_FREE(entry->ram_entry.write_data); + } else { + ldout(cct, 20) << "Retiring non-write: " << *entry << dendl; + } + } + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(cct) << "failed to commit free of" << retiring_entries.size() + << " log entries (" << this->m_log_pool_name << ")" << dendl; + ceph_assert(false); + } TX_FINALLY { + } TX_END; + tx_end = ceph_clock_now(); + } + m_perfcounter->tinc(l_librbd_pwl_retire_tx_t, tx_end - tx_start); + m_perfcounter->hinc(l_librbd_pwl_retire_tx_t_hist, utime_t(tx_end - tx_start).to_nsec(), + retiring_entries.size()); + + /* Update runtime copy of first_valid, and free entries counts */ + { + std::lock_guard locker(m_lock); + + ceph_assert(this->m_first_valid_entry == initial_first_valid_entry); + this->m_first_valid_entry = first_valid_entry; + this->m_free_log_entries += retiring_entries.size(); + for (auto &entry: retiring_entries) { + if (entry->write_bytes()) { + ceph_assert(this->m_bytes_cached >= entry->write_bytes()); + this->m_bytes_cached -= entry->write_bytes(); + uint64_t entry_allocation_size = entry->write_bytes(); + if (entry_allocation_size < MIN_WRITE_ALLOC_SIZE) { + entry_allocation_size = MIN_WRITE_ALLOC_SIZE; + } + ceph_assert(this->m_bytes_allocated >= entry_allocation_size); + this->m_bytes_allocated -= entry_allocation_size; + } + } + this->m_alloc_failed_since_retire = false; + this->wake_up(); + } + } else { + ldout(cct, 20) << "Nothing to retire" << dendl; + return false; + } + return true; +} + +template +Context* ReplicatedWriteLog::construct_flush_entry_ctx( + std::shared_ptr log_entry) { + bool invalidating = this->m_invalidating; // snapshot so we behave consistently + Context *ctx = this->construct_flush_entry(log_entry, invalidating); + + if (invalidating) { + return ctx; + } + return new LambdaContext( + [this, log_entry, ctx](int r) { + m_image_ctx.op_work_queue->queue(new LambdaContext( + [this, log_entry, ctx](int r) { + ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry + << " " << *log_entry << dendl; + log_entry->writeback(this->m_image_writeback, ctx); + }), 0); + }); +} + +const unsigned long int ops_flushed_together = 4; +/* + * Performs the pmem buffer flush on all scheduled ops, then schedules + * the log event append operation for all of them. + */ +template +void ReplicatedWriteLog::flush_then_append_scheduled_ops(void) +{ + GenericLogOperations ops; + bool ops_remain = false; + ldout(m_image_ctx.cct, 20) << dendl; + do { + { + ops.clear(); + std::lock_guard locker(m_lock); + if (m_ops_to_flush.size()) { + auto last_in_batch = m_ops_to_flush.begin(); + unsigned int ops_to_flush = m_ops_to_flush.size(); + if (ops_to_flush > ops_flushed_together) { + ops_to_flush = ops_flushed_together; + } + ldout(m_image_ctx.cct, 20) << "should flush " << ops_to_flush << dendl; + std::advance(last_in_batch, ops_to_flush); + ops.splice(ops.end(), m_ops_to_flush, m_ops_to_flush.begin(), last_in_batch); + ops_remain = !m_ops_to_flush.empty(); + ldout(m_image_ctx.cct, 20) << "flushing " << ops.size() << ", " + << m_ops_to_flush.size() << " remain" << dendl; + } else { + ops_remain = false; + } + } + if (ops_remain) { + enlist_op_flusher(); + } + + /* Ops subsequently scheduled for flush may finish before these, + * which is fine. We're unconcerned with completion order until we + * get to the log message append step. */ + if (ops.size()) { + flush_pmem_buffer(ops); + schedule_append_ops(ops); + } + } while (ops_remain); + append_scheduled_ops(); +} + +/* + * Performs the log event append operation for all of the scheduled + * events. + */ +template +void ReplicatedWriteLog::append_scheduled_ops(void) { + GenericLogOperations ops; + int append_result = 0; + bool ops_remain = false; + bool appending = false; /* true if we set m_appending */ + ldout(m_image_ctx.cct, 20) << dendl; + do { + ops.clear(); + this->append_scheduled(ops, ops_remain, appending, true); + + if (ops.size()) { + std::lock_guard locker(this->m_log_append_lock); + alloc_op_log_entries(ops); + append_result = append_op_log_entries(ops); + } + + int num_ops = ops.size(); + if (num_ops) { + /* New entries may be flushable. Completion will wake up flusher. */ + this->complete_op_log_entries(std::move(ops), append_result); + } + } while (ops_remain); +} + +template +void ReplicatedWriteLog::enlist_op_flusher() +{ + this->m_async_flush_ops++; + this->m_async_op_tracker.start_op(); + Context *flush_ctx = new LambdaContext([this](int r) { + flush_then_append_scheduled_ops(); + this->m_async_flush_ops--; + this->m_async_op_tracker.finish_op(); + }); + this->m_work_queue.queue(flush_ctx); +} + +template +void ReplicatedWriteLog::setup_schedule_append( + pwl::GenericLogOperationsVector &ops, bool do_early_flush) { + if (do_early_flush) { + /* This caller is waiting for persist, so we'll use their thread to + * expedite it */ + flush_pmem_buffer(ops); + this->schedule_append(ops); + } else { + /* This is probably not still the caller's thread, so do the payload + * flushing/replicating later. */ + schedule_flush_and_append(ops); + } +} + +/* + * Takes custody of ops. They'll all get their log entries appended, + * and have their on_write_persist contexts completed once they and + * all prior log entries are persisted everywhere. + */ +template +void ReplicatedWriteLog::schedule_append_ops(GenericLogOperations &ops) +{ + bool need_finisher; + GenericLogOperationsVector appending; + + std::copy(std::begin(ops), std::end(ops), std::back_inserter(appending)); + { + std::lock_guard locker(m_lock); + + need_finisher = this->m_ops_to_append.empty() && !this->m_appending; + this->m_ops_to_append.splice(this->m_ops_to_append.end(), ops); + } + + if (need_finisher) { + this->enlist_op_appender(); + } + + for (auto &op : appending) { + op->appending(); + } +} + +/* + * Takes custody of ops. They'll all get their pmem blocks flushed, + * then get their log entries appended. + */ +template +void ReplicatedWriteLog::schedule_flush_and_append(GenericLogOperationsVector &ops) +{ + GenericLogOperations to_flush(ops.begin(), ops.end()); + bool need_finisher; + ldout(m_image_ctx.cct, 20) << dendl; + { + std::lock_guard locker(m_lock); + + need_finisher = m_ops_to_flush.empty(); + m_ops_to_flush.splice(m_ops_to_flush.end(), to_flush); + } + + if (need_finisher) { + enlist_op_flusher(); + } +} + +template +void ReplicatedWriteLog::process_work() { + CephContext *cct = m_image_ctx.cct; + int max_iterations = 4; + bool wake_up_requested = false; + uint64_t aggressive_high_water_bytes = this->m_bytes_allocated_cap * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_bytes = this->m_bytes_allocated_cap * RETIRE_HIGH_WATER; + uint64_t low_water_bytes = this->m_bytes_allocated_cap * RETIRE_LOW_WATER; + uint64_t aggressive_high_water_entries = this->m_total_log_entries * AGGRESSIVE_RETIRE_HIGH_WATER; + uint64_t high_water_entries = this->m_total_log_entries * RETIRE_HIGH_WATER; + uint64_t low_water_entries = this->m_total_log_entries * RETIRE_LOW_WATER; + + ldout(cct, 20) << dendl; + + do { + { + std::lock_guard locker(m_lock); + this->m_wake_up_requested = false; + } + if (this->m_alloc_failed_since_retire || this->m_invalidating || + this->m_bytes_allocated > high_water_bytes || + (m_log_entries.size() > high_water_entries)) { + int retired = 0; + utime_t started = ceph_clock_now(); + ldout(m_image_ctx.cct, 10) << "alloc_fail=" << this->m_alloc_failed_since_retire + << ", allocated > high_water=" + << (this->m_bytes_allocated > high_water_bytes) + << ", allocated_entries > high_water=" + << (m_log_entries.size() > high_water_entries) + << dendl; + while (this->m_alloc_failed_since_retire || this->m_invalidating || + (this->m_bytes_allocated > high_water_bytes) || + (m_log_entries.size() > high_water_entries) || + (((this->m_bytes_allocated > low_water_bytes) || + (m_log_entries.size() > low_water_entries)) && + (utime_t(ceph_clock_now() - started).to_msec() < RETIRE_BATCH_TIME_LIMIT_MS))) { + if (!retire_entries((this->m_shutting_down || this->m_invalidating || + (this->m_bytes_allocated > aggressive_high_water_bytes) || + (m_log_entries.size() > aggressive_high_water_entries)) + ? MAX_ALLOC_PER_TRANSACTION + : MAX_FREE_PER_TRANSACTION)) { + break; + } + retired++; + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + } + ldout(m_image_ctx.cct, 10) << "Retired " << retired << " times" << dendl; + } + this->dispatch_deferred_writes(); + this->process_writeback_dirty_entries(); + + { + std::lock_guard locker(m_lock); + wake_up_requested = this->m_wake_up_requested; + } + } while (wake_up_requested && --max_iterations > 0); + + { + std::lock_guard locker(m_lock); + this->m_wake_up_scheduled = false; + /* Reschedule if it's still requested */ + if (this->m_wake_up_requested) { + this->wake_up(); + } + } +} + +/* + * Flush the pmem regions for the data blocks of a set of operations + * + * V is expected to be GenericLogOperations, or GenericLogOperationsVector + */ +template +template +void ReplicatedWriteLog::flush_pmem_buffer(V& ops) +{ + for (auto &operation : ops) { + operation->flush_pmem_buf_to_cache(m_log_pool); + } + + /* Drain once for all */ + pmemobj_drain(m_log_pool); + + utime_t now = ceph_clock_now(); + for (auto &operation : ops) { + if (operation->reserved_allocated()) { + operation->buf_persist_comp_time = now; + } else { + ldout(m_image_ctx.cct, 20) << "skipping non-write op: " << *operation << dendl; + } + } +} + +/** + * Update/persist the last flushed sync point in the log + */ +template +void ReplicatedWriteLog::persist_last_flushed_sync_gen() +{ + TOID(struct WriteLogPoolRoot) pool_root; + pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot); + uint64_t flushed_sync_gen; + + std::lock_guard append_locker(this->m_log_append_lock); + { + std::lock_guard locker(m_lock); + flushed_sync_gen = this->m_flushed_sync_gen; + } + + if (D_RO(pool_root)->flushed_sync_gen < flushed_sync_gen) { + ldout(m_image_ctx.cct, 15) << "flushed_sync_gen in log updated from " + << D_RO(pool_root)->flushed_sync_gen << " to " + << flushed_sync_gen << dendl; + TX_BEGIN(m_log_pool) { + D_RW(pool_root)->flushed_sync_gen = flushed_sync_gen; + } TX_ONCOMMIT { + } TX_ONABORT { + lderr(m_image_ctx.cct) << "failed to commit update of flushed sync point" << dendl; + ceph_assert(false); + } TX_FINALLY { + } TX_END; + } +} + +template +void ReplicatedWriteLog::reserve_pmem(C_BlockIORequestT *req, + bool &alloc_succeeds, bool &no_space) { + std::vector& buffers = req->get_resources_buffers(); + for (auto &buffer : buffers) { + utime_t before_reserve = ceph_clock_now(); + buffer.buffer_oid = pmemobj_reserve(m_log_pool, + &buffer.buffer_alloc_action, + buffer.allocation_size, + 0 /* Object type */); + buffer.allocation_lat = ceph_clock_now() - before_reserve; + if (TOID_IS_NULL(buffer.buffer_oid)) { + if (!req->has_io_waited_for_buffers()) { + req->set_io_waited_for_entries(true); + } + ldout(m_image_ctx.cct, 5) << "can't allocate all data buffers: " + << pmemobj_errormsg() << ". " + << *req << dendl; + alloc_succeeds = false; + no_space = true; /* Entries need to be retired */ + break; + } else { + buffer.allocated = true; + } + ldout(m_image_ctx.cct, 20) << "Allocated " << buffer.buffer_oid.oid.pool_uuid_lo + << "." << buffer.buffer_oid.oid.off + << ", size=" << buffer.allocation_size << dendl; + } +} + +template +bool ReplicatedWriteLog::alloc_resources(C_BlockIORequestT *req) { + bool alloc_succeeds = true; + uint64_t bytes_allocated = 0; + uint64_t bytes_cached = 0; + uint64_t bytes_dirtied = 0; + uint64_t num_lanes = 0; + uint64_t num_unpublished_reserves = 0; + uint64_t num_log_entries = 0; + + ldout(m_image_ctx.cct, 20) << dendl; + // Setup buffer, and get all the number of required resources + req->setup_buffer_resources(bytes_cached, bytes_dirtied, bytes_allocated, + num_lanes, num_log_entries, num_unpublished_reserves); + + alloc_succeeds = this->check_allocation(req, bytes_cached, bytes_dirtied, bytes_allocated, + num_lanes, num_log_entries, num_unpublished_reserves, + this->m_bytes_allocated_cap); + + std::vector& buffers = req->get_resources_buffers(); + if (!alloc_succeeds) { + /* On alloc failure, free any buffers we did allocate */ + for (auto &buffer : buffers) { + if (buffer.allocated) { + pmemobj_cancel(m_log_pool, &buffer.buffer_alloc_action, 1); + } + } + } + + req->set_allocated(alloc_succeeds); + return alloc_succeeds; +} + } // namespace pwl } // namespace cache } // namespace librbd diff --git a/src/librbd/cache/pwl/ReplicatedWriteLog.h b/src/librbd/cache/pwl/ReplicatedWriteLog.h index dc1a46a4547..339264b15ca 100644 --- a/src/librbd/cache/pwl/ReplicatedWriteLog.h +++ b/src/librbd/cache/pwl/ReplicatedWriteLog.h @@ -33,10 +33,8 @@ namespace pwl { template class ReplicatedWriteLog : public AbstractWriteLog { public: - typedef io::Extent Extent; - typedef io::Extents Extents; - - ReplicatedWriteLog(ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state); + ReplicatedWriteLog( + ImageCtxT &image_ctx, librbd::cache::pwl::ImageCacheState* cache_state); ~ReplicatedWriteLog(); ReplicatedWriteLog(const ReplicatedWriteLog&) = delete; ReplicatedWriteLog &operator=(const ReplicatedWriteLog&) = delete; @@ -50,6 +48,45 @@ private: using C_WriteSameRequestT = pwl::C_WriteSameRequest; using C_CompAndWriteRequestT = pwl::C_CompAndWriteRequest; + PMEMobjpool *m_log_pool = nullptr; + + void remove_pool_file(); + void load_existing_entries(pwl::DeferredContexts &later); + void alloc_op_log_entries(pwl::GenericLogOperations &ops); + int append_op_log_entries(pwl::GenericLogOperations &ops); + void flush_then_append_scheduled_ops(void); + void enlist_op_flusher(); + void flush_op_log_entries(pwl::GenericLogOperationsVector &ops); + template + void flush_pmem_buffer(V& ops); + +protected: + using AbstractWriteLog::m_lock; + using AbstractWriteLog::m_log_entries; + using AbstractWriteLog::m_image_ctx; + using AbstractWriteLog::m_perfcounter; + using AbstractWriteLog::m_ops_to_flush; + using AbstractWriteLog::m_cache_state; + using AbstractWriteLog::m_first_free_entry; + using AbstractWriteLog::m_first_valid_entry; + + void process_work() override; + void schedule_append_ops(pwl::GenericLogOperations &ops) override; + void append_scheduled_ops(void) override; + void reserve_pmem(C_BlockIORequestT *req, bool &alloc_succeeds, bool &no_space) override; + bool retire_entries(const unsigned long int frees_per_tx) override; + void persist_last_flushed_sync_gen() override; + bool alloc_resources(C_BlockIORequestT *req) override; + void schedule_flush_and_append(pwl::GenericLogOperationsVector &ops) override; + void setup_schedule_append( + pwl::GenericLogOperationsVector &ops, bool do_early_flush) override; + Context *construct_flush_entry_ctx( + const std::shared_ptr log_entry) override; + void get_pool_name(const std::string log_poolset_name) override; + void initialize_pool(Context *on_finish, pwl::DeferredContexts &later) override; + void write_data_to_buffer( + std::shared_ptr ws_entry, + pwl::WriteLogPmemEntry *pmem_entry) override; }; } // namespace pwl diff --git a/src/librbd/cache/pwl/Request.cc b/src/librbd/cache/pwl/Request.cc index c30fb2203cb..aecf3b6a3f5 100644 --- a/src/librbd/cache/pwl/Request.cc +++ b/src/librbd/cache/pwl/Request.cc @@ -275,16 +275,7 @@ bool C_WriteRequest::append_write_request(std::shared_ptr sync_poi template void C_WriteRequest::schedule_append() { ceph_assert(++m_appended == 1); - if (m_do_early_flush) { - /* This caller is waiting for persist, so we'll use their thread to - * expedite it */ - pwl.flush_pmem_buffer(this->op_set->operations); - pwl.schedule_append(this->op_set->operations); - } else { - /* This is probably not still the caller's thread, so do the payload - * flushing/replicating later. */ - pwl.schedule_flush_and_append(this->op_set->operations); - } + pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush); } /**