Objecter: implement mon and osd operation timeouts

This captures almost all operations from librados other than mon_commands().

Get the values for the timeouts from the Objecter constructor, so only
librados uses them.

Add C_Cancel_*_Op, finish_*_op(), and *_op_cancel() for each type of
operation, to mirror those for Op. Create a callback and schedule it
in the existing timer thread if the timeouts are specified.

Fixes: #6507
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>
This commit is contained in:
Josh Durgin 2014-02-03 17:59:21 -08:00
parent 1829d2c9fd
commit 3e1f7bbb42
10 changed files with 254 additions and 39 deletions

View File

@ -185,7 +185,8 @@ Client::Client(Messenger *m, MonClient *mc)
// osd interfaces
osdmap = new OSDMap; // initially blank.. see mount()
mdsmap = new MDSMap;
objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer);
objecter = new Objecter(cct, messenger, monclient, osdmap, client_lock, timer,
0, 0);
objecter->set_client_incarnation(0); // client always 0, for now.
writeback_handler = new ObjecterWriteback(objecter);
objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,

View File

@ -220,7 +220,9 @@ int librados::RadosClient::connect()
ldout(cct, 1) << "starting objecter" << dendl;
err = -ENOMEM;
objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer);
objecter = new Objecter(cct, messenger, &monclient, &osdmap, lock, timer,
cct->_conf->rados_mon_op_timeout,
cct->_conf->rados_osd_op_timeout);
if (!objecter)
goto out;
objecter->set_balanced_budget();

View File

@ -51,7 +51,8 @@ void Dumper::init(int rank)
inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
unsigned pg_pool = MDS_METADATA_POOL;
osdmap = new OSDMap();
objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer);
objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer,
0, 0);
journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC,
objecter, 0, 0, &timer);

View File

@ -117,7 +117,8 @@ MDS::MDS(const std::string &n, Messenger *m, MonClient *mc) :
mdsmap = new MDSMap;
osdmap = new OSDMap;
objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer);
objecter = new Objecter(m->cct, messenger, monc, osdmap, mds_lock, timer,
0, 0);
objecter->unset_honor_osdmap_full();
filer = new Filer(objecter);

View File

@ -59,7 +59,8 @@ void Resetter::init(int rank)
inodeno_t ino = MDS_INO_LOG_OFFSET + rank;
unsigned pg_pool = MDS_METADATA_POOL;
osdmap = new OSDMap();
objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer);
objecter = new Objecter(g_ceph_context, messenger, monc, osdmap, lock, timer,
0, 0);
journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC,
objecter, 0, 0, &timer);

View File

@ -194,7 +194,7 @@ OSDService::OSDService(OSD *osd) :
objecter_lock("OSD::objecter_lock"),
objecter_timer(osd->client_messenger->cct, objecter_lock),
objecter(new Objecter(osd->client_messenger->cct, osd->objecter_messenger, osd->monc, &objecter_osdmap,
objecter_lock, objecter_timer)),
objecter_lock, objecter_timer, 0, 0)),
objecter_finisher(osd->client_messenger->cct),
objecter_dispatcher(this),
watch_lock("OSD::watch_lock"),

View File

@ -5605,9 +5605,9 @@ void ReplicatedPG::cancel_copy(CopyOpRef cop, bool requeue)
// cancel objecter op, if we can
if (cop->objecter_tid) {
Mutex::Locker l(osd->objecter_lock);
osd->objecter->op_cancel(cop->objecter_tid);
osd->objecter->op_cancel(cop->objecter_tid, -ECANCELED);
if (cop->objecter_tid2) {
osd->objecter->op_cancel(cop->objecter_tid2);
osd->objecter->op_cancel(cop->objecter_tid2, -ECANCELED);
}
}
@ -5918,7 +5918,7 @@ void ReplicatedPG::cancel_flush(FlushOpRef fop, bool requeue)
<< fop->objecter_tid << dendl;
if (fop->objecter_tid) {
Mutex::Locker l(osd->objecter_lock);
osd->objecter->op_cancel(fop->objecter_tid);
osd->objecter->op_cancel(fop->objecter_tid, -ECANCELED);
}
if (fop->ctx->op && requeue) {
requeue_op(fop->ctx->op);

View File

@ -1175,7 +1175,7 @@ void Objecter::resend_mon_ops()
}
for (map<tid_t,PoolOp*>::iterator p = pool_ops.begin(); p!=pool_ops.end(); ++p) {
pool_op_submit(p->second);
_pool_op_submit(p->second);
logger->inc(l_osdc_poolop_resend);
}
@ -1205,6 +1205,19 @@ void Objecter::resend_mon_ops()
// read | write ---------------------------
class C_CancelOp : public Context
{
Objecter::Op *op;
Objecter *objecter;
public:
C_CancelOp(Objecter::Op *op, Objecter *objecter) : op(op),
objecter(objecter) {}
void finish(int r) {
// note that objecter lock == timer lock, and is already held
objecter->op_cancel(op->tid, -ETIMEDOUT);
}
};
tid_t Objecter::op_submit(Op *op)
{
assert(client_lock.is_locked());
@ -1214,6 +1227,11 @@ tid_t Objecter::op_submit(Op *op)
assert(op->ops.size() == op->out_rval.size());
assert(op->ops.size() == op->out_handler.size());
if (osd_timeout > 0) {
op->ontimeout = new C_CancelOp(op, this);
timer.add_event_after(osd_timeout, op->ontimeout);
}
// throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks.
take_op_budget(op);
@ -1330,7 +1348,7 @@ tid_t Objecter::_op_submit(Op *op)
return op->tid;
}
int Objecter::op_cancel(tid_t tid)
int Objecter::op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
@ -1344,11 +1362,11 @@ int Objecter::op_cancel(tid_t tid)
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
Op *op = p->second;
if (op->onack) {
op->onack->complete(-ECANCELED);
op->onack->complete(r);
op->onack = NULL;
}
if (op->oncommit) {
op->oncommit->complete(-ECANCELED);
op->oncommit->complete(r);
op->oncommit = NULL;
}
op_cancel_map_check(op);
@ -1563,6 +1581,9 @@ void Objecter::finish_op(Op *op)
logger->set(l_osdc_op_active, ops.size());
assert(check_latest_map_ops.find(op->tid) == check_latest_map_ops.end());
if (op->ontimeout)
timer.cancel_event(op->ontimeout);
delete op;
}
@ -2108,7 +2129,29 @@ int Objecter::change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid)
return 0;
}
class C_CancelPoolOp : public Context
{
tid_t tid;
Objecter *objecter;
public:
C_CancelPoolOp(tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
// note that objecter lock == timer lock, and is already held
objecter->pool_op_cancel(tid, -ETIMEDOUT);
}
};
void Objecter::pool_op_submit(PoolOp *op)
{
if (mon_timeout > 0) {
op->ontimeout = new C_CancelPoolOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
_pool_op_submit(op);
}
void Objecter::_pool_op_submit(PoolOp *op)
{
ldout(cct, 10) << "pool_op_submit " << op->tid << dendl;
MPoolOp *m = new MPoolOp(monc->get_fsid(), op->tid, op->pool,
@ -2150,11 +2193,7 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
op->onfinish->complete(m->replyCode);
}
op->onfinish = NULL;
delete op;
pool_ops.erase(tid);
logger->set(l_osdc_poolop_active, pool_ops.size());
finish_pool_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
@ -2162,9 +2201,52 @@ void Objecter::handle_pool_op_reply(MPoolOpReply *m)
m->put();
}
int Objecter::pool_op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
map<tid_t, PoolOp*>::iterator it = pool_ops.find(tid);
if (it == pool_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
PoolOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
finish_pool_op(op);
return 0;
}
void Objecter::finish_pool_op(PoolOp *op)
{
pool_ops.erase(op->tid);
logger->set(l_osdc_poolop_active, pool_ops.size());
if (op->ontimeout)
timer.cancel_event(op->ontimeout);
delete op;
}
// pool stats
class C_CancelPoolStatOp : public Context
{
tid_t tid;
Objecter *objecter;
public:
C_CancelPoolStatOp(tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
// note that objecter lock == timer lock, and is already held
objecter->pool_stat_op_cancel(tid, -ETIMEDOUT);
}
};
void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
Context *onfinish)
{
@ -2175,6 +2257,11 @@ void Objecter::get_pool_stats(list<string>& pools, map<string,pool_stat_t> *resu
op->pools = pools;
op->pool_stats = result;
op->onfinish = onfinish;
op->ontimeout = NULL;
if (mon_timeout > 0) {
op->ontimeout = new C_CancelPoolStatOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
poolstat_ops[op->tid] = op;
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
@ -2205,11 +2292,7 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
if (m->version > last_seen_pgmap_version)
last_seen_pgmap_version = m->version;
op->onfinish->complete(0);
poolstat_ops.erase(tid);
delete op;
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
finish_pool_stat_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
@ -2217,6 +2300,49 @@ void Objecter::handle_get_pool_stats_reply(MGetPoolStatsReply *m)
m->put();
}
int Objecter::pool_stat_op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
map<tid_t, PoolStatOp*>::iterator it = poolstat_ops.find(tid);
if (it == poolstat_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
PoolStatOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
finish_pool_stat_op(op);
return 0;
}
void Objecter::finish_pool_stat_op(PoolStatOp *op)
{
poolstat_ops.erase(op->tid);
logger->set(l_osdc_poolstat_active, poolstat_ops.size());
if (op->ontimeout)
timer.cancel_event(op->ontimeout);
delete op;
}
class C_CancelStatfsOp : public Context
{
tid_t tid;
Objecter *objecter;
public:
C_CancelStatfsOp(tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
// note that objecter lock == timer lock, and is already held
objecter->statfs_op_cancel(tid, -ETIMEDOUT);
}
};
void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
{
@ -2226,6 +2352,11 @@ void Objecter::get_fs_stats(ceph_statfs& result, Context *onfinish)
op->tid = ++last_tid;
op->stats = &result;
op->onfinish = onfinish;
op->ontimeout = NULL;
if (mon_timeout > 0) {
op->ontimeout = new C_CancelStatfsOp(op->tid, this);
timer.add_event_after(mon_timeout, op->ontimeout);
}
statfs_ops[op->tid] = op;
logger->set(l_osdc_statfs_active, statfs_ops.size());
@ -2256,11 +2387,7 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
if (m->h.version > last_seen_pgmap_version)
last_seen_pgmap_version = m->h.version;
op->onfinish->complete(0);
statfs_ops.erase(tid);
delete op;
logger->set(l_osdc_statfs_active, statfs_ops.size());
finish_statfs_op(op);
} else {
ldout(cct, 10) << "unknown request " << tid << dendl;
}
@ -2268,6 +2395,36 @@ void Objecter::handle_fs_stats_reply(MStatfsReply *m)
m->put();
}
int Objecter::statfs_op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
map<tid_t, StatfsOp*>::iterator it = statfs_ops.find(tid);
if (it == statfs_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
StatfsOp *op = it->second;
if (op->onfinish)
op->onfinish->complete(r);
finish_statfs_op(op);
return 0;
}
void Objecter::finish_statfs_op(StatfsOp *op)
{
statfs_ops.erase(op->tid);
logger->set(l_osdc_statfs_active, statfs_ops.size());
if (op->ontimeout)
timer.cancel_event(op->ontimeout);
delete op;
}
// scatter/gather
@ -2560,11 +2717,28 @@ void Objecter::handle_command_reply(MCommandReply *m)
m->put();
}
class C_CancelCommandOp : public Context
{
tid_t tid;
Objecter *objecter;
public:
C_CancelCommandOp(tid_t tid, Objecter *objecter) : tid(tid),
objecter(objecter) {}
void finish(int r) {
// note that objecter lock == timer lock, and is already held
objecter->command_op_cancel(tid, -ETIMEDOUT);
}
};
int Objecter::_submit_command(CommandOp *c, tid_t *ptid)
{
tid_t tid = ++last_tid;
ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
c->tid = tid;
if (osd_timeout > 0) {
c->ontimeout = new C_CancelCommandOp(tid, this);
timer.add_event_after(osd_timeout, c->ontimeout);
}
command_ops[tid] = c;
num_homeless_ops++;
(void)recalc_command_target(c);
@ -2638,6 +2812,25 @@ void Objecter::_send_command(CommandOp *c)
logger->inc(l_osdc_command_send);
}
int Objecter::command_op_cancel(tid_t tid, int r)
{
assert(client_lock.is_locked());
assert(initialized);
map<tid_t, CommandOp*>::iterator it = command_ops.find(tid);
if (it == command_ops.end()) {
ldout(cct, 10) << __func__ << " tid " << tid << " dne" << dendl;
return -ENOENT;
}
ldout(cct, 10) << __func__ << " tid " << tid << dendl;
CommandOp *op = it->second;
command_cancel_map_check(op);
_finish_command(op, -ETIMEDOUT, "");
return 0;
}
void Objecter::_finish_command(CommandOp *c, int r, string rs)
{
ldout(cct, 10) << "_finish_command " << c->tid << " = " << r << " " << rs << dendl;
@ -2647,6 +2840,8 @@ void Objecter::_finish_command(CommandOp *c, int r, string rs)
if (c->onfinish)
c->onfinish->complete(r);
command_ops.erase(c->tid);
if (c->ontimeout)
timer.cancel_event(c->ontimeout);
c->put();
logger->set(l_osdc_command_active, command_ops.size());

View File

@ -1041,7 +1041,7 @@ public:
vector<int*> out_rval;
int flags, priority;
Context *onack, *oncommit;
Context *onack, *oncommit, *ontimeout;
tid_t tid;
eversion_t replay_version; // for op replay
@ -1070,6 +1070,7 @@ public:
snapid(CEPH_NOSNAP),
outbl(NULL),
flags(f), priority(0), onack(ac), oncommit(co),
ontimeout(NULL),
tid(0), attempts(0),
paused(false), objver(ov), reply_epoch(NULL),
map_dne_bound(0),
@ -1213,7 +1214,7 @@ public:
list<string> pools;
map<string,pool_stat_t> *pool_stats;
Context *onfinish;
Context *onfinish, *ontimeout;
utime_t last_submit;
};
@ -1221,7 +1222,7 @@ public:
struct StatfsOp {
tid_t tid;
struct ceph_statfs *stats;
Context *onfinish;
Context *onfinish, *ontimeout;
utime_t last_submit;
};
@ -1230,7 +1231,7 @@ public:
tid_t tid;
int64_t pool;
string name;
Context *onfinish;
Context *onfinish, *ontimeout;
int pool_op;
uint64_t auid;
__u8 crush_rule;
@ -1238,7 +1239,7 @@ public:
bufferlist *blp;
utime_t last_submit;
PoolOp() : tid(0), pool(0), onfinish(0), pool_op(0),
PoolOp() : tid(0), pool(0), onfinish(NULL), ontimeout(NULL), pool_op(0),
auid(0), crush_rule(0), snapid(0), blp(NULL) {}
};
@ -1256,7 +1257,7 @@ public:
epoch_t map_dne_bound;
int map_check_error; // error to return if map check fails
const char *map_check_error_str;
Context *onfinish;
Context *onfinish, *ontimeout;
utime_t last_submit;
CommandOp()
@ -1265,12 +1266,13 @@ public:
map_dne_bound(0),
map_check_error(0),
map_check_error_str(NULL),
onfinish(NULL) {}
onfinish(NULL), ontimeout(NULL) {}
};
int _submit_command(CommandOp *c, tid_t *ptid);
int recalc_command_target(CommandOp *c);
void _send_command(CommandOp *c);
int command_op_cancel(tid_t tid, int r);
void _finish_command(CommandOp *c, int r, string rs);
void handle_command_reply(MCommandReply *m);
@ -1388,6 +1390,8 @@ public:
map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
double mon_timeout, osd_timeout;
void send_op(Op *op);
void cancel_linger_op(Op *op);
void finish_op(Op *op);
@ -1456,7 +1460,8 @@ public:
public:
Objecter(CephContext *cct_, Messenger *m, MonClient *mc,
OSDMap *om, Mutex& l, SafeTimer& t) :
OSDMap *om, Mutex& l, SafeTimer& t, double mon_timeout,
double osd_timeout) :
messenger(m), monc(mc), osdmap(om), cct(cct_),
initialized(false),
last_tid(0), client_inc(-1), max_linger_id(0),
@ -1469,6 +1474,8 @@ public:
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
num_homeless_ops(0),
mon_timeout(mon_timeout),
osd_timeout(osd_timeout),
op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
op_throttle_ops(cct, "objecter_ops", cct->_conf->objecter_inflight_ops)
{ }
@ -1550,8 +1557,8 @@ public:
/** Clear the passed flags from the global op flag set */
void clear_global_op_flag(int flags) { global_op_flags &= ~flags; }
/// cancel an in-progress request
int op_cancel(tid_t tid);
/// cancel an in-progress request with the given return code
int op_cancel(tid_t tid, int r);
// commands
int osd_command(int osd, vector<string>& cmd,
@ -1985,6 +1992,7 @@ public:
// pool ops
private:
void pool_op_submit(PoolOp *op);
void _pool_op_submit(PoolOp *op);
public:
int create_pool_snap(int64_t pool, string& snapName, Context *onfinish);
int allocate_selfmanaged_snap(int64_t pool, snapid_t *psnapid, Context *onfinish);
@ -1997,6 +2005,8 @@ public:
int change_pool_auid(int64_t pool, Context *onfinish, uint64_t auid);
void handle_pool_op_reply(MPoolOpReply *m);
int pool_op_cancel(tid_t tid, int r);
void finish_pool_op(PoolOp *op);
// --------------------------
// pool stats
@ -2006,6 +2016,8 @@ public:
void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
void get_pool_stats(list<string>& pools, map<string,pool_stat_t> *result,
Context *onfinish);
int pool_stat_op_cancel(tid_t tid, int r);
void finish_pool_stat_op(PoolStatOp *op);
// ---------------------------
// df stats
@ -2014,6 +2026,8 @@ private:
public:
void handle_fs_stats_reply(MStatfsReply *m);
void get_fs_stats(struct ceph_statfs& result, Context *onfinish);
int statfs_op_cancel(tid_t tid, int r);
void finish_statfs_op(StatfsOp *op);
// ---------------------------
// some scatter/gather hackery

View File

@ -257,7 +257,7 @@ class ClientStub : public TestStub
<< messenger->get_myaddr() << dendl;
objecter.reset(new Objecter(cct, messenger.get(), &monc, &osdmap,
lock, timer));
lock, timer, 0, 0));
assert(objecter.get() != NULL);
objecter->set_balanced_budget();