mirror of
https://github.com/ceph/ceph
synced 2024-12-26 21:43:10 +00:00
journal: update JournalTrimmer to support new commit tracking
Signed-off-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
parent
df25867527
commit
7906737a45
@ -18,13 +18,18 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx,
|
||||
const std::string &object_oid_prefix,
|
||||
const JournalMetadataPtr &journal_metadata)
|
||||
: m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
|
||||
m_journal_metadata(journal_metadata), m_lock("JournalTrimmer::m_lock"),
|
||||
m_remove_set_pending(false), m_remove_set(0), m_remove_set_ctx(NULL) {
|
||||
m_journal_metadata(journal_metadata), m_metadata_listener(this),
|
||||
m_lock("JournalTrimmer::m_lock"), m_remove_set_pending(false),
|
||||
m_remove_set(0), m_remove_set_ctx(NULL) {
|
||||
m_ioctx.dup(ioctx);
|
||||
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
|
||||
|
||||
m_journal_metadata->add_listener(&m_metadata_listener);
|
||||
}
|
||||
|
||||
JournalTrimmer::~JournalTrimmer() {
|
||||
m_journal_metadata->remove_listener(&m_metadata_listener);
|
||||
|
||||
m_journal_metadata->flush_commit_position();
|
||||
m_async_op_tracker.wait_for_ops();
|
||||
}
|
||||
@ -110,39 +115,43 @@ void JournalTrimmer::remove_set(uint64_t object_set) {
|
||||
}
|
||||
}
|
||||
|
||||
void JournalTrimmer::handle_commit_position_safe(int r) {
|
||||
ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
|
||||
void JournalTrimmer::handle_metadata_updated() {
|
||||
ldout(m_cct, 20) << __func__ << dendl;
|
||||
|
||||
Mutex::Locker locker(m_lock);
|
||||
if (r == 0) {
|
||||
// TODO
|
||||
/*
|
||||
uint8_t splay_width = m_journal_metadata->get_splay_width();
|
||||
uint64_t object_set = object_set_position.object_number / splay_width;
|
||||
|
||||
JournalMetadata::RegisteredClients registered_clients;
|
||||
m_journal_metadata->get_registered_clients(®istered_clients);
|
||||
JournalMetadata::RegisteredClients registered_clients;
|
||||
m_journal_metadata->get_registered_clients(®istered_clients);
|
||||
|
||||
bool trim_permitted = true;
|
||||
for (JournalMetadata::RegisteredClients::iterator it =
|
||||
registered_clients.begin();
|
||||
it != registered_clients.end(); ++it) {
|
||||
const JournalMetadata::Client &client = *it;
|
||||
uint64_t client_object_set = client.commit_position.object_number /
|
||||
splay_width;
|
||||
if (client.id != m_journal_metadata->get_client_id() &&
|
||||
client_object_set < object_set) {
|
||||
ldout(m_cct, 20) << "object set " << client_object_set << " still "
|
||||
<< "in-use by client " << client.id << dendl;
|
||||
trim_permitted = false;
|
||||
break;
|
||||
uint8_t splay_width = m_journal_metadata->get_splay_width();
|
||||
uint64_t minimum_set = m_journal_metadata->get_minimum_set();
|
||||
uint64_t active_set = m_journal_metadata->get_active_set();
|
||||
uint64_t minimum_commit_set = active_set;
|
||||
std::string minimum_client_id;
|
||||
|
||||
// TODO: add support for trimming past "laggy" clients
|
||||
for (auto &client : registered_clients) {
|
||||
if (client.commit_position.object_positions.empty()) {
|
||||
// client hasn't recorded any commits
|
||||
minimum_commit_set = minimum_set;
|
||||
minimum_client_id = client.id;
|
||||
break;
|
||||
}
|
||||
|
||||
for (auto &position : client.commit_position.object_positions) {
|
||||
uint64_t object_set = position.object_number / splay_width;
|
||||
if (object_set < minimum_commit_set) {
|
||||
minimum_client_id = client.id;
|
||||
minimum_commit_set = object_set;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (trim_permitted) {
|
||||
trim_objects(object_set_position.object_number / splay_width);
|
||||
}
|
||||
*/
|
||||
if (minimum_commit_set > minimum_set) {
|
||||
trim_objects(minimum_commit_set);
|
||||
} else {
|
||||
ldout(m_cct, 20) << "object set " << minimum_commit_set << " still "
|
||||
<< "in-use by client " << minimum_client_id << dendl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -153,23 +162,26 @@ void JournalTrimmer::handle_set_removed(int r, uint64_t object_set) {
|
||||
Mutex::Locker locker(m_lock);
|
||||
m_remove_set_pending = false;
|
||||
|
||||
if (r == 0 || (r == -ENOENT && m_remove_set_ctx == NULL)) {
|
||||
// advance the minimum set to the next set
|
||||
m_journal_metadata->set_minimum_set(object_set + 1);
|
||||
uint64_t minimum_set = m_journal_metadata->get_minimum_set();
|
||||
|
||||
if (m_remove_set > minimum_set) {
|
||||
m_remove_set_pending = true;
|
||||
remove_set(minimum_set);
|
||||
}
|
||||
} else if (r == -ENOENT) {
|
||||
if (r == -ENOENT) {
|
||||
// no objects within the set existed
|
||||
r = 0;
|
||||
}
|
||||
if (r == 0) {
|
||||
// advance the minimum set to the next set
|
||||
m_journal_metadata->set_minimum_set(object_set + 1);
|
||||
uint64_t active_set = m_journal_metadata->get_active_set();
|
||||
uint64_t minimum_set = m_journal_metadata->get_minimum_set();
|
||||
|
||||
if (m_remove_set_ctx != NULL && !m_remove_set_pending) {
|
||||
if (m_remove_set > minimum_set && minimum_set <= active_set) {
|
||||
m_remove_set_pending = true;
|
||||
remove_set(minimum_set);
|
||||
}
|
||||
}
|
||||
|
||||
if (m_remove_set_ctx != nullptr && !m_remove_set_pending) {
|
||||
ldout(m_cct, 20) << "completing remove set context" << dendl;
|
||||
m_remove_set_ctx->complete(r);
|
||||
m_remove_set_ctx = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,12 +29,22 @@ public:
|
||||
private:
|
||||
typedef std::function<Context*()> CreateContext;
|
||||
|
||||
struct MetadataListener : public JournalMetadata::Listener {
|
||||
JournalTrimmer *journal_trimmmer;
|
||||
|
||||
MetadataListener(JournalTrimmer *journal_trimmmer)
|
||||
: journal_trimmmer(journal_trimmmer) {
|
||||
}
|
||||
void handle_update(JournalMetadata *) {
|
||||
journal_trimmmer->handle_metadata_updated();
|
||||
}
|
||||
};
|
||||
|
||||
struct C_CommitPositionSafe : public Context {
|
||||
JournalTrimmer *journal_trimmer;
|
||||
|
||||
C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
|
||||
: journal_trimmer(_journal_trimmer) {
|
||||
Mutex::Locker locker(journal_trimmer->m_lock);
|
||||
journal_trimmer->m_async_op_tracker.start_op();
|
||||
}
|
||||
virtual ~C_CommitPositionSafe() {
|
||||
@ -42,7 +52,6 @@ private:
|
||||
}
|
||||
|
||||
virtual void finish(int r) {
|
||||
journal_trimmer->handle_commit_position_safe(r);
|
||||
}
|
||||
};
|
||||
struct C_RemoveSet : public Context {
|
||||
@ -66,6 +75,7 @@ private:
|
||||
std::string m_object_oid_prefix;
|
||||
|
||||
JournalMetadataPtr m_journal_metadata;
|
||||
MetadataListener m_metadata_listener;
|
||||
|
||||
AsyncOpTracker m_async_op_tracker;
|
||||
|
||||
@ -82,8 +92,7 @@ private:
|
||||
void trim_objects(uint64_t minimum_set);
|
||||
void remove_set(uint64_t object_set);
|
||||
|
||||
void handle_commit_position_safe(int r);
|
||||
|
||||
void handle_metadata_updated();
|
||||
void handle_set_removed(int r, uint64_t object_set);
|
||||
};
|
||||
|
||||
|
@ -85,10 +85,10 @@ TEST_F(TestJournalTrimmer, Committed) {
|
||||
uint64_t commit_tid5;
|
||||
uint64_t commit_tid6;
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid1));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid2));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid2));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid3));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 0, "payload", &commit_tid4));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 2, "payload", &commit_tid5));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 4, "payload", &commit_tid5));
|
||||
ASSERT_EQ(0, append_payload(metadata, oid, 5, "payload", &commit_tid6));
|
||||
|
||||
journal::JournalTrimmer *trimmer = create_trimmer(oid, metadata);
|
||||
|
Loading…
Reference in New Issue
Block a user