ReplicatedBackend: excise OSDService*

This should eventually make it easier to mock out a PGBackend::Listener.

Signed-off-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2014-02-15 17:43:19 -08:00
parent 31b7937bae
commit fa980644bd
5 changed files with 177 additions and 69 deletions

View File

@ -148,6 +148,26 @@
virtual void update_stats(
const pg_stat_t &stat) = 0;
virtual void schedule_work(
GenContext<ThreadPool::TPHandle&> *c) = 0;
virtual int whoami() const = 0;
virtual void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch) = 0;
virtual void send_message_osd_cluster(
Message *m, Connection *con) = 0;
virtual void send_message_osd_cluster(
Message *m, const ConnectionRef& con) = 0;
virtual ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch) = 0;
virtual entity_name_t get_cluster_msgr_name() = 0;
virtual PerfCounters *get_logger() = 0;
virtual tid_t get_tid() = 0;
virtual LogClientTemp clog_error() = 0;
virtual ~Listener() {}
};
Listener *parent;
@ -499,4 +519,28 @@
ostream &errorstream) { assert(0); }
};
struct PG_SendMessageOnConn: public Context {
PGBackend::Listener *pg;
Message *reply;
ConnectionRef conn;
PG_SendMessageOnConn(
PGBackend::Listener *pg,
Message *reply,
ConnectionRef conn) : pg(pg), reply(reply), conn(conn) {}
void finish(int) {
pg->send_message_osd_cluster(reply, conn.get());
}
};
struct PG_QueueAsync : public Context {
PGBackend::Listener *pg;
GenContext<ThreadPool::TPHandle&> *c;
PG_QueueAsync(
PGBackend::Listener *pg,
GenContext<ThreadPool::TPHandle&> *c) : pg(pg), c(c) {}
void finish(int) {
pg->schedule_work(c);
}
};
#endif

View File

@ -29,10 +29,11 @@ static ostream& _prefix(std::ostream *_dout, ReplicatedBackend *pgb) {
}
ReplicatedBackend::ReplicatedBackend(
PGBackend::Listener *pg, coll_t coll, OSDService *osd) :
PGBackend(pg), temp_created(false),
temp_coll(coll_t::make_temp_coll(pg->get_info().pgid)),
coll(coll), osd(osd), cct(osd->cct) {}
PGBackend::Listener *pg, coll_t coll, ObjectStore *store,
CephContext *cct) :
PGBackend(pg, store,
coll, coll_t::make_temp_coll(pg->get_info().pgid)),
cct(cct) {}
void ReplicatedBackend::run_recovery_op(
PGBackend::RecoveryHandle *_h,
@ -204,9 +205,9 @@ void ReplicatedBackend::_on_change(ObjectStore::Transaction *t)
void ReplicatedBackend::on_flushed()
{
if (have_temp_coll() &&
!osd->store->collection_empty(get_temp_coll())) {
!store->collection_empty(get_temp_coll())) {
vector<hobject_t> objects;
osd->store->collection_list(get_temp_coll(), objects);
store->collection_list(get_temp_coll(), objects);
derr << __func__ << ": found objects in the temp collection: "
<< objects << ", crashing now"
<< dendl;
@ -220,7 +221,7 @@ int ReplicatedBackend::objects_read_sync(
uint64_t len,
bufferlist *bl)
{
return osd->store->read(coll, hoid, off, len, *bl);
return store->read(coll, hoid, off, len, *bl);
}
struct AsyncReadCallback : public GenContext<ThreadPool::TPHandle&> {
@ -247,17 +248,17 @@ void ReplicatedBackend::objects_read_async(
to_read.begin();
i != to_read.end() && r >= 0;
++i) {
int _r = osd->store->read(coll, hoid, i->first.first,
i->first.second, *(i->second.first));
int _r = store->read(coll, hoid, i->first.first,
i->first.second, *(i->second.first));
if (i->second.second) {
osd->gen_wq.queue(
get_parent()->schedule_work(
get_parent()->bless_gencontext(
new AsyncReadCallback(_r, i->second.second)));
}
if (_r < 0)
r = _r;
}
osd->gen_wq.queue(
get_parent()->schedule_work(
get_parent()->bless_gencontext(
new AsyncReadCallback(r, on_complete)));
}
@ -552,7 +553,7 @@ void ReplicatedBackend::op_applied(
if (op->op)
op->op->mark_event("op_applied");
op->waiting_for_applied.erase(osd->whoami);
op->waiting_for_applied.erase(get_parent()->whoami());
parent->op_applied(op->v);
if (op->waiting_for_applied.empty()) {
@ -572,7 +573,7 @@ void ReplicatedBackend::op_commit(
if (op->op)
op->op->mark_event("op_commit");
op->waiting_for_commit.erase(osd->whoami);
op->waiting_for_commit.erase(get_parent()->whoami());
if (op->waiting_for_commit.empty()) {
op->on_commit->complete(0);

View File

@ -28,10 +28,11 @@ class ReplicatedBackend : public PGBackend {
};
friend struct C_ReplicatedBackend_OnPullComplete;
public:
OSDService *osd;
CephContext *cct;
ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
ReplicatedBackend(
PGBackend::Listener *pg, coll_t coll, ObjectStore *store,
CephContext *cct);
/// @see PGBackend::open_recovery_op
RPGHandle *_open_recovery_op() {

View File

@ -82,7 +82,7 @@ PGLSFilter::~PGLSFilter()
}
static void log_subop_stats(
OSDService *osd,
PerfCounters *logger,
OpRequestRef op, int tag_inb, int tag_lat)
{
utime_t now = ceph_clock_now(g_ceph_context);
@ -91,14 +91,14 @@ static void log_subop_stats(
uint64_t inb = op->get_req()->get_data().length();
osd->logger->inc(l_osd_sop);
logger->inc(l_osd_sop);
osd->logger->inc(l_osd_sop_inb, inb);
osd->logger->tinc(l_osd_sop_lat, latency);
logger->inc(l_osd_sop_inb, inb);
logger->tinc(l_osd_sop_lat, latency);
if (tag_inb)
osd->logger->inc(tag_inb, inb);
osd->logger->tinc(tag_lat, latency);
logger->inc(tag_inb, inb);
logger->tinc(tag_lat, latency);
}
struct OnReadComplete : public Context {
@ -323,6 +323,41 @@ void ReplicatedPG::begin_peer_recover(
peer_missing[peer].revise_have(soid, eversion_t());
}
void ReplicatedPG::schedule_work(
GenContext<ThreadPool::TPHandle&> *c)
{
osd->gen_wq.queue(c);
}
void ReplicatedPG::send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch)
{
osd->send_message_osd_cluster(peer, m, from_epoch);
}
void ReplicatedPG::send_message_osd_cluster(
Message *m, Connection *con)
{
osd->send_message_osd_cluster(m, con);
}
void ReplicatedPG::send_message_osd_cluster(
Message *m, const ConnectionRef& con)
{
osd->send_message_osd_cluster(m, con);
}
ConnectionRef ReplicatedPG::get_con_osd_cluster(
int peer, epoch_t from_epoch)
{
return osd->get_con_osd_cluster(peer, from_epoch);
}
PerfCounters *ReplicatedPG::get_logger()
{
return osd->logger;
}
// =======================
// pg changes
@ -965,7 +1000,7 @@ ReplicatedPG::ReplicatedPG(OSDService *o, OSDMapRef curmap,
const PGPool &_pool, pg_t p, const hobject_t& oid,
const hobject_t& ioid) :
PG(o, curmap, _pool, p, oid, ioid),
pgbackend(new ReplicatedBackend(this, coll_t(p), o)),
pgbackend(new ReplicatedBackend(this, coll_t(p), o->store, cct)),
snapset_contexts_lock("ReplicatedPG::snapset_contexts"),
temp_seq(0),
snap_trimmer_machine(this)
@ -1953,8 +1988,8 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
reply->compute_cost(cct);
t->register_on_complete(
new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
new PG_SendMessageOnConn(
get_parent(), reply, m->get_connection()));
t->register_on_applied(
new ObjectStore::C_DeleteTransaction(t));
@ -2011,8 +2046,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
m->get_priority());
c->to_continue.swap(to_continue);
t->register_on_complete(
new C_QueueInWQ(
&osd->push_wq,
new PG_QueueAsync(
get_parent(),
get_parent()->bless_gencontext(c)));
}
replies.erase(replies.end() - 1);
@ -2026,8 +2061,8 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
reply->compute_cost(cct);
t->register_on_complete(
new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
new PG_SendMessageOnConn(
get_parent(), reply, m->get_connection()));
}
t->register_on_applied(
@ -6349,8 +6384,8 @@ void ReplicatedBackend::issue_op(
wr->new_temp_oid = new_temp_oid;
wr->discard_temp_oid = discard_temp_oid;
osd->send_message_osd_cluster(peer, wr, get_osdmap()->get_epoch());
get_parent()->send_message_osd_cluster(
peer, wr, get_osdmap()->get_epoch());
}
}
@ -7112,7 +7147,8 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
// send ack to acker only if we haven't sent a commit already
MOSDSubOpReply *ack = new MOSDSubOpReply(m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
ack->set_priority(CEPH_MSG_PRIO_HIGH); // this better match commit priority!
osd->send_message_osd_cluster(rm->ackerosd, ack, get_osdmap()->get_epoch());
get_parent()->send_message_osd_cluster(
rm->ackerosd, ack, get_osdmap()->get_epoch());
}
parent->op_applied(m->version);
@ -7133,9 +7169,11 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
MOSDSubOpReply *commit = new MOSDSubOpReply(static_cast<MOSDSubOp*>(rm->op->get_req()), 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
commit->set_last_complete_ondisk(rm->last_complete);
commit->set_priority(CEPH_MSG_PRIO_HIGH); // this better match ack priority!
osd->send_message_osd_cluster(rm->ackerosd, commit, get_osdmap()->get_epoch());
get_parent()->send_message_osd_cluster(
rm->ackerosd, commit, get_osdmap()->get_epoch());
log_subop_stats(osd, rm->op, l_osd_sop_w_inb, l_osd_sop_w_lat);
log_subop_stats(get_parent()->get_logger(), rm->op,
l_osd_sop_w_inb, l_osd_sop_w_lat);
}
@ -7584,8 +7622,8 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer,
ObjectRecoveryProgress progress)
{
// send op
tid_t tid = osd->get_tid();
osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
tid_t tid = get_parent()->get_tid();
osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
dout(10) << "send_pull_op " << recovery_info.soid << " "
<< recovery_info.version
@ -7605,9 +7643,10 @@ int ReplicatedBackend::send_pull_legacy(int prio, int peer,
subop->recovery_info = recovery_info;
subop->recovery_progress = progress;
osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
get_parent()->send_message_osd_cluster(
peer, subop, get_osdmap()->get_epoch());
osd->logger->inc(l_osd_pull);
get_parent()->get_logger()->inc(l_osd_pull);
return 0;
}
@ -7628,7 +7667,7 @@ void ReplicatedBackend::submit_push_data(
} else {
dout(10) << __func__ << ": Creating oid "
<< recovery_info.soid << " in the temp collection" << dendl;
temp_contents.insert(recovery_info.soid);
add_temp_obj(recovery_info.soid);
target_coll = get_temp_coll(t);
}
@ -7656,10 +7695,9 @@ void ReplicatedBackend::submit_push_data(
if (complete) {
if (!first) {
assert(temp_contents.count(recovery_info.soid));
dout(10) << __func__ << ": Removing oid "
<< recovery_info.soid << " from the temp collection" << dendl;
temp_contents.erase(recovery_info.soid);
clear_temp_obj(recovery_info.soid);
t->collection_move(coll, target_coll, recovery_info.soid);
}
@ -7799,7 +7837,7 @@ struct C_OnPushCommit : public Context {
C_OnPushCommit(ReplicatedPG *pg, OpRequestRef op) : pg(pg), op(op) {}
void finish(int) {
op->mark_event("committed");
log_subop_stats(pg->osd, op, l_osd_push_inb, l_osd_sop_push_lat);
log_subop_stats(pg->osd->logger, op, l_osd_push_inb, l_osd_sop_push_lat);
}
};
@ -7842,7 +7880,7 @@ void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
for (map<int, vector<PushOp> >::iterator i = pushes.begin();
i != pushes.end();
++i) {
ConnectionRef con = osd->get_con_osd_cluster(
ConnectionRef con = get_parent()->get_con_osd_cluster(
i->first,
get_osdmap()->get_epoch());
if (!con)
@ -7876,7 +7914,7 @@ void ReplicatedBackend::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
msg->pushes.push_back(*j);
}
msg->compute_cost(cct);
osd->send_message_osd_cluster(msg, con);
get_parent()->send_message_osd_cluster(msg, con);
}
}
}
@ -7887,7 +7925,7 @@ void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
for (map<int, vector<PullOp> >::iterator i = pulls.begin();
i != pulls.end();
++i) {
ConnectionRef con = osd->get_con_osd_cluster(
ConnectionRef con = get_parent()->get_con_osd_cluster(
i->first,
get_osdmap()->get_epoch());
if (!con)
@ -7913,7 +7951,7 @@ void ReplicatedBackend::send_pulls(int prio, map<int, vector<PullOp> > &pulls)
msg->map_epoch = get_osdmap()->get_epoch();
msg->pulls.swap(i->second);
msg->compute_cost(cct);
osd->send_message_osd_cluster(msg, con);
get_parent()->send_message_osd_cluster(msg, con);
}
}
}
@ -7937,8 +7975,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
<< dendl;
if (progress.first) {
osd->store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
osd->store->getattrs(coll, recovery_info.soid, out_op->attrset);
store->omap_get_header(coll, recovery_info.soid, &out_op->omap_header);
store->getattrs(coll, recovery_info.soid, out_op->attrset);
// Debug
bufferlist bv;
@ -7946,11 +7984,11 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
object_info_t oi(bv);
if (oi.version != recovery_info.version) {
osd->clog.error() << get_info().pgid << " push "
<< recovery_info.soid << " v "
<< recovery_info.version
<< " failed because local copy is "
<< oi.version << "\n";
get_parent()->clog_error() << get_info().pgid << " push "
<< recovery_info.soid << " v "
<< recovery_info.version
<< " failed because local copy is "
<< oi.version << "\n";
return -EINVAL;
}
@ -7960,8 +7998,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
uint64_t available = cct->_conf->osd_recovery_max_chunk;
if (!progress.omap_complete) {
ObjectMap::ObjectMapIterator iter =
osd->store->get_omap_iterator(coll,
recovery_info.soid);
store->get_omap_iterator(coll,
recovery_info.soid);
for (iter->lower_bound(progress.omap_recovered_to);
iter->valid();
iter->next()) {
@ -7993,7 +8031,7 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
p != out_op->data_included.end();
++p) {
bufferlist bit;
osd->store->read(coll, recovery_info.soid,
store->read(coll, recovery_info.soid,
p.get_start(), p.get_len(), bit);
if (p.get_len() != bit.length()) {
dout(10) << " extent " << p.get_start() << "~" << p.get_len()
@ -8019,8 +8057,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
stat->num_bytes_recovered += out_op->data.length();
}
osd->logger->inc(l_osd_push);
osd->logger->inc(l_osd_push_outb, out_op->data.length());
get_parent()->get_logger()->inc(l_osd_push);
get_parent()->get_logger()->inc(l_osd_push_outb, out_op->data.length());
// send
out_op->version = recovery_info.version;
@ -8033,8 +8071,8 @@ int ReplicatedBackend::build_push_op(const ObjectRecoveryInfo &recovery_info,
int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
{
tid_t tid = osd->get_tid();
osd_reqid_t rid(osd->get_cluster_msgr_name(), 0, tid);
tid_t tid = get_parent()->get_tid();
osd_reqid_t rid(get_parent()->get_cluster_msgr_name(), 0, tid);
MOSDSubOp *subop = new MOSDSubOp(rid, get_info().pgid, pop.soid,
false, 0, get_osdmap()->get_epoch(),
tid, pop.recovery_info.version);
@ -8052,7 +8090,7 @@ int ReplicatedBackend::send_push_op_legacy(int prio, int peer, PushOp &pop)
subop->current_progress = pop.before_progress;
subop->recovery_progress = pop.after_progress;
osd->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
get_parent()->send_message_osd_cluster(peer, subop, get_osdmap()->get_epoch());
return 0;
}
@ -8185,18 +8223,18 @@ void ReplicatedBackend::sub_op_pull(OpRequestRef op)
m->get_source().num(),
reply);
log_subop_stats(osd, op, 0, l_osd_sop_pull_lat);
log_subop_stats(get_parent()->get_logger(), op, 0, l_osd_sop_pull_lat);
}
void ReplicatedBackend::handle_pull(int peer, PullOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
struct stat st;
int r = osd->store->stat(coll, soid, &st);
int r = store->stat(coll, soid, &st);
if (r != 0) {
osd->clog.error() << get_info().pgid << " "
<< peer << " tried to pull " << soid
<< " but got " << cpp_strerror(-r) << "\n";
get_parent()->clog_error() << get_info().pgid << " "
<< peer << " tried to pull " << soid
<< " but got " << cpp_strerror(-r) << "\n";
prep_push_op_blank(soid, reply);
} else {
ObjectRecoveryInfo &recovery_info = op.recovery_info;
@ -8389,8 +8427,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
op->get_req()->get_priority());
c->to_continue.swap(to_continue);
t->register_on_complete(
new C_QueueInWQ(
&osd->push_wq,
new PG_QueueAsync(
get_parent(),
get_parent()->bless_gencontext(c)));
}
run_recovery_op(h, op->get_req()->get_priority());
@ -8401,8 +8439,8 @@ void ReplicatedBackend::sub_op_push(OpRequestRef op)
reply->set_priority(m->get_priority());
assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
handle_push(m->get_source().num(), pop, &resp, t);
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
t->register_on_complete(new PG_SendMessageOnConn(
get_parent(), reply, m->get_connection()));
}
t->register_on_applied(
new ObjectStore::C_DeleteTransaction(t));

View File

@ -363,6 +363,30 @@ public:
info.stats = stat;
}
void schedule_work(
GenContext<ThreadPool::TPHandle&> *c);
int whoami() const {
return osd->whoami;
}
void send_message_osd_cluster(
int peer, Message *m, epoch_t from_epoch);
void send_message_osd_cluster(
Message *m, Connection *con);
void send_message_osd_cluster(
Message *m, const ConnectionRef& con);
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
entity_name_t get_cluster_msgr_name() {
return osd->get_cluster_msgr_name();
}
PerfCounters *get_logger();
tid_t get_tid() { return osd->get_tid(); }
LogClientTemp clog_error() { return osd->clog.error(); }
/*
* Capture all object state associated with an in-progress read or write.
*/