mirror of
https://github.com/ceph/ceph
synced 2024-12-18 09:25:49 +00:00
osd: remove replay_queue
Signed-off-by: Sage Weil <sage@redhat.com>
This commit is contained in:
parent
5aca4ebce6
commit
3269c52128
@ -1716,7 +1716,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
|
||||
cct->_conf->osd_command_thread_timeout,
|
||||
cct->_conf->osd_command_thread_suicide_timeout,
|
||||
&command_tp),
|
||||
replay_queue_lock("OSD::replay_queue_lock"),
|
||||
remove_wq(
|
||||
cct,
|
||||
store,
|
||||
@ -4385,10 +4384,6 @@ void OSD::tick()
|
||||
start_boot();
|
||||
}
|
||||
|
||||
if (is_active()) {
|
||||
check_replay_queue();
|
||||
}
|
||||
|
||||
do_waiters();
|
||||
|
||||
tick_timer.add_event_after(OSD_TICK_INTERVAL, new C_Tick(this));
|
||||
@ -8448,44 +8443,6 @@ void OSD::_remove_pg(PG *pg)
|
||||
// =========================================================
|
||||
// RECOVERY
|
||||
|
||||
/*
|
||||
* caller holds osd_lock
|
||||
*/
|
||||
void OSD::check_replay_queue()
|
||||
{
|
||||
assert(osd_lock.is_locked());
|
||||
|
||||
utime_t now = ceph_clock_now();
|
||||
list< pair<spg_t,utime_t> > pgids;
|
||||
replay_queue_lock.Lock();
|
||||
while (!replay_queue.empty() &&
|
||||
replay_queue.front().second <= now) {
|
||||
pgids.push_back(replay_queue.front());
|
||||
replay_queue.pop_front();
|
||||
}
|
||||
replay_queue_lock.Unlock();
|
||||
|
||||
for (list< pair<spg_t,utime_t> >::iterator p = pgids.begin(); p != pgids.end(); ++p) {
|
||||
spg_t pgid = p->first;
|
||||
pg_map_lock.get_read();
|
||||
if (pg_map.count(pgid)) {
|
||||
PG *pg = _lookup_lock_pg_with_map_lock_held(pgid);
|
||||
pg_map_lock.unlock();
|
||||
dout(10) << "check_replay_queue " << *pg << dendl;
|
||||
if ((pg->is_active() || pg->is_activating()) &&
|
||||
pg->is_replay() &&
|
||||
pg->is_primary() &&
|
||||
pg->replay_until == p->second) {
|
||||
pg->replay_queued_ops();
|
||||
}
|
||||
pg->unlock();
|
||||
} else {
|
||||
pg_map_lock.unlock();
|
||||
dout(10) << "check_replay_queue pgid " << pgid << " (not found)" << dendl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void OSDService::_maybe_queue_recovery() {
|
||||
assert(recovery_lock.is_locked_by_me());
|
||||
uint64_t available_pushes;
|
||||
|
@ -2311,10 +2311,6 @@ protected:
|
||||
void do_recovery(PG *pg, epoch_t epoch_queued, uint64_t pushes_reserved,
|
||||
ThreadPool::TPHandle &handle);
|
||||
|
||||
Mutex replay_queue_lock;
|
||||
list< pair<spg_t, utime_t > > replay_queue;
|
||||
|
||||
void check_replay_queue();
|
||||
|
||||
// -- scrubbing --
|
||||
void sched_scrub();
|
||||
|
@ -1921,41 +1921,6 @@ void PG::queue_op(OpRequestRef& op)
|
||||
}
|
||||
}
|
||||
|
||||
void PG::replay_queued_ops()
|
||||
{
|
||||
assert(is_replay());
|
||||
assert(is_active() || is_activating());
|
||||
eversion_t c = info.last_update;
|
||||
list<OpRequestRef> replay;
|
||||
dout(10) << "replay_queued_ops" << dendl;
|
||||
state_clear(PG_STATE_REPLAY);
|
||||
|
||||
for (map<eversion_t,OpRequestRef>::iterator p = replay_queue.begin();
|
||||
p != replay_queue.end();
|
||||
++p) {
|
||||
if (p->first.version != c.version+1) {
|
||||
dout(10) << "activate replay " << p->first
|
||||
<< " skipping " << c.version+1 - p->first.version
|
||||
<< " ops"
|
||||
<< dendl;
|
||||
c = p->first;
|
||||
}
|
||||
dout(10) << "activate replay " << p->first << " "
|
||||
<< *p->second->get_req() << dendl;
|
||||
replay.push_back(p->second);
|
||||
}
|
||||
replay_queue.clear();
|
||||
if (is_active()) {
|
||||
requeue_ops(replay);
|
||||
requeue_ops(waiting_for_active);
|
||||
assert(waiting_for_peered.empty());
|
||||
} else {
|
||||
waiting_for_active.splice(waiting_for_active.begin(), replay);
|
||||
}
|
||||
|
||||
publish_stats_to_osd();
|
||||
}
|
||||
|
||||
void PG::_activate_committed(epoch_t epoch, epoch_t activation_epoch)
|
||||
{
|
||||
lock();
|
||||
@ -2241,24 +2206,6 @@ void PG::finish_recovery_op(const hobject_t& soid, bool dequeue)
|
||||
}
|
||||
}
|
||||
|
||||
static void split_replay_queue(
|
||||
map<eversion_t, OpRequestRef> *from,
|
||||
map<eversion_t, OpRequestRef> *to,
|
||||
unsigned match,
|
||||
unsigned bits)
|
||||
{
|
||||
for (map<eversion_t, OpRequestRef>::iterator i = from->begin();
|
||||
i != from->end();
|
||||
) {
|
||||
if (OSD::split_request(i->second, match, bits)) {
|
||||
to->insert(*i);
|
||||
from->erase(i++);
|
||||
} else {
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void PG::split_ops(PG *child, unsigned split_bits) {
|
||||
unsigned match = child->info.pgid.ps();
|
||||
assert(waiting_for_all_missing.empty());
|
||||
@ -2268,7 +2215,6 @@ void PG::split_ops(PG *child, unsigned split_bits) {
|
||||
assert(waiting_for_ack.empty());
|
||||
assert(waiting_for_ondisk.empty());
|
||||
assert(waiting_for_active.empty());
|
||||
split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
|
||||
|
||||
osd->dequeue_pg(this, &waiting_for_peered);
|
||||
|
||||
@ -5099,15 +5045,6 @@ void PG::start_peering_interval(
|
||||
if (was_old_primary != is_primary()) {
|
||||
state_clear(PG_STATE_CLEAN);
|
||||
clear_publish_stats();
|
||||
|
||||
// take replay queue waiters
|
||||
list<OpRequestRef> ls;
|
||||
for (map<eversion_t,OpRequestRef>::iterator it = replay_queue.begin();
|
||||
it != replay_queue.end();
|
||||
++it)
|
||||
ls.push_back(it->second);
|
||||
replay_queue.clear();
|
||||
requeue_ops(ls);
|
||||
}
|
||||
|
||||
on_role_change();
|
||||
|
@ -856,7 +856,6 @@ protected:
|
||||
map<eversion_t,
|
||||
list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
|
||||
|
||||
map<eversion_t,OpRequestRef> replay_queue;
|
||||
void split_ops(PG *child, unsigned split_bits);
|
||||
|
||||
void requeue_object_waiters(map<hobject_t, list<OpRequestRef>, hobject_t::BitwiseComparator>& m);
|
||||
@ -1032,7 +1031,6 @@ public:
|
||||
bool choose_acting(pg_shard_t &auth_log_shard,
|
||||
bool *history_les_bound);
|
||||
void build_might_have_unfound();
|
||||
void replay_queued_ops();
|
||||
void activate(
|
||||
ObjectStore::Transaction& t,
|
||||
epoch_t activation_epoch,
|
||||
|
Loading…
Reference in New Issue
Block a user