osd: make MOSDOp[Reply] encoding backwards compatible

Move away from struct-based encoding of MOSDOpReply while we're at it.

Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
Sage Weil 2011-08-25 13:59:42 -07:00 committed by Sage Weil
parent 7cedf7b8d1
commit fbeafdf9c3
2 changed files with 98 additions and 39 deletions

View File

@ -235,7 +235,7 @@ struct ceph_osd_request_head {
if (flags & CEPH_OSD_FLAG_PEERSTAT)
::encode(peer_stat, payload);
} else {
header.version = 2;
header.version = 3;
::encode(client_inc, payload);
::encode(osdmap_epoch, payload);
::encode(flags, payload);
@ -267,8 +267,11 @@ struct ceph_osd_request_head {
// old decode
::decode(client_inc, p);
old_pg_t opgid;
::decode_raw(opgid, p);
pgid = opgid;
__u32 su;
::decode(pgid, p);
::decode(su, p);
oloc.pool = pgid.pool();
@ -296,6 +299,11 @@ struct ceph_osd_request_head {
if (flags & CEPH_OSD_FLAG_PEERSTAT)
::decode(peer_stat, p);
// recalculate pgid hash value
pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
oid.name.c_str(),
oid.name.length()));
} else {
// new decode
::decode(client_inc, p);
@ -305,7 +313,15 @@ struct ceph_osd_request_head {
::decode(reassert_version, p);
::decode(oloc, p);
::decode(pgid, p);
if (header.version < 3) {
old_pg_t opgid;
::decode_raw(opgid, p);
pgid = opgid;
} else {
::decode(pgid, p);
}
::decode(oid, p);
//::decode(ops, p);

View File

@ -30,31 +30,35 @@
*/
class MOSDOpReply : public Message {
ceph_osd_reply_head head;
public:
object_t oid;
pg_t pgid;
vector<OSDOp> ops;
int64_t flags;
int32_t result;
eversion_t reassert_version;
epoch_t osdmap_epoch;
public:
object_t get_oid() { return oid; }
pg_t get_pg() { return pg_t(head.layout.ol_pgid); }
int get_flags() { return head.flags; }
pg_t get_pg() { return pgid; }
int get_flags() { return flags; }
bool is_ondisk() { return get_flags() & CEPH_OSD_FLAG_ONDISK; }
bool is_onnvram() { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; }
__s32 get_result() { return head.result; }
eversion_t get_version() { return head.reassert_version; }
int get_result() { return result; }
eversion_t get_version() { return reassert_version; }
bool may_read() { return head.flags & CEPH_OSD_FLAG_READ; }
bool may_write() { return head.flags & CEPH_OSD_FLAG_WRITE; }
bool may_read() { return flags & CEPH_OSD_FLAG_READ; }
bool may_write() { return flags & CEPH_OSD_FLAG_WRITE; }
void set_result(int r) { head.result = r; }
void set_version(eversion_t v) { head.reassert_version = v; }
void set_result(int r) { result = r; }
void set_version(eversion_t v) { reassert_version = v; }
void add_flags(int f) { head.flags = (int)head.flags | f; }
void add_flags(int f) { flags |= f; }
// osdmap
epoch_t get_map_epoch() { return head.osdmap_epoch; }
epoch_t get_map_epoch() { return osdmap_epoch; }
/*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
head.client_inc,
@ -64,41 +68,80 @@ class MOSDOpReply : public Message {
public:
MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) :
Message(CEPH_MSG_OSD_OPREPLY) {
memset(&head, 0, sizeof(head));
set_tid(req->get_tid());
head.client_inc = req->client_inc;
ops = req->ops;
head.result = result;
head.flags =
result = result;
flags =
(req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype;
oid = req->oid;
head.layout.ol_pgid = req->pgid.get_old_pg();
head.osdmap_epoch = e;
head.reassert_version = req->reassert_version;
pgid = req->pgid;
osdmap_epoch = e;
reassert_version = req->reassert_version;
}
MOSDOpReply() {}
private:
~MOSDOpReply() {}
public:
// marshalling
virtual void encode_payload(CephContext *cct) {
if (!connection->has_feature(CEPH_FEATURE_PGID64)) {
ceph_osd_reply_head head;
memset(&head, 0, sizeof(head));
head.layout.ol_pgid = pgid.get_old_pg().v;
head.flags = flags;
head.osdmap_epoch = osdmap_epoch;
head.result = result;
head.num_ops = ops.size();
head.object_len = oid.name.length();
::encode(head, payload);
for (unsigned i = 0; i < head.num_ops; i++) {
::encode(ops[i].op, payload);
}
::encode_nohead(oid.name, payload);
} else {
header.version = 2;
::encode(oid, payload);
::encode(pgid, payload);
::encode(flags, payload);
::encode(result, payload);
::encode(reassert_version, payload);
::encode(osdmap_epoch, payload);
__u32 num_ops = ops.size();
::encode(num_ops, payload);
for (unsigned i = 0; i < num_ops; i++)
::encode(ops[i].op, payload);
}
}
virtual void decode_payload(CephContext *cct) {
bufferlist::iterator p = payload.begin();
::decode(head, p);
ops.resize(head.num_ops);
for (unsigned i = 0; i < head.num_ops; i++) {
::decode(ops[i].op, p);
if (header.version < 2) {
ceph_osd_reply_head head;
::decode(head, p);
ops.resize(head.num_ops);
for (unsigned i = 0; i < head.num_ops; i++) {
::decode(ops[i].op, p);
}
::decode_nohead(head.object_len, oid.name, p);
pgid = pg_t(head.layout.ol_pgid);
result = head.result;
flags = head.flags;
reassert_version = head.reassert_version;
osdmap_epoch = head.osdmap_epoch;
} else {
::decode(oid, p);
::decode(pgid, p);
::decode(flags, p);
::decode(result, p);
::decode(reassert_version, p);
::decode(osdmap_epoch, p);
__u32 num_ops = ops.size();
::decode(num_ops, p);
ops.resize(num_ops);
for (unsigned i = 0; i < num_ops; i++)
::decode(ops[i].op, p);
}
::decode_nohead(head.object_len, oid.name, p);
}
virtual void encode_payload(CephContext *cct) {
head.num_ops = ops.size();
head.object_len = oid.name.length();
::encode(head, payload);
for (unsigned i = 0; i < head.num_ops; i++) {
::encode(ops[i].op, payload);
}
::encode_nohead(oid.name, payload);
}
const char *get_type_name() { return "osd_op_reply"; }