Merge pull request #28529 from trociny/wip-journal-in_flight_advance_sets

journal: wait for in flight advance sets on stopping recorder

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2019-06-17 09:01:04 -04:00 committed by GitHub
commit 34f2ba4e8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 11 deletions

View File

@ -85,6 +85,29 @@ JournalRecorder::~JournalRecorder() {
ceph_assert(m_in_flight_object_closes == 0);
}
void JournalRecorder::shut_down(Context *on_safe) {
on_safe = new FunctionContext(
[this, on_safe](int r) {
Context *ctx = nullptr;
{
Mutex::Locker locker(m_lock);
if (m_in_flight_advance_sets != 0) {
ceph_assert(m_on_object_set_advanced == nullptr);
m_on_object_set_advanced = new FunctionContext(
[on_safe, r](int) {
on_safe->complete(r);
});
} else {
ctx = on_safe;
}
}
if (ctx != nullptr) {
ctx->complete(r);
}
});
flush(on_safe);
}
Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
@ -181,19 +204,26 @@ void JournalRecorder::advance_object_set() {
}
void JournalRecorder::handle_advance_object_set(int r) {
Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
Context *on_object_set_advanced = nullptr;
{
Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
ceph_assert(m_in_flight_advance_sets > 0);
--m_in_flight_advance_sets;
ceph_assert(m_in_flight_advance_sets > 0);
--m_in_flight_advance_sets;
if (r < 0 && r != -ESTALE) {
lderr(m_cct) << __func__ << ": failed to advance object set: "
<< cpp_strerror(r) << dendl;
if (r < 0 && r != -ESTALE) {
lderr(m_cct) << __func__ << ": failed to advance object set: "
<< cpp_strerror(r) << dendl;
}
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
open_object_set();
std::swap(on_object_set_advanced, m_on_object_set_advanced);
}
}
if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
open_object_set();
if (on_object_set_advanced != nullptr) {
on_object_set_advanced->complete(0);
}
}

View File

@ -27,6 +27,7 @@ public:
double flush_age, uint64_t max_in_flight_appends);
~JournalRecorder();
void shut_down(Context *on_safe);
Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
@ -96,6 +97,8 @@ private:
FutureImplPtr m_prev_future;
Context *m_on_object_set_advanced = nullptr;
void open_object_set();
bool close_object_set(uint64_t active_set);

View File

@ -412,7 +412,7 @@ void Journaler::stop_append(Context *on_safe) {
delete recorder;
on_safe->complete(r);
});
recorder->flush(on_safe);
recorder->shut_down(on_safe);
}
uint64_t Journaler::get_max_append_size() const {