Merge pull request #2071 from somnathr/wip-sd-cpu-optimized

Wip sd cpu optimized

Reviewed-by: Samuel Just <sam.just@inktank.com>
This commit is contained in:
Samuel Just 2014-07-11 14:42:48 -07:00
commit e1fc96f713
10 changed files with 68 additions and 69 deletions

View File

@ -58,15 +58,15 @@ public:
friend class MOSDOpReply;
// read
snapid_t get_snapid() { return snapid; }
void set_snapid(snapid_t s) { snapid = s; }
const snapid_t& get_snapid() { return snapid; }
void set_snapid(const snapid_t& s) { snapid = s; }
// writ
snapid_t get_snap_seq() const { return snap_seq; }
const snapid_t& get_snap_seq() const { return snap_seq; }
const vector<snapid_t> &get_snaps() const { return snaps; }
void set_snaps(const vector<snapid_t>& i) {
snaps = i;
}
void set_snap_seq(snapid_t s) { snap_seq = s; }
void set_snap_seq(const snapid_t& s) { snap_seq = s; }
osd_reqid_t get_reqid() const {
return osd_reqid_t(get_orig_source(),
@ -78,22 +78,22 @@ public:
object_t& get_oid() { return oid; }
pg_t get_pg() const { return pgid; }
const pg_t& get_pg() const { return pgid; }
object_locator_t get_object_locator() const {
const object_locator_t& get_object_locator() const {
return oloc;
}
epoch_t get_map_epoch() { return osdmap_epoch; }
eversion_t get_version() { return reassert_version; }
const eversion_t& get_version() { return reassert_version; }
utime_t get_mtime() { return mtime; }
MOSDOp()
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION) { }
MOSDOp(int inc, long tid,
object_t& _oid, object_locator_t& _oloc, pg_t _pgid, epoch_t _osdmap_epoch,
object_t& _oid, object_locator_t& _oloc, pg_t& _pgid, epoch_t _osdmap_epoch,
int _flags)
: Message(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
client_inc(inc),

View File

@ -48,16 +48,16 @@ class MOSDOpReply : public Message {
request_redirect_t redirect;
public:
object_t get_oid() const { return oid; }
pg_t get_pg() const { return pgid; }
const object_t& get_oid() const { return oid; }
const pg_t& get_pg() const { return pgid; }
int get_flags() const { return flags; }
bool is_ondisk() const { return get_flags() & CEPH_OSD_FLAG_ONDISK; }
bool is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; }
int get_result() const { return result; }
eversion_t get_replay_version() const { return replay_version; }
version_t get_user_version() const { return user_version; }
const eversion_t& get_replay_version() const { return replay_version; }
const version_t& get_user_version() const { return user_version; }
void set_result(int r) { result = r; }
@ -84,7 +84,7 @@ public:
}
/* Don't fill in replay_version for non-write ops */
void set_enoent_reply_versions(eversion_t v, version_t uv) {
void set_enoent_reply_versions(const eversion_t& v, const version_t& uv) {
user_version = uv;
bad_replay_version = v;
}
@ -126,14 +126,13 @@ public:
MOSDOpReply()
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION) { }
MOSDOpReply(MOSDOp *req, int r, epoch_t e, int acktype, bool ignore_out_data)
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION) {
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION),
oid(req->oid), pgid(req->pgid), ops(req->ops) {
set_tid(req->get_tid());
ops = req->ops;
result = r;
flags =
(req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype;
oid = req->oid;
pgid = req->pgid;
osdmap_epoch = e;
user_version = 0;
retry_attempt = req->get_retry_attempt();

View File

@ -2053,7 +2053,7 @@ PG *OSD::_create_lock_pg(
return pg;
}
PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
PG *OSD::get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op)
{
{
RWLock::RLocker l(pg_map_lock);
@ -5222,7 +5222,7 @@ void OSD::dispatch_op(OpRequestRef op)
}
}
bool OSD::dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap) {
bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap) {
if (is_stopping()) {
// we're shutting down, so drop the op
return true;
@ -7826,7 +7826,7 @@ struct send_map_on_destruct {
}
};
void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);
@ -7974,7 +7974,7 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
}
template<typename T, int MSGTYPE>
void OSD::handle_replica_op(OpRequestRef op, OSDMapRef osdmap)
void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
{
T *m = static_cast<T *>(op->get_req());
assert(m->get_header().type == MSGTYPE);
@ -8032,7 +8032,7 @@ bool OSD::op_is_discardable(MOSDOp *op)
return false;
}
void OSD::enqueue_op(PG *pg, OpRequestRef op)
void OSD::enqueue_op(PG *pg, OpRequestRef& op)
{
utime_t latency = ceph_clock_now(cct) - op->get_req()->get_recv_stamp();
dout(15) << "enqueue_op " << op << " prio " << op->get_req()->get_priority()
@ -8372,7 +8372,7 @@ void OSD::set_disk_tp_priority()
// --------------------------------
int OSD::init_op_flags(OpRequestRef op)
int OSD::init_op_flags(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
vector<OSDOp>::iterator iter;

View File

@ -956,7 +956,7 @@ protected:
void tick();
void _dispatch(Message *m);
void dispatch_op(OpRequestRef op);
bool dispatch_op_fast(OpRequestRef op, OSDMapRef osdmap);
bool dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap);
void check_osdmap_features(ObjectStore *store);
@ -1458,7 +1458,7 @@ private:
} op_shardedwq;
void enqueue_op(PG *pg, OpRequestRef op);
void enqueue_op(PG *pg, OpRequestRef& op);
void dequeue_op(
PGRef pg, OpRequestRef op,
ThreadPool::TPHandle &handle);
@ -1585,7 +1585,7 @@ protected:
PGPool _get_pool(int id, OSDMapRef createmap);
PG *get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op);
PG *get_pg_or_queue_for_pg(const spg_t& pgid, OpRequestRef& op);
bool _have_pg(spg_t pgid);
PG *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
PG *_lookup_lock_pg(spg_t pgid);
@ -2250,10 +2250,10 @@ public:
void handle_rep_scrub(MOSDRepScrub *m);
void handle_scrub(struct MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
void handle_op(OpRequestRef op, OSDMapRef osdmap);
void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
template <typename T, int MSGTYPE>
void handle_replica_op(OpRequestRef op, OSDMapRef osdmap);
void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap);
/// check if we can throw out op from a disconnected client
static bool op_is_discardable(class MOSDOp *m);
@ -2261,7 +2261,7 @@ public:
public:
void force_remount();
int init_op_flags(OpRequestRef op);
int init_op_flags(OpRequestRef& op);
OSDService service;
friend class OSDService;

View File

@ -1578,7 +1578,7 @@ void OSDMap::pg_to_raw_up(pg_t pg, vector<int> *up, int *primary) const
_apply_primary_affinity(pps, *pool, up, primary);
}
void OSDMap::_pg_to_up_acting_osds(pg_t pg, vector<int> *up, int *up_primary,
void OSDMap::_pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary) const
{
const pg_pool_t *pool = get_pg_pool(pg.pool());

View File

@ -617,7 +617,7 @@ private:
/**
* map to up and acting. Fills in whatever fields are non-NULL.
*/
void _pg_to_up_acting_osds(pg_t pg, vector<int> *up, int *up_primary,
void _pg_to_up_acting_osds(const pg_t& pg, vector<int> *up, int *up_primary,
vector<int> *acting, int *acting_primary) const;
public:
@ -629,7 +629,7 @@ public:
*/
int pg_to_osds(pg_t pg, vector<int> *raw, int *primary) const;
/// map a pg to its acting set. @return acting set size
int pg_to_acting_osds(pg_t pg, vector<int> *acting,
int pg_to_acting_osds(const pg_t& pg, vector<int> *acting,
int *acting_primary) const {
_pg_to_up_acting_osds(pg, NULL, NULL, acting, acting_primary);
return acting->size();
@ -664,7 +664,7 @@ public:
assert(i != pools.end());
return i->second.ec_pool();
}
bool get_primary_shard(pg_t pgid, spg_t *out) const {
bool get_primary_shard(const pg_t& pgid, spg_t *out) const {
map<int64_t, pg_pool_t>::const_iterator i = get_pools().find(pgid.pool());
if (i == get_pools().end()) {
return false;
@ -766,7 +766,7 @@ public:
return calc_pg_rank(osd, group, nrep);
}
/* role is -1 (stray), 0 (primary), 1 (replica) */
int get_pg_acting_role(pg_t pg, int osd) const {
int get_pg_acting_role(const pg_t& pg, int osd) const {
vector<int> group;
int nrep = pg_to_acting_osds(pg, group);
return calc_pg_role(osd, group, nrep);

View File

@ -1651,7 +1651,7 @@ void PG::activate(ObjectStore::Transaction& t,
}
}
bool PG::op_has_sufficient_caps(OpRequestRef op)
bool PG::op_has_sufficient_caps(OpRequestRef& op)
{
// only check MOSDOp
if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
@ -1696,7 +1696,7 @@ void PG::take_op_map_waiters()
for (list<OpRequestRef>::iterator i = waiting_for_map.begin();
i != waiting_for_map.end();
) {
if (op_must_wait_for_map(get_osdmap_with_maplock(), *i)) {
if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), *i)) {
break;
} else {
osd->op_wq.queue(make_pair(PGRef(this), *i));
@ -1705,7 +1705,7 @@ void PG::take_op_map_waiters()
}
}
void PG::queue_op(OpRequestRef op)
void PG::queue_op(OpRequestRef& op)
{
Mutex::Locker l(map_lock);
if (!waiting_for_map.empty()) {
@ -1713,7 +1713,7 @@ void PG::queue_op(OpRequestRef op)
waiting_for_map.push_back(op);
return;
}
if (op_must_wait_for_map(get_osdmap_with_maplock(), op)) {
if (op_must_wait_for_map(get_osdmap_with_maplock()->get_epoch(), op)) {
waiting_for_map.push_back(op);
return;
}
@ -4809,7 +4809,7 @@ ostream& operator<<(ostream& out, const PG& pg)
return out;
}
bool PG::can_discard_op(OpRequestRef op)
bool PG::can_discard_op(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
if (OSD::op_is_discardable(m)) {
@ -4859,7 +4859,7 @@ bool PG::can_discard_op(OpRequestRef op)
}
template<typename T, int MSGTYPE>
bool PG::can_discard_replica_op(OpRequestRef op)
bool PG::can_discard_replica_op(OpRequestRef& op)
{
T *m = static_cast<T *>(op->get_req());
assert(m->get_header().type == MSGTYPE);
@ -4910,7 +4910,7 @@ bool PG::can_discard_backfill(OpRequestRef op)
}
bool PG::can_discard_request(OpRequestRef op)
bool PG::can_discard_request(OpRequestRef& op)
{
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
@ -4943,67 +4943,67 @@ bool PG::can_discard_request(OpRequestRef op)
return true;
}
bool PG::op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op)
bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
{
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDOp*>(op->get_req())->get_map_epoch());
case MSG_OSD_SUBOP:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
case MSG_OSD_SUBOPREPLY:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
case MSG_OSD_PG_SCAN:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDPGScan*>(op->get_req())->map_epoch);
case MSG_OSD_PG_BACKFILL:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDPGBackfill*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDPGPush*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PULL:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDPGPull*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH_REPLY:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDPGPushReply*>(op->get_req())->map_epoch);
case MSG_OSD_EC_WRITE:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDECSubOpWrite*>(op->get_req())->map_epoch);
case MSG_OSD_EC_WRITE_REPLY:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDECSubOpWriteReply*>(op->get_req())->map_epoch);
case MSG_OSD_EC_READ:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDECSubOpRead*>(op->get_req())->map_epoch);
case MSG_OSD_EC_READ_REPLY:
return !have_same_or_newer_map(
curmap,
cur_epoch,
static_cast<MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
}
assert(0);

View File

@ -207,7 +207,7 @@ protected:
OSDMapRef last_persisted_osdmap_ref;
PGPool pool;
void queue_op(OpRequestRef op);
void queue_op(OpRequestRef& op);
void take_op_map_waiters();
void update_osdmap_ref(OSDMapRef newmap) {
@ -2154,28 +2154,28 @@ public:
OSDMapRef osdmap);
// OpRequest queueing
bool can_discard_op(OpRequestRef op);
bool can_discard_op(OpRequestRef& op);
bool can_discard_scan(OpRequestRef op);
bool can_discard_backfill(OpRequestRef op);
bool can_discard_request(OpRequestRef op);
bool can_discard_request(OpRequestRef& op);
template<typename T, int MSGTYPE>
bool can_discard_replica_op(OpRequestRef op);
bool can_discard_replica_op(OpRequestRef& op);
static bool op_must_wait_for_map(OSDMapRef curmap, OpRequestRef op);
static bool op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op);
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
}
static bool have_same_or_newer_map(OSDMapRef osdmap, epoch_t e) {
return e <= osdmap->get_epoch();
static bool have_same_or_newer_map(epoch_t cur_epoch, epoch_t e) {
return e <= cur_epoch;
}
bool have_same_or_newer_map(epoch_t e) {
return e <= get_osdmap()->get_epoch();
}
bool op_has_sufficient_caps(OpRequestRef op);
bool op_has_sufficient_caps(OpRequestRef& op);
// recovery bits
@ -2207,11 +2207,11 @@ public:
// abstract bits
virtual void do_request(
OpRequestRef op,
OpRequestRef& op,
ThreadPool::TPHandle &handle
) = 0;
virtual void do_op(OpRequestRef op) = 0;
virtual void do_op(OpRequestRef& op) = 0;
virtual void do_sub_op(OpRequestRef op) = 0;
virtual void do_sub_op_reply(OpRequestRef op) = 0;
virtual void do_scan(

View File

@ -1074,14 +1074,14 @@ void ReplicatedPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
}
void ReplicatedPG::do_request(
OpRequestRef op,
OpRequestRef& op,
ThreadPool::TPHandle &handle)
{
if (!op_has_sufficient_caps(op)) {
osd->reply_op_error(op, -EPERM);
return;
}
assert(!op_must_wait_for_map(get_osdmap(), op));
assert(!op_must_wait_for_map(get_osdmap()->get_epoch(), op));
if (can_discard_request(op)) {
return;
}
@ -1184,7 +1184,7 @@ bool ReplicatedPG::check_src_targ(const hobject_t& soid, const hobject_t& toid)
* pg lock will be held (if multithreaded)
* osd_lock NOT held.
*/
void ReplicatedPG::do_op(OpRequestRef op)
void ReplicatedPG::do_op(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
assert(m->get_header().type == CEPH_MSG_OSD_OP);

View File

@ -1271,9 +1271,9 @@ public:
bufferlist& odata);
void do_request(
OpRequestRef op,
OpRequestRef& op,
ThreadPool::TPHandle &handle);
void do_op(OpRequestRef op);
void do_op(OpRequestRef& op);
bool pg_op_must_wait(MOSDOp *op);
void do_pg_op(OpRequestRef op);
void do_sub_op(OpRequestRef op);