Merge pull request #34959 from yison/rwl-image-writeback-cache-seq10

rbd/cache: Replicated Write Log core codes - load existing cache

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2020-05-15 10:24:40 -04:00 committed by GitHub
commit 7d6bff654b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 268 additions and 2 deletions

View File

@ -307,6 +307,203 @@ void ReplicatedWriteLog<I>::arm_periodic_stats() {
}
}
/*
* Loads the log entries from an existing log.
*
* Creates the in-memory structures to represent the state of the
* re-opened log.
*
* Finds the last appended sync point, and any sync points referred to
* in log entries, but missing from the log. These missing sync points
* are created and scheduled for append. Some rudimentary consistency
* checking is done.
*
* Rebuilds the m_blocks_to_log_entries map, to make log entries
* readable.
*
* Places all writes on the dirty entries list, which causes them all
* to be flushed.
*
*/
template <typename I>
void ReplicatedWriteLog<I>::load_existing_entries(DeferredContexts &later) {
TOID(struct WriteLogPoolRoot) pool_root;
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
struct WriteLogPmemEntry *pmem_log_entries = D_RW(D_RW(pool_root)->log_entries);
uint64_t entry_index = m_first_valid_entry;
/* The map below allows us to find sync point log entries by sync
* gen number, which is necessary so write entries can be linked to
* their sync points. */
std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> sync_point_entries;
/* The map below tracks sync points referred to in writes but not
* appearing in the sync_point_entries map. We'll use this to
* determine which sync points are missing and need to be
* created. */
std::map<uint64_t, bool> missing_sync_points;
/*
* Read the existing log entries. Construct an in-memory log entry
* object of the appropriate type for each. Add these to the global
* log entries list.
*
* Write entries will not link to their sync points yet. We'll do
* that in the next pass. Here we'll accumulate a map of sync point
* gen numbers that are referred to in writes but do not appearing in
* the log.
*/
while (entry_index != m_first_free_entry) {
WriteLogPmemEntry *pmem_entry = &pmem_log_entries[entry_index];
std::shared_ptr<GenericLogEntry> log_entry = nullptr;
bool writer = pmem_entry->is_writer();
ceph_assert(pmem_entry->entry_index == entry_index);
if (pmem_entry->is_sync_point()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " is a sync point. pmem_entry=[" << *pmem_entry << "]" << dendl;
auto sync_point_entry = std::make_shared<SyncPointLogEntry>(pmem_entry->sync_gen_number);
log_entry = sync_point_entry;
sync_point_entries[pmem_entry->sync_gen_number] = sync_point_entry;
missing_sync_points.erase(pmem_entry->sync_gen_number);
m_current_sync_gen = pmem_entry->sync_gen_number;
} else if (pmem_entry->is_write()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " is a write. pmem_entry=[" << *pmem_entry << "]" << dendl;
auto write_entry =
std::make_shared<WriteLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes);
write_entry->pmem_buffer = D_RW(pmem_entry->write_data);
log_entry = write_entry;
} else if (pmem_entry->is_writesame()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " is a write same. pmem_entry=[" << *pmem_entry << "]" << dendl;
auto ws_entry =
std::make_shared<WriteSameLogEntry>(nullptr, pmem_entry->image_offset_bytes,
pmem_entry->write_bytes, pmem_entry->ws_datalen);
ws_entry->pmem_buffer = D_RW(pmem_entry->write_data);
log_entry = ws_entry;
} else if (pmem_entry->is_discard()) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " is a discard. pmem_entry=[" << *pmem_entry << "]" << dendl;
auto discard_entry =
std::make_shared<DiscardLogEntry>(nullptr, pmem_entry->image_offset_bytes, pmem_entry->write_bytes,
m_discard_granularity_bytes);
log_entry = discard_entry;
} else {
lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
<< ", pmem_entry=[" << *pmem_entry << "]" << dendl;
}
if (writer) {
ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
<< " writes. pmem_entry=[" << *pmem_entry << "]" << dendl;
if (!sync_point_entries[pmem_entry->sync_gen_number]) {
missing_sync_points[pmem_entry->sync_gen_number] = true;
}
}
log_entry->ram_entry = *pmem_entry;
log_entry->pmem_entry = pmem_entry;
log_entry->log_entry_index = entry_index;
log_entry->completed = true;
m_log_entries.push_back(log_entry);
entry_index = (entry_index + 1) % m_total_log_entries;
}
/* Create missing sync points. These must not be appended until the
* entry reload is complete and the write map is up to
* date. Currently this is handled by the deferred contexts object
* passed to new_sync_point(). These contexts won't be completed
* until this function returns. */
for (auto &kv : missing_sync_points) {
ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
if (0 == m_current_sync_gen) {
/* The unlikely case where the log contains writing entries, but no sync
* points (e.g. because they were all retired) */
m_current_sync_gen = kv.first-1;
}
ceph_assert(kv.first == m_current_sync_gen+1);
init_flush_new_sync_point(later);
ceph_assert(kv.first == m_current_sync_gen);
sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
}
/*
* Iterate over the log entries again (this time via the global
* entries list), connecting write entries to their sync points and
* updating the sync point stats.
*
* Add writes to the write log map.
*/
std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
for (auto &log_entry : m_log_entries) {
if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
/* This entry is one of the types that write */
auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
if (gen_write_entry) {
auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
if (!sync_point_entry) {
lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
ceph_assert(false);
} else {
gen_write_entry->sync_point_entry = sync_point_entry;
sync_point_entry->writes++;
sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
sync_point_entry->writes_completed++;
m_blocks_to_log_entries.add_log_entry(gen_write_entry);
/* This entry is only dirty if its sync gen number is > the flushed
* sync gen number from the root object. */
if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
m_dirty_log_entries.push_back(log_entry);
m_bytes_dirty += gen_write_entry->bytes_dirty();
} else {
gen_write_entry->set_flushed(true);
sync_point_entry->writes_flushed++;
}
if (log_entry->write_bytes() == log_entry->bytes_dirty()) {
/* This entry is a basic write */
uint64_t bytes_allocated = MIN_WRITE_ALLOC_SIZE;
if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) {
bytes_allocated = gen_write_entry->ram_entry.write_bytes;
}
m_bytes_allocated += bytes_allocated;
m_bytes_cached += gen_write_entry->ram_entry.write_bytes;
}
}
}
} else {
/* This entry is sync point entry */
auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
if (sync_point_entry) {
if (previous_sync_point_entry) {
previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
sync_point_entry->prior_sync_point_flushed = false;
ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
(0 == previous_sync_point_entry->writes) ||
(previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
} else {
sync_point_entry->prior_sync_point_flushed = true;
ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
}
previous_sync_point_entry = sync_point_entry;
} else {
/* There are no previous sync points, so we'll consider them flushed */
sync_point_entry->prior_sync_point_flushed = true;
}
ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
}
}
}
if (0 == m_current_sync_gen) {
/* If a re-opened log was completely flushed, we'll have found no sync point entries here,
* and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
* point recorded in the log. */
m_current_sync_gen = m_flushed_sync_gen;
}
}
template <typename I>
void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later) {
CephContext *cct = m_image_ctx.cct;
@ -409,7 +606,51 @@ void ReplicatedWriteLog<I>::rwl_init(Context *on_finish, DeferredContexts &later
} TX_FINALLY {
} TX_END;
} else {
// TODO: load existed cache. This will be covered in later PR.
m_cache_state->present = true;
/* Open existing pool */
if ((m_log_pool =
pmemobj_open(m_log_pool_name.c_str(),
m_rwl_pool_layout_name)) == NULL) {
lderr(cct) << "failed to open pool (" << m_log_pool_name << "): "
<< pmemobj_errormsg() << dendl;
on_finish->complete(-errno);
return;
}
pool_root = POBJ_ROOT(m_log_pool, struct WriteLogPoolRoot);
if (D_RO(pool_root)->header.layout_version != RWL_POOL_VERSION) {
// TODO: will handle upgrading version in the future
lderr(cct) << "Pool layout version is " << D_RO(pool_root)->header.layout_version
<< " expected " << RWL_POOL_VERSION << dendl;
on_finish->complete(-EINVAL);
return;
}
if (D_RO(pool_root)->block_size != MIN_WRITE_ALLOC_SIZE) {
lderr(cct) << "Pool block size is " << D_RO(pool_root)->block_size
<< " expected " << MIN_WRITE_ALLOC_SIZE << dendl;
on_finish->complete(-EINVAL);
return;
}
m_log_pool_actual_size = D_RO(pool_root)->pool_size;
m_flushed_sync_gen = D_RO(pool_root)->flushed_sync_gen;
m_total_log_entries = D_RO(pool_root)->num_log_entries;
m_first_free_entry = D_RO(pool_root)->first_free_entry;
m_first_valid_entry = D_RO(pool_root)->first_valid_entry;
if (m_first_free_entry < m_first_valid_entry) {
/* Valid entries wrap around the end of the ring, so first_free is lower
* than first_valid. If first_valid was == first_free+1, the entry at
* first_free would be empty. The last entry is never used, so in
* that case there would be zero free log entries. */
m_free_log_entries = m_total_log_entries - (m_first_valid_entry - m_first_free_entry) -1;
} else {
/* first_valid is <= first_free. If they are == we have zero valid log
* entries, and n-1 free log entries */
m_free_log_entries = m_total_log_entries - (m_first_free_entry - m_first_valid_entry) -1;
}
size_t effective_pool_size = (size_t)(m_log_pool_config_size * USABLE_SIZE);
m_bytes_allocated_cap = effective_pool_size;
load_existing_entries(later);
m_cache_state->clean = m_dirty_log_entries.empty();
m_cache_state->empty = m_log_entries.empty();
}
ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
@ -498,6 +739,7 @@ void ReplicatedWriteLog<I>::shut_down(Context *on_finish) {
{
std::lock_guard locker(m_lock);
ceph_assert(m_dirty_log_entries.size() == 0);
m_wake_up_enabled = false;
m_cache_state->clean = true;
m_log_entries.clear();
if (m_log_pool) {
@ -735,6 +977,7 @@ void ReplicatedWriteLog<I>::aio_discard(uint64_t offset, uint64_t length,
utime_t now = ceph_clock_now();
m_perfcounter->inc(l_librbd_rwl_discard, 1);
Extents discard_extents = {{offset, length}};
m_discard_granularity_bytes = discard_granularity_bytes;
ceph_assert(m_initialized);

View File

@ -262,6 +262,8 @@ private:
ThreadPool m_thread_pool;
ContextWQ m_work_queue;
uint32_t m_discard_granularity_bytes;
void perf_start(const std::string name);
void perf_stop();
void log_perf();
@ -270,6 +272,7 @@ private:
void rwl_init(Context *on_finish, rwl::DeferredContexts &later);
void update_image_cache_state(Context *on_finish);
void load_existing_entries(rwl::DeferredContexts &later);
void wake_up();
void process_work();

View File

@ -36,7 +36,6 @@ public:
virtual bool can_writeback() const {
return false;
}
// TODO: discard need to override this
virtual bool can_retire() const {
return false;
}
@ -210,6 +209,9 @@ public:
/* The bytes in the image this op makes dirty. */
return ram_entry.write_bytes;
};
bool can_retire() const override {
return this->completed;
}
void copy_pmem_bl(bufferlist *out_bl) override {
ceph_assert(false);
}

View File

@ -606,6 +606,7 @@ void C_WriteSameRequest<T>::setup_buffer_resources(
if (pattern_length > buffer.allocation_size) {
buffer.allocation_size = pattern_length;
}
bytes_allocated += buffer.allocation_size;
}
template <typename T>

View File

@ -212,6 +212,23 @@ struct WriteLogPmemEntry {
BlockExtent block_extent();
uint64_t get_offset_bytes();
uint64_t get_write_bytes();
bool is_sync_point() {
return sync_point;
}
bool is_discard() {
return discard;
}
bool is_writesame() {
return writesame;
}
bool is_write() {
/* Log entry is a basic write */
return !is_sync_point() && !is_discard() && !is_writesame();
}
bool is_writer() {
/* Log entry is any type that writes data */
return is_write() || is_discard() || is_writesame();
}
friend std::ostream& operator<<(std::ostream& os,
const WriteLogPmemEntry &entry);
};