diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index 0998c259fb0..f00925303c0 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -18,13 +18,18 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, const std::string &object_oid_prefix, const JournalMetadataPtr &journal_metadata) : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), - m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"), - m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) { + m_journal_metadata(journal_metadata), m_metadata_listener(this), + m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false), + m_remove_set(0), m_remove_set_ctx(NULL) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); + + m_journal_metadata->add_listener(&m_metadata_listener); } JournalTrimmer::~JournalTrimmer() { + m_journal_metadata->remove_listener(&m_metadata_listener); + m_journal_metadata->flush_commit_position(); m_async_op_tracker.wait_for_ops(); } @@ -110,39 +115,43 @@ void JournalTrimmer::remove_set(uint64_t object_set) { } } -void JournalTrimmer::handle_commit_position_safe(int r) { - ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; +void JournalTrimmer::handle_metadata_updated() { + ldout(m_cct, 20) << __func__ << dendl; Mutex::Locker locker(m_lock); - if (r == 0) { - // TODO - /* - uint8_t splay_width = m_journal_metadata->get_splay_width(); - uint64_t object_set = object_set_position.object_number / splay_width; - JournalMetadata::RegisteredClients registered_clients; - m_journal_metadata->get_registered_clients(®istered_clients); + JournalMetadata::RegisteredClients registered_clients; + m_journal_metadata->get_registered_clients(®istered_clients); - bool trim_permitted = true; - for (JournalMetadata::RegisteredClients::iterator it = - registered_clients.begin(); - it != registered_clients.end(); ++it) { - const JournalMetadata::Client &client = *it; - uint64_t client_object_set = client.commit_position.object_number / - splay_width; - if (client.id != m_journal_metadata->get_client_id() && - client_object_set < object_set) { - ldout(m_cct, 20) << "object set " << client_object_set << " still " - << "in-use by client " << client.id << dendl; - trim_permitted = false; - break; + uint8_t splay_width = m_journal_metadata->get_splay_width(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_commit_set = active_set; + std::string minimum_client_id; + + // TODO: add support for trimming past "laggy" clients + for (auto &client : registered_clients) { + if (client.commit_position.object_positions.empty()) { + // client hasn't recorded any commits + minimum_commit_set = minimum_set; + minimum_client_id = client.id; + break; + } + + for (auto &position : client.commit_position.object_positions) { + uint64_t object_set = position.object_number / splay_width; + if (object_set < minimum_commit_set) { + minimum_client_id = client.id; + minimum_commit_set = object_set; } } + } - if (trim_permitted) { - trim_objects(object_set_position.object_number / splay_width); - } - */ + if (minimum_commit_set > minimum_set) { + trim_objects(minimum_commit_set); + } else { + ldout(m_cct, 20) << "object set " << minimum_commit_set << " still " + << "in-use by client " << minimum_client_id << dendl; } } @@ -153,23 +162,26 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) { Mutex::Locker locker(m_lock); m_remove_set_pending = false; - if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) { - // advance the minimum set to the next set - m_journal_metadata->set_minimum_set(object_set + 1); - uint64_t minimum_set = m_journal_metadata->get_minimum_set(); - - if (m_remove_set > minimum_set) { - m_remove_set_pending = true; - remove_set(minimum_set); - } - } else if (r == -ENOENT) { + if (r == -ENOENT) { // no objects within the set existed r = 0; } + if (r == 0) { + // advance the minimum set to the next set + m_journal_metadata->set_minimum_set(object_set + 1); + uint64_t active_set = m_journal_metadata->get_active_set(); + uint64_t minimum_set = m_journal_metadata->get_minimum_set(); - if (m_remove_set_ctx != NULL && !m_remove_set_pending) { + if (m_remove_set > minimum_set && minimum_set <= active_set) { + m_remove_set_pending = true; + remove_set(minimum_set); + } + } + + if (m_remove_set_ctx != nullptr && !m_remove_set_pending) { ldout(m_cct, 20) << "completing remove set context" << dendl; m_remove_set_ctx->complete(r); + m_remove_set_ctx = nullptr; } } diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h index 349592b36e4..8c577767d1f 100644 --- a/src/journal/JournalTrimmer.h +++ b/src/journal/JournalTrimmer.h @@ -29,12 +29,22 @@ public: private: typedef std::function CreateContext; + struct MetadataListener : public JournalMetadata::Listener { + JournalTrimmer *journal_trimmmer; + + MetadataListener(JournalTrimmer *journal_trimmmer) + : journal_trimmmer(journal_trimmmer) { + } + void handle_update(JournalMetadata *) { + journal_trimmmer->handle_metadata_updated(); + } + }; + struct C_CommitPositionSafe : public Context { JournalTrimmer *journal_trimmer; C_CommitPositionSafe(JournalTrimmer *_journal_trimmer) : journal_trimmer(_journal_trimmer) { - Mutex::Locker locker(journal_trimmer->m_lock); journal_trimmer->m_async_op_tracker.start_op(); } virtual ~C_CommitPositionSafe() { @@ -42,7 +52,6 @@ private: } virtual void finish(int r) { - journal_trimmer->handle_commit_position_safe(r); } }; struct C_RemoveSet : public Context { @@ -66,6 +75,7 @@ private: std::string m_object_oid_prefix; JournalMetadataPtr m_journal_metadata; + MetadataListener m_metadata_listener; AsyncOpTracker m_async_op_tracker; @@ -82,8 +92,7 @@ private: void trim_objects(uint64_t minimum_set); void remove_set(uint64_t object_set); - void handle_commit_position_safe(int r); - + void handle_metadata_updated(); void handle_set_removed(int r, uint64_t object_set); }; diff --git a/src/test/journal/test_JournalTrimmer.cc b/src/test/journal/test_JournalTrimmer.cc index 896f80c88fa..62a6a2751b3 100644 --- a/src/test/journal/test_JournalTrimmer.cc +++ b/src/test/journal/test_JournalTrimmer.cc @@ -85,10 +85,10 @@ TEST_F(TestJournalTrimmer, Committed) { uint64_t commit_tid5; uint64_t commit_tid6; ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1)); - ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2)); + ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2)); ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3)); ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4)); - ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid5)); + ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5)); ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6)); journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);