objecter: create a new op for resending lingering requests

This commit is contained in:
Yehuda Sadeh 2010-12-08 16:02:52 -08:00
parent 5cba1e6333
commit cc78bbf16e
3 changed files with 140 additions and 133 deletions

View File

@ -1568,7 +1568,6 @@ void RadosClient::watch_notify(MWatchNotify *m)
int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx) int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_t *cookie, librados::Rados::WatchCtx *ctx)
{ {
utime_t ut = g_clock.now(); utime_t ut = g_clock.now();
bufferlist inbl, outbl;
ObjectOperation *rd = new ObjectOperation(); ObjectOperation *rd = new ObjectOperation();
if (!rd) if (!rd)
@ -1593,8 +1592,8 @@ int RadosClient::watch(PoolCtx& pool, const object_t& oid, uint64_t ver, uint64_
pool.assert_ver = 0; pool.assert_ver = 0;
} }
rd->watch(*cookie, ver, 1); rd->watch(*cookie, ver, 1);
rd->set_linger(true); uint64_t linger_id;
objecter->read(oid, oloc, *rd, pool.snap_seq, &outbl, 0, onack, &objver); objecter->linger(oid, oloc, *rd, pool.snap_seq, NULL, 0, onack, NULL, &objver, &linger_id);
lock.Unlock(); lock.Unlock();
mylock.Lock(); mylock.Lock();

View File

@ -59,6 +59,55 @@ void Objecter::shutdown()
{ {
} }
tid_t Objecter::resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver)
{
Op *o = new Op(info.oid, info.oloc, info.ops, info.flags | CEPH_OSD_FLAG_READ, onack, onfinish, objver, true);
o->snapid = info.snap;
return op_submit(o, info.linger_id);
}
tid_t Objecter::resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver)
{
map<uint64_t, LingerOpInfo>::iterator iter = op_linger_info.find(linger_id);
if (iter != op_linger_info.end()) {
return resend_linger(iter->second, onack, onfinish, objver);
} else {
dout(0) << "WARNING: resend_linger(): could not find linger_id" << linger_id << dendl; // should that happen?
}
return -1;
}
uint64_t Objecter::register_linger(LingerOpInfo& info, uint64_t linger_id)
{
if (!linger_id)
linger_id = ++max_linger_id;
info.linger_id = linger_id;
op_linger_info[linger_id] = info;
return linger_id;
}
tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist *pbl, int flags,
Context *onack, Context *onfinish,
eversion_t *objver,
uint64_t *linger_id)
{
uint64_t lid;
LingerOpInfo info;
info.oid = oid;
info.oloc = oloc;
info.snap = snap;
info.flags = flags;
info.ops = op.ops;
lid = register_linger(info, 0);
if (linger_id)
*linger_id = lid;
return resend_linger(info, onack, onfinish, objver);
}
void Objecter::dispatch(Message *m) void Objecter::dispatch(Message *m)
{ {
@ -166,15 +215,6 @@ void Objecter::handle_osd_map(MOSDMap *m)
} }
dout(0) << "handle_osd_map" << dendl; dout(0) << "handle_osd_map" << dendl;
dump_active(); dump_active();
#if 0
hash_map<tid_t, Op*> old_map;
old_map.swap(op_osd_linger);
for (hash_map<tid_t,Op*>::iterator p = old_map.begin();
p != op_osd_linger.end(); p++) {
dout(0) << "handle_osd_map: op_submit" << dendl;
op_submit(p->second);
}
#endif
assert(e == osdmap->get_epoch()); assert(e == osdmap->get_epoch());
} }
@ -292,8 +332,6 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs)
if (other == pg.acting) if (other == pg.acting)
continue; // no change. continue; // no change.
pg.epoch = osdmap->get_epoch();
dout(10) << "scan_pgs " << pgid << " " << pg.acting << " -> " << other << dendl; dout(10) << "scan_pgs " << pgid << " " << pg.acting << " -> " << other << dendl;
other.swap(pg.acting); other.swap(pg.acting);
@ -325,14 +363,13 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
// resubmit ops! // resubmit ops!
set<tid_t> tids; set<tid_t> tids;
tids.swap( pg.active_tids ); tids.swap( pg.active_tids );
set<tid_t>::iterator liter; map<tid_t, bool>::iterator liter;
for (liter = pg.linger_tids.begin(); liter != pg.linger_tids.end(); ++liter) { for (liter = pg.linger_ops.begin(); liter != pg.linger_ops.end(); ++liter) {
tids.insert(*liter); resend_linger(liter->first, NULL, NULL, NULL);
dout(0) << "adding lingering tid=" << *liter << " to set" << dendl;
} }
dout(0) << "pg.linger_tids.empty()=" << pg.linger_tids.empty() << " pg=" << &pg << " pg.epoch=" << pg.epoch << dendl; dout(0) << "pg.linger_tids.empty()=" << pg.linger_tids.empty() << " pg=" << &pg << dendl;
if (pg.linger_tids.empty()) if (pg.linger_ops.empty())
close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing
dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl; dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl;
@ -365,14 +402,8 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
op_submit(op, false); op_submit(op, false);
} }
} }
} else { } else
hash_map<tid_t, Op*>::iterator p = op_osd_linger.find(tid);
if (p != op_osd_linger.end()) {
dout(0) << "kick_requests lingering " << tid << dendl;
op_submit(p->second, true);
} else
assert(0); assert(0);
}
} }
} }
dout(0) << "*** kick_requests done" << dendl; dout(0) << "*** kick_requests done" << dendl;
@ -438,13 +469,8 @@ void Objecter::resend_mon_ops()
// read | write --------------------------- // read | write ---------------------------
tid_t Objecter::op_submit(Op *op, bool register_linger) tid_t Objecter::op_submit(Op *op, uint64_t linger_id)
{ {
bool handle_linger = (op->linger && register_linger);
bool new_linger = (op->linger && !op->tid);
dout(0) << "Objecter::op_submit register_linger=" << register_linger << " new_linger=" << new_linger << dendl;
// throttle. before we look at any state, because // throttle. before we look at any state, because
// take_op_budget() may drop our lock while it blocks. // take_op_budget() may drop our lock while it blocks.
take_op_budget(op); take_op_budget(op);
@ -454,15 +480,12 @@ dout(0) << "Objecter::op_submit register_linger=" << register_linger << " new_li
// find // find
PG &pg = get_pg(op->pgid); PG &pg = get_pg(op->pgid);
if (linger_id)
pg.linger_ops[linger_id] = true;
// pick tid // pick tid
if (!op->tid) { op->tid = ++last_tid;
op->tid = ++last_tid;
if (new_linger)
op_osd_linger[op->tid] = op;
} else if (handle_linger) {
op->tid = ++last_tid;
}
assert(client_inc >= 0); assert(client_inc >= 0);
// add to gather set(s) // add to gather set(s)
@ -481,17 +504,9 @@ dout(0) << "Objecter::op_submit register_linger=" << register_linger << " new_li
} }
op_osd[op->tid] = op; op_osd[op->tid] = op;
if (handle_linger) {
dout(0) << "reset lingering request" << dendl;
op->attempts = 0;
}
BackTrace bt(0); BackTrace bt(0);
bt.print(*_dout); bt.print(*_dout);
pg.active_tids.insert(op->tid); pg.active_tids.insert(op->tid);
if (new_linger) {
pg.linger_tids.insert(op->tid);
dout(0) << "inserting tid=" << op->tid << " to linger_tids pg=" << (void *)&pg << dendl;
}
pg.last = g_clock.now(); pg.last = g_clock.now();
// send? // send?
@ -645,7 +660,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
num_unacked--; num_unacked--;
if (op->oncommit) if (op->oncommit)
num_uncommitted--; num_uncommitted--;
op_submit(op, false); op_submit(op);
m->put(); m->put();
return; return;
} }
@ -683,15 +698,13 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
pg.active_tids.erase(tid); pg.active_tids.erase(tid);
dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg() dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
<< " still has " << pg.active_tids << dendl; << " still has " << pg.active_tids << dendl;
if (pg.active_tids.empty() && pg.linger_tids.empty()) if (pg.active_tids.empty() && pg.linger_ops.empty())
close_pg( m->get_pg() ); close_pg( m->get_pg() );
put_op_budget(op); put_op_budget(op);
op_osd.erase( tid ); op_osd.erase( tid );
if (!op->linger) { if (op->con)
if (op->con) op->con->put();
op->con->put(); delete op;
delete op;
}
} }
dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
@ -1251,8 +1264,5 @@ void Objecter::dump_active()
dout(0) << "dump_active" << dendl; dout(0) << "dump_active" << dendl;
for (hash_map<tid_t,Op*>::iterator p = op_osd.begin(); p != op_osd.end(); p++) for (hash_map<tid_t,Op*>::iterator p = op_osd.begin(); p != op_osd.end(); p++)
dout(0) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl; dout(0) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl;
dout(0) << "lingering" << dendl;
for (hash_map<tid_t,Op*>::iterator p = op_osd_linger.begin(); p != op_osd_linger.end(); p++)
dout(0) << " " << p->first << "\t" << p->second->oid << "\t" << p->second->ops << dendl;
} }

View File

@ -48,9 +48,8 @@ struct ObjectOperation {
vector<OSDOp> ops; vector<OSDOp> ops;
int flags; int flags;
int priority; int priority;
bool linger;
ObjectOperation() : flags(0), priority(0), linger(false) {} ObjectOperation() : flags(0), priority(0) {}
void add_op(int op) { void add_op(int op) {
int s = ops.size(); int s = ops.size();
@ -101,9 +100,6 @@ struct ObjectOperation {
ops[s].op.pgls.count = count; ops[s].op.pgls.count = count;
ops[s].op.pgls.cookie = cookie; ops[s].op.pgls.cookie = cookie;
} }
void set_linger(bool l) {
linger = l;
}
// ------ // ------
@ -221,6 +217,7 @@ class Objecter {
private: private:
tid_t last_tid; tid_t last_tid;
int client_inc; int client_inc;
uint64_t max_linger_id;
int num_unacked; int num_unacked;
int num_uncommitted; int num_uncommitted;
bool keep_balanced_budget; bool keep_balanced_budget;
@ -278,7 +275,7 @@ public:
bool linger; bool linger;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op, Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, eversion_t *ov, bool ln) : int f, Context *ac, Context *co, eversion_t *ov, bool ln = false) :
session_item(this), session_item(this),
oid(o), oloc(ol), oid(o), oloc(ol),
con(NULL), con(NULL),
@ -397,8 +394,6 @@ public:
auid(0), crush_rule(0), snapid(0), blp(NULL) {} auid(0), crush_rule(0), snapid(0), blp(NULL) {}
}; };
// -- osd sessions -- // -- osd sessions --
struct Session { struct Session {
xlist<Op*> ops; xlist<Op*> ops;
@ -411,7 +406,6 @@ public:
private: private:
// pending ops // pending ops
hash_map<tid_t,Op*> op_osd; hash_map<tid_t,Op*> op_osd;
hash_map<tid_t,Op*> op_osd_linger;
map<tid_t,PoolStatOp*> op_poolstat; map<tid_t,PoolStatOp*> op_poolstat;
map<tid_t,StatfsOp*> op_statfs; map<tid_t,StatfsOp*> op_statfs;
map<tid_t,PoolOp*> op_pool; map<tid_t,PoolOp*> op_pool;
@ -428,7 +422,7 @@ public:
set<tid_t> active_tids; // active ops set<tid_t> active_tids; // active ops
set<tid_t> linger_tids; // active ops set<tid_t> linger_tids; // active ops
utime_t last; utime_t last;
epoch_t epoch; // generation epoch map<uint64_t, bool> linger_ops;
PG() {} PG() {}
@ -445,7 +439,22 @@ public:
}; };
hash_map<pg_t,PG> pg_map; hash_map<pg_t,PG> pg_map;
struct LingerOpInfo {
uint64_t linger_id;
object_t oid;
object_locator_t oloc;
uint64_t off;
uint64_t len;
snapid_t snap;
int flags;
vector<OSDOp> ops;
PG *pg;
LingerOpInfo() : linger_id(0), off(0), len(0), flags(0), pg(NULL) {}
};
map<uint64_t, LingerOpInfo> op_linger_info;
PG &get_pg(pg_t pgid); PG &get_pg(pg_t pgid);
void close_pg(pg_t pgid) { void close_pg(pg_t pgid) {
@ -485,7 +494,7 @@ public:
public: public:
Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l, SafeTimer& t) : Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l, SafeTimer& t) :
messenger(m), monc(mc), osdmap(om), messenger(m), monc(mc), osdmap(om),
last_tid(0), client_inc(-1), last_tid(0), client_inc(-1), max_linger_id(0),
num_unacked(0), num_uncommitted(0), num_unacked(0), num_uncommitted(0),
keep_balanced_budget(false), honor_osdmap_full(true), keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0), last_seen_osdmap_version(0),
@ -520,7 +529,7 @@ public:
private: private:
// low-level // low-level
tid_t op_submit(Op *op, bool register_linger = true); tid_t op_submit(Op *op, uint64_t linger_id = 0);
// public interface // public interface
public: public:
@ -542,7 +551,7 @@ private:
ObjectOperation& op, ObjectOperation& op,
const SnapContext& snapc, utime_t mtime, int flags, const SnapContext& snapc, utime_t mtime, int flags,
Context *onack, Context *oncommit, eversion_t *objver = NULL) { Context *onack, Context *oncommit, eversion_t *objver = NULL) {
Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, op.linger); Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->priority = op.priority; o->priority = op.priority;
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
@ -552,16 +561,26 @@ private:
ObjectOperation& op, ObjectOperation& op,
snapid_t snapid, bufferlist *pbl, int flags, snapid_t snapid, bufferlist *pbl, int flags,
Context *onack, eversion_t *objver = NULL) { Context *onack, eversion_t *objver = NULL) {
Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL, objver, op.linger); Op *o = new Op(oid, oloc, op.ops, flags | CEPH_OSD_FLAG_READ, onack, NULL, objver);
o->priority = op.priority; o->priority = op.priority;
o->snapid = snapid; o->snapid = snapid;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
} }
int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops, bool *plinger) { tid_t resend_linger(LingerOpInfo& info, Context *onack, Context *onfinish, eversion_t *objver);
tid_t resend_linger(uint64_t linger_id, Context *onack, Context *onfinish, eversion_t *objver);
uint64_t register_linger(LingerOpInfo& info, uint64_t linger_id = 0);
tid_t linger(const object_t& oid, const object_locator_t& oloc,
ObjectOperation& op,
snapid_t snap, bufferlist *pbl, int flags,
Context *onack, Context *onfinish,
eversion_t *objver, uint64_t *linger_id);
int init_ops(vector<OSDOp>& ops, int ops_count, ObjectOperation *extra_ops) {
int i; int i;
bool linger = false;
if (extra_ops) if (extra_ops)
ops_count += extra_ops->ops.size(); ops_count += extra_ops->ops.size();
@ -570,12 +589,8 @@ private:
for (i=0; i<ops_count - 1; i++) { for (i=0; i<ops_count - 1; i++) {
ops[i] = extra_ops->ops[i]; ops[i] = extra_ops->ops[i];
linger |= extra_ops->linger;
} }
if (plinger)
*plinger = linger;
return i; return i;
} }
@ -586,11 +601,10 @@ private:
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_STAT; ops[i].op.op = CEPH_OSD_OP_STAT;
C_Stat *fin = new C_Stat(psize, pmtime, onfinish); C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = &fin->bl; o->outbl = &fin->bl;
return op_submit(o); return op_submit(o);
@ -601,32 +615,31 @@ private:
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_READ; ops[i].op.op = CEPH_OSD_OP_READ;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0; ops[i].op.extent.truncate_seq = 0;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
} }
tid_t read_trunc(const object_t& oid, const object_locator_t& oloc, tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags, uint64_t off, uint64_t len, snapid_t snap, bufferlist *pbl, int flags,
uint64_t trunc_size, __u32 trunc_seq, uint64_t trunc_size, __u32 trunc_seq,
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_READ; ops[i].op.op = CEPH_OSD_OP_READ;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq; ops[i].op.extent.truncate_seq = trunc_seq;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
@ -636,14 +649,13 @@ private:
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_MAPEXT; ops[i].op.op = CEPH_OSD_OP_MAPEXT;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0; ops[i].op.extent.truncate_seq = 0;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
@ -653,14 +665,13 @@ private:
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_SPARSE_READ; ops[i].op.op = CEPH_OSD_OP_SPARSE_READ;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0; ops[i].op.extent.truncate_seq = 0;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
@ -671,14 +682,13 @@ private:
Context *onfinish, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_GETXATTR; ops[i].op.op = CEPH_OSD_OP_GETXATTR;
ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = 0; ops[i].op.xattr.value_len = 0;
if (name) if (name)
ops[i].data.append(name); ops[i].data.append(name);
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, onfinish, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = pbl; o->outbl = pbl;
return op_submit(o); return op_submit(o);
@ -689,11 +699,10 @@ private:
int flags, Context *onfinish, int flags, Context *onfinish,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_GETXATTRS; ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_READ, fin, 0, objver);
o->snapid = snap; o->snapid = snap;
o->outbl = &fin->bl; o->outbl = &fin->bl;
return op_submit(o); return op_submit(o);
@ -723,15 +732,14 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_WRITE; ops[i].op.op = CEPH_OSD_OP_WRITE;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0; ops[i].op.extent.truncate_seq = 0;
ops[i].data = bl; ops[i].data = bl;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -743,15 +751,14 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_WRITE; ops[i].op.op = CEPH_OSD_OP_WRITE;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq; ops[i].op.extent.truncate_seq = trunc_seq;
ops[i].data = bl; ops[i].data = bl;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -761,13 +768,12 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_WRITEFULL; ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
ops[i].op.extent.offset = 0; ops[i].op.extent.offset = 0;
ops[i].op.extent.length = bl.length(); ops[i].op.extent.length = bl.length();
ops[i].data = bl; ops[i].data = bl;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -779,13 +785,12 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_TRUNCATE; ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
ops[i].op.extent.offset = trunc_size; ops[i].op.extent.offset = trunc_size;
ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq; ops[i].op.extent.truncate_seq = trunc_seq;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -795,12 +800,11 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_ZERO; ops[i].op.op = CEPH_OSD_OP_ZERO;
ops[i].op.extent.offset = off; ops[i].op.extent.offset = off;
ops[i].op.extent.length = len; ops[i].op.extent.length = len;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -810,11 +814,10 @@ private:
utime_t mtime, Context *onack, Context *oncommit, utime_t mtime, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_ROLLBACK; ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
ops[i].op.snap.snapid = snapid; ops[i].op.snap.snapid = snapid;
Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -825,11 +828,10 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_CREATE; ops[i].op.op = CEPH_OSD_OP_CREATE;
ops[i].op.flags = create_flags; ops[i].op.flags = create_flags;
Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, global_flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -839,10 +841,9 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_DELETE; ops[i].op.op = CEPH_OSD_OP_DELETE;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -852,10 +853,9 @@ private:
Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { Context *onack, Context *oncommit, eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
SnapContext snapc; // no snapc for lock ops SnapContext snapc; // no snapc for lock ops
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = op; ops[i].op.op = op;
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
} }
@ -865,15 +865,14 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_SETXATTR; ops[i].op.op = CEPH_OSD_OP_SETXATTR;
ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = bl.length(); ops[i].op.xattr.value_len = bl.length();
if (name) if (name)
ops[i].data.append(name); ops[i].data.append(name);
ops[i].data.append(bl); ops[i].data.append(bl);
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);
@ -884,14 +883,13 @@ private:
Context *onack, Context *oncommit, Context *onack, Context *oncommit,
eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) { eversion_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
vector<OSDOp> ops; vector<OSDOp> ops;
bool linger; int i = init_ops(ops, 1, extra_ops);
int i = init_ops(ops, 1, extra_ops, &linger);
ops[i].op.op = CEPH_OSD_OP_RMXATTR; ops[i].op.op = CEPH_OSD_OP_RMXATTR;
ops[i].op.xattr.name_len = (name ? strlen(name) : 0); ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
ops[i].op.xattr.value_len = 0; ops[i].op.xattr.value_len = 0;
if (name) if (name)
ops[i].data.append(name); ops[i].data.append(name);
Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, linger); Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime; o->mtime = mtime;
o->snapc = snapc; o->snapc = snapc;
return op_submit(o); return op_submit(o);