OSD,PG: Move Op,SubOp queueing into PG

PG now handles delaying/discarding messages since pg map epoch may not
be the same as the OSD map.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2012-04-23 17:40:58 -07:00 committed by Samuel Just
parent 33bcbb33c9
commit ddef446dc1
4 changed files with 171 additions and 156 deletions

View File

@ -4567,20 +4567,6 @@ void OSD::handle_pg_scan(OpRequestRef op)
pg->put();
}
bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan *)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
assert(pg->is_locked());
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old scan, ignoring" << dendl;
return false;
}
return true;
}
void OSD::handle_pg_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
@ -4612,19 +4598,6 @@ void OSD::handle_pg_backfill(OpRequestRef op)
pg->put();
}
bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
assert(pg->is_locked());
if (m->query_epoch < pg->info.history.same_interval_since) {
dout(10) << *pg << " got old backfill, ignoring" << dendl;
return false;
}
return true;
}
/** PGQuery
* from primary to replica | stray
@ -5247,8 +5220,23 @@ void OSD::handle_op(OpRequestRef op)
handle_misdirected_op(NULL, op);
}
return;
} else if (m->may_write() &&
(!pg->is_primary() ||
!pg->same_for_modify_since(m->get_map_epoch()))) {
handle_misdirected_op(pg, op);
pg->unlock();
return;
} else if (m->may_read() &&
!pg->same_for_read_since(m->get_map_epoch())) {
handle_misdirected_op(pg, op);
pg->unlock();
return;
} else if (!op_has_sufficient_caps(pg, m)) {
pg->unlock();
return;
}
pg->get();
enqueue_op(pg, op);
pg->unlock();
@ -5363,94 +5351,11 @@ bool OSD::op_is_discardable(MOSDOp *op)
// want to do what we can to apply it.
if (!op->get_connection()->is_connected() &&
op->get_version().version == 0) {
dout(10) << " sender " << op->get_connection()->get_peer_addr()
<< " not connected, dropping " << *op << dendl;
return true;
}
return false;
}
/*
* Determine if we can queue the op right now; if not this deals with it.
* If it's not queueable, we deal with it in one of a few ways:
* dropping the request, putting it into a wait list for later, or
* telling the sender that the request was misdirected.
*
* @return true if the op is queueable; false otherwise.
*/
bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
{
assert(pg->is_locked());
MOSDOp *m = (MOSDOp*)op->request;
assert(m->get_header().type == CEPH_MSG_OSD_OP);
if (!op_has_sufficient_caps(pg, m)) {
reply_op_error(op, -EPERM);
return false;
}
if (op_is_discardable(m)) {
return false;
}
// misdirected?
if (m->may_write()) {
if (!pg->is_primary() ||
!pg->same_for_modify_since(m->get_map_epoch())) {
handle_misdirected_op(pg, op);
return false;
}
} else {
if (!pg->same_for_read_since(m->get_map_epoch())) {
handle_misdirected_op(pg, op);
return false;
}
}
if (!pg->is_active()) {
dout(7) << *pg << " not active (yet)" << dendl;
pg->waiting_for_active.push_back(op);
op->mark_delayed();
return false;
}
if (pg->is_replay()) {
if (m->get_version().version > 0) {
dout(7) << *pg << " queueing replay at " << m->get_version()
<< " for " << *m << dendl;
pg->replay_queue[m->get_version()] = op;
op->mark_delayed();
} else {
dout(7) << *pg << " waiting until after replay for " << *m << dendl;
pg->waiting_for_active.push_back(op);
}
return false;
}
return true;
}
/*
* discard operation, or return true. no side-effects.
*/
bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
assert(pg->is_locked());
// same pg?
// if pg changes _at all_, we reset and repeer!
if (m->map_epoch < pg->info.history.same_interval_since) {
dout(10) << "handle_sub_op pg changed " << pg->info.history
<< " after " << m->map_epoch
<< ", dropping" << dendl;
return false;
}
return true;
}
/*
* enqueue called with osd_lock held
*/
@ -5459,42 +5364,7 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op)
dout(15) << *pg << " enqueue_op " << op->request << " "
<< *(op->request) << dendl;
assert(pg->is_locked());
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
if (!op_is_queueable(pg, op))
return;
break;
case MSG_OSD_SUBOP:
if (!subop_is_queueable(pg, op))
return;
break;
case MSG_OSD_SUBOPREPLY:
// don't care.
break;
case MSG_OSD_PG_SCAN:
if (!scan_is_queueable(pg, op))
return;
break;
case MSG_OSD_PG_BACKFILL:
if (!backfill_is_queueable(pg, op))
return;
break;
default:
assert(0 == "enqueued an illegal message type");
}
// add to pg's op_queue
pg->op_queue.push_back(op);
op_wq.queue(pg);
op->mark_queued_for_pg();
pg->queue_op(op);
}
bool OSD::OpWQ::_enqueue(PG *pg)

View File

@ -677,10 +677,8 @@ protected:
void handle_pg_trim(OpRequestRef op);
void handle_pg_scan(OpRequestRef op);
bool scan_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_backfill(OpRequestRef op);
bool backfill_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_remove(OpRequestRef op);
void queue_pg_for_deletion(PG *pg);
@ -1136,16 +1134,11 @@ public:
void handle_sub_op(OpRequestRef op);
void handle_sub_op_reply(OpRequestRef op);
private:
static bool op_is_discardable(class MOSDOp *m);
/// check if we can throw out op from a disconnected client
bool op_is_discardable(class MOSDOp *m);
/// check if op has sufficient caps
bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
/// check if op should be (re)queued for processing
bool op_is_queueable(PG *pg, OpRequestRef op);
/// check if subop should be (re)queued for processing
bool subop_is_queueable(PG *pg, OpRequestRef op);
public:
void force_remount();

View File

@ -25,6 +25,8 @@
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGScan.h"
#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
@ -1461,11 +1463,22 @@ void PG::do_request(OpRequestRef op)
{
// do any pending flush
do_pending_flush();
if (can_discard_request(op)) {
return;
} else if (must_delay_request(op)) {
op_waiters.push_back(op);
return;
} else if (!is_active()) {
waiting_for_active.push_back(op);
return;
} else if (is_replay()) {
waiting_for_active.push_back(op);
return;
}
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
if (!osd->op_is_discardable((MOSDOp*)op->request))
do_op(op); // do it now
do_op(op); // do it now
break;
case MSG_OSD_SUBOP:
@ -3882,6 +3895,132 @@ ostream& operator<<(ostream& out, const PG& pg)
return out;
}
bool PG::can_discard_op(OpRequestRef op)
{
MOSDOp *m = (MOSDOp*)op->request;
if (OSD::op_is_discardable(m)) {
return true;
} else if (m->may_write() &&
(!is_primary() ||
!same_for_modify_since(m->get_map_epoch()))) {
return true;
} else if (m->may_read() &&
!same_for_read_since(m->get_map_epoch())) {
return true;
} else if (is_replay()) {
if (m->get_version().version > 0) {
dout(7) << " queueing replay at " << m->get_version()
<< " for " << *m << dendl;
replay_queue[m->get_version()] = op;
op->mark_delayed();
return true;
}
}
return false;
}
bool PG::can_discard_subop(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp *)op->request;
assert(m->get_header().type == MSG_OSD_SUBOP);
// same pg?
// if pg changes _at all_, we reset and repeer!
if (old_peering_msg(m->map_epoch, m->map_epoch)) {
dout(10) << "handle_sub_op pg changed " << info.history
<< " after " << m->map_epoch
<< ", dropping" << dendl;
return true;
}
return false;
}
bool PG::can_discard_scan(OpRequestRef op)
{
MOSDPGScan *m = (MOSDPGScan *)op->request;
assert(m->get_header().type == MSG_OSD_PG_SCAN);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
dout(10) << " got old scan, ignoring" << dendl;
return true;
}
return false;
}
bool PG::can_discard_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
dout(10) << " got old backfill, ignoring" << dendl;
return true;
}
return false;
}
bool PG::can_discard_request(OpRequestRef op)
{
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
return can_discard_op(op);
case MSG_OSD_SUBOP:
return can_discard_subop(op);
case MSG_OSD_SUBOPREPLY:
return false;
case MSG_OSD_PG_SCAN:
return can_discard_scan(op);
case MSG_OSD_PG_BACKFILL:
return can_discard_backfill(op);
}
return true;
}
bool PG::must_delay_request(OpRequestRef op)
{
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
return !require_same_or_newer_map(
static_cast<MOSDOp*>(op->request)->get_map_epoch());
case MSG_OSD_SUBOP:
return !require_same_or_newer_map(
static_cast<MOSDSubOp*>(op->request)->map_epoch);
case MSG_OSD_SUBOPREPLY:
return !require_same_or_newer_map(
static_cast<MOSDSubOpReply*>(op->request)->map_epoch);
case MSG_OSD_PG_SCAN:
return !require_same_or_newer_map(
static_cast<MOSDPGScan*>(op->request)->map_epoch);
case MSG_OSD_PG_BACKFILL:
return !require_same_or_newer_map(
static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
}
assert(0);
return false;
}
void PG::queue_op(OpRequestRef op)
{
if (can_discard_op(op))
return;
// TODO: deal with osd queueing
op_queue.push_back(op);
}
void PG::take_waiters()
{
list<OpRequestRef> ls;
ls.swap(op_waiters);
ls.splice(ls.end(), op_queue);
}
void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
{
if (old_peering_evt(evt))
@ -4120,6 +4259,7 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&)
}
pg->update_heartbeat_peers();
pg->take_waiters();
return transit< Started >();
}

View File

@ -1363,12 +1363,24 @@ public:
pair<int, pg_info_t> &notify_info);
void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch);
bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
// OpRequest queueing
bool can_discard_op(OpRequestRef op);
bool can_discard_scan(OpRequestRef op);
bool can_discard_subop(OpRequestRef op);
bool can_discard_backfill(OpRequestRef op);
bool can_discard_request(OpRequestRef op);
bool must_delay_request(OpRequestRef op);
void queue_op(OpRequestRef op);
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
}
// recovery bits
void take_waiters();
void queue_peering_event(CephPeeringEvtRef evt);
void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx);
void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,