Merge pull request #1667 from ceph/wip-8089

osd: fix dup request ahndling for ENOENT and cache ops

Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2014-04-14 16:11:47 -07:00
commit 895b6d4d58

View File

@ -1224,6 +1224,35 @@ void ReplicatedPG::do_op(OpRequestRef op)
return;
}
// dup/replay?
if (op->may_write() || op->may_cache()) {
const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid());
if (entry) {
const eversion_t& oldv = entry->version;
dout(3) << __func__ << " dup " << m->get_reqid()
<< " was " << oldv << dendl;
if (already_complete(oldv)) {
osd->reply_op_error(op, 0, oldv, entry->user_version);
} else {
if (m->wants_ack()) {
if (already_ack(oldv)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK);
reply->set_reply_versions(oldv, entry->user_version);
osd->send_message_osd_client(reply, m->get_connection());
} else {
dout(10) << " waiting for " << oldv << " to ack" << dendl;
waiting_for_ack[oldv].push_back(op);
}
}
dout(10) << " waiting for " << oldv << " to commit" << dendl;
waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
op->mark_delayed("waiting for ondisk");
}
return;
}
}
ObjectContextRef obc;
bool can_create = op->may_write() || op->may_cache();
hobject_t missing_oid;
@ -1622,34 +1651,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx)
ctx->op_t = pgbackend->get_transaction();
if (op->may_write() || op->may_cache()) {
// dup/replay?
const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid);
if (entry) {
const eversion_t& oldv = entry->version;
dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl;
if (already_complete(oldv)) {
reply_ctx(ctx, 0, oldv, entry->user_version);
} else {
close_op_ctx(ctx, -EBUSY);
if (m->wants_ack()) {
if (already_ack(oldv)) {
MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
reply->add_flags(CEPH_OSD_FLAG_ACK);
reply->set_reply_versions(oldv, entry->user_version);
osd->send_message_osd_client(reply, m->get_connection());
} else {
dout(10) << " waiting for " << oldv << " to ack" << dendl;
waiting_for_ack[oldv].push_back(op);
}
}
dout(10) << " waiting for " << oldv << " to commit" << dendl;
waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed
op->mark_delayed("waiting for ondisk");
}
return;
}
op->mark_started();
// snap