1
0
mirror of https://github.com/ceph/ceph synced 2025-02-18 00:17:37 +00:00

Merge pull request from athanatos/wip-8635

Wip 8635 -- Move scrub, snap_trim into a unified queue

Reviewed-by: Kefu Chai <kchai@redhat.com>
Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Samuel Just 2015-06-04 11:25:12 -07:00
commit b7abc398c0
7 changed files with 354 additions and 369 deletions

View File

@ -587,12 +587,7 @@ OPTION(osd_op_thread_suicide_timeout, OPT_INT, 150)
OPTION(osd_recovery_thread_timeout, OPT_INT, 30)
OPTION(osd_recovery_thread_suicide_timeout, OPT_INT, 300)
OPTION(osd_recovery_sleep, OPT_FLOAT, 0) // seconds to sleep between recovery ops
OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1)
OPTION(osd_snap_trim_thread_suicide_timeout, OPT_INT, 60*60*10)
OPTION(osd_snap_trim_sleep, OPT_FLOAT, 0)
OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
OPTION(osd_scrub_thread_suicide_timeout, OPT_INT, 60)
OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
OPTION(osd_scrub_invalid_stats, OPT_BOOL, true)
OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
OPTION(osd_remove_thread_suicide_timeout, OPT_INT, 10*60*60)
@ -716,18 +711,27 @@ OPTION(filestore_rocksdb_options, OPT_STR, "")
OPTION(mon_rocksdb_options, OPT_STR, "")
/**
* osd_client_op_priority and osd_recovery_op_priority adjust the relative
* priority of client io vs recovery io.
* osd_*_priority adjust the relative priority of client io, recovery io,
* snaptrim io, etc
*
* osd_client_op_priority/osd_recovery_op_priority determines the ratio of
* available io between client and recovery. Each option may be set between
* osd_*_priority determines the ratio of available io between client and
* recovery. Each option may be set between
* 1..63.
*
* osd_recovery_op_warn_multiple scales the normal warning threshhold,
* osd_op_complaint_time, so that slow recovery ops won't cause noise
*/
OPTION(osd_client_op_priority, OPT_U32, 63)
OPTION(osd_recovery_op_priority, OPT_U32, 10)
OPTION(osd_snap_trim_priority, OPT_U32, 5)
OPTION(osd_snap_trim_cost, OPT_U32, 1<<20) // set default cost equal to 1MB io
OPTION(osd_scrub_priority, OPT_U32, 5)
// set default cost equal to 50MB io
OPTION(osd_scrub_cost, OPT_U32, 50<<20)
/**
* osd_recovery_op_warn_multiple scales the normal warning threshhold,
* osd_op_complaint_time, so that slow recovery ops won't cause noise
*/
OPTION(osd_recovery_op_warn_multiple, OPT_U32, 16)
// Max time to wait between notifying mon of shutdown and shutting down

View File

@ -152,6 +152,18 @@ static ostream& _prefix(std::ostream* _dout, int whoami, epoch_t epoch) {
return *_dout << "osd." << whoami << " " << epoch << " ";
}
void PGQueueable::RunVis::operator()(OpRequestRef &op) {
return osd->dequeue_op(pg, op, handle);
}
void PGQueueable::RunVis::operator()(PGSnapTrim &op) {
return pg->snap_trimmer(op.epoch_queued);
}
void PGQueueable::RunVis::operator()(PGScrub &op) {
return pg->scrub(op.epoch_queued, handle);
}
//Initial features in new superblock.
//Features here are also automatically upgraded
CompatSet OSD::get_osd_initial_compat_set() {
@ -196,9 +208,6 @@ OSDService::OSDService(OSD *osd) :
op_wq(osd->op_shardedwq),
peering_wq(osd->peering_wq),
recovery_wq(osd->recovery_wq),
snap_trim_wq(osd->snap_trim_wq),
scrub_wq(osd->scrub_wq),
rep_scrub_wq(osd->rep_scrub_wq),
recovery_gen_wq("recovery_gen_wq", cct->_conf->osd_recovery_thread_timeout,
&osd->recovery_tp),
op_gen_wq("op_gen_wq", cct->_conf->osd_recovery_thread_timeout, &osd->osd_tp),
@ -1273,7 +1282,10 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
void OSDService::dequeue_pg(PG *pg, list<OpRequestRef> *dequeued)
{
osd->op_shardedwq.dequeue(pg, dequeued);
if (dequeued)
osd->op_shardedwq.dequeue_and_get_ops(pg, dequeued);
else
osd->op_shardedwq.dequeue(pg);
}
void OSDService::queue_for_peering(PG *pg)
@ -1535,21 +1547,6 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
cct->_conf->osd_recovery_thread_suicide_timeout,
&recovery_tp),
replay_queue_lock("OSD::replay_queue_lock"),
snap_trim_wq(
this,
cct->_conf->osd_snap_trim_thread_timeout,
cct->_conf->osd_snap_trim_thread_suicide_timeout,
&disk_tp),
scrub_wq(
this,
cct->_conf->osd_scrub_thread_timeout,
cct->_conf->osd_scrub_thread_suicide_timeout,
&disk_tp),
rep_scrub_wq(
this,
cct->_conf->osd_scrub_thread_timeout,
cct->_conf->osd_scrub_thread_suicide_timeout,
&disk_tp),
remove_wq(
store,
cct->_conf->osd_remove_thread_timeout,
@ -5582,6 +5579,8 @@ epoch_t op_required_epoch(OpRequestRef op)
return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
case MSG_OSD_EC_READ_REPLY:
return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
case MSG_OSD_REP_SCRUB:
return replica_op_required_epoch<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
default:
assert(0);
return 0;
@ -5595,7 +5594,6 @@ void OSD::dispatch_op(OpRequestRef op)
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
break;
case MSG_OSD_PG_NOTIFY:
handle_pg_notify(op);
break;
@ -5696,6 +5694,9 @@ bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)
case MSG_OSD_EC_READ_REPLY:
handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
break;
case MSG_OSD_REP_SCRUB:
handle_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op, osdmap);
break;
default:
assert(0);
}
@ -5740,10 +5741,6 @@ void OSD::_dispatch(Message *m)
handle_scrub(static_cast<MOSDScrub*>(m));
break;
case MSG_OSD_REP_SCRUB:
handle_rep_scrub(static_cast<MOSDRepScrub*>(m));
break;
// -- need OSDMap --
default:
@ -5766,26 +5763,6 @@ void OSD::_dispatch(Message *m)
}
void OSD::handle_rep_scrub(MOSDRepScrub *m)
{
dout(10) << __func__ << " " << *m << dendl;
if (!require_self_aliveness(m, m->map_epoch)) {
m->put();
return;
}
if (!require_osd_peer(m)) {
m->put();
return;
}
if (osdmap->get_epoch() >= m->map_epoch &&
!require_same_peer_instance(m, osdmap, true)) {
m->put();
return;
}
rep_scrub_wq.queue(m);
}
void OSD::handle_scrub(MOSDScrub *m)
{
dout(10) << "handle_scrub " << *m << dendl;
@ -8231,7 +8208,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
return;
}
}
pair<PGRef, OpRequestRef> item = sdata->pqueue.dequeue();
pair<PGRef, PGQueueable> item = sdata->pqueue.dequeue();
sdata->pg_for_processing[&*(item.first)].push_back(item.second);
sdata->sdata_op_ordering_lock.Unlock();
ThreadPool::TPHandle tp_handle(osd->cct, hb, timeout_interval,
@ -8239,7 +8216,7 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
(item.first)->lock_suspend_timeout(tp_handle);
OpRequestRef op;
boost::optional<PGQueueable> op;
{
Mutex::Locker l(sdata->sdata_op_ordering_lock);
if (!sdata->pg_for_processing.count(&*(item.first))) {
@ -8257,7 +8234,10 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
// and will begin to be handled by a worker thread.
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
osd_reqid_t reqid;
if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
reqid = (*_op)->get_reqid();
}
#endif
tracepoint(osd, opwq_process_start, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
@ -8272,11 +8252,14 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
delete f;
*_dout << dendl;
osd->dequeue_op(item.first, op, tp_handle);
op->run(osd, item.first, tp_handle);
{
#ifdef WITH_LTTNG
osd_reqid_t reqid = op->get_reqid();
osd_reqid_t reqid;
if (boost::optional<OpRequestRef> _op = op->maybe_get_op()) {
reqid = (*_op)->get_reqid();
}
#endif
tracepoint(osd, opwq_process_finish, reqid.name._type,
reqid.name._num, reqid.tid, reqid.inc);
@ -8285,21 +8268,22 @@ void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
(item.first)->unlock();
}
void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
void OSD::ShardedOpWQ::_enqueue(pair<PGRef, PGQueueable> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
ShardData* sdata = shard_list[shard_index];
assert (NULL != sdata);
unsigned priority = item.second->get_req()->get_priority();
unsigned cost = item.second->get_req()->get_cost();
unsigned priority = item.second.get_priority();
unsigned cost = item.second.get_cost();
sdata->sdata_op_ordering_lock.Lock();
if (priority >= CEPH_MSG_PRIO_LOW)
sdata->pqueue.enqueue_strict(
item.second->get_req()->get_source_inst(), priority, item);
item.second.get_owner(), priority, item);
else
sdata->pqueue.enqueue(item.second->get_req()->get_source_inst(),
sdata->pqueue.enqueue(
item.second.get_owner(),
priority, cost, item);
sdata->sdata_op_ordering_lock.Unlock();
@ -8309,7 +8293,7 @@ void OSD::ShardedOpWQ::_enqueue(pair<PGRef, OpRequestRef> item) {
}
void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, PGQueueable> item) {
uint32_t shard_index = (((item.first)->get_pgid().ps())% shard_list.size());
@ -8321,13 +8305,15 @@ void OSD::ShardedOpWQ::_enqueue_front(pair<PGRef, OpRequestRef> item) {
item.second = sdata->pg_for_processing[&*(item.first)].back();
sdata->pg_for_processing[&*(item.first)].pop_back();
}
unsigned priority = item.second->get_req()->get_priority();
unsigned cost = item.second->get_req()->get_cost();
unsigned priority = item.second.get_priority();
unsigned cost = item.second.get_cost();
if (priority >= CEPH_MSG_PRIO_LOW)
sdata->pqueue.enqueue_strict_front(
item.second->get_req()->get_source_inst(),priority, item);
item.second.get_owner(),
priority, item);
else
sdata->pqueue.enqueue_front(item.second->get_req()->get_source_inst(),
sdata->pqueue.enqueue_front(
item.second.get_owner(),
priority, cost, item);
sdata->sdata_op_ordering_lock.Unlock();

View File

@ -316,6 +316,75 @@ public:
typedef ceph::shared_ptr<DeletingState> DeletingStateRef;
class OSD;
struct PGScrub {
epoch_t epoch_queued;
PGScrub(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGScrub";
}
};
struct PGSnapTrim {
epoch_t epoch_queued;
PGSnapTrim(epoch_t e) : epoch_queued(e) {}
ostream &operator<<(ostream &rhs) {
return rhs << "PGSnapTrim";
}
};
class PGQueueable {
typedef boost::variant<
OpRequestRef,
PGSnapTrim,
PGScrub
> QVariant;
QVariant qvariant;
int cost;
unsigned priority;
utime_t start_time;
entity_inst_t owner;
struct RunVis : public boost::static_visitor<> {
OSD *osd;
PGRef &pg;
ThreadPool::TPHandle &handle;
RunVis(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle)
: osd(osd), pg(pg), handle(handle) {}
void operator()(OpRequestRef &op);
void operator()(PGSnapTrim &op);
void operator()(PGScrub &op);
};
public:
PGQueueable(OpRequestRef op)
: qvariant(op), cost(op->get_req()->get_cost()),
priority(op->get_req()->get_priority()),
start_time(op->get_req()->get_recv_stamp()),
owner(op->get_req()->get_source_inst())
{}
PGQueueable(
const PGSnapTrim &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
PGQueueable(
const PGScrub &op, int cost, unsigned priority, utime_t start_time,
const entity_inst_t &owner)
: qvariant(op), cost(cost), priority(priority), start_time(start_time),
owner(owner) {}
boost::optional<OpRequestRef> maybe_get_op() {
OpRequestRef *op = boost::get<OpRequestRef>(&qvariant);
return op ? *op : boost::optional<OpRequestRef>();
}
void run(OSD *osd, PGRef &pg, ThreadPool::TPHandle &handle) {
RunVis v(osd, pg, handle);
boost::apply_visitor(v, qvariant);
}
unsigned get_priority() const { return priority; }
int get_cost() const { return cost; }
utime_t get_start_time() const { return start_time; }
entity_inst_t get_owner() const { return owner; }
};
class OSDService {
public:
OSD *osd;
@ -334,12 +403,9 @@ public:
PerfCounters *&logger;
PerfCounters *&recoverystate_perf;
MonClient *&monc;
ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > &op_wq;
ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > &op_wq;
ThreadPool::BatchWorkQueue<PG> &peering_wq;
ThreadPool::WorkQueue<PG> &recovery_wq;
ThreadPool::WorkQueue<PG> &snap_trim_wq;
ThreadPool::WorkQueue<PG> &scrub_wq;
ThreadPool::WorkQueue<MOSDRepScrub> &rep_scrub_wq;
GenContextWQ recovery_gen_wq;
GenContextWQ op_gen_wq;
ClassHandler *&class_handler;
@ -720,11 +786,27 @@ public:
void queue_for_peering(PG *pg);
bool queue_for_recovery(PG *pg);
bool queue_for_snap_trim(PG *pg) {
return snap_trim_wq.queue(pg);
void queue_for_snap_trim(PG *pg) {
op_wq.queue(
make_pair(
pg,
PGQueueable(
PGSnapTrim(pg->get_osdmap()->get_epoch()),
cct->_conf->osd_snap_trim_cost,
cct->_conf->osd_snap_trim_priority,
ceph_clock_now(cct),
entity_inst_t())));
}
bool queue_for_scrub(PG *pg) {
return scrub_wq.queue(pg);
void queue_for_scrub(PG *pg) {
op_wq.queue(
make_pair(
pg,
PGQueueable(
PGScrub(pg->get_osdmap()->get_epoch()),
cct->_conf->osd_scrub_cost,
cct->_conf->osd_scrub_priority,
ceph_clock_now(cct),
entity_inst_t())));
}
// osd map cache (past osd maps)
@ -1471,124 +1553,139 @@ private:
// -- op queue --
class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> > {
friend class PGQueueable;
class ShardedOpWQ: public ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> > {
struct ShardData {
Mutex sdata_lock;
Cond sdata_cond;
Mutex sdata_op_ordering_lock;
map<PG*, list<OpRequestRef> > pg_for_processing;
PrioritizedQueue< pair<PGRef, OpRequestRef>, entity_inst_t> pqueue;
ShardData(string lock_name, string ordering_lock, uint64_t max_tok_per_prio, uint64_t min_cost):
sdata_lock(lock_name.c_str()),
sdata_op_ordering_lock(ordering_lock.c_str()),
pqueue(max_tok_per_prio, min_cost) {}
map<PG*, list<PGQueueable> > pg_for_processing;
PrioritizedQueue< pair<PGRef, PGQueueable>, entity_inst_t> pqueue;
ShardData(
string lock_name, string ordering_lock,
uint64_t max_tok_per_prio, uint64_t min_cost)
: sdata_lock(lock_name.c_str()),
sdata_op_ordering_lock(ordering_lock.c_str()),
pqueue(max_tok_per_prio, min_cost) {}
};
vector<ShardData*> shard_list;
OSD *osd;
uint32_t num_shards;
public:
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
osd(o), num_shards(pnum_shards) {
for(uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
char order_lock[32] = {0};
snprintf(order_lock, sizeof(order_lock), "%s.%d", "OSD:ShardedOpWQ:order:", i);
ShardData* one_shard = new ShardData(lock_name, order_lock,
osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
osd->cct->_conf->osd_op_pq_min_cost);
shard_list.push_back(one_shard);
}
public:
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
ShardedThreadPool::ShardedWQ < pair <PGRef, PGQueueable> >(ti, si, tp),
osd(o), num_shards(pnum_shards) {
for(uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s.%d", "OSD:ShardedOpWQ:", i);
char order_lock[32] = {0};
snprintf(
order_lock, sizeof(order_lock), "%s.%d",
"OSD:ShardedOpWQ:order:", i);
ShardData* one_shard = new ShardData(
lock_name, order_lock,
osd->cct->_conf->osd_op_pq_max_tokens_per_priority,
osd->cct->_conf->osd_op_pq_min_cost);
shard_list.push_back(one_shard);
}
~ShardedOpWQ() {
while(!shard_list.empty()) {
delete shard_list.back();
shard_list.pop_back();
}
}
~ShardedOpWQ() {
while(!shard_list.empty()) {
delete shard_list.back();
shard_list.pop_back();
}
}
void _process(uint32_t thread_index, heartbeat_handle_d *hb);
void _enqueue(pair <PGRef, OpRequestRef> item);
void _enqueue_front(pair <PGRef, OpRequestRef> item);
void return_waiting_threads() {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
assert (NULL != sdata);
sdata->sdata_lock.Lock();
sdata->sdata_cond.Signal();
sdata->sdata_lock.Unlock();
}
void _process(uint32_t thread_index, heartbeat_handle_d *hb);
void _enqueue(pair <PGRef, PGQueueable> item);
void _enqueue_front(pair <PGRef, PGQueueable> item);
void return_waiting_threads() {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
assert (NULL != sdata);
sdata->sdata_lock.Lock();
sdata->sdata_cond.Signal();
sdata->sdata_lock.Unlock();
}
}
void dump(Formatter *f) {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
assert (NULL != sdata);
sdata->sdata_op_ordering_lock.Lock();
f->open_object_section(lock_name);
sdata->pqueue.dump(f);
f->close_section();
sdata->sdata_op_ordering_lock.Unlock();
}
void dump(Formatter *f) {
for(uint32_t i = 0; i < num_shards; i++) {
ShardData* sdata = shard_list[i];
char lock_name[32] = {0};
snprintf(lock_name, sizeof(lock_name), "%s%d", "OSD:ShardedOpWQ:", i);
assert (NULL != sdata);
sdata->sdata_op_ordering_lock.Lock();
f->open_object_section(lock_name);
sdata->pqueue.dump(f);
f->close_section();
sdata->sdata_op_ordering_lock.Unlock();
}
}
struct Pred {
PG *pg;
Pred(PG *pg) : pg(pg) {}
bool operator()(const pair<PGRef, OpRequestRef> &op) {
return op.first == pg;
}
};
void dequeue(PG *pg, list<OpRequestRef> *dequeued = 0) {
ShardData* sdata = NULL;
assert(pg != NULL);
uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
sdata = shard_list[shard_index];
assert(sdata != NULL);
if (!dequeued) {
sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg));
sdata->pg_for_processing.erase(pg);
sdata->sdata_op_ordering_lock.Unlock();
} else {
list<pair<PGRef, OpRequestRef> > _dequeued;
sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
for (list<pair<PGRef, OpRequestRef> >::iterator i = _dequeued.begin();
i != _dequeued.end(); ++i) {
dequeued->push_back(i->second);
}
if (sdata->pg_for_processing.count(pg)) {
dequeued->splice(
dequeued->begin(),
sdata->pg_for_processing[pg]);
sdata->pg_for_processing.erase(pg);
}
sdata->sdata_op_ordering_lock.Unlock();
}
struct Pred {
PG *pg;
Pred(PG *pg) : pg(pg) {}
bool operator()(const pair<PGRef, PGQueueable> &op) {
return op.first == pg;
}
};
void dequeue(PG *pg) {
ShardData* sdata = NULL;
assert(pg != NULL);
uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
sdata = shard_list[shard_index];
assert(sdata != NULL);
sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg));
sdata->pg_for_processing.erase(pg);
sdata->sdata_op_ordering_lock.Unlock();
}
void dequeue_and_get_ops(PG *pg, list<OpRequestRef> *dequeued) {
ShardData* sdata = NULL;
assert(pg != NULL);
uint32_t shard_index = pg->get_pgid().ps()% shard_list.size();
sdata = shard_list[shard_index];
assert(sdata != NULL);
assert(dequeued);
list<pair<PGRef, PGQueueable> > _dequeued;
sdata->sdata_op_ordering_lock.Lock();
sdata->pqueue.remove_by_filter(Pred(pg), &_dequeued);
for (list<pair<PGRef, PGQueueable> >::iterator i = _dequeued.begin();
i != _dequeued.end(); ++i) {
boost::optional<OpRequestRef> mop = i->second.maybe_get_op();
if (mop)
dequeued->push_back(*mop);
}
map<PG *, list<PGQueueable> >::iterator iter =
sdata->pg_for_processing.find(pg);
if (iter != sdata->pg_for_processing.end()) {
for (list<PGQueueable>::reverse_iterator i = iter->second.rbegin();
i != iter->second.rend();
++i) {
boost::optional<OpRequestRef> mop = i->maybe_get_op();
if (mop)
dequeued->push_front(*mop);
}
sdata->pg_for_processing.erase(iter);
}
sdata->sdata_op_ordering_lock.Unlock();
}
bool is_shard_empty(uint32_t thread_index) {
uint32_t shard_index = thread_index % num_shards;
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
return sdata->pqueue.empty();
}
bool is_shard_empty(uint32_t thread_index) {
uint32_t shard_index = thread_index % num_shards;
ShardData* sdata = shard_list[shard_index];
assert(NULL != sdata);
Mutex::Locker l(sdata->sdata_op_ordering_lock);
return sdata->pqueue.empty();
}
} op_shardedwq;
@ -2089,153 +2186,12 @@ protected:
void check_replay_queue();
// -- snap trimming --
xlist<PG*> snap_trim_queue;
struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
SnapTrimWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, si, tp), osd(o) {}
bool _empty() {
return osd->snap_trim_queue.empty();
}
bool _enqueue(PG *pg) {
if (pg->snap_trim_item.is_on_list())
return false;
pg->get("SnapTrimWQ");
osd->snap_trim_queue.push_back(&pg->snap_trim_item);
return true;
}
void _dequeue(PG *pg) {
if (pg->snap_trim_item.remove_myself())
pg->put("SnapTrimWQ");
}
PG *_dequeue() {
if (osd->snap_trim_queue.empty())
return NULL;
PG *pg = osd->snap_trim_queue.front();
osd->snap_trim_queue.pop_front();
return pg;
}
void _process(PG *pg) {
pg->snap_trimmer();
pg->put("SnapTrimWQ");
}
void _clear() {
while (PG *pg = _dequeue()) {
pg->put("SnapTrimWQ");
}
}
} snap_trim_wq;
// -- scrubbing --
void sched_scrub();
bool scrub_random_backoff();
bool scrub_load_below_threshold();
bool scrub_time_permit(utime_t now);
xlist<PG*> scrub_queue;
struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
ScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, si, tp), osd(o) {}
bool _empty() {
return osd->scrub_queue.empty();
}
bool _enqueue(PG *pg) {
if (pg->scrub_item.is_on_list()) {
return false;
}
pg->get("ScrubWQ");
osd->scrub_queue.push_back(&pg->scrub_item);
return true;
}
void _dequeue(PG *pg) {
if (pg->scrub_item.remove_myself()) {
pg->put("ScrubWQ");
}
}
PG *_dequeue() {
if (osd->scrub_queue.empty())
return NULL;
PG *pg = osd->scrub_queue.front();
osd->scrub_queue.pop_front();
return pg;
}
void _process(
PG *pg,
ThreadPool::TPHandle &handle) {
pg->scrub(handle);
pg->put("ScrubWQ");
}
void _clear() {
while (!osd->scrub_queue.empty()) {
PG *pg = osd->scrub_queue.front();
osd->scrub_queue.pop_front();
pg->put("ScrubWQ");
}
}
} scrub_wq;
struct RepScrubWQ : public ThreadPool::WorkQueue<MOSDRepScrub> {
private:
OSD *osd;
list<MOSDRepScrub*> rep_scrub_queue;
public:
RepScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, si, tp), osd(o) {}
bool _empty() {
return rep_scrub_queue.empty();
}
bool _enqueue(MOSDRepScrub *msg) {
rep_scrub_queue.push_back(msg);
return true;
}
void _dequeue(MOSDRepScrub *msg) {
assert(0); // Not applicable for this wq
return;
}
MOSDRepScrub *_dequeue() {
if (rep_scrub_queue.empty())
return NULL;
MOSDRepScrub *msg = rep_scrub_queue.front();
rep_scrub_queue.pop_front();
return msg;
}
void _process(
MOSDRepScrub *msg,
ThreadPool::TPHandle &handle) {
PG *pg = NULL;
{
Mutex::Locker lock(osd->osd_lock);
if (osd->is_stopping() ||
!osd->_have_pg(msg->pgid)) {
msg->put();
return;
}
pg = osd->_lookup_lock_pg(msg->pgid);
}
assert(pg);
pg->replica_scrub(msg, handle);
msg->put();
pg->unlock();
}
void _clear() {
while (!rep_scrub_queue.empty()) {
MOSDRepScrub *msg = rep_scrub_queue.front();
rep_scrub_queue.pop_front();
msg->put();
}
}
} rep_scrub_wq;
// -- removing --
struct RemoveWQ :
public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
@ -2288,6 +2244,7 @@ protected:
case MSG_OSD_EC_WRITE_REPLY:
case MSG_OSD_EC_READ:
case MSG_OSD_EC_READ_REPLY:
case MSG_OSD_REP_SCRUB:
return true;
default:
return false;
@ -2341,7 +2298,6 @@ private:
static int write_meta(ObjectStore *store,
uuid_d& cluster_fsid, uuid_d& osd_fsid, int whoami);
void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(struct MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
void handle_op(OpRequestRef& op, OSDMapRef& osdmap);

View File

@ -195,7 +195,9 @@ PG::PG(OSDService *o, OSDMapRef curmap,
coll(p), pg_log(cct),
pgmeta_oid(p.make_pgmeta_oid()),
missing_loc(this),
recovery_item(this), scrub_item(this), snap_trim_item(this), stat_queue_item(this),
recovery_item(this), stat_queue_item(this),
snap_trim_queued(false),
scrub_queued(false),
recovery_ops_active(0),
role(0),
state(0),
@ -884,7 +886,6 @@ void PG::clear_primary_state()
scrub_after_recovery = false;
osd->recovery_wq.dequeue(this);
osd->snap_trim_wq.dequeue(this);
agent_clear();
}
@ -1928,10 +1929,27 @@ void PG::all_activated_and_committed()
void PG::queue_snap_trim()
{
if (osd->queue_for_snap_trim(this))
if (snap_trim_queued) {
dout(10) << "queue_snap_trim -- already queued" << dendl;
} else {
dout(10) << "queue_snap_trim -- queuing" << dendl;
else
dout(10) << "queue_snap_trim -- already trimming" << dendl;
snap_trim_queued = true;
osd->queue_for_snap_trim(this);
}
}
bool PG::requeue_scrub()
{
assert(is_locked());
if (scrub_queued) {
dout(10) << __func__ << ": already queued" << dendl;
return false;
} else {
dout(10) << __func__ << ": queueing" << dendl;
scrub_queued = true;
osd->queue_for_scrub(this);
return true;
}
}
bool PG::queue_scrub()
@ -1950,7 +1968,7 @@ bool PG::queue_scrub()
state_set(PG_STATE_REPAIR);
scrubber.must_repair = false;
}
osd->queue_for_scrub(this);
requeue_scrub();
return true;
}
@ -2110,7 +2128,9 @@ void PG::split_ops(PG *child, unsigned split_bits) {
assert(waiting_for_active.empty());
split_replay_queue(&replay_queue, &(child->replay_queue), match, split_bits);
snap_trim_queued = false;
osd->dequeue_pg(this, &waiting_for_peered);
OSD::split_list(
&waiting_for_peered, &(child->waiting_for_peered), match, split_bits);
{
@ -3367,7 +3387,7 @@ void PG::sub_op_scrub_map(OpRequestRef op)
scrubber.waiting_on_whom.erase(m->from);
if (scrubber.waiting_on == 0) {
osd->scrub_wq.queue(this);
requeue_scrub();
}
}
@ -3384,6 +3404,8 @@ void PG::_request_scrub_map(
spg_t(info.pgid.pgid, replica.shard), version,
get_osdmap()->get_epoch(),
start, end, deep, seed);
// default priority, we want the rep scrub processed prior to any recovery
// or client io messages (we are holding a lock!)
osd->send_message_osd_cluster(
replica.osd, repscrubop, get_osdmap()->get_epoch());
}
@ -3491,7 +3513,6 @@ void PG::schedule_backfill_full_retry()
void PG::clear_scrub_reserved()
{
osd->scrub_wq.dequeue(this);
scrubber.reserved_peers.clear();
scrubber.reserve_failed = false;
@ -3722,9 +3743,10 @@ void PG::repair_object(
* scrubmap of objects that are in the range [msg->start, msg->end).
*/
void PG::replica_scrub(
MOSDRepScrub *msg,
OpRequestRef op,
ThreadPool::TPHandle &handle)
{
MOSDRepScrub *msg = static_cast<MOSDRepScrub *>(op->get_req());
assert(!scrubber.active_rep_scrub);
dout(7) << "replica_scrub" << dendl;
@ -3740,15 +3762,14 @@ void PG::replica_scrub(
assert(msg->chunky);
if (last_update_applied < msg->scrub_to) {
dout(10) << "waiting for last_update_applied to catch up" << dendl;
scrubber.active_rep_scrub = msg;
scrubber.active_rep_scrub = op;
msg->get();
return;
}
if (active_pushes > 0) {
dout(10) << "waiting for active pushes to finish" << dendl;
scrubber.active_rep_scrub = msg;
msg->get();
scrubber.active_rep_scrub = op;
return;
}
@ -3782,13 +3803,8 @@ void PG::replica_scrub(
* scrub will be chunky if all OSDs in PG support chunky scrub
* scrub will fail if OSDs are too old.
*/
void PG::scrub(ThreadPool::TPHandle &handle)
void PG::scrub(epoch_t queued, ThreadPool::TPHandle &handle)
{
lock();
if (deleting) {
unlock();
return;
}
if (g_conf->osd_scrub_sleep > 0 &&
(scrubber.state == PG::Scrubber::NEW_CHUNK ||
scrubber.state == PG::Scrubber::INACTIVE)) {
@ -3800,6 +3816,11 @@ void PG::scrub(ThreadPool::TPHandle &handle)
lock();
dout(20) << __func__ << " slept for " << t << dendl;
}
if (deleting || pg_has_reset_since(queued)) {
return;
}
assert(scrub_queued);
scrub_queued = false;
if (!is_primary() || !is_active() || !is_clean() || !is_scrubbing()) {
dout(10) << "scrub -- not primary or active or not clean" << dendl;
@ -3807,7 +3828,6 @@ void PG::scrub(ThreadPool::TPHandle &handle)
state_clear(PG_STATE_REPAIR);
state_clear(PG_STATE_DEEP_SCRUB);
publish_stats_to_osd();
unlock();
return;
}
@ -3838,8 +3858,6 @@ void PG::scrub(ThreadPool::TPHandle &handle)
}
chunky_scrub(handle);
unlock();
}
/*
@ -4135,7 +4153,7 @@ void PG::chunky_scrub(ThreadPool::TPHandle &handle)
if (scrubber.end < hobject_t::get_max()) {
scrubber.state = PG::Scrubber::NEW_CHUNK;
osd->scrub_wq.queue(this);
requeue_scrub();
done = true;
} else {
scrubber.state = PG::Scrubber::FINISH;
@ -4815,6 +4833,8 @@ void PG::start_peering_interval(
peer_missing.clear();
peer_purged.clear();
actingbackfill.clear();
snap_trim_queued = false;
scrub_queued = false;
// reset primary state?
if (was_old_primary || is_primary()) {
@ -5131,6 +5151,8 @@ bool PG::can_discard_request(OpRequestRef& op)
return can_discard_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
case MSG_OSD_EC_READ_REPLY:
return can_discard_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
case MSG_OSD_REP_SCRUB:
return can_discard_replica_op<MOSDRepScrub, MSG_OSD_REP_SCRUB>(op);
case MSG_OSD_PG_SCAN:
return can_discard_scan(op);
@ -5212,6 +5234,11 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
case MSG_OSD_REP_SCRUB:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDRepScrub*>(op->get_req())->map_epoch);
}
assert(0);
return false;

View File

@ -220,11 +220,13 @@ protected:
return osdmap_ref;
}
public:
OSDMapRef get_osdmap() const {
assert(is_locked());
assert(osdmap_ref);
return osdmap_ref;
}
protected:
/** locking and reference counting.
* I destroy myself when the reference count hits zero.
@ -432,7 +434,10 @@ public:
/* You should not use these items without taking their respective queue locks
* (if they have one) */
xlist<PG*>::item recovery_item, scrub_item, snap_trim_item, stat_queue_item;
xlist<PG*>::item recovery_item, stat_queue_item;
bool snap_trim_queued;
bool scrub_queued;
int recovery_ops_active;
set<pg_shard_t> waiting_on_backfill;
#ifdef DEBUG_RECOVERY_OIDS
@ -1035,7 +1040,6 @@ public:
epoch_start(0),
active(false), queue_snap_trim(false),
waiting_on(0), shallow_errors(0), deep_errors(0), fixed(0),
active_rep_scrub(0),
must_scrub(false), must_deep_scrub(false), must_repair(false),
num_digest_updates_pending(0),
state(INACTIVE),
@ -1059,7 +1063,7 @@ public:
int fixed;
ScrubMap primary_scrubmap;
map<pg_shard_t, ScrubMap> received_maps;
MOSDRepScrub *active_rep_scrub;
OpRequestRef active_rep_scrub;
utime_t scrub_reg_stamp; // stamp we registered for
// flags to indicate explicitly requested scrubs (by admin)
@ -1145,8 +1149,7 @@ public:
waiting_on = 0;
waiting_on_whom.clear();
if (active_rep_scrub) {
active_rep_scrub->put();
active_rep_scrub = NULL;
active_rep_scrub = OpRequestRef();
}
received_maps.clear();
@ -1180,7 +1183,7 @@ public:
const hobject_t& soid, list<pair<ScrubMap::object, pg_shard_t> > *ok_peers,
pg_shard_t bad_peer);
void scrub(ThreadPool::TPHandle &handle);
void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void chunky_scrub(ThreadPool::TPHandle &handle);
void scrub_compare_maps();
void scrub_process_inconsistent();
@ -1230,7 +1233,7 @@ public:
void unreg_next_scrub();
void replica_scrub(
struct MOSDRepScrub *op,
OpRequestRef op,
ThreadPool::TPHandle &handle);
void sub_op_scrub_map(OpRequestRef op);
void sub_op_scrub_reserve(OpRequestRef op);
@ -2154,6 +2157,7 @@ public:
void log_weirdness();
void queue_snap_trim();
bool requeue_scrub();
bool queue_scrub();
/// share pg info after a pg is active
@ -2249,7 +2253,7 @@ public:
ThreadPool::TPHandle &handle
) = 0;
virtual void do_backfill(OpRequestRef op) = 0;
virtual void snap_trimmer() = 0;
virtual void snap_trimmer(epoch_t epoch_queued) = 0;
virtual int do_command(cmdmap_t cmdmap, ostream& ss,
bufferlist& idata, bufferlist& odata) = 0;

View File

@ -1315,6 +1315,10 @@ void ReplicatedPG::do_request(
do_backfill(op);
break;
case MSG_OSD_REP_SCRUB:
replica_scrub(op, handle);
break;
default:
assert(0 == "bad message type in do_request");
}
@ -2905,13 +2909,8 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
return repop;
}
void ReplicatedPG::snap_trimmer()
void ReplicatedPG::snap_trimmer(epoch_t queued)
{
lock();
if (deleting) {
unlock();
return;
}
if (g_conf->osd_snap_trim_sleep > 0) {
unlock();
utime_t t;
@ -2920,13 +2919,16 @@ void ReplicatedPG::snap_trimmer()
lock();
dout(20) << __func__ << " slept for " << t << dendl;
}
if (deleting || pg_has_reset_since(queued)) {
return;
}
snap_trim_queued = false;
dout(10) << "snap_trimmer entry" << dendl;
if (is_primary()) {
entity_inst_t nobody;
if (scrubber.active) {
dout(10) << " scrubbing, will requeue snap_trimmer after" << dendl;
scrubber.queue_snap_trim = true;
unlock();
return;
}
@ -2943,7 +2945,6 @@ void ReplicatedPG::snap_trimmer()
// replica collection trimming
snap_trimmer_machine.process_event(SnapTrim());
}
unlock();
return;
}
@ -7297,16 +7298,20 @@ void ReplicatedPG::op_applied(const eversion_t &applied_version)
if (is_primary()) {
if (scrubber.active) {
if (last_update_applied == scrubber.subset_last_update) {
osd->scrub_wq.queue(this);
requeue_scrub();
}
} else {
assert(scrubber.start == scrubber.end);
}
} else {
if (scrubber.active_rep_scrub) {
if (last_update_applied == scrubber.active_rep_scrub->scrub_to) {
osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
scrubber.active_rep_scrub = 0;
if (last_update_applied == static_cast<MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->scrub_to) {
osd->op_wq.queue(
make_pair(
this,
scrubber.active_rep_scrub));
scrubber.active_rep_scrub = OpRequestRef();
}
}
}
@ -8157,7 +8162,7 @@ void ReplicatedPG::kick_object_context_blocked(ObjectContextRef obc)
waiting_for_blocked_object.erase(p);
if (obc->requeue_scrub_on_unblock)
osd->queue_for_scrub(this);
requeue_scrub();
}
SnapSetContext *ReplicatedPG::create_snapset_context(const hobject_t& oid)
@ -8390,7 +8395,7 @@ void ReplicatedPG::_applied_recovered_object(ObjectContextRef obc)
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0
&& scrubber.is_chunky_scrub_active()) {
osd->scrub_wq.queue(this);
requeue_scrub();
}
unlock();
@ -8406,9 +8411,13 @@ void ReplicatedPG::_applied_recovered_object_replica()
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0 &&
scrubber.active_rep_scrub && scrubber.active_rep_scrub->chunky) {
osd->rep_scrub_wq.queue(scrubber.active_rep_scrub);
scrubber.active_rep_scrub = 0;
scrubber.active_rep_scrub && static_cast<MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->chunky) {
osd->op_wq.queue(
make_pair(
this,
scrubber.active_rep_scrub));
scrubber.active_rep_scrub = OpRequestRef();
}
unlock();
@ -8770,7 +8779,6 @@ void ReplicatedPG::on_shutdown()
// remove from queues
osd->recovery_wq.dequeue(this);
osd->snap_trim_wq.dequeue(this);
osd->pg_stat_queue_dequeue(this);
osd->dequeue_pg(this, 0);
osd->peering_wq.dequeue(this);
@ -11219,7 +11227,7 @@ void ReplicatedPG::_scrub_digest_updated()
{
dout(20) << __func__ << dendl;
if (--scrubber.num_digest_updates_pending == 0) {
osd->scrub_wq.queue(this);
requeue_scrub();
}
}

View File

@ -1415,7 +1415,7 @@ public:
void do_backfill(OpRequestRef op);
RepGather *trim_object(const hobject_t &coid);
void snap_trimmer();
void snap_trimmer(epoch_t e);
int do_osd_ops(OpContext *ctx, vector<OSDOp>& ops);
int _get_tmap(OpContext *ctx, bufferlist *header, bufferlist *vals);