From fbeafdf9c385af6eb4858b7227862039f2ea5a4d Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Thu, 25 Aug 2011 13:59:42 -0700 Subject: [PATCH] osd: make MOSDOp[Reply] encoding backwards compatible Move away from struct-based encoding of MOSDOpReply while we're at it. Signed-off-by: Sage Weil --- src/messages/MOSDOp.h | 22 ++++++- src/messages/MOSDOpReply.h | 115 +++++++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 39 deletions(-) diff --git a/src/messages/MOSDOp.h b/src/messages/MOSDOp.h index 3576710469b..2a8c074cda0 100644 --- a/src/messages/MOSDOp.h +++ b/src/messages/MOSDOp.h @@ -235,7 +235,7 @@ struct ceph_osd_request_head { if (flags & CEPH_OSD_FLAG_PEERSTAT) ::encode(peer_stat, payload); } else { - header.version = 2; + header.version = 3; ::encode(client_inc, payload); ::encode(osdmap_epoch, payload); ::encode(flags, payload); @@ -267,8 +267,11 @@ struct ceph_osd_request_head { // old decode ::decode(client_inc, p); + old_pg_t opgid; + ::decode_raw(opgid, p); + pgid = opgid; + __u32 su; - ::decode(pgid, p); ::decode(su, p); oloc.pool = pgid.pool(); @@ -296,6 +299,11 @@ struct ceph_osd_request_head { if (flags & CEPH_OSD_FLAG_PEERSTAT) ::decode(peer_stat, p); + + // recalculate pgid hash value + pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS, + oid.name.c_str(), + oid.name.length())); } else { // new decode ::decode(client_inc, p); @@ -305,7 +313,15 @@ struct ceph_osd_request_head { ::decode(reassert_version, p); ::decode(oloc, p); - ::decode(pgid, p); + + if (header.version < 3) { + old_pg_t opgid; + ::decode_raw(opgid, p); + pgid = opgid; + } else { + ::decode(pgid, p); + } + ::decode(oid, p); //::decode(ops, p); diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 04243a9b7eb..3a7428a5e7e 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -30,31 +30,35 @@ */ class MOSDOpReply : public Message { - ceph_osd_reply_head head; - public: object_t oid; + pg_t pgid; vector ops; + int64_t flags; + int32_t result; + eversion_t reassert_version; + epoch_t osdmap_epoch; +public: object_t get_oid() { return oid; } - pg_t get_pg() { return pg_t(head.layout.ol_pgid); } - int get_flags() { return head.flags; } + pg_t get_pg() { return pgid; } + int get_flags() { return flags; } bool is_ondisk() { return get_flags() & CEPH_OSD_FLAG_ONDISK; } bool is_onnvram() { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; } - __s32 get_result() { return head.result; } - eversion_t get_version() { return head.reassert_version; } + int get_result() { return result; } + eversion_t get_version() { return reassert_version; } + + bool may_read() { return flags & CEPH_OSD_FLAG_READ; } + bool may_write() { return flags & CEPH_OSD_FLAG_WRITE; } - bool may_read() { return head.flags & CEPH_OSD_FLAG_READ; } - bool may_write() { return head.flags & CEPH_OSD_FLAG_WRITE; } - - void set_result(int r) { head.result = r; } - void set_version(eversion_t v) { head.reassert_version = v; } - - void add_flags(int f) { head.flags = (int)head.flags | f; } + void set_result(int r) { result = r; } + void set_version(eversion_t v) { reassert_version = v; } + void add_flags(int f) { flags |= f; } + // osdmap - epoch_t get_map_epoch() { return head.osdmap_epoch; } + epoch_t get_map_epoch() { return osdmap_epoch; } /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(), head.client_inc, @@ -64,41 +68,80 @@ class MOSDOpReply : public Message { public: MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) : Message(CEPH_MSG_OSD_OPREPLY) { - memset(&head, 0, sizeof(head)); set_tid(req->get_tid()); - head.client_inc = req->client_inc; ops = req->ops; - head.result = result; - head.flags = + result = result; + flags = (req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype; oid = req->oid; - head.layout.ol_pgid = req->pgid.get_old_pg(); - head.osdmap_epoch = e; - head.reassert_version = req->reassert_version; + pgid = req->pgid; + osdmap_epoch = e; + reassert_version = req->reassert_version; } MOSDOpReply() {} private: ~MOSDOpReply() {} public: - // marshalling + virtual void encode_payload(CephContext *cct) { + if (!connection->has_feature(CEPH_FEATURE_PGID64)) { + ceph_osd_reply_head head; + memset(&head, 0, sizeof(head)); + head.layout.ol_pgid = pgid.get_old_pg().v; + head.flags = flags; + head.osdmap_epoch = osdmap_epoch; + head.result = result; + head.num_ops = ops.size(); + head.object_len = oid.name.length(); + ::encode(head, payload); + for (unsigned i = 0; i < head.num_ops; i++) { + ::encode(ops[i].op, payload); + } + ::encode_nohead(oid.name, payload); + } else { + header.version = 2; + ::encode(oid, payload); + ::encode(pgid, payload); + ::encode(flags, payload); + ::encode(result, payload); + ::encode(reassert_version, payload); + ::encode(osdmap_epoch, payload); + + __u32 num_ops = ops.size(); + ::encode(num_ops, payload); + for (unsigned i = 0; i < num_ops; i++) + ::encode(ops[i].op, payload); + } + } virtual void decode_payload(CephContext *cct) { bufferlist::iterator p = payload.begin(); - ::decode(head, p); - ops.resize(head.num_ops); - for (unsigned i = 0; i < head.num_ops; i++) { - ::decode(ops[i].op, p); + if (header.version < 2) { + ceph_osd_reply_head head; + ::decode(head, p); + ops.resize(head.num_ops); + for (unsigned i = 0; i < head.num_ops; i++) { + ::decode(ops[i].op, p); + } + ::decode_nohead(head.object_len, oid.name, p); + pgid = pg_t(head.layout.ol_pgid); + result = head.result; + flags = head.flags; + reassert_version = head.reassert_version; + osdmap_epoch = head.osdmap_epoch; + } else { + ::decode(oid, p); + ::decode(pgid, p); + ::decode(flags, p); + ::decode(result, p); + ::decode(reassert_version, p); + ::decode(osdmap_epoch, p); + + __u32 num_ops = ops.size(); + ::decode(num_ops, p); + ops.resize(num_ops); + for (unsigned i = 0; i < num_ops; i++) + ::decode(ops[i].op, p); } - ::decode_nohead(head.object_len, oid.name, p); - } - virtual void encode_payload(CephContext *cct) { - head.num_ops = ops.size(); - head.object_len = oid.name.length(); - ::encode(head, payload); - for (unsigned i = 0; i < head.num_ops; i++) { - ::encode(ops[i].op, payload); - } - ::encode_nohead(oid.name, payload); } const char *get_type_name() { return "osd_op_reply"; }