Merge pull request #3738 from athanatos/wip-10830

osd/: include version_t in extra_reqids with promote

Reviewed-by: Sage Weil <sage@redhat.com>
This commit is contained in:
Sage Weil 2015-02-16 09:00:20 -08:00
commit 6a1bd776f0
7 changed files with 103 additions and 55 deletions

View File

@ -780,7 +780,10 @@ protected:
waiting_for_blocked_object;
// Callbacks should assume pg (and nothing else) is locked
map<hobject_t, list<Context*> > callbacks_for_degraded_object;
map<eversion_t,list<OpRequestRef> > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,
list<pair<OpRequestRef, version_t> > > waiting_for_ack, waiting_for_ondisk;
map<eversion_t,OpRequestRef> replay_queue;
void split_ops(PG *child, unsigned split_bits);

View File

@ -126,22 +126,42 @@ struct PGLog {
bool logged_req(const osd_reqid_t &r) const {
return caller_ops.count(r) || extra_caller_ops.count(r);
}
const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
bool get_request(
const osd_reqid_t &r,
eversion_t *replay_version,
version_t *user_version) const {
assert(replay_version);
assert(user_version);
ceph::unordered_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p;
p = caller_ops.find(r);
if (p != caller_ops.end())
return p->second;
if (p != caller_ops.end()) {
*replay_version = p->second->version;
*user_version = p->second->user_version;
return true;
}
// warning: we will return *a* request for this reqid, but not
// necessarily the most recent.
p = extra_caller_ops.find(r);
if (p != extra_caller_ops.end())
return p->second;
return NULL;
if (p != extra_caller_ops.end()) {
for (vector<pair<osd_reqid_t, version_t> >::const_iterator i =
p->second->extra_reqids.begin();
i != p->second->extra_reqids.end();
++i) {
if (i->first == r) {
*replay_version = p->second->version;
*user_version = i->second;
return true;
}
}
assert(0 == "in extra_caller_ops but not extra_reqids");
}
return false;
}
/// get a (bounded) list of recent reqids for the given object
void get_object_reqids(const hobject_t& oid, unsigned max,
vector<osd_reqid_t> *pls) const {
vector<pair<osd_reqid_t, version_t> > *pls) const {
// make sure object is present at least once before we do an
// O(n) search.
if (objects.count(oid) == 0)
@ -151,7 +171,7 @@ struct PGLog {
++i) {
if (i->soid == oid) {
if (i->reqid_is_indexed())
pls->push_back(i->reqid);
pls->push_back(make_pair(i->reqid, i->user_version));
pls->insert(pls->end(), i->extra_reqids.begin(), i->extra_reqids.end());
if (pls->size() >= max) {
if (pls->size() > max) {
@ -175,10 +195,11 @@ struct PGLog {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[i->reqid] = &(*i);
}
for (vector<osd_reqid_t>::const_iterator j = i->extra_reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
i->extra_reqids.begin();
j != i->extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(*j, &(*i)));
extra_caller_ops.insert(make_pair(j->first, &(*i)));
}
}
@ -196,10 +217,11 @@ struct PGLog {
//assert(caller_ops.count(i->reqid) == 0); // divergent merge_log indexes new before unindexing old
caller_ops[e.reqid] = &e;
}
for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(*j, &e));
extra_caller_ops.insert(make_pair(j->first, &e));
}
}
void unindex() {
@ -216,12 +238,13 @@ struct PGLog {
caller_ops[e.reqid] == &e)
caller_ops.erase(e.reqid);
}
for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
for (ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*>::iterator k =
extra_caller_ops.find(*j);
k != extra_caller_ops.end() && k->first == *j;
extra_caller_ops.find(j->first);
k != extra_caller_ops.end() && k->first == j->first;
++k) {
if (k->second == &e) {
extra_caller_ops.erase(k);
@ -255,10 +278,11 @@ struct PGLog {
if (e.reqid_is_indexed()) {
caller_ops[e.reqid] = &(log.back());
}
for (vector<osd_reqid_t>::const_iterator j = e.extra_reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator j =
e.extra_reqids.begin();
j != e.extra_reqids.end();
++j) {
extra_caller_ops.insert(make_pair(*j, &(log.back())));
extra_caller_ops.insert(make_pair(j->first, &(log.back())));
}
}

View File

@ -1489,27 +1489,30 @@ void ReplicatedPG::do_op(OpRequestRef& op)
// promote ops, but we can't possible have both in our log where
// the original request is still not stable on disk, so for our
// purposes here it doesn't matter which one we get.
const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid());
if (entry) {
const eversion_t& oldv = entry->version;
eversion_t replay_version;
version_t user_version;
bool got = pg_log.get_log().get_request(
m->get_reqid(), &replay_version, &user_version);
if (got) {
dout(3) << __func__ << " dup " << m->get_reqid()
<< " was " << oldv << dendl;
if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv, entry->user_version);
<< " was " << replay_version << dendl;
if (already_complete(replay_version)) {
osd->reply_op_error(op, 0, replay_version, user_version);
} else {
if (m->wants_ack()) {
if (already_ack(oldv)) {
if (already_ack(replay_version)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK);
reply->set_reply_versions(oldv, entry->user_version);
reply->set_reply_versions(replay_version, user_version);
osd->send_message_osd_client(reply, m->get_connection());
} else {
dout(10) << " waiting for " << oldv << " to ack" << dendl;
waiting_for_ack[oldv].push_back(op);
dout(10) << " waiting for " << replay_version << " to ack" << dendl;
waiting_for_ack[replay_version].push_back(make_pair(op, user_version));
}
}
dout(10) << " waiting for " << oldv << " to commit" << dendl;
waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
dout(10) << " waiting for " << replay_version << " to commit" << dendl;
// always queue ondisk waiters, so that we can requeue if needed
waiting_for_ondisk[replay_version].push_back(make_pair(op, user_version));
op->mark_delayed("waiting for ondisk");
}
return;
@ -7425,11 +7428,12 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// send dup commits, in order
if (waiting_for_ondisk.count(repop->v)) {
assert(waiting_for_ondisk.begin()->first == repop->v);
for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
for (list<pair<OpRequestRef, version_t> >::iterator i =
waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
osd->reply_op_error(*i, 0, repop->ctx->at_version,
repop->ctx->user_at_version);
osd->reply_op_error(i->first, 0, repop->ctx->at_version,
i->second);
}
waiting_for_ondisk.erase(repop->v);
}
@ -7464,13 +7468,14 @@ void ReplicatedPG::eval_repop(RepGather *repop)
// send dup acks, in order
if (waiting_for_ack.count(repop->v)) {
assert(waiting_for_ack.begin()->first == repop->v);
for (list<OpRequestRef>::iterator i = waiting_for_ack[repop->v].begin();
for (list<pair<OpRequestRef, version_t> >::iterator i =
waiting_for_ack[repop->v].begin();
i != waiting_for_ack[repop->v].end();
++i) {
MOSDOp *m = (MOSDOp*)(*i)->get_req();
MOSDOp *m = (MOSDOp*)i->first->get_req();
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, true);
reply->set_reply_versions(repop->ctx->at_version,
repop->ctx->user_at_version);
i->second);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
@ -10233,10 +10238,16 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
}
// also requeue any dups, interleaved into position
map<eversion_t, list<OpRequestRef> >::iterator p = waiting_for_ondisk.find(repop->v);
map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator p =
waiting_for_ondisk.find(repop->v);
if (p != waiting_for_ondisk.end()) {
dout(10) << " also requeuing ondisk waiters " << p->second << dendl;
rq.splice(rq.end(), p->second);
for (list<pair<OpRequestRef, version_t> >::iterator i =
p->second.begin();
i != p->second.end();
++i) {
rq.push_back(i->first);
}
waiting_for_ondisk.erase(p);
}
}
@ -10247,14 +10258,15 @@ void ReplicatedPG::apply_and_flush_repops(bool requeue)
if (requeue) {
requeue_ops(rq);
if (!waiting_for_ondisk.empty()) {
for (map<eversion_t, list<OpRequestRef> >::iterator i =
for (map<eversion_t, list<pair<OpRequestRef, version_t> > >::iterator i =
waiting_for_ondisk.begin();
i != waiting_for_ondisk.end();
++i) {
for (list<OpRequestRef>::iterator j = i->second.begin();
for (list<pair<OpRequestRef, version_t> >::iterator j =
i->second.begin();
j != i->second.end();
++j) {
derr << __func__ << ": op " << *((*j)->get_req()) << " waiting on "
derr << __func__ << ": op " << *(j->first->get_req()) << " waiting on "
<< i->first << dendl;
}
}

View File

@ -126,7 +126,7 @@ public:
uint32_t flags; // object_copy_data_t::FLAG_*
uint32_t source_data_digest, source_omap_digest;
uint32_t data_digest, omap_digest;
vector<osd_reqid_t> reqids;
vector<pair<osd_reqid_t, version_t> > reqids; // [(reqid, user_version)]
bool is_data_digest() {
return flags & object_copy_data_t::FLAG_DATA_DIGEST;
}
@ -555,7 +555,7 @@ public:
int num_read; ///< count read ops
int num_write; ///< count update ops
vector<osd_reqid_t> extra_reqids;
vector<pair<osd_reqid_t, version_t> > extra_reqids;
CopyFromCallback *copy_cb;

View File

@ -3029,10 +3029,15 @@ void pg_log_entry_t::dump(Formatter *f) const
f->dump_stream("prior_version") << prior_version;
f->dump_stream("reqid") << reqid;
f->open_array_section("extra_reqids");
for (vector<osd_reqid_t>::const_iterator p = extra_reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator p =
extra_reqids.begin();
p != extra_reqids.end();
++p)
f->dump_stream("reqid") << *p;
++p) {
f->open_object_section("extra_reqid");
f->dump_stream("reqid") << p->first;
f->dump_stream("user_version") << p->second;
f->close_section();
}
f->close_section();
f->dump_stream("mtime") << mtime;
if (snaps.length() > 0) {
@ -3651,7 +3656,7 @@ void object_copy_data_t::generate_test_instances(list<object_copy_data_t*>& o)
o.back()->data.push_back(databp);
o.back()->omap_header.append("this is an omap header");
o.back()->snaps.push_back(123);
o.back()->reqids.push_back(osd_reqid_t());
o.back()->reqids.push_back(make_pair(osd_reqid_t(), version_t()));
}
void object_copy_data_t::dump(Formatter *f) const
@ -3676,10 +3681,14 @@ void object_copy_data_t::dump(Formatter *f) const
f->dump_unsigned("snap", *p);
f->close_section();
f->open_array_section("reqids");
for (vector<osd_reqid_t>::const_iterator p = reqids.begin();
for (vector<pair<osd_reqid_t, version_t> >::const_iterator p = reqids.begin();
p != reqids.end();
++p)
f->dump_stream("reqid") << *p;
++p) {
f->open_object_section("extra_reqid");
f->dump_stream("reqid") << p->first;
f->dump_stream("user_version") << p->second;
f->close_section();
}
f->close_section();
}

View File

@ -2155,7 +2155,7 @@ struct pg_log_entry_t {
/// describes state for a locally-rollbackable entry
ObjectModDesc mod_desc;
vector<osd_reqid_t> extra_reqids;
vector<pair<osd_reqid_t, version_t> > extra_reqids;
pg_log_entry_t()
: op(0), user_version(0),
@ -2576,7 +2576,7 @@ struct object_copy_data_t {
snapid_t snap_seq;
///< recent reqids on this object
vector<osd_reqid_t> reqids;
vector<pair<osd_reqid_t, version_t> > reqids;
public:
object_copy_data_t() : size((uint64_t)-1), data_digest(-1),

View File

@ -625,7 +625,7 @@ struct ObjectOperation {
uint32_t *out_flags;
uint32_t *out_data_digest;
uint32_t *out_omap_digest;
vector<osd_reqid_t> *out_reqids;
vector<pair<osd_reqid_t, version_t> > *out_reqids;
int *prval;
C_ObjectOperation_copyget(object_copy_cursor_t *c,
uint64_t *s,
@ -638,7 +638,7 @@ struct ObjectOperation {
uint32_t *flags,
uint32_t *dd,
uint32_t *od,
vector<osd_reqid_t> *oreqids,
vector<pair<osd_reqid_t, version_t> > *oreqids,
int *r)
: cursor(c),
out_size(s), out_mtime(m),
@ -698,7 +698,7 @@ struct ObjectOperation {
uint32_t *out_flags,
uint32_t *out_data_digest,
uint32_t *out_omap_digest,
vector<osd_reqid_t> *out_reqids,
vector<pair<osd_reqid_t, version_t> > *out_reqids,
int *prval) {
OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
osd_op.op.copy_get.max = max;