osd/: move client op handling into ReplicatedBackend

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2013-11-20 16:17:36 -08:00
parent c58c8990d9
commit 53c26bc335
5 changed files with 683 additions and 551 deletions

View File

@ -114,13 +114,29 @@
const hobject_t &hoid,
map<string, bufferptr> &attrs) = 0;
virtual void op_applied_replica(
virtual void op_applied(
const eversion_t &applied_version) = 0;
virtual bool should_send_op(
int peer,
const hobject_t &hoid) = 0;
virtual void log_operation(
vector<pg_log_entry_t> &logv,
const eversion_t &trim_to,
bool update_snaps,
ObjectStore::Transaction *t) = 0;
virtual void update_peer_last_complete_ondisk(
int fromosd,
eversion_t lcod) = 0;
virtual void update_last_complete_ondisk(
eversion_t lcod) = 0;
virtual void update_stats(
const pg_stat_t &stat) = 0;
virtual ~Listener() {}
};
Listener *parent;

View File

@ -12,6 +12,7 @@
*
*/
#include "ReplicatedBackend.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
#include "messages/MOSDPGPush.h"
@ -148,6 +149,8 @@ bool ReplicatedBackend::handle_message(
default:
break;
}
} else {
sub_op_modify(op);
}
break;
}
@ -162,6 +165,8 @@ bool ReplicatedBackend::handle_message(
sub_op_push_reply(op);
return true;
}
} else {
sub_op_modify_reply(op);
}
break;
}
@ -192,6 +197,14 @@ void ReplicatedBackend::on_change(ObjectStore::Transaction *t)
t->remove(get_temp_coll(t), *i);
}
temp_contents.clear();
for (map<tid_t, InProgressOp>::iterator i = in_progress_ops.begin();
i != in_progress_ops.end();
in_progress_ops.erase(i++)) {
if (i->second.on_commit)
delete i->second.on_commit;
if (i->second.on_applied)
delete i->second.on_applied;
}
clear_state();
}
@ -480,6 +493,9 @@ public:
bool empty() const {
return t->empty();
}
uint64_t get_bytes_written() const {
return t->get_encoded_bytes();
}
~RPGTransaction() { delete t; }
};
@ -488,13 +504,202 @@ PGBackend::PGTransaction *ReplicatedBackend::get_transaction()
return new RPGTransaction(coll, get_temp_coll());
}
class C_OSD_OnOpCommit : public Context {
ReplicatedBackend *pg;
ReplicatedBackend::InProgressOp *op;
public:
C_OSD_OnOpCommit(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
: pg(pg), op(op) {}
void finish(int) {
pg->op_commit(op);
}
};
class C_OSD_OnOpApplied : public Context {
ReplicatedBackend *pg;
ReplicatedBackend::InProgressOp *op;
public:
C_OSD_OnOpApplied(ReplicatedBackend *pg, ReplicatedBackend::InProgressOp *op)
: pg(pg), op(op) {}
void finish(int) {
pg->op_applied(op);
}
};
void ReplicatedBackend::submit_transaction(
const hobject_t &soid,
const eversion_t &at_version,
PGTransaction *_t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
Context *on_local_applied_sync,
Context *on_all_acked,
Context *on_all_commit,
tid_t tid)
tid_t tid,
osd_reqid_t reqid,
OpRequestRef orig_op)
{
//RPGTransaction *t = dynamic_cast<RPGTransaction*>(_t);
return;
RPGTransaction *t = dynamic_cast<RPGTransaction*>(_t);
ObjectStore::Transaction *op_t = t->get_transaction();
assert(t->get_temp_added().size() <= 1);
assert(t->get_temp_cleared().size() <= 1);
assert(!in_progress_ops.count(tid));
InProgressOp &op = in_progress_ops.insert(
make_pair(
tid,
InProgressOp(
tid, on_all_commit, on_all_acked,
orig_op, at_version)
)
).first->second;
issue_op(
soid,
at_version,
tid,
reqid,
trim_to,
t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(),
t->get_temp_cleared().size() ?
*(t->get_temp_cleared().begin()) :hobject_t(),
log_entries,
&op,
op_t);
// add myself to gather set
op.waiting_for_applied.insert(osd->whoami);
op.waiting_for_commit.insert(osd->whoami);
ObjectStore::Transaction local_t;
if (t->get_temp_added().size()) {
get_temp_coll(&local_t);
temp_contents.insert(t->get_temp_added().begin(), t->get_temp_added().end());
}
for (set<hobject_t>::const_iterator i = t->get_temp_cleared().begin();
i != t->get_temp_cleared().end();
++i) {
temp_contents.erase(*i);
}
parent->log_operation(log_entries, trim_to, true, &local_t);
local_t.append(*op_t);
local_t.swap(*op_t);
op_t->register_on_applied_sync(on_local_applied_sync);
op_t->register_on_applied(
parent->bless_context(
new C_OSD_OnOpApplied(this, &op)));
op_t->register_on_applied(
new ObjectStore::C_DeleteTransaction(op_t));
op_t->register_on_commit(
parent->bless_context(
new C_OSD_OnOpCommit(this, &op)));
parent->queue_transaction(op_t, op.op);
delete t;
}
void ReplicatedBackend::op_applied(
InProgressOp *op)
{
dout(10) << __func__ << ": " << op->tid << dendl;
if (op->op)
op->op->mark_event("op_applied");
op->waiting_for_applied.erase(osd->whoami);
parent->op_applied(op->v);
if (op->waiting_for_applied.empty()) {
op->on_applied->complete(0);
op->on_applied = 0;
}
if (op->done()) {
assert(!op->on_commit && !op->on_applied);
in_progress_ops.erase(op->tid);
}
}
void ReplicatedBackend::op_commit(
InProgressOp *op)
{
dout(10) << __func__ << ": " << op->tid << dendl;
if (op->op)
op->op->mark_event("op_commit");
op->waiting_for_commit.erase(osd->whoami);
if (op->waiting_for_commit.empty()) {
op->on_commit->complete(0);
op->on_commit = 0;
}
if (op->done()) {
assert(!op->on_commit && !op->on_applied);
in_progress_ops.erase(op->tid);
}
}
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
assert(r->get_header().type == MSG_OSD_SUBOPREPLY);
op->mark_started();
// must be replication.
tid_t rep_tid = r->get_tid();
int fromosd = r->get_source().num();
if (in_progress_ops.count(rep_tid)) {
map<tid_t, InProgressOp>::iterator iter =
in_progress_ops.find(rep_tid);
InProgressOp &ip_op = iter->second;
MOSDOp *m;
if (ip_op.op)
m = static_cast<MOSDOp *>(ip_op.op->get_req());
if (m)
dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
<< " ack_type " << r->ack_type
<< " from osd." << fromosd
<< dendl;
else
dout(7) << __func__ << ": tid " << ip_op.tid << " (no op) "
<< " ack_type " << r->ack_type
<< " from osd." << fromosd
<< dendl;
// oh, good.
if (r->ack_type & CEPH_OSD_FLAG_ONDISK) {
assert(ip_op.waiting_for_commit.count(fromosd));
ip_op.waiting_for_commit.erase(fromosd);
if (ip_op.op)
ip_op.op->mark_event("sub_op_commit_rec");
} else {
assert(ip_op.waiting_for_applied.count(fromosd));
if (ip_op.op)
ip_op.op->mark_event("sub_op_applied_rec");
}
ip_op.waiting_for_applied.erase(fromosd);
parent->update_peer_last_complete_ondisk(
fromosd,
r->get_last_complete_ondisk());
if (ip_op.waiting_for_applied.empty() &&
ip_op.on_applied) {
ip_op.on_applied->complete(0);
ip_op.on_applied = 0;
}
if (ip_op.waiting_for_commit.empty() &&
ip_op.on_commit) {
ip_op.on_commit->complete(0);
ip_op.on_commit= 0;
}
if (ip_op.done()) {
assert(!ip_op.on_commit && !ip_op.on_applied);
in_progress_ops.erase(iter);
}
}
}

View File

@ -172,18 +172,6 @@ public:
const string &attr,
bufferlist *out);
/**
* Client IO
*/
PGTransaction *get_transaction();
void submit_transaction(
PGTransaction *t,
vector<pg_log_entry_t> &log_entries,
Context *on_all_acked,
Context *on_all_commit,
tid_t tid
);
private:
// push
struct PushInfo {
@ -340,6 +328,99 @@ private:
const ObjectRecoveryInfo& recovery_info,
SnapSetContext *ssc
);
/**
* Client IO
*/
struct InProgressOp {
tid_t tid;
set<int> waiting_for_commit;
set<int> waiting_for_applied;
Context *on_commit;
Context *on_applied;
OpRequestRef op;
eversion_t v;
InProgressOp(
tid_t tid, Context *on_commit, Context *on_applied,
OpRequestRef op, eversion_t v)
: tid(tid), on_commit(on_commit), on_applied(on_applied),
op(op), v(v) {}
bool done() const {
return waiting_for_commit.empty() &&
waiting_for_applied.empty();
}
};
map<tid_t, InProgressOp> in_progress_ops;
public:
PGTransaction *get_transaction();
friend class C_OSD_OnOpCommit;
friend class C_OSD_OnOpApplied;
void submit_transaction(
const hobject_t &hoid,
const eversion_t &at_version,
PGTransaction *t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
Context *on_local_applied_sync,
Context *on_all_applied,
Context *on_all_commit,
tid_t tid,
osd_reqid_t reqid,
OpRequestRef op
);
private:
void issue_op(
const hobject_t &soid,
const eversion_t &at_version,
tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
vector<pg_log_entry_t> &log_entries,
InProgressOp *op,
ObjectStore::Transaction *op_t);
void op_applied(InProgressOp *op);
void op_commit(InProgressOp *op);
void sub_op_modify_reply(OpRequestRef op);
void sub_op_modify(OpRequestRef op);
struct RepModify {
OpRequestRef op;
bool applied, committed;
int ackerosd;
eversion_t last_complete;
epoch_t epoch_started;
uint64_t bytes_written;
ObjectStore::Transaction opt, localt;
RepModify() : applied(false), committed(false), ackerosd(-1),
epoch_started(0), bytes_written(0) {}
};
typedef std::tr1::shared_ptr<RepModify> RepModifyRef;
struct C_OSD_RepModifyApply : public Context {
ReplicatedBackend *pg;
RepModifyRef rm;
C_OSD_RepModifyApply(ReplicatedBackend *pg, RepModifyRef r)
: pg(pg), rm(r) {}
void finish(int r) {
pg->sub_op_modify_applied(rm);
}
};
struct C_OSD_RepModifyCommit : public Context {
ReplicatedBackend *pg;
RepModifyRef rm;
C_OSD_RepModifyCommit(ReplicatedBackend *pg, RepModifyRef r)
: pg(pg), rm(r) {}
void finish(int r) {
pg->sub_op_modify_commit(rm);
}
};
void sub_op_modify_applied(RepModifyRef rm);
void sub_op_modify_commit(RepModifyRef rm);
};
#endif

File diff suppressed because it is too large Load Diff

View File

@ -112,7 +112,7 @@ public:
* Final transaction; if non-empty the callback must execute it before any
* other accesses to the object (in order to complete the copy).
*/
ObjectStore::Transaction final_tx;
PGBackend::PGTransaction *final_tx;
string category; ///< The copy source's category
version_t user_version; ///< The copy source's user version
bool should_requeue; ///< op should be requeued on cancel
@ -324,8 +324,15 @@ public:
map<string, bufferptr> &attrs) {
return get_object_context(hoid, true, &attrs);
}
void log_operation(
vector<pg_log_entry_t> &logv,
const eversion_t &trim_to,
bool update_snaps,
ObjectStore::Transaction *t) {
append_log(logv, trim_to, *t, update_snaps);
}
void op_applied_replica(
void op_applied(
const eversion_t &applied_version);
bool should_send_op(
@ -338,6 +345,22 @@ public:
assert(is_backfill_targets(peer));
return should_send;
}
void update_peer_last_complete_ondisk(
int fromosd,
eversion_t lcod) {
peer_last_complete_ondisk[fromosd] = lcod;
}
void update_last_complete_ondisk(
eversion_t lcod) {
last_complete_ondisk = lcod;
}
void update_stats(
const pg_stat_t &stat) {
info.stats = stat;
}
/*
* Capture all object state associated with an in-progress read or write.
@ -381,7 +404,7 @@ public:
int current_osd_subop_num;
ObjectStore::Transaction op_t, local_t;
PGBackend::PGTransaction *op_t;
vector<pg_log_entry_t> log;
interval_set<uint64_t> modified_ranges;
@ -417,6 +440,7 @@ public:
modify(false), user_modify(false), undirty(false),
bytes_written(0), bytes_read(0), user_at_version(0),
current_osd_subop_num(0),
op_t(NULL),
data_off(0), reply(NULL), pg(_pg),
num_read(0),
num_write(0),
@ -435,6 +459,7 @@ public:
}
}
~OpContext() {
assert(!op_t);
assert(lock_to_release == NONE);
if (reply)
reply->put();
@ -445,7 +470,6 @@ public:
* State on the PG primary associated with the replicated mutation
*/
class RepGather {
bool is_done;
public:
xlist<RepGather*>::item queue_item;
int nref;
@ -458,11 +482,10 @@ public:
tid_t rep_tid;
bool applying, applied, aborted;
bool rep_aborted, rep_done;
set<int> waitfor_ack;
//set<int> waitfor_nvram;
set<int> waitfor_disk;
bool all_applied;
bool all_committed;
bool sent_ack;
//bool sent_nvram;
bool sent_disk;
@ -471,18 +494,16 @@ public:
eversion_t pg_local_last_complete;
list<ObjectStore::Transaction*> tls;
bool queue_snap_trimmer;
RepGather(OpContext *c, ObjectContextRef pi, tid_t rt,
eversion_t lc) :
is_done(false),
queue_item(this),
nref(1),
ctx(c), obc(pi),
rep_tid(rt),
applying(false), applied(false), aborted(false),
sent_ack(false),
rep_aborted(false), rep_done(false),
all_applied(false), all_committed(false), sent_ack(false),
//sent_nvram(false),
sent_disk(false),
pg_local_last_complete(lc),
@ -500,16 +521,9 @@ public:
//generic_dout(0) << "deleting " << this << dendl;
}
}
void mark_done() {
is_done = true;
}
bool done() {
return is_done;
}
};
protected:
/**
@ -544,6 +558,8 @@ protected:
*/
void close_op_ctx(OpContext *ctx) {
release_op_ctx_locks(ctx);
delete ctx->op_t;
ctx->op_t = NULL;
delete ctx;
}
@ -578,16 +594,14 @@ protected:
xlist<RepGather*> repop_queue;
map<tid_t, RepGather*> repop_map;
void apply_repop(RepGather *repop);
void op_applied(RepGather *repop);
void op_commit(RepGather *repop);
friend class C_OSD_RepopApplied;
friend class C_OSD_RepopCommit;
void repop_all_applied(RepGather *repop);
void repop_all_committed(RepGather *repop);
void eval_repop(RepGather*);
void issue_repop(RepGather *repop, utime_t now);
RepGather *new_repop(OpContext *ctx, ObjectContextRef obc, tid_t rep_tid);
void remove_repop(RepGather *repop);
void repop_ack(RepGather *repop,
int result, int ack_type,
int fromosd, eversion_t pg_complete_thru=eversion_t(0,0));
RepGather *simple_repop_create(ObjectContextRef obc);
void simple_repop_submit(RepGather *repop);
@ -613,7 +627,7 @@ protected:
++i) {
if ((*i)->v > v)
break;
if (!(*i)->waitfor_disk.empty())
if (!(*i)->all_committed)
return false;
}
return true;
@ -625,14 +639,12 @@ protected:
++i) {
if ((*i)->v > v)
break;
if (!(*i)->waitfor_ack.empty())
if (!(*i)->all_applied)
return false;
}
return true;
}
friend class C_OSD_OpCommit;
friend class C_OSD_OpApplied;
friend struct C_OnPushCommit;
// projected object info
@ -792,7 +804,7 @@ protected:
// low level ops
void _make_clone(ObjectStore::Transaction& t,
void _make_clone(PGBackend::PGTransaction* t,
const hobject_t& head, const hobject_t& coid,
object_info_t *poi);
void execute_ctx(OpContext *ctx);
@ -878,38 +890,6 @@ protected:
void send_remove_op(const hobject_t& oid, eversion_t v, int peer);
struct RepModify {
ReplicatedPG *pg;
OpRequestRef op;
OpContext *ctx;
bool applied, committed;
int ackerosd;
eversion_t last_complete;
epoch_t epoch_started;
uint64_t bytes_written;
ObjectStore::Transaction opt, localt;
list<ObjectStore::Transaction*> tls;
RepModify() : pg(NULL), ctx(NULL), applied(false), committed(false), ackerosd(-1),
epoch_started(0), bytes_written(0) {}
};
struct C_OSD_RepModifyApply : public Context {
RepModify *rm;
C_OSD_RepModifyApply(RepModify *r) : rm(r) { }
void finish(int r) {
rm->pg->sub_op_modify_applied(rm);
}
};
struct C_OSD_RepModifyCommit : public Context {
RepModify *rm;
C_OSD_RepModifyCommit(RepModify *r) : rm(r) { }
void finish(int r) {
rm->pg->sub_op_modify_commit(rm);
}
};
struct C_OSD_OndiskWriteUnlock : public Context {
ObjectContextRef obc, obc2, obc3;
C_OSD_OndiskWriteUnlock(
@ -964,11 +944,6 @@ protected:
void sub_op_remove(OpRequestRef op);
void sub_op_modify(OpRequestRef op);
void sub_op_modify_applied(RepModify *rm);
void sub_op_modify_commit(RepModify *rm);
void sub_op_modify_reply(OpRequestRef op);
void _applied_recovered_object(ObjectContextRef obc);
void _applied_recovered_object_replica();
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
@ -993,10 +968,10 @@ protected:
object_locator_t oloc, version_t version, unsigned flags,
bool mirror_snapset);
void process_copy_chunk(hobject_t oid, tid_t tid, int r);
void _write_copy_chunk(CopyOpRef cop, ObjectStore::Transaction *t);
void _write_copy_chunk(CopyOpRef cop, PGBackend::PGTransaction *t);
void _copy_some(ObjectContextRef obc, CopyOpRef cop);
void _build_finish_copy_transaction(CopyOpRef cop,
ObjectStore::Transaction& t);
PGBackend::PGTransaction *t);
void finish_copyfrom(OpContext *ctx);
void finish_promote(int r, OpRequestRef op,
CopyResults *results, ObjectContextRef obc);
@ -1177,13 +1152,10 @@ public:
inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
{
out << "repgather(" << &repop
<< (repop.applying ? " applying" : "")
<< (repop.applied ? " applied" : "")
<< " " << repop.v
<< " rep_tid=" << repop.rep_tid
<< " wfack=" << repop.waitfor_ack
//<< " wfnvram=" << repop.waitfor_nvram
<< " wfdisk=" << repop.waitfor_disk;
<< " committed?=" << repop.all_committed
<< " applied?=" << repop.all_applied;
if (repop.ctx->lock_to_release != ReplicatedPG::OpContext::NONE)
out << " lock=" << (int)repop.ctx->lock_to_release;
if (repop.ctx->op)
@ -1192,4 +1164,8 @@ inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
return out;
}
void intrusive_ptr_add_ref(ReplicatedPG::RepGather *repop);
void intrusive_ptr_release(ReplicatedPG::RepGather *repop);
#endif