OSD: use the async workqueue to send OSDMap updates on dropped ops

Check whether we actually want to send a map in-line, and if we do, create
a GenContext which does so and put that in the op_gen_wq.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2014-03-24 20:42:19 -07:00
parent 6c98e36f89
commit ebdc097047

View File

@ -7476,6 +7476,37 @@ void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
reply_op_error(op, -ENXIO);
}
class C_SendMap : public GenContext<ThreadPool::TPHandle&> {
OSD *osd;
Message *m;
ConnectionRef con;
OSDMapRef osdmap;
epoch_t map_epoch;
public:
C_SendMap(OSD *osd, Message *m, const ConnectionRef& con,
OSDMapRef& osdmap, epoch_t map_epoch) :
osd(osd), m(m), con(con), osdmap(osdmap), map_epoch(map_epoch) {}
void finish(ThreadPool::TPHandle& tp) {
OSD::Session *session = static_cast<OSD::Session *>(
con->get_priv());
if (session) {
session->sent_epoch_lock.Lock();
}
osd->service.share_map_incoming(
m->get_source(),
con.get(),
map_epoch,
osdmap,
session ? &session->last_sent_epoch : NULL);
if (session) {
session->sent_epoch_lock.Unlock();
session->put();
}
}
};
struct send_map_on_destruct {
OSD *osd;
Message *m;
@ -7491,21 +7522,7 @@ struct send_map_on_destruct {
~send_map_on_destruct() {
if (!should_send)
return;
OSD::Session *client_session = static_cast<OSD::Session *>(
con->get_priv());
if (client_session) {
client_session->sent_epoch_lock.Lock();
}
osd->service.share_map_incoming(
m->get_source(),
con.get(),
map_epoch,
osdmap,
client_session ? &client_session->last_sent_epoch : NULL);
if (client_session) {
client_session->sent_epoch_lock.Unlock();
client_session->put();
}
osd->service.op_gen_wq.queue(new C_SendMap(osd, m, con, osdmap, map_epoch));
}
};
@ -7538,6 +7555,18 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
// set up a map send if the Op gets blocked for some reason
send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
Session *client_session =
static_cast<Session*>(m->get_connection()->get_priv());
if (client_session) {
client_session->sent_epoch_lock.Lock();
}
share_map.should_send = service.should_share_map(
m->get_source(), m->get_connection().get(), m->get_map_epoch(),
osdmap, &client_session->last_sent_epoch);
if (client_session) {
client_session->sent_epoch_lock.Unlock();
client_session->put();
}
if (op->rmw_flags == 0) {
int r = init_op_flags(op);
@ -7629,7 +7658,7 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
PG *pg = get_pg_or_queue_for_pg(pgid, op);
if (pg) {
op->send_map_update = true;
op->send_map_update = share_map.should_send;
op->sent_epoch = m->get_map_epoch();
enqueue_op(pg, op);
share_map.should_send = false;