diff --git a/src/os/newstore/NewStore.cc b/src/os/newstore/NewStore.cc index 17109ba9645..44bdbe0882a 100644 --- a/src/os/newstore/NewStore.cc +++ b/src/os/newstore/NewStore.cc @@ -2359,7 +2359,8 @@ void NewStore::_kv_sync_thread() kv_lock.Lock(); while (true) { assert(kv_committing.empty()); - if (kv_queue.empty()) { + assert(wal_cleaning.empty()); + if (kv_queue.empty() && wal_cleanup_queue.empty()) { if (kv_stop) break; dout(20) << __func__ << " sleep" << dendl; @@ -2367,20 +2368,37 @@ void NewStore::_kv_sync_thread() kv_cond.Wait(kv_lock); dout(20) << __func__ << " wake" << dendl; } else { - dout(20) << __func__ << " committing " << kv_queue.size() << dendl; + dout(20) << __func__ << " committing " << kv_queue.size() << " cleaning " << wal_cleanup_queue.size() << dendl; kv_committing.swap(kv_queue); + wal_cleaning.swap(wal_cleanup_queue); utime_t start = ceph_clock_now(NULL); kv_lock.Unlock(); - db->submit_transaction_sync(db->get_transaction()); + KeyValueDB::Transaction txc_cleanup_sync = db->get_transaction(); + //adding wal cleanup op + for (std::deque::iterator it = wal_cleaning.begin(); + it != wal_cleaning.end(); + it++) { + wal_transaction_t& wt =*(*it)->wal_txn; + string key; + get_wal_key(wt.seq, &key); + txc_cleanup_sync->rmkey(PREFIX_WAL, key); + } + + db->submit_transaction_sync(txc_cleanup_sync); utime_t finish = ceph_clock_now(NULL); utime_t dur = finish - start; - dout(20) << __func__ << " committed " << kv_committing.size() + dout(20) << __func__ << " committed " << kv_committing.size() << "cleaned " << wal_cleaning.size() << " in " << dur << dendl; while (!kv_committing.empty()) { TransContext *txc = kv_committing.front(); _txc_state_proc(txc); kv_committing.pop_front(); } + while (!wal_cleaning.empty()) { + TransContext *txc = wal_cleaning.front(); + _txc_state_proc(txc); + wal_cleaning.pop_front(); + } // this is as good a place as any ... _reap_collections(); @@ -2420,16 +2438,9 @@ int NewStore::_wal_finish(TransContext *txc) wal_transaction_t& wt = *txc->wal_txn; dout(20) << __func__ << " txc " << " seq " << wt.seq << txc << dendl; - string key; - get_wal_key(wt.seq, &key); - KeyValueDB::Transaction cleanup = db->get_transaction(); - cleanup->rmkey(PREFIX_WAL, key); - - txc->state = TransContext::STATE_WAL_CLEANUP; - Mutex::Locker l(kv_lock); - db->submit_transaction(cleanup); - kv_queue.push_back(txc); + txc->state = TransContext::STATE_WAL_CLEANUP; + wal_cleanup_queue.push_back(txc); kv_cond.SignalOne(); return 0; } diff --git a/src/os/newstore/NewStore.h b/src/os/newstore/NewStore.h index 55b73f7fd0b..ce6dfac17d0 100644 --- a/src/os/newstore/NewStore.h +++ b/src/os/newstore/NewStore.h @@ -499,6 +499,7 @@ private: Cond kv_cond, kv_sync_cond; bool kv_stop; deque kv_queue, kv_committing; + deque wal_cleanup_queue, wal_cleaning; Logger *logger;