Merge pull request #13545 from liewegas/wip-18933

osd: constify OpRequest::get_req(); fix a few cases of operator<< vs mutated message races

Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Sage Weil 2017-02-24 12:01:54 -06:00 committed by GitHub
commit b0a3a2cb69
33 changed files with 315 additions and 271 deletions

View File

@ -14,10 +14,10 @@
#include "common/RefCountedObj.h"
void intrusive_ptr_add_ref(RefCountedObject *p) {
void intrusive_ptr_add_ref(const RefCountedObject *p) {
p->get();
}
void intrusive_ptr_release(RefCountedObject *p) {
void intrusive_ptr_release(const RefCountedObject *p) {
p->put();
}

View File

@ -23,7 +23,7 @@
struct RefCountedObject {
private:
atomic_t nref;
mutable atomic_t nref;
CephContext *cct;
public:
RefCountedObject(CephContext *c = NULL, int n=1) : nref(n), cct(c) {}
@ -31,6 +31,14 @@ public:
assert(nref.read() == 0);
}
const RefCountedObject *get() const {
int v = nref.inc();
if (cct)
lsubdout(cct, refs, 1) << "RefCountedObject::get " << this << " "
<< (v - 1) << " -> " << v
<< dendl;
return this;
}
RefCountedObject *get() {
int v = nref.inc();
if (cct)
@ -39,7 +47,7 @@ public:
<< dendl;
return this;
}
void put() {
void put() const {
CephContext *local_cct = cct;
int v = nref.dec();
if (v == 0) {
@ -151,7 +159,7 @@ struct RefCountedWaitObject {
}
};
void intrusive_ptr_add_ref(RefCountedObject *p);
void intrusive_ptr_release(RefCountedObject *p);
void intrusive_ptr_add_ref(const RefCountedObject *p);
void intrusive_ptr_release(const RefCountedObject *p);
#endif

View File

@ -59,7 +59,7 @@ public:
bool is_immediate() const {
return flags & FLAG_IMMEDIATE;
}
epoch_t get_epoch() { return epoch; }
epoch_t get_epoch() const { return epoch; }
void decode_payload() {
bufferlist::iterator p = payload.begin();

View File

@ -40,8 +40,8 @@ class MOSDMarkMeDown : public PaxosServiceMessage {
~MOSDMarkMeDown() {}
public:
entity_inst_t get_target() { return target_osd; }
epoch_t get_epoch() { return epoch; }
entity_inst_t get_target() const { return target_osd; }
epoch_t get_epoch() const { return epoch; }
void decode_payload() {
bufferlist::iterator p = payload.begin();

View File

@ -130,14 +130,14 @@ public:
else
return object_locator_t(hobj);
}
object_t& get_oid() {
const object_t& get_oid() const {
assert(!final_decode_needed);
return hobj.oid;
}
const hobject_t &get_hobj() const {
return hobj;
}
snapid_t get_snapid() {
snapid_t get_snapid() const {
assert(!final_decode_needed);
return hobj.snap;
}
@ -231,7 +231,7 @@ public:
add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
}
bool has_flag(__u32 flag) { return flags & flag; };
bool has_flag(__u32 flag) const { return flags & flag; };
bool wants_ack() const { return flags & CEPH_OSD_FLAG_ACK; }
bool wants_ondisk() const { return flags & CEPH_OSD_FLAG_ONDISK; }
bool wants_onnvram() const { return flags & CEPH_OSD_FLAG_ONNVRAM; }

View File

@ -128,7 +128,8 @@ public:
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION) {
do_redirect = false;
}
MOSDOpReply(MOSDOp *req, int r, epoch_t e, int acktype, bool ignore_out_data)
MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype,
bool ignore_out_data)
: Message(CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION),
oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops) {

View File

@ -28,7 +28,7 @@ class MOSDPGInfo : public Message {
public:
vector<pair<pg_notify_t,pg_interval_map_t> > pg_list;
epoch_t get_epoch() { return epoch; }
epoch_t get_epoch() const { return epoch; }
MOSDPGInfo()
: Message(MSG_OSD_PG_INFO, HEAD_VERSION, COMPAT_VERSION) {

View File

@ -38,9 +38,9 @@ public:
pg_missing_t missing;
pg_interval_map_t past_intervals;
epoch_t get_epoch() { return epoch; }
spg_t get_pgid() { return spg_t(info.pgid.pgid, to); }
epoch_t get_query_epoch() { return query_epoch; }
epoch_t get_epoch() const { return epoch; }
spg_t get_pgid() const { return spg_t(info.pgid.pgid, to); }
epoch_t get_query_epoch() const { return query_epoch; }
MOSDPGLog() : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) {
set_priority(CEPH_MSG_PRIO_HIGH);
@ -67,6 +67,8 @@ private:
public:
const char *get_type_name() const { return "PGlog"; }
void print(ostream& out) const {
// NOTE: log is not const, but operator<< doesn't touch fields
// swapped out by OSD code.
out << "pg_log(" << info.pgid << " epoch " << epoch
<< " log " << log
<< " query_epoch " << query_epoch << ")";

View File

@ -36,12 +36,14 @@ class MOSDPGNotify : public Message {
vector<pair<pg_notify_t,pg_interval_map_t> > pg_list; // pgid -> version
public:
version_t get_epoch() { return epoch; }
vector<pair<pg_notify_t,pg_interval_map_t> >& get_pg_list() { return pg_list; }
version_t get_epoch() const { return epoch; }
const vector<pair<pg_notify_t,pg_interval_map_t> >& get_pg_list() const {
return pg_list;
}
MOSDPGNotify()
: Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION) {
set_priority(CEPH_MSG_PRIO_HIGH);
set_priority(CEPH_MSG_PRIO_HIGH);
}
MOSDPGNotify(epoch_t e, vector<pair<pg_notify_t,pg_interval_map_t> >& l)
: Message(MSG_OSD_PG_NOTIFY, HEAD_VERSION, COMPAT_VERSION),

View File

@ -21,12 +21,12 @@ class MOSDPGPull : public MOSDFastDispatchOp {
static const int HEAD_VERSION = 2;
static const int COMPAT_VERSION = 1;
vector<PullOp> pulls;
public:
pg_shard_t from;
spg_t pgid;
epoch_t map_epoch;
vector<PullOp> pulls;
uint64_t cost;
epoch_t get_map_epoch() const override {
@ -36,6 +36,13 @@ public:
return pgid;
}
void take_pulls(vector<PullOp> *outpulls) {
outpulls->swap(pulls);
}
void set_pulls(vector<PullOp> *inpulls) {
inpulls->swap(pulls);
}
MOSDPGPull()
: MOSDFastDispatchOp(MSG_OSD_PG_PULL, HEAD_VERSION, COMPAT_VERSION),
cost(0)
@ -82,9 +89,9 @@ public:
void print(ostream& out) const {
out << "MOSDPGPull(" << pgid
<< " " << map_epoch
<< " " << pulls;
out << ")";
<< " e" << map_epoch
<< " cost " << cost
<< ")";
}
};

View File

@ -29,7 +29,7 @@ class MOSDPGQuery : public Message {
version_t epoch;
public:
version_t get_epoch() { return epoch; }
version_t get_epoch() const { return epoch; }
map<spg_t, pg_query_t> pg_list;
MOSDPGQuery() : Message(MSG_OSD_PG_QUERY,

View File

@ -30,7 +30,7 @@ class MOSDPGRemove : public Message {
public:
vector<spg_t> pg_list;
epoch_t get_epoch() { return epoch; }
epoch_t get_epoch() const { return epoch; }
MOSDPGRemove() :
Message(MSG_OSD_PG_REMOVE, HEAD_VERSION, COMPAT_VERSION) {}

View File

@ -27,7 +27,7 @@ public:
spg_t pgid;
eversion_t trim_to;
epoch_t get_epoch() { return epoch; }
epoch_t get_epoch() const { return epoch; }
MOSDPGTrim() : Message(MSG_OSD_PG_TRIM, HEAD_VERSION, COMPAT_VERSION) {}
MOSDPGTrim(version_t mv, spg_t p, eversion_t tt) :

View File

@ -95,11 +95,11 @@ public:
int get_result() { return result; }
void set_last_complete_ondisk(eversion_t v) { last_complete_ondisk = v; }
eversion_t get_last_complete_ondisk() { return last_complete_ondisk; }
eversion_t get_last_complete_ondisk() const { return last_complete_ondisk; }
public:
MOSDRepOpReply(
MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
const MOSDRepOp *req, pg_shard_t from, int result_, epoch_t e, int at) :
MOSDFastDispatchOp(MSG_OSD_REPOPREPLY, HEAD_VERSION, COMPAT_VERSION),
map_epoch(e),
reqid(req->reqid),

View File

@ -116,8 +116,8 @@ public:
epoch_t get_map_epoch() { return map_epoch; }
spg_t get_pg() { return pgid; }
hobject_t get_poid() { return poid; }
spg_t get_pg() const { return pgid; }
const hobject_t& get_poid() const { return poid; }
int get_ack_type() { return ack_type; }
bool is_ondisk() { return ack_type & CEPH_OSD_FLAG_ONDISK; }
@ -136,7 +136,7 @@ public:
public:
MOSDSubOpReply(
MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at)
const MOSDSubOp *req, pg_shard_t from, int result_, epoch_t e, int at)
: MOSDFastDispatchOp(MSG_OSD_SUBOPREPLY, HEAD_VERSION, COMPAT_VERSION),
map_epoch(e),
reqid(req->reqid),

View File

@ -376,6 +376,7 @@ public:
byte_throttler->take(data.length());
}
const bufferlist& get_data() const { return data; }
bufferlist& get_data() { return data; }
void claim_data(bufferlist& bl,
unsigned int flags = buffer::list::CLAIM_DEFAULT) {

View File

@ -1114,7 +1114,7 @@ public:
data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferptr>& attrset) {
void setattrs(const coll_t& cid, const ghobject_t& oid, const map<string,bufferptr>& attrset) {
Op* _op = _get_next_op();
_op->op = OP_SETATTRS;
_op->cid = _get_coll_id(cid);
@ -1123,7 +1123,7 @@ public:
data.ops++;
}
/// Set multiple xattrs of an object
void setattrs(const coll_t& cid, const ghobject_t& oid, map<string,bufferlist>& attrset) {
void setattrs(const coll_t& cid, const ghobject_t& oid, const map<string,bufferlist>& attrset) {
Op* _op = _get_next_op();
_op->op = OP_SETATTRS;
_op->cid = _get_coll_id(cid);

View File

@ -280,7 +280,7 @@ struct RecoveryMessages {
};
void ECBackend::handle_recovery_push(
PushOp &op,
const PushOp &op,
RecoveryMessages *m)
{
@ -362,7 +362,7 @@ void ECBackend::handle_recovery_push(
}
void ECBackend::handle_recovery_push_reply(
PushReplyOp &op,
const PushReplyOp &op,
pg_shard_t from,
RecoveryMessages *m)
{
@ -726,40 +726,44 @@ bool ECBackend::handle_message(
int priority = _op->get_req()->get_priority();
switch (_op->get_req()->get_type()) {
case MSG_OSD_EC_WRITE: {
MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(_op->get_req());
// NOTE: this is non-const because handle_sub_write modifies the embedded
// ObjectStore::Transaction in place (and then std::move's it). It does
// not conflict with ECSubWrite's operator<<.
MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(
_op->get_nonconst_req());
handle_sub_write(op->op.from, _op, op->op);
return true;
}
case MSG_OSD_EC_WRITE_REPLY: {
MOSDECSubOpWriteReply *op = static_cast<MOSDECSubOpWriteReply*>(
const MOSDECSubOpWriteReply *op = static_cast<const MOSDECSubOpWriteReply*>(
_op->get_req());
op->set_priority(priority);
handle_sub_write_reply(op->op.from, op->op);
return true;
}
case MSG_OSD_EC_READ: {
MOSDECSubOpRead *op = static_cast<MOSDECSubOpRead*>(_op->get_req());
const MOSDECSubOpRead *op = static_cast<const MOSDECSubOpRead*>(_op->get_req());
MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
reply->pgid = get_parent()->primary_spg_t();
reply->map_epoch = get_parent()->get_epoch();
handle_sub_read(op->op.from, op->op, &(reply->op));
op->set_priority(priority);
get_parent()->send_message_osd_cluster(
op->op.from.osd, reply, get_parent()->get_epoch());
return true;
}
case MSG_OSD_EC_READ_REPLY: {
// NOTE: this is non-const because handle_sub_read_reply steals resulting
// buffers. It does not conflict with ECSubReadReply operator<<.
MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
_op->get_req());
_op->get_nonconst_req());
RecoveryMessages rm;
handle_sub_read_reply(op->op.from, op->op, &rm);
dispatch_recovery_messages(rm, priority);
return true;
}
case MSG_OSD_PG_PUSH: {
MOSDPGPush *op = static_cast<MOSDPGPush *>(_op->get_req());
const MOSDPGPush *op = static_cast<const MOSDPGPush *>(_op->get_req());
RecoveryMessages rm;
for (vector<PushOp>::iterator i = op->pushes.begin();
for (vector<PushOp>::const_iterator i = op->pushes.begin();
i != op->pushes.end();
++i) {
handle_recovery_push(*i, &rm);
@ -768,9 +772,10 @@ bool ECBackend::handle_message(
return true;
}
case MSG_OSD_PG_PUSH_REPLY: {
MOSDPGPushReply *op = static_cast<MOSDPGPushReply *>(_op->get_req());
const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
_op->get_req());
RecoveryMessages rm;
for (vector<PushReplyOp>::iterator i = op->replies.begin();
for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
i != op->replies.end();
++i) {
handle_recovery_push_reply(*i, op->from, &rm);
@ -936,12 +941,11 @@ void ECBackend::handle_sub_write(
void ECBackend::handle_sub_read(
pg_shard_t from,
ECSubRead &op,
const ECSubRead &op,
ECSubReadReply *reply)
{
shard_id_t shard = get_parent()->whoami_shard().shard;
for(map<hobject_t, list<boost::tuple<uint64_t, uint64_t, uint32_t> >>::iterator i =
op.to_read.begin();
for(auto i = op.to_read.begin();
i != op.to_read.end();
++i) {
int r = 0;
@ -955,8 +959,7 @@ void ECBackend::handle_sub_read(
goto error;
}
}
for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j =
i->second.begin(); j != i->second.end(); ++j) {
for (auto j = i->second.begin(); j != i->second.end(); ++j) {
bufferlist bl;
r = store->read(
ch,
@ -1034,7 +1037,7 @@ error:
void ECBackend::handle_sub_write_reply(
pg_shard_t from,
ECSubWriteReply &op)
const ECSubWriteReply &op)
{
map<ceph_tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
assert(i != tid_to_op_map.end());
@ -1076,8 +1079,7 @@ void ECBackend::handle_sub_read_reply(
return;
}
ReadOp &rop = iter->second;
for (map<hobject_t, list<pair<uint64_t, bufferlist> >>::iterator i =
op.buffers_read.begin();
for (auto i = op.buffers_read.begin();
i != op.buffers_read.end();
++i) {
assert(!op.errors.count(i->first)); // If attribute error we better not have sent a buffer
@ -1104,7 +1106,7 @@ void ECBackend::handle_sub_read_reply(
riter->get<2>()[from].claim(j->second);
}
}
for (map<hobject_t, map<string, bufferlist>>::iterator i = op.attrs_read.begin();
for (auto i = op.attrs_read.begin();
i != op.attrs_read.end();
++i) {
assert(!op.errors.count(i->first)); // if read error better not have sent an attribute
@ -1116,7 +1118,7 @@ void ECBackend::handle_sub_read_reply(
rop.complete[i->first].attrs = map<string, bufferlist>();
(*(rop.complete[i->first].attrs)).swap(i->second);
}
for (map<hobject_t, int>::iterator i = op.errors.begin();
for (auto i = op.errors.begin();
i != op.errors.end();
++i) {
rop.complete[i->first].errors.insert(

View File

@ -69,12 +69,12 @@ public:
);
void handle_sub_read(
pg_shard_t from,
ECSubRead &op,
const ECSubRead &op,
ECSubReadReply *reply
);
void handle_sub_write_reply(
pg_shard_t from,
ECSubWriteReply &op
const ECSubWriteReply &op
);
void handle_sub_read_reply(
pg_shard_t from,
@ -305,10 +305,10 @@ private:
boost::optional<map<string, bufferlist> > attrs,
RecoveryMessages *m);
void handle_recovery_push(
PushOp &op,
const PushOp &op,
RecoveryMessages *m);
void handle_recovery_push_reply(
PushReplyOp &op,
const PushReplyOp &op,
pg_shard_t from,
RecoveryMessages *m);

View File

@ -1381,7 +1381,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err)
void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
version_t uv)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
int flags;
flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
@ -1394,7 +1394,7 @@ void OSDService::reply_op_error(OpRequestRef op, int err, eversion_t v,
void OSDService::handle_misdirected_op(PG *pg, OpRequestRef op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
assert(m->get_map_epoch() >= pg->info.history.same_primary_since);
@ -3281,7 +3281,7 @@ PG *OSD::_create_lock_pg(
vector<int>& up, int up_primary,
vector<int>& acting, int acting_primary,
pg_history_t history,
pg_interval_map_t& pi,
const pg_interval_map_t& pi,
ObjectStore::Transaction& t)
{
assert(osd_lock.is_locked());
@ -3669,7 +3669,7 @@ void OSD::build_past_intervals_parallel()
void OSD::handle_pg_peering_evt(
spg_t pgid,
const pg_history_t& orig_history,
pg_interval_map_t& pi,
const pg_interval_map_t& pi,
epoch_t epoch,
PG::CephPeeringEvtRef evt)
{
@ -5374,6 +5374,7 @@ void OSD::handle_pg_stats_ack(MPGStatsAck *ack)
dout(10) << "handle_pg_stats_ack " << dendl;
if (!require_mon_peer(ack)) {
ack->put();
return;
}
@ -5457,8 +5458,10 @@ void OSD::flush_pg_stats()
void OSD::handle_command(MMonCommand *m)
{
if (!require_mon_peer(m))
if (!require_mon_peer(m)) {
m->put();
return;
}
Command *c = new Command(m->cmd, m->get_tid(), m->get_data(), NULL);
command_wq.queue(c);
@ -6308,7 +6311,7 @@ void OSD::do_waiters()
template<typename T, int MSGTYPE>
epoch_t replica_op_required_epoch(OpRequestRef op)
{
T *m = static_cast<T *>(op->get_req());
const T *m = static_cast<const T *>(op->get_req());
assert(m->get_type() == MSGTYPE);
return m->map_epoch;
}
@ -6317,11 +6320,11 @@ epoch_t op_required_epoch(OpRequestRef op)
{
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP: {
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
return m->get_map_epoch();
}
case CEPH_MSG_OSD_BACKOFF: {
MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
return m->map_epoch;
}
case MSG_OSD_SUBOP:
@ -6576,8 +6579,10 @@ void OSD::handle_pg_scrub(MOSDScrub *m, PG *pg)
void OSD::handle_scrub(MOSDScrub *m)
{
dout(10) << "handle_scrub " << *m << dendl;
if (!require_mon_peer(m))
if (!require_mon_peer(m)) {
m->put();
return;
}
if (m->fsid != monc->get_fsid()) {
dout(0) << "handle_scrub fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
m->put();
@ -7617,19 +7622,18 @@ void OSD::activate_map()
take_waiters(waiting_for_osdmap);
}
bool OSD::require_mon_peer(Message *m)
bool OSD::require_mon_peer(const Message *m)
{
if (!m->get_connection()->peer_is_mon()) {
dout(0) << "require_mon_peer received from non-mon "
<< m->get_connection()->get_peer_addr()
<< " " << *m << dendl;
m->put();
return false;
}
return true;
}
bool OSD::require_osd_peer(Message *m)
bool OSD::require_osd_peer(const Message *m)
{
if (!m->get_connection()->peer_is_osd()) {
dout(0) << "require_osd_peer received from non-osd "
@ -7640,7 +7644,7 @@ bool OSD::require_osd_peer(Message *m)
return true;
}
bool OSD::require_self_aliveness(Message *m, epoch_t epoch)
bool OSD::require_self_aliveness(const Message *m, epoch_t epoch)
{
epoch_t up_epoch = service.get_up_epoch();
if (epoch < up_epoch) {
@ -7656,7 +7660,7 @@ bool OSD::require_self_aliveness(Message *m, epoch_t epoch)
return true;
}
bool OSD::require_same_peer_instance(Message *m, OSDMapRef& map,
bool OSD::require_same_peer_instance(const Message *m, OSDMapRef& map,
bool is_fast_dispatch)
{
int from = m->get_source().num();
@ -7693,7 +7697,7 @@ bool OSD::require_same_peer_instance(Message *m, OSDMapRef& map,
bool OSD::require_same_or_newer_map(OpRequestRef& op, epoch_t epoch,
bool is_fast_dispatch)
{
Message *m = op->get_req();
const Message *m = op->get_req();
dout(15) << "require_same_or_newer_map " << epoch
<< " (i am " << osdmap->get_epoch() << ") " << m << dendl;
@ -7785,29 +7789,22 @@ void OSD::split_pgs(
*/
void OSD::handle_pg_create(OpRequestRef op)
{
MOSDPGCreate *m = (MOSDPGCreate*)op->get_req();
const MOSDPGCreate *m = static_cast<const MOSDPGCreate*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_CREATE);
dout(10) << "handle_pg_create " << *m << dendl;
/* we have to hack around require_mon_peer's interface limits, so
* grab an extra reference before going in. If the peer isn't
* a Monitor, the reference is put for us (and then cleared
* up automatically by our OpTracker infrastructure). Otherwise,
* we put the extra ref ourself.
*/
if (!require_mon_peer(op->get_req()->get())) {
if (!require_mon_peer(op->get_req())) {
return;
}
op->get_req()->put();
if (!require_same_or_newer_map(op, m->epoch, false))
return;
op->mark_started();
map<pg_t,utime_t>::iterator ci = m->ctimes.begin();
for (map<pg_t,pg_create_t>::iterator p = m->mkpg.begin();
map<pg_t,utime_t>::const_iterator ci = m->ctimes.begin();
for (map<pg_t,pg_create_t>::const_iterator p = m->mkpg.begin();
p != m->mkpg.end();
++p, ++ci) {
assert(ci != m->ctimes.end() && ci->first == p->first);
@ -8094,7 +8091,7 @@ void OSD::do_infos(map<int,
*/
void OSD::handle_pg_notify(OpRequestRef op)
{
MOSDPGNotify *m = (MOSDPGNotify*)op->get_req();
const MOSDPGNotify *m = static_cast<const MOSDPGNotify*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_NOTIFY);
dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
@ -8108,10 +8105,9 @@ void OSD::handle_pg_notify(OpRequestRef op)
op->mark_started();
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator it = m->get_pg_list().begin();
for (auto it = m->get_pg_list().begin();
it != m->get_pg_list().end();
++it) {
if (it->first.info.pgid.preferred() >= 0) {
dout(20) << "ignoring localized pg " << it->first.info.pgid << dendl;
continue;
@ -8132,7 +8128,7 @@ void OSD::handle_pg_notify(OpRequestRef op)
void OSD::handle_pg_log(OpRequestRef op)
{
MOSDPGLog *m = (MOSDPGLog*) op->get_req();
MOSDPGLog *m = static_cast<MOSDPGLog*>(op->get_nonconst_req());
assert(m->get_type() == MSG_OSD_PG_LOG);
dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
@ -8161,7 +8157,7 @@ void OSD::handle_pg_log(OpRequestRef op)
void OSD::handle_pg_info(OpRequestRef op)
{
MOSDPGInfo *m = static_cast<MOSDPGInfo *>(op->get_req());
const MOSDPGInfo *m = static_cast<const MOSDPGInfo *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_INFO);
dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
@ -8174,7 +8170,7 @@ void OSD::handle_pg_info(OpRequestRef op)
op->mark_started();
for (vector<pair<pg_notify_t,pg_interval_map_t> >::iterator p = m->pg_list.begin();
for (auto p = m->pg_list.begin();
p != m->pg_list.end();
++p) {
if (p->first.info.pgid.preferred() >= 0) {
@ -8197,7 +8193,7 @@ void OSD::handle_pg_info(OpRequestRef op)
void OSD::handle_pg_trim(OpRequestRef op)
{
MOSDPGTrim *m = (MOSDPGTrim *)op->get_req();
const MOSDPGTrim *m = static_cast<const MOSDPGTrim*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_TRIM);
dout(7) << "handle_pg_trim " << *m << " from " << m->get_source() << dendl;
@ -8251,7 +8247,7 @@ void OSD::handle_pg_trim(OpRequestRef op)
void OSD::handle_pg_backfill_reserve(OpRequestRef op)
{
MBackfillReserve *m = static_cast<MBackfillReserve*>(op->get_req());
const MBackfillReserve *m = static_cast<const MBackfillReserve*>(op->get_req());
assert(m->get_type() == MSG_OSD_BACKFILL_RESERVE);
if (!require_osd_peer(op->get_req()))
@ -8299,7 +8295,7 @@ void OSD::handle_pg_backfill_reserve(OpRequestRef op)
void OSD::handle_pg_recovery_reserve(OpRequestRef op)
{
MRecoveryReserve *m = static_cast<MRecoveryReserve*>(op->get_req());
const MRecoveryReserve *m = static_cast<const MRecoveryReserve*>(op->get_req());
assert(m->get_type() == MSG_OSD_RECOVERY_RESERVE);
if (!require_osd_peer(op->get_req()))
@ -8354,7 +8350,7 @@ void OSD::handle_pg_query(OpRequestRef op)
{
assert(osd_lock.is_locked());
MOSDPGQuery *m = (MOSDPGQuery*)op->get_req();
const MOSDPGQuery *m = static_cast<const MOSDPGQuery*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_QUERY);
if (!require_osd_peer(op->get_req()))
@ -8370,7 +8366,7 @@ void OSD::handle_pg_query(OpRequestRef op)
map< int, vector<pair<pg_notify_t, pg_interval_map_t> > > notify_list;
for (map<spg_t,pg_query_t>::iterator it = m->pg_list.begin();
for (auto it = m->pg_list.begin();
it != m->pg_list.end();
++it) {
spg_t pgid = it->first;
@ -8461,7 +8457,7 @@ void OSD::handle_pg_query(OpRequestRef op)
void OSD::handle_pg_remove(OpRequestRef op)
{
MOSDPGRemove *m = (MOSDPGRemove *)op->get_req();
const MOSDPGRemove *m = static_cast<const MOSDPGRemove *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_REMOVE);
assert(osd_lock.is_locked());
@ -8476,7 +8472,7 @@ void OSD::handle_pg_remove(OpRequestRef op)
op->mark_started();
for (vector<spg_t>::iterator it = m->pg_list.begin();
for (auto it = m->pg_list.begin();
it != m->pg_list.end();
++it) {
spg_t pgid = *it;
@ -8756,7 +8752,7 @@ struct send_map_on_destruct {
OSDMapRef osdmap;
epoch_t map_epoch;
bool should_send;
send_map_on_destruct(OSD *osd, Message *m,
send_map_on_destruct(OSD *osd, const Message *m,
OSDMapRef& osdmap, epoch_t map_epoch)
: osd(osd), name(m->get_source()), con(m->get_connection()),
osdmap(osdmap), map_epoch(map_epoch),
@ -8771,7 +8767,7 @@ struct send_map_on_destruct {
void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
if (op_is_discardable(m)) {
dout(10) << " discardable " << *m << dendl;
@ -8869,7 +8865,7 @@ void OSD::handle_op(OpRequestRef& op, OSDMapRef& osdmap)
void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
{
MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
Session *s = static_cast<Session*>(m->get_connection()->get_priv());
dout(10) << __func__ << " " << *m << " session " << s << dendl;
assert(s);
@ -8890,7 +8886,7 @@ void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
template<typename T, int MSGTYPE>
void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
{
T *m = static_cast<T *>(op->get_req());
const T *m = static_cast<const T *>(op->get_req());
assert(m->get_type() == MSGTYPE);
dout(10) << __func__ << " " << *m << " epoch " << m->map_epoch << dendl;
@ -8936,7 +8932,7 @@ void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
}
}
bool OSD::op_is_discardable(MOSDOp *op)
bool OSD::op_is_discardable(const MOSDOp *op)
{
// drop client request if they are not connected and can't get the
// reply anyway.
@ -9114,7 +9110,7 @@ void OSD::dequeue_op(
// share our map with sender, if they're old
if (op->send_map_update) {
Message *m = op->get_req();
const Message *m = op->get_req();
Session *session = static_cast<Session *>(m->get_connection()->get_priv());
epoch_t last_sent_epoch;
if (session) {
@ -9415,8 +9411,8 @@ void OSD::get_latest_osdmap()
int OSD::init_op_flags(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
vector<OSDOp>::iterator iter;
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
vector<OSDOp>::const_iterator iter;
// client flags have no bearing on whether an op is a read, write, etc.
op->rmw_flags = 0;
@ -9490,7 +9486,7 @@ int OSD::init_op_flags(OpRequestRef& op)
switch (iter->op.op) {
case CEPH_OSD_OP_CALL:
{
bufferlist::iterator bp = iter->indata.begin();
bufferlist::iterator bp = const_cast<bufferlist&>(iter->indata).begin();
int is_write, is_read;
string cname, mname;
bp.copy(iter->op.cls.class_len, cname);

View File

@ -1404,7 +1404,7 @@ public:
unsigned mask = ~((~0)<<bits);
switch (op->get_req()->get_type()) {
case CEPH_MSG_OSD_OP:
return (static_cast<MOSDOp*>(
return (static_cast<const MOSDOp*>(
op->get_req())->get_raw_pg().m_seed & mask) == match;
}
return false;
@ -2063,7 +2063,7 @@ protected:
vector<int>& up, int up_primary,
vector<int>& acting, int acting_primary,
pg_history_t history,
pg_interval_map_t& pi,
const pg_interval_map_t& pi,
ObjectStore::Transaction& t);
PG *_lookup_qlock_pg(spg_t pgid);
@ -2074,7 +2074,7 @@ protected:
void handle_pg_peering_evt(
spg_t pgid,
const pg_history_t& orig_history,
pg_interval_map_t& pi,
const pg_interval_map_t& pi,
epoch_t epoch,
PG::CephPeeringEvtRef evt);
@ -2230,19 +2230,19 @@ protected:
vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map,
OSDMapRef map);
bool require_mon_peer(Message *m);
bool require_osd_peer(Message *m);
bool require_mon_peer(const Message *m);
bool require_osd_peer(const Message *m);
/***
* Verifies that we were alive in the given epoch, and that
* still are.
*/
bool require_self_aliveness(Message *m, epoch_t alive_since);
bool require_self_aliveness(const Message *m, epoch_t alive_since);
/**
* Verifies that the OSD who sent the given op has the same
* address as in the given map.
* @pre op was sent by an OSD using the cluster messenger
*/
bool require_same_peer_instance(Message *m, OSDMapRef& map,
bool require_same_peer_instance(const Message *m, OSDMapRef& map,
bool is_fast_dispatch);
bool require_same_or_newer_map(OpRequestRef& op, epoch_t e,
@ -2493,7 +2493,7 @@ public:
void handle_signal(int signum);
/// check if we can throw out op from a disconnected client
static bool op_is_discardable(MOSDOp *m);
static bool op_is_discardable(const MOSDOp *m);
public:
OSDService service;

View File

@ -110,7 +110,8 @@ public:
bool send_map_update;
epoch_t sent_epoch;
bool hitset_inserted;
Message *get_req() const { return request; }
const Message *get_req() const { return request; }
Message *get_nonconst_req() { return request; }
const char *state_string() const {
switch(latest_flag_point) {

View File

@ -341,7 +341,9 @@ void PG::proc_master_log(
void PG::proc_replica_log(
ObjectStore::Transaction& t,
pg_info_t &oinfo, pg_log_t &olog, pg_missing_t& omissing,
pg_info_t &oinfo,
const pg_log_t &olog,
pg_missing_t& omissing,
pg_shard_t from)
{
dout(10) << "proc_replica_log for osd." << from << ": "
@ -1877,7 +1879,7 @@ bool PG::op_has_sufficient_caps(OpRequestRef& op)
if (op->get_req()->get_type() != CEPH_MSG_OSD_OP)
return true;
MOSDOp *req = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *req = static_cast<const MOSDOp*>(op->get_req());
Session *session = (Session *)req->get_connection()->get_priv();
if (!session) {
@ -2821,7 +2823,7 @@ void PG::init(
const vector<int>& newup, int new_up_primary,
const vector<int>& newacting, int new_acting_primary,
const pg_history_t& history,
pg_interval_map_t& pi,
const pg_interval_map_t& pi,
bool backfill,
ObjectStore::Transaction *t)
{
@ -2840,7 +2842,7 @@ void PG::init(
new_acting_primary);
info.history = history;
past_intervals.swap(pi);
past_intervals = pi;
info.stats.up = up;
info.stats.up_primary = new_up_primary;
@ -3646,7 +3648,7 @@ void PG::unreg_next_scrub()
void PG::sub_op_scrub_map(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp *>(op->get_req());
assert(m->get_type() == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_map" << dendl;
@ -3664,7 +3666,7 @@ void PG::sub_op_scrub_map(OpRequestRef op)
op->mark_started();
dout(10) << " got " << m->from << " scrub map" << dendl;
bufferlist::iterator p = m->get_data().begin();
bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
scrubber.received_maps[m->from].decode(p, info.pgid.pool());
dout(10) << "map version is "
@ -3700,7 +3702,7 @@ void PG::_request_scrub_map(
void PG::sub_op_scrub_reserve(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
assert(m->get_type() == MSG_OSD_SUBOP);
dout(7) << "sub_op_scrub_reserve" << dendl;
@ -3721,7 +3723,7 @@ void PG::sub_op_scrub_reserve(OpRequestRef op)
void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
const MOSDSubOpReply *reply = static_cast<const MOSDSubOpReply*>(op->get_req());
assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
dout(7) << "sub_op_scrub_reserve_reply" << dendl;
@ -3733,7 +3735,7 @@ void PG::sub_op_scrub_reserve_reply(OpRequestRef op)
op->mark_started();
pg_shard_t from = reply->from;
bufferlist::iterator p = reply->get_data().begin();
bufferlist::iterator p = const_cast<bufferlist&>(reply->get_data()).begin();
bool reserved;
::decode(reserved, p);
@ -4045,7 +4047,7 @@ void PG::replica_scrub(
OpRequestRef op,
ThreadPool::TPHandle &handle)
{
MOSDRepScrub *msg = static_cast<MOSDRepScrub *>(op->get_req());
const MOSDRepScrub *msg = static_cast<const MOSDRepScrub *>(op->get_req());
assert(!scrubber.active_rep_scrub);
dout(7) << "replica_scrub" << dendl;
@ -5392,7 +5394,7 @@ ostream& operator<<(ostream& out, const PG& pg)
bool PG::can_discard_op(OpRequestRef& op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
if (cct->_conf->osd_discard_disconnected_ops && OSD::op_is_discardable(m)) {
dout(20) << " discard " << *m << dendl;
return true;
@ -5430,7 +5432,7 @@ bool PG::can_discard_op(OpRequestRef& op)
template<typename T, int MSGTYPE>
bool PG::can_discard_replica_op(OpRequestRef& op)
{
T *m = static_cast<T *>(op->get_req());
const T *m = static_cast<const T *>(op->get_req());
assert(m->get_type() == MSGTYPE);
/* Mostly, this overlaps with the old_peering_msg
@ -5455,7 +5457,7 @@ bool PG::can_discard_replica_op(OpRequestRef& op)
bool PG::can_discard_scan(OpRequestRef op)
{
MOSDPGScan *m = static_cast<MOSDPGScan *>(op->get_req());
const MOSDPGScan *m = static_cast<const MOSDPGScan *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_SCAN);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
@ -5467,7 +5469,7 @@ bool PG::can_discard_scan(OpRequestRef op)
bool PG::can_discard_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = static_cast<MOSDPGBackfill *>(op->get_req());
const MOSDPGBackfill *m = static_cast<const MOSDPGBackfill *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_BACKFILL);
if (old_peering_msg(m->map_epoch, m->query_epoch)) {
@ -5532,7 +5534,7 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
case CEPH_MSG_OSD_OP:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDOp*>(op->get_req())->get_map_epoch());
static_cast<const MOSDOp*>(op->get_req())->get_map_epoch());
case CEPH_MSG_OSD_BACKOFF:
return false; // we don't care about maps
@ -5540,82 +5542,82 @@ bool PG::op_must_wait_for_map(epoch_t cur_epoch, OpRequestRef& op)
case MSG_OSD_SUBOP:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDSubOp*>(op->get_req())->map_epoch);
static_cast<const MOSDSubOp*>(op->get_req())->map_epoch);
case MSG_OSD_REPOP:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDRepOp*>(op->get_req())->map_epoch);
static_cast<const MOSDRepOp*>(op->get_req())->map_epoch);
case MSG_OSD_SUBOPREPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDSubOpReply*>(op->get_req())->map_epoch);
static_cast<const MOSDSubOpReply*>(op->get_req())->map_epoch);
case MSG_OSD_REPOPREPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDRepOpReply*>(op->get_req())->map_epoch);
static_cast<const MOSDRepOpReply*>(op->get_req())->map_epoch);
case MSG_OSD_PG_SCAN:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGScan*>(op->get_req())->map_epoch);
static_cast<const MOSDPGScan*>(op->get_req())->map_epoch);
case MSG_OSD_PG_BACKFILL:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGBackfill*>(op->get_req())->map_epoch);
static_cast<const MOSDPGBackfill*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGPush*>(op->get_req())->map_epoch);
static_cast<const MOSDPGPush*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PULL:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGPull*>(op->get_req())->map_epoch);
static_cast<const MOSDPGPull*>(op->get_req())->map_epoch);
case MSG_OSD_PG_PUSH_REPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGPushReply*>(op->get_req())->map_epoch);
static_cast<const MOSDPGPushReply*>(op->get_req())->map_epoch);
case MSG_OSD_EC_WRITE:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpWrite*>(op->get_req())->map_epoch);
static_cast<const MOSDECSubOpWrite*>(op->get_req())->map_epoch);
case MSG_OSD_EC_WRITE_REPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpWriteReply*>(op->get_req())->map_epoch);
static_cast<const MOSDECSubOpWriteReply*>(op->get_req())->map_epoch);
case MSG_OSD_EC_READ:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpRead*>(op->get_req())->map_epoch);
static_cast<const MOSDECSubOpRead*>(op->get_req())->map_epoch);
case MSG_OSD_EC_READ_REPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
static_cast<const MOSDECSubOpReadReply*>(op->get_req())->map_epoch);
case MSG_OSD_REP_SCRUB:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDRepScrub*>(op->get_req())->map_epoch);
static_cast<const MOSDRepScrub*>(op->get_req())->map_epoch);
case MSG_OSD_PG_UPDATE_LOG_MISSING:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGUpdateLogMissing*>(op->get_req())->map_epoch);
static_cast<const MOSDPGUpdateLogMissing*>(op->get_req())->map_epoch);
case MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY:
return !have_same_or_newer_map(
cur_epoch,
static_cast<MOSDPGUpdateLogMissingReply*>(op->get_req())->map_epoch);
static_cast<const MOSDPGUpdateLogMissingReply*>(op->get_req())->map_epoch);
}
ceph_abort();
return false;
@ -7137,7 +7139,7 @@ boost::statechart::result PG::RecoveryState::ReplicaActive::react(const MLogRec&
PG *pg = context< RecoveryMachine >().pg;
ldout(pg->cct, 10) << "received log from " << logevt.from << dendl;
ObjectStore::Transaction* t = context<RecoveryMachine>().get_cur_transaction();
pg->merge_log(*t,logevt.msg->info, logevt.msg->log, logevt.from);
pg->merge_log(*t, logevt.msg->info, logevt.msg->log, logevt.from);
assert(pg->pg_log.get_head() == pg->info.last_update);
return discard_event();

View File

@ -924,7 +924,8 @@ public:
virtual void calc_trim_to() = 0;
void proc_replica_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
void proc_replica_log(ObjectStore::Transaction& t,
pg_info_t &oinfo, const pg_log_t &olog,
pg_missing_t& omissing, pg_shard_t from);
void proc_master_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
pg_missing_t& omissing, pg_shard_t from);
@ -1356,7 +1357,7 @@ public:
pg_shard_t from;
pg_info_t info;
epoch_t msg_epoch;
MInfoRec(pg_shard_t from, pg_info_t &info, epoch_t msg_epoch) :
MInfoRec(pg_shard_t from, const pg_info_t &info, epoch_t msg_epoch) :
from(from), info(info), msg_epoch(msg_epoch) {}
void print(std::ostream *out) const {
*out << "MInfoRec from " << from << " info: " << info;
@ -1377,7 +1378,7 @@ public:
pg_shard_t from;
pg_notify_t notify;
uint64_t features;
MNotifyRec(pg_shard_t from, pg_notify_t &notify, uint64_t f) :
MNotifyRec(pg_shard_t from, const pg_notify_t &notify, uint64_t f) :
from(from), notify(notify), features(f) {}
void print(std::ostream *out) const {
*out << "MNotifyRec from " << from << " notify: " << notify
@ -2183,7 +2184,7 @@ public:
const vector<int>& acting,
int acting_primary,
const pg_history_t& history,
pg_interval_map_t& pim,
const pg_interval_map_t& pim,
bool backfill,
ObjectStore::Transaction *t);

View File

@ -185,7 +185,7 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
virtual ObjectContextRef get_obc(
const hobject_t &hoid,
map<string, bufferlist> &attrs) = 0;
const map<string, bufferlist> &attrs) = 0;
virtual bool try_lock_for_read(
const hobject_t &hoid,
@ -202,7 +202,7 @@ typedef ceph::shared_ptr<const OSDMap> OSDMapRef;
virtual void log_operation(
const vector<pg_log_entry_t> &logv,
boost::optional<pg_hit_set_history_t> &hset_history,
const boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
bool transaction_applied,

View File

@ -131,7 +131,9 @@ void PGLog::trim(
void PGLog::proc_replica_log(
ObjectStore::Transaction& t,
pg_info_t &oinfo, const pg_log_t &olog, pg_missing_t& omissing,
pg_info_t &oinfo,
const pg_log_t &olog,
pg_missing_t& omissing,
pg_shard_t from) const
{
dout(10) << "proc_replica_log for osd." << from << ": "

View File

@ -703,7 +703,9 @@ public:
log.last_requested = 0;
}
void proc_replica_log(ObjectStore::Transaction& t, pg_info_t &oinfo, const pg_log_t &olog,
void proc_replica_log(ObjectStore::Transaction& t,
pg_info_t &oinfo,
const pg_log_t &olog,
pg_missing_t& omissing, pg_shard_t from) const;
protected:
@ -972,7 +974,9 @@ public:
pg_info_t &info, LogEntryHandler *rollbacker,
bool &dirty_info, bool &dirty_big_info);
void merge_log(ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
void merge_log(ObjectStore::Transaction& t,
pg_info_t &oinfo,
pg_log_t &olog,
pg_shard_t from,
pg_info_t &info, LogEntryHandler *rollbacker,
bool &dirty_info, bool &dirty_big_info);

View File

@ -1022,7 +1022,9 @@ int PrimaryLogPG::do_command(
void PrimaryLogPG::do_pg_op(OpRequestRef op)
{
MOSDOp *m = static_cast<MOSDOp *>(op->get_req());
// NOTE: this is non-const because we modify the OSDOp.outdata in
// place
MOSDOp *m = static_cast<MOSDOp *>(op->get_nonconst_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
dout(10) << "do_pg_op " << *m << dendl;
@ -1528,7 +1530,7 @@ void PrimaryLogPG::get_src_oloc(const object_t& oid, const object_locator_t& olo
void PrimaryLogPG::handle_backoff(OpRequestRef& op)
{
MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
const MOSDBackoff *m = static_cast<const MOSDBackoff*>(op->get_req());
SessionRef session((Session *)m->get_connection()->get_priv());
if (!session)
return; // drop it.
@ -1556,7 +1558,7 @@ void PrimaryLogPG::do_request(
}
// pg-wide backoffs
Message *m = op->get_req();
const Message *m = op->get_req();
if (m->get_connection()->has_feature(CEPH_FEATURE_RADOS_BACKOFF)) {
SessionRef session((Session *)m->get_connection()->get_priv());
if (!session)
@ -1585,7 +1587,7 @@ void PrimaryLogPG::do_request(
}
// pg backoff acks at pg-level
if (op->get_req()->get_type() == CEPH_MSG_OSD_BACKOFF) {
MOSDBackoff *ba = static_cast<MOSDBackoff*>(m);
const MOSDBackoff *ba = static_cast<const MOSDBackoff*>(m);
if (ba->begin != ba->end) {
handle_backoff(op);
return;
@ -1700,13 +1702,14 @@ hobject_t PrimaryLogPG::earliest_backfill() const
void PrimaryLogPG::do_op(OpRequestRef& op)
{
FUNCTRACE();
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
// NOTE: take a non-const pointer here; we must be careful not to
// change anything that will break other reads on m (operator<<).
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
assert(m->get_type() == CEPH_MSG_OSD_OP);
if (m->finish_decode()) {
op->reset_desc(); // for TrackedOp
m->clear_payload();
}
m->clear_payload();
dout(20) << __func__ << ": op " << *m << dendl;
@ -2193,7 +2196,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
{
dout(20) << __func__ << " r=" << r << dendl;
assert(op->may_write());
const osd_reqid_t &reqid = static_cast<MOSDOp*>(op->get_req())->get_reqid();
const osd_reqid_t &reqid = static_cast<const MOSDOp*>(op->get_req())->get_reqid();
ObjectContextRef obc;
mempool::osd::list<pg_log_entry_t> entries;
entries.push_back(pg_log_entry_t(pg_log_entry_t::ERROR, soid,
@ -2215,7 +2218,7 @@ void PrimaryLogPG::record_write_error(OpRequestRef op, const hobject_t &soid,
{}
void operator()() {
ldpp_dout(pg, 20) << "finished " << __func__ << " r=" << r << dendl;
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = orig_reply.detach();
if (reply == nullptr) {
@ -2249,7 +2252,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
if (op &&
op->get_req() &&
op->get_req()->get_type() == CEPH_MSG_OSD_OP &&
(static_cast<MOSDOp *>(op->get_req())->get_flags() &
(static_cast<const MOSDOp *>(op->get_req())->get_flags() &
CEPH_OSD_FLAG_IGNORE_CACHE)) {
dout(20) << __func__ << ": ignoring cache due to flag" << dendl;
return cache_result_t::NOOP;
@ -2295,7 +2298,7 @@ PrimaryLogPG::cache_result_t PrimaryLogPG::maybe_handle_cache_detail(
missing_oid = obc->obs.oi.soid;
}
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
const object_locator_t oloc = m->get_object_locator();
if (op->need_skip_handle_cache()) {
@ -2508,7 +2511,7 @@ bool PrimaryLogPG::maybe_promote(ObjectContextRef obc,
void PrimaryLogPG::do_cache_redirect(OpRequestRef op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
int flags = m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK);
MOSDOpReply *reply = new MOSDOpReply(m, -ENOENT,
get_osdmap()->get_epoch(), flags, false);
@ -2550,7 +2553,9 @@ struct C_ProxyRead : public Context {
void PrimaryLogPG::do_proxy_read(OpRequestRef op)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
// NOTE: non-const here because the ProxyReadOp needs mutable refs to
// stash the result in the request's OSDOp vector
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
object_locator_t oloc(m->get_object_locator());
oloc.pool = pool.info.tier_of;
@ -2643,7 +2648,7 @@ void PrimaryLogPG::finish_proxy_read(hobject_t oid, ceph_tid_t tid, int r)
osd->logger->inc(l_osd_tier_proxy_read);
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
OpContext *ctx = new OpContext(op, m->get_reqid(), prdop->ops, this);
ctx->reply = new MOSDOpReply(m, 0, get_osdmap()->get_epoch(), 0, false);
ctx->user_at_version = prdop->user_version;
@ -2739,7 +2744,8 @@ struct C_ProxyWrite_Commit : public Context {
void PrimaryLogPG::do_proxy_write(OpRequestRef op, const hobject_t& missing_oid)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
// NOTE: non-const because ProxyWriteOp takes a mutable ref
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
object_locator_t oloc(m->get_object_locator());
oloc.pool = pool.info.tier_of;
SnapContext snapc(m->get_snap_seq(), m->get_snaps());
@ -2804,7 +2810,7 @@ void PrimaryLogPG::finish_proxy_write(hobject_t oid, ceph_tid_t tid, int r)
osd->logger->inc(l_osd_tier_proxy_write);
MOSDOp *m = static_cast<MOSDOp*>(pwop->op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(pwop->op->get_req());
assert(m != NULL);
if (m->wants_ondisk() && !pwop->sent_disk) {
@ -2938,7 +2944,7 @@ void PrimaryLogPG::execute_ctx(OpContext *ctx)
ctx->reset_obs(ctx->obc);
ctx->update_log_only = false; // reset in case finish_copyfrom() is re-running execute_ctx
OpRequestRef op = ctx->op;
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
ObjectContextRef obc = ctx->obc;
const hobject_t& soid = obc->obs.oi.soid;
@ -3171,7 +3177,7 @@ void PrimaryLogPG::reply_ctx(OpContext *ctx, int r, eversion_t v, version_t uv)
void PrimaryLogPG::log_op_stats(OpContext *ctx)
{
OpRequestRef op = ctx->op;
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(op->get_req());
utime_t now = ceph_clock_now();
utime_t latency = now;
@ -3220,7 +3226,7 @@ void PrimaryLogPG::log_op_stats(OpContext *ctx)
void PrimaryLogPG::do_sub_op(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
assert(have_same_or_newer_map(m->map_epoch));
assert(m->get_type() == MSG_OSD_SUBOP);
dout(15) << "do_sub_op " << *op->get_req() << dendl;
@ -3231,7 +3237,7 @@ void PrimaryLogPG::do_sub_op(OpRequestRef op)
return;
}
OSDOp *first = NULL;
const OSDOp *first = NULL;
if (m->ops.size() >= 1) {
first = &m->ops[0];
}
@ -3256,10 +3262,10 @@ void PrimaryLogPG::do_sub_op(OpRequestRef op)
void PrimaryLogPG::do_sub_op_reply(OpRequestRef op)
{
MOSDSubOpReply *r = static_cast<MOSDSubOpReply *>(op->get_req());
const MOSDSubOpReply *r = static_cast<const MOSDSubOpReply *>(op->get_req());
assert(r->get_type() == MSG_OSD_SUBOPREPLY);
if (r->ops.size() >= 1) {
OSDOp& first = r->ops[0];
const OSDOp& first = r->ops[0];
switch (first.op.op) {
case CEPH_OSD_OP_SCRUB_RESERVE:
sub_op_scrub_reserve_reply(op);
@ -3272,7 +3278,7 @@ void PrimaryLogPG::do_scan(
OpRequestRef op,
ThreadPool::TPHandle &handle)
{
MOSDPGScan *m = static_cast<MOSDPGScan*>(op->get_req());
const MOSDPGScan *m = static_cast<const MOSDPGScan*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_SCAN);
dout(10) << "do_scan " << *m << dendl;
@ -3323,7 +3329,7 @@ void PrimaryLogPG::do_scan(
BackfillInterval& bi = peer_backfill_info[from];
bi.begin = m->begin;
bi.end = m->end;
bufferlist::iterator p = m->get_data().begin();
bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
// take care to preserve ordering!
bi.clear_objects();
@ -3345,7 +3351,7 @@ void PrimaryLogPG::do_scan(
void PrimaryLogPG::do_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = static_cast<MOSDPGBackfill*>(op->get_req());
const MOSDPGBackfill *m = static_cast<const MOSDPGBackfill*>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_BACKFILL);
dout(10) << "do_backfill " << *m << dendl;
@ -6595,7 +6601,7 @@ int PrimaryLogPG::prepare_transaction(OpContext *ctx)
ctx->delta_stats.num_objects > 0) && // FIXME: keys?
(pool.info.has_flag(pg_pool_t::FLAG_FULL) ||
get_osdmap()->test_flag(CEPH_OSDMAP_FULL))) {
MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
if (ctx->reqid.name.is_mds() || // FIXME: ignore MDS for now
m->has_flag(CEPH_OSD_FLAG_FULL_FORCE)) {
dout(20) << __func__ << " full, but proceeding due to FULL_FORCE or MDS"
@ -6810,7 +6816,7 @@ void PrimaryLogPG::apply_stats(
void PrimaryLogPG::complete_read_ctx(int result, OpContext *ctx)
{
MOSDOp *m = static_cast<MOSDOp*>(ctx->op->get_req());
const MOSDOp *m = static_cast<const MOSDOp*>(ctx->op->get_req());
assert(ctx->async_reads_complete());
for (vector<OSDOp>::iterator p = ctx->ops.begin();
@ -6918,7 +6924,7 @@ int PrimaryLogPG::fill_in_copy_get(
return result;
}
MOSDOp *op = reinterpret_cast<MOSDOp*>(ctx->op->get_req());
const MOSDOp *op = reinterpret_cast<const MOSDOp*>(ctx->op->get_req());
uint64_t features = op->get_features();
bool async_read_started = false;
@ -7067,7 +7073,10 @@ int PrimaryLogPG::fill_in_copy_get(
void PrimaryLogPG::fill_in_copy_get_noent(OpRequestRef& op, hobject_t oid,
OSDOp& osd_op, bool classic)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
// NOTE: we take non-const ref here for claim_op_out_data below; we must
// be careful not to modify anything else that will upset a racing
// operator<<
MOSDOp *m = static_cast<MOSDOp*>(op->get_nonconst_req());
uint64_t features = m->get_features();
object_copy_data_t reply_obj;
@ -8287,7 +8296,7 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
}
} else {
if (scrubber.active_rep_scrub) {
if (last_update_applied == static_cast<MOSDRepScrub*>(
if (last_update_applied == static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->scrub_to) {
osd->op_wq.queue(
make_pair(
@ -8301,9 +8310,9 @@ void PrimaryLogPG::op_applied(const eversion_t &applied_version)
void PrimaryLogPG::eval_repop(RepGather *repop)
{
MOSDOp *m = NULL;
const MOSDOp *m = NULL;
if (repop->op)
m = static_cast<MOSDOp *>(repop->op->get_req());
m = static_cast<const MOSDOp *>(repop->op->get_req());
if (m)
dout(10) << "eval_repop " << *repop
@ -8861,9 +8870,10 @@ ObjectContextRef PrimaryLogPG::create_object_context(const object_info_t& oi,
return obc;
}
ObjectContextRef PrimaryLogPG::get_object_context(const hobject_t& soid,
bool can_create,
map<string, bufferlist> *attrs)
ObjectContextRef PrimaryLogPG::get_object_context(
const hobject_t& soid,
bool can_create,
const map<string, bufferlist> *attrs)
{
assert(
attrs || !pg_log.get_missing().is_missing(soid) ||
@ -9280,7 +9290,7 @@ void PrimaryLogPG::kick_object_context_blocked(ObjectContextRef obc)
SnapSetContext *PrimaryLogPG::get_snapset_context(
const hobject_t& oid,
bool can_create,
map<string, bufferlist> *attrs,
const map<string, bufferlist> *attrs,
bool oid_existed)
{
Mutex::Locker l(snapset_contexts_lock);
@ -9518,7 +9528,7 @@ void PrimaryLogPG::_applied_recovered_object_replica()
// requeue an active chunky scrub waiting on recovery ops
if (!deleting && active_pushes == 0 &&
scrubber.active_rep_scrub && static_cast<MOSDRepScrub*>(
scrubber.active_rep_scrub && static_cast<const MOSDRepScrub*>(
scrubber.active_rep_scrub->get_req())->chunky) {
osd->op_wq.queue(
make_pair(
@ -9568,7 +9578,7 @@ void PrimaryLogPG::failed_push(const list<pg_shard_t> &from, const hobject_t &so
void PrimaryLogPG::sub_op_remove(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
assert(m->get_type() == MSG_OSD_SUBOP);
dout(7) << "sub_op_remove " << m->poid << dendl;
@ -9611,7 +9621,7 @@ eversion_t PrimaryLogPG::pick_newest_available(const hobject_t& oid)
void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
{
MOSDPGUpdateLogMissing *m = static_cast<MOSDPGUpdateLogMissing*>(
const MOSDPGUpdateLogMissing *m = static_cast<const MOSDPGUpdateLogMissing*>(
op->get_req());
assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
@ -9619,7 +9629,7 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
Context *complete = new FunctionContext(
[=](int) {
MOSDPGUpdateLogMissing *msg = static_cast<MOSDPGUpdateLogMissing*>(
const MOSDPGUpdateLogMissing *msg = static_cast<const MOSDPGUpdateLogMissing*>(
op->get_req());
lock();
if (!pg_has_reset_since(msg->get_epoch())) {
@ -9660,8 +9670,8 @@ void PrimaryLogPG::do_update_log_missing(OpRequestRef &op)
void PrimaryLogPG::do_update_log_missing_reply(OpRequestRef &op)
{
MOSDPGUpdateLogMissingReply *m =
static_cast<MOSDPGUpdateLogMissingReply*>(
const MOSDPGUpdateLogMissingReply *m =
static_cast<const MOSDPGUpdateLogMissingReply*>(
op->get_req());
dout(20) << __func__ << " got reply from "
<< m->get_from() << dendl;

View File

@ -337,7 +337,7 @@ public:
ObjectContextRef get_obc(
const hobject_t &hoid,
map<string, bufferlist> &attrs) override {
const map<string, bufferlist> &attrs) override {
return get_object_context(hoid, true, &attrs);
}
@ -370,7 +370,7 @@ public:
void log_operation(
const vector<pg_log_entry_t> &logv,
boost::optional<pg_hit_set_history_t> &hset_history,
const boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
bool transaction_applied,
@ -952,7 +952,7 @@ protected:
ObjectContextRef get_object_context(
const hobject_t& soid,
bool can_create,
map<string, bufferlist> *attrs = 0
const map<string, bufferlist> *attrs = 0
);
void context_registry_on_change();
@ -972,7 +972,7 @@ protected:
SnapSetContext *get_snapset_context(
const hobject_t& oid,
bool can_create,
map<string, bufferlist> *attrs = 0,
const map<string, bufferlist> *attrs = 0,
bool oid_existed = true //indicate this oid whether exsited in backend
);
void register_snapset_context(SnapSetContext *ssc) {

View File

@ -184,9 +184,9 @@ bool ReplicatedBackend::can_handle_while_inactive(OpRequestRef op)
case MSG_OSD_PG_PULL:
return true;
case MSG_OSD_SUBOP: {
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
if (m->ops.size() >= 1) {
OSDOp *first = &m->ops[0];
const OSDOp *first = &m->ops[0];
switch (first->op.op) {
case CEPH_OSD_OP_PULL:
return true;
@ -221,9 +221,9 @@ bool ReplicatedBackend::handle_message(
return true;
case MSG_OSD_SUBOP: {
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
if (m->ops.size() >= 1) {
OSDOp *first = &m->ops[0];
const OSDOp *first = &m->ops[0];
switch (first->op.op) {
case CEPH_OSD_OP_PULL:
sub_op_pull(op);
@ -247,9 +247,9 @@ bool ReplicatedBackend::handle_message(
}
case MSG_OSD_SUBOPREPLY: {
MOSDSubOpReply *r = static_cast<MOSDSubOpReply*>(op->get_req());
const MOSDSubOpReply *r = static_cast<const MOSDSubOpReply*>(op->get_req());
if (r->ops.size() >= 1) {
OSDOp &first = r->ops[0];
const OSDOp &first = r->ops[0];
switch (first.op.op) {
case CEPH_OSD_OP_PUSH:
// continue peer recovery
@ -664,8 +664,8 @@ void ReplicatedBackend::op_commit(
void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
{
MOSDRepOpReply *r = static_cast<MOSDRepOpReply *>(op->get_req());
r->finish_decode();
static_cast<MOSDRepOpReply*>(op->get_nonconst_req())->finish_decode();
const MOSDRepOpReply *r = static_cast<const MOSDRepOpReply *>(op->get_req());
assert(r->get_header().type == MSG_OSD_REPOPREPLY);
op->mark_started();
@ -678,9 +678,9 @@ void ReplicatedBackend::sub_op_modify_reply(OpRequestRef op)
map<ceph_tid_t, InProgressOp>::iterator iter =
in_progress_ops.find(rep_tid);
InProgressOp &ip_op = iter->second;
MOSDOp *m = NULL;
const MOSDOp *m = NULL;
if (ip_op.op)
m = static_cast<MOSDOp *>(ip_op.op->get_req());
m = static_cast<const MOSDOp *>(ip_op.op->get_req());
if (m)
dout(7) << __func__ << ": tid " << ip_op.tid << " op " //<< *m
@ -840,7 +840,7 @@ void ReplicatedBackend::be_deep_scrub(
void ReplicatedBackend::_do_push(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_PUSH);
pg_shard_t from = m->from;
@ -848,7 +848,7 @@ void ReplicatedBackend::_do_push(OpRequestRef op)
vector<PushReplyOp> replies;
ObjectStore::Transaction t;
for (vector<PushOp>::iterator i = m->pushes.begin();
for (vector<PushOp>::const_iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
replies.push_back(PushReplyOp());
@ -896,7 +896,7 @@ struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->get_req());
const MOSDPGPush *m = static_cast<const MOSDPGPush *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_PUSH);
pg_shard_t from = m->from;
@ -905,7 +905,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
vector<PullOp> replies(1);
ObjectStore::Transaction t;
list<pull_complete_info> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
for (vector<PushOp>::const_iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, &t);
@ -931,7 +931,7 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
reply->set_priority(m->get_priority());
reply->pgid = get_info().pgid;
reply->map_epoch = m->map_epoch;
reply->pulls.swap(replies);
reply->set_pulls(&replies);
reply->compute_cost(cct);
t.register_on_complete(
@ -944,28 +944,28 @@ void ReplicatedBackend::_do_pull_response(OpRequestRef op)
void ReplicatedBackend::do_pull(OpRequestRef op)
{
MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_req());
MOSDPGPull *m = static_cast<MOSDPGPull *>(op->get_nonconst_req());
assert(m->get_type() == MSG_OSD_PG_PULL);
pg_shard_t from = m->from;
map<pg_shard_t, vector<PushOp> > replies;
for (vector<PullOp>::iterator i = m->pulls.begin();
i != m->pulls.end();
++i) {
vector<PullOp> pulls;
m->take_pulls(&pulls);
for (auto& i : pulls) {
replies[from].push_back(PushOp());
handle_pull(from, *i, &(replies[from].back()));
handle_pull(from, i, &(replies[from].back()));
}
send_pushes(m->get_priority(), replies);
}
void ReplicatedBackend::do_push_reply(OpRequestRef op)
{
MOSDPGPushReply *m = static_cast<MOSDPGPushReply *>(op->get_req());
const MOSDPGPushReply *m = static_cast<const MOSDPGPushReply *>(op->get_req());
assert(m->get_type() == MSG_OSD_PG_PUSH_REPLY);
pg_shard_t from = m->from;
vector<PushOp> replies(1);
for (vector<PushReplyOp>::iterator i = m->replies.begin();
for (vector<PushReplyOp>::const_iterator i = m->replies.begin();
i != m->replies.end();
++i) {
bool more = handle_push_reply(from, *i, &(replies.back()));
@ -1088,8 +1088,8 @@ void ReplicatedBackend::issue_op(
// sub op modify
void ReplicatedBackend::sub_op_modify(OpRequestRef op)
{
MOSDRepOp *m = static_cast<MOSDRepOp *>(op->get_req());
m->finish_decode();
static_cast<MOSDRepOp*>(op->get_nonconst_req())->finish_decode();
const MOSDRepOp *m = static_cast<const MOSDRepOp *>(op->get_req());
int msg_type = m->get_type();
assert(MSG_OSD_REPOP == msg_type);
@ -1122,7 +1122,7 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op)
// shipped transaction and log entries
vector<pg_log_entry_t> log;
bufferlist::iterator p = m->get_data().begin();
bufferlist::iterator p = const_cast<bufferlist&>(m->get_data()).begin();
::decode(rm->opt, p);
if (m->new_temp_oid != hobject_t()) {
@ -1139,7 +1139,7 @@ void ReplicatedBackend::sub_op_modify(OpRequestRef op)
clear_temp_obj(m->discard_temp_oid);
}
p = m->logbl.begin();
p = const_cast<bufferlist&>(m->logbl).begin();
::decode(log, p);
rm->opt.set_fadvise_flag(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
@ -1181,25 +1181,25 @@ void ReplicatedBackend::sub_op_modify_applied(RepModifyRef rm)
dout(10) << "sub_op_modify_applied on " << rm << " op "
<< *rm->op->get_req() << dendl;
Message *m = rm->op->get_req();
const Message *m = rm->op->get_req();
Message *ack = NULL;
eversion_t version;
if (m->get_type() == MSG_OSD_SUBOP) {
// doesn't have CLIENT SUBOP feature ,use Subop
MOSDSubOp *req = static_cast<MOSDSubOp*>(m);
const MOSDSubOp *req = static_cast<const MOSDSubOp*>(m);
version = req->version;
if (!rm->committed)
ack = new MOSDSubOpReply(
req, parent->whoami_shard(),
0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
} else if (m->get_type() == MSG_OSD_REPOP) {
MOSDRepOp *req = static_cast<MOSDRepOp*>(m);
const MOSDRepOp *req = static_cast<const MOSDRepOp*>(m);
version = req->version;
if (!rm->committed)
ack = new MOSDRepOpReply(
static_cast<MOSDRepOp*>(m), parent->whoami_shard(),
static_cast<const MOSDRepOp*>(m), parent->whoami_shard(),
0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
} else {
ceph_abort();
@ -1228,19 +1228,19 @@ void ReplicatedBackend::sub_op_modify_commit(RepModifyRef rm)
assert(get_osdmap()->is_up(rm->ackerosd));
get_parent()->update_last_complete_ondisk(rm->last_complete);
Message *m = rm->op->get_req();
const Message *m = rm->op->get_req();
Message *commit = NULL;
if (m->get_type() == MSG_OSD_SUBOP) {
// doesn't have CLIENT SUBOP feature ,use Subop
MOSDSubOpReply *reply = new MOSDSubOpReply(
static_cast<MOSDSubOp*>(m),
static_cast<const MOSDSubOp*>(m),
get_parent()->whoami_shard(),
0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(rm->last_complete);
commit = reply;
} else if (m->get_type() == MSG_OSD_REPOP) {
MOSDRepOpReply *reply = new MOSDRepOpReply(
static_cast<MOSDRepOp*>(m),
static_cast<const MOSDRepOp*>(m),
get_parent()->whoami_shard(),
0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ONDISK);
reply->set_last_complete_ondisk(rm->last_complete);
@ -1679,15 +1679,15 @@ int ReplicatedBackend::send_pull_legacy(int prio, pg_shard_t peer,
}
void ReplicatedBackend::submit_push_data(
ObjectRecoveryInfo &recovery_info,
const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
bool cache_dont_need,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
map<string, bufferlist> &attrs,
map<string, bufferlist> &omap_entries,
const map<string, bufferlist> &attrs,
const map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t)
{
hobject_t target_oid;
@ -1710,7 +1710,7 @@ void ReplicatedBackend::submit_push_data(
if (omap_header.length())
t->omap_setheader(coll, ghobject_t(target_oid), omap_header);
bufferlist bv = attrs[OI_ATTR];
bufferlist bv = attrs.at(OI_ATTR);
object_info_t oi(bv);
t->set_alloc_hint(coll, ghobject_t(target_oid),
oi.expected_object_size,
@ -1750,8 +1750,9 @@ void ReplicatedBackend::submit_push_data(
}
}
void ReplicatedBackend::submit_push_complete(ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
void ReplicatedBackend::submit_push_complete(
const ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t)
{
for (map<hobject_t, interval_set<uint64_t>>::const_iterator p =
recovery_info.clone_subset.begin();
@ -1789,13 +1790,13 @@ ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
}
bool ReplicatedBackend::handle_pull_response(
pg_shard_t from, PushOp &pop, PullOp *response,
pg_shard_t from, const PushOp &pop, PullOp *response,
list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t)
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
data.claim(pop.data);
data = pop.data;
dout(10) << "handle_pull_response "
<< pop.recovery_info
<< pop.after_progress
@ -1808,7 +1809,7 @@ bool ReplicatedBackend::handle_pull_response(
return false;
}
hobject_t &hoid = pop.soid;
const hobject_t &hoid = pop.soid;
assert((data_included.empty() && data.length() == 0) ||
(!data_included.empty() && data.length() > 0));
@ -1825,16 +1826,17 @@ bool ReplicatedBackend::handle_pull_response(
bool first = pi.recovery_progress.first;
if (first) {
// attrs only reference the origin bufferlist (decode from MOSDPGPush message)
// whose size is much greater than attrs in recovery. If obc cache it (get_obc maybe
// cache the attr), this causes the whole origin bufferlist would not be free until
// obc is evicted from obc cache. So rebuild the bufferlist before cache it.
for (map<string, bufferlist>::iterator it = pop.attrset.begin();
it != pop.attrset.end();
++it) {
it->second.rebuild();
// attrs only reference the origin bufferlist (decode from
// MOSDPGPush message) whose size is much greater than attrs in
// recovery. If obc cache it (get_obc maybe cache the attr), this
// causes the whole origin bufferlist would not be free until obc
// is evicted from obc cache. So rebuild the bufferlists before
// cache it.
auto attrset = pop.attrset;
for (auto& a : attrset) {
a.second.rebuild();
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, attrset);
pi.recovery_info.oi = pi.obc->obs.oi;
pi.recovery_info = recalc_subsets(
pi.recovery_info,
@ -1888,7 +1890,7 @@ bool ReplicatedBackend::handle_pull_response(
}
void ReplicatedBackend::handle_push(
pg_shard_t from, PushOp &pop, PushReplyOp *response,
pg_shard_t from, const PushOp &pop, PushReplyOp *response,
ObjectStore::Transaction *t)
{
dout(10) << "handle_push "
@ -1896,7 +1898,7 @@ void ReplicatedBackend::handle_push(
<< pop.after_progress
<< dendl;
bufferlist data;
data.claim(pop.data);
data = pop.data;
bool first = pop.before_progress.first;
bool complete = pop.after_progress.data_complete &&
pop.after_progress.omap_complete;
@ -1974,7 +1976,7 @@ void ReplicatedBackend::send_pulls(int prio, map<pg_shard_t, vector<PullOp> > &p
msg->set_priority(prio);
msg->pgid = get_parent()->primary_spg_t();
msg->map_epoch = get_osdmap()->get_epoch();
msg->pulls.swap(i->second);
msg->set_pulls(&i->second);
msg->compute_cost(cct);
get_parent()->send_message_osd_cluster(msg, con);
}
@ -2170,7 +2172,7 @@ void ReplicatedBackend::prep_push_op_blank(const hobject_t& soid, PushOp *op)
void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
{
MOSDSubOpReply *reply = static_cast<MOSDSubOpReply*>(op->get_req());
const MOSDSubOpReply *reply = static_cast<const MOSDSubOpReply*>(op->get_req());
const hobject_t& soid = reply->get_poid();
assert(reply->get_type() == MSG_OSD_SUBOPREPLY);
dout(10) << "sub_op_push_reply from " << reply->get_source() << " " << *reply << dendl;
@ -2186,7 +2188,8 @@ void ReplicatedBackend::sub_op_push_reply(OpRequestRef op)
send_push_op_legacy(op->get_req()->get_priority(), peer, pop);
}
bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply)
bool ReplicatedBackend::handle_push_reply(
pg_shard_t peer, const PushReplyOp &op, PushOp *reply)
{
const hobject_t &soid = op.soid;
if (pushing.count(soid) == 0) {
@ -2241,7 +2244,7 @@ bool ReplicatedBackend::handle_push_reply(pg_shard_t peer, PushReplyOp &op, Push
*/
void ReplicatedBackend::sub_op_pull(OpRequestRef op)
{
MOSDSubOp *m = static_cast<MOSDSubOp*>(op->get_req());
const MOSDSubOp *m = static_cast<const MOSDSubOp*>(op->get_req());
assert(m->get_type() == MSG_OSD_SUBOP);
op->mark_started();
@ -2347,7 +2350,9 @@ void ReplicatedBackend::trim_pushed_data(
void ReplicatedBackend::sub_op_push(OpRequestRef op)
{
op->mark_started();
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_req());
// don't bother with const-ness here; we're about to kill MOSDSubOp
// anyway.
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->get_nonconst_req());
PushOp pop;
pop.soid = m->poid;

View File

@ -238,7 +238,7 @@ private:
void do_pull(OpRequestRef op);
void do_push_reply(OpRequestRef op);
bool handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply);
bool handle_push_reply(pg_shard_t peer, const PushReplyOp &op, PushOp *reply);
void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
struct pull_complete_info {
@ -246,10 +246,10 @@ private:
object_stat_sum_t stat;
};
bool handle_pull_response(
pg_shard_t from, PushOp &op, PullOp *response,
pg_shard_t from, const PushOp &op, PullOp *response,
list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t);
void handle_push(pg_shard_t from, PushOp &op, PushReplyOp *response,
void handle_push(pg_shard_t from, const PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
static void trim_pushed_data(const interval_set<uint64_t> &copy_subset,
@ -276,17 +276,17 @@ private:
PushOp *out_op,
object_stat_sum_t *stat = 0,
bool cache_dont_need = true);
void submit_push_data(ObjectRecoveryInfo &recovery_info,
void submit_push_data(const ObjectRecoveryInfo &recovery_info,
bool first,
bool complete,
bool cache_dont_need,
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
bufferlist omap_header,
map<string, bufferlist> &attrs,
map<string, bufferlist> &omap_entries,
const map<string, bufferlist> &attrs,
const map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t);
void submit_push_complete(ObjectRecoveryInfo &recovery_info,
void submit_push_complete(const ObjectRecoveryInfo &recovery_info,
ObjectStore::Transaction *t);
void calc_clone_subsets(

View File

@ -83,7 +83,7 @@ void Session::ack_backoff(
}
bool Session::check_backoff(
CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m)
CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m)
{
BackoffRef b(have_backoff(pgid, oid));
if (b) {

View File

@ -195,7 +195,7 @@ struct Session : public RefCountedObject {
}
bool check_backoff(
CephContext *cct, spg_t pgid, const hobject_t& oid, Message *m);
CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m);
void add_backoff(BackoffRef b) {
Mutex::Locker l(backoff_lock);