diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index b3f545f3627..cb3eb5abc3a 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -1224,6 +1224,35 @@ void ReplicatedPG::do_op(OpRequestRef op) return; } + // dup/replay? + if (op->may_write() || op->may_cache()) { + const pg_log_entry_t *entry = pg_log.get_log().get_request(m->get_reqid()); + if (entry) { + const eversion_t& oldv = entry->version; + dout(3) << __func__ << " dup " << m->get_reqid() + << " was " << oldv << dendl; + if (already_complete(oldv)) { + osd->reply_op_error(op, 0, oldv, entry->user_version); + } else { + if (m->wants_ack()) { + if (already_ack(oldv)) { + MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); + reply->add_flags(CEPH_OSD_FLAG_ACK); + reply->set_reply_versions(oldv, entry->user_version); + osd->send_message_osd_client(reply, m->get_connection()); + } else { + dout(10) << " waiting for " << oldv << " to ack" << dendl; + waiting_for_ack[oldv].push_back(op); + } + } + dout(10) << " waiting for " << oldv << " to commit" << dendl; + waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed + op->mark_delayed("waiting for ondisk"); + } + return; + } + } + ObjectContextRef obc; bool can_create = op->may_write() || op->may_cache(); hobject_t missing_oid; @@ -1622,34 +1651,6 @@ void ReplicatedPG::execute_ctx(OpContext *ctx) ctx->op_t = pgbackend->get_transaction(); if (op->may_write() || op->may_cache()) { - // dup/replay? - const pg_log_entry_t *entry = pg_log.get_log().get_request(ctx->reqid); - if (entry) { - const eversion_t& oldv = entry->version; - dout(3) << "do_op dup " << ctx->reqid << " was " << oldv << dendl; - if (already_complete(oldv)) { - reply_ctx(ctx, 0, oldv, entry->user_version); - } else { - close_op_ctx(ctx, -EBUSY); - - if (m->wants_ack()) { - if (already_ack(oldv)) { - MOSDOpReply *reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false); - reply->add_flags(CEPH_OSD_FLAG_ACK); - reply->set_reply_versions(oldv, entry->user_version); - osd->send_message_osd_client(reply, m->get_connection()); - } else { - dout(10) << " waiting for " << oldv << " to ack" << dendl; - waiting_for_ack[oldv].push_back(op); - } - } - dout(10) << " waiting for " << oldv << " to commit" << dendl; - waiting_for_ondisk[oldv].push_back(op); // always queue ondisk waiters, so that we can requeue if needed - op->mark_delayed("waiting for ondisk"); - } - return; - } - op->mark_started(); // snap