ReplicatedPG: add OpContext::user_at_version

Set this up with the existing at_version member, but only increase
it for user_modify ops. Use this when logging the PG's user_version. In
order to maintain compatibility with old clients on classic pools, we
force user_version to follow at_version whenever it's updated.

Now that we have and are maintaining this PG user version, use it
for the user version on ops that get ENOENT back, when short-circuiting
replies as part of reply_op_error()[1], or when replying to repops
in eval_repop; further use it for the cls_current_version() function. This
is a small semantic change for that function, as previously it would
generally return the same value as the user would get sent back via
MOSDOpReply -- but I don't think it was something you could count on.
We now define it as being the user version of the PG at the start of the
op, and as a bonus it is defined even for read ops (the at_version is
only filled in on write operations).

[1]: We tweak PGLog to make it easier to retrieve both user and PG versions.

Signed-off-by: Greg Farnum <greg@inktank.com>
This commit is contained in:
Greg Farnum 2013-08-27 17:24:24 -07:00
parent 7db71fc270
commit c119afa075
6 changed files with 39 additions and 19 deletions

View File

@ -582,7 +582,7 @@ uint64_t cls_current_version(cls_method_context_t hctx)
{
ReplicatedPG::OpContext *ctx = *(ReplicatedPG::OpContext **)hctx;
return ctx->at_version.version;
return ctx->user_at_version;
}

View File

@ -6798,10 +6798,11 @@ void OSD::finish_recovery_op(PG *pg, const hobject_t& soid, bool dequeue)
void OSDService::reply_op_error(OpRequestRef op, int err)
{
reply_op_error(op, err, eversion_t());
reply_op_error(op, err, eversion_t(), 0);
}
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
version_t uv)
{
MOSDOp *m = static_cast<MOSDOp*>(op->request);
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@ -6810,7 +6811,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v)
MOSDOpReply *reply = new MOSDOpReply(m, err, osdmap->get_epoch(), flags);
Messenger *msgr = client_messenger;
reply->set_reply_versions(v, v.version);
reply->set_reply_versions(v, uv);
if (m->get_source().is_osd())
msgr = cluster_messenger;
msgr->send_message(reply, m->get_connection());

View File

@ -414,7 +414,7 @@ public:
void dec_scrubs_active();
void reply_op_error(OpRequestRef op, int err);
void reply_op_error(OpRequestRef op, int err, eversion_t v);
void reply_op_error(OpRequestRef op, int err, eversion_t v, version_t uv);
void handle_misdirected_op(PG *pg, OpRequestRef op);
// -- Watch --

View File

@ -84,11 +84,11 @@ struct PGLog {
bool logged_req(const osd_reqid_t &r) const {
return caller_ops.count(r);
}
eversion_t get_request_version(const osd_reqid_t &r) const {
const pg_log_entry_t *get_request(const osd_reqid_t &r) const {
hash_map<osd_reqid_t,pg_log_entry_t*>::const_iterator p = caller_ops.find(r);
if (p == caller_ops.end())
return eversion_t();
return p->second->version;
return NULL;
return p->second;
}
void index() {

View File

@ -908,14 +908,15 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
eversion_t oldv = pg_log.get_log().get_request_version(ctx->reqid);
if (oldv != eversion_t()) {
const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
if (entry) {
const eversion_t& oldv = entry->version;
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
delete ctx;
put_object_context(obc);
put_object_contexts(src_obc);
if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv);
osd->reply_op_error(op, 0, oldv, entry->user_version);
} else {
if (m->wants_ack()) {
if (already_ack(oldv)) {
@ -957,6 +958,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
<< dendl;
}
ctx->user_at_version = info.last_user_version;
// note my stats
utime_t now = ceph_clock_now(g_ceph_context);
@ -1028,9 +1031,9 @@ void ReplicatedPG::do_op(OpRequestRef op)
ctx->reply->set_result(result);
if (result >= 0) {
ctx->reply->set_reply_versions(ctx->at_version, ctx->at_version.version);
ctx->reply->set_reply_versions(ctx->at_version, ctx->user_at_version);
} else if (result == -ENOENT) {
ctx->reply->set_enoent_reply_versions(info.last_update, info.last_update.version);
ctx->reply->set_enoent_reply_versions(info.last_update, ctx->user_at_version);
}
// read or error?
@ -3854,7 +3857,14 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
// finish and log the op.
if (ctx->user_modify) {
/* update the user_version for any modify ops, except for the watch op */
ctx->new_obs.oi.user_version = ctx->at_version.version;
++ctx->user_at_version;
assert(ctx->user_at_version > ctx->new_obs.oi.user_version);
/* In order for new clients and old clients to interoperate properly
* when exchanging versions, we need to lower bound the user_version
* (which our new clients pay proper attention to)
* by the at_version (which is all the old clients can ever see). */
ctx->user_at_version = MAX(ctx->at_version.version, ctx->user_at_version);
ctx->new_obs.oi.user_version = ctx->user_at_version;
}
ctx->bytes_written = ctx->op_t.get_encoded_bytes();
@ -3884,7 +3894,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
if (!ctx->new_obs.exists)
logopcode = pg_log_entry_t::DELETE;
ctx->log.push_back(pg_log_entry_t(logopcode, soid, ctx->at_version, old_version,
ctx->at_version.version, ctx->reqid, ctx->mtime));
ctx->user_at_version, ctx->reqid, ctx->mtime));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
@ -4099,7 +4109,7 @@ void ReplicatedPG::eval_repop(RepGather *repop)
for (list<OpRequestRef>::iterator i = waiting_for_ondisk[repop->v].begin();
i != waiting_for_ondisk[repop->v].end();
++i) {
osd->reply_op_error(*i, 0, repop->v);
osd->reply_op_error(*i, 0, repop->v, 0);
}
waiting_for_ondisk.erase(repop->v);
}
@ -4115,8 +4125,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
else
else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->set_reply_versions(repop->ctx->at_version,
repop->ctx->user_at_version);
}
reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
dout(10) << " sending commit on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);
@ -4137,6 +4150,8 @@ void ReplicatedPG::eval_repop(RepGather *repop)
++i) {
MOSDOp *m = (MOSDOp*)(*i)->request;
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->set_reply_versions(repop->ctx->at_version,
repop->ctx->user_at_version);
reply->add_flags(CEPH_OSD_FLAG_ACK);
osd->send_message_osd_client(reply, m->get_connection());
}
@ -4148,8 +4163,11 @@ void ReplicatedPG::eval_repop(RepGather *repop)
MOSDOpReply *reply = repop->ctx->reply;
if (reply)
repop->ctx->reply = NULL;
else
else {
reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0);
reply->set_reply_versions(repop->ctx->at_version,
repop->ctx->user_at_version);
}
reply->add_flags(CEPH_OSD_FLAG_ACK);
dout(10) << " sending ack on " << *repop << " " << reply << dendl;
assert(entity_name_t::TYPE_OSD != m->get_connection()->peer_type);

View File

@ -116,6 +116,7 @@ public:
utime_t mtime;
SnapContext snapc; // writer snap context
eversion_t at_version; // pg's current version pointer
version_t user_at_version; // pg's current user version pointer
int current_osd_subop_num;
@ -147,7 +148,7 @@ public:
op(_op), reqid(_reqid), ops(_ops), obs(_obs), snapset(0),
new_obs(_obs->oi, _obs->exists),
modify(false), user_modify(false),
bytes_written(0), bytes_read(0),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
obc(0), clone_obc(0), snapset_obc(0), data_off(0), reply(NULL), pg(_pg),
num_read(0),