osd: include client ticket in MOSDOp

One step closer to stabilizing the client <-> osd protocol.
This commit is contained in:
Sage Weil 2009-05-29 17:28:07 -07:00
parent b9bca9c43e
commit b9dd08ac08
8 changed files with 31 additions and 9 deletions

View File

@ -2259,6 +2259,8 @@ int Client::mount()
signed_ticket = monclient->get_signed_ticket();
ticket = monclient->get_ticket();
objecter->signed_ticket = signed_ticket;
mounted = true;
dout(2) << "mounted: have osdmap " << osdmap->get_epoch()

View File

@ -26,7 +26,7 @@
#define CEPH_OSD_PROTOCOL 5 /* cluster internal */
#define CEPH_MDS_PROTOCOL 9 /* cluster internal */
#define CEPH_MON_PROTOCOL 4 /* cluster internal */
#define CEPH_OSDC_PROTOCOL 13 /* public/client */
#define CEPH_OSDC_PROTOCOL 14 /* public/client */
#define CEPH_MDSC_PROTOCOL 21 /* public/client */
#define CEPH_MONC_PROTOCOL 12 /* public/client */

View File

@ -361,13 +361,15 @@ struct ceph_osd_request_head {
struct ceph_timespec mtime;
struct ceph_eversion reassert_version;
__le32 ticket_len;
__le64 snapid;
__le64 snap_seq; /* writer's snap context */
__le32 num_snaps;
__le16 num_ops;
__u16 object_type;
struct ceph_osd_op ops[]; /* followed by snaps */
struct ceph_osd_op ops[]; /* followed by ticket, snaps */
} __attribute__ ((packed));
struct ceph_osd_reply_head {

View File

@ -93,6 +93,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
struct ceph_osd_request_head *head;
struct ceph_osd_op *op;
__le64 *snaps;
void *ticketp;
int do_trunc = truncate_seq && (off + *plen > truncate_size);
int num_op = 1 + do_sync + do_trunc;
size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
@ -114,6 +115,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
/* create message */
msg_size += osdc->client->signed_ticket_len;
if (snapc)
msg_size += sizeof(u64) * snapc->num_snaps;
msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
@ -124,7 +126,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
memset(msg->front.iov_base, 0, msg->front.iov_len);
head = msg->front.iov_base;
op = (void *)(head + 1);
snaps = (void *)(op + num_op);
ticketp = (void *)(op + num_op);
snaps = ticketp + osdc->client->signed_ticket_len;
head->ticket_len = cpu_to_le32(osdc->client->signed_ticket_len);
memcpy(ticketp, osdc->client->signed_ticket,
osdc->client->signed_ticket_len);
head->client_inc = cpu_to_le32(1); /* always, for now. */
head->flags = cpu_to_le32(flags);

View File

@ -113,6 +113,7 @@ bool RadosClient::init()
lock.Lock();
objecter->signed_ticket = mc->get_signed_ticket();
objecter->set_client_incarnation(0);
objecter->init();

View File

@ -32,6 +32,7 @@ private:
ceph_osd_request_head head;
public:
vector<ceph_osd_op> ops;
bufferlist ticket;
vector<snapid_t> snaps;
osd_peer_stat_t peer_stat;
@ -72,15 +73,18 @@ public:
return peer_stat;
}
bufferlist& get_ticket() { return ticket; }
//void inc_shed_count() { head.shed_count = get_shed_count() + 1; }
//int get_shed_count() { return head.shed_count; }
MOSDOp(int inc, long tid,
MOSDOp(const bufferlist& tkt, int inc, long tid,
object_t oid, ceph_object_layout ol, epoch_t mapepoch,
int flags) :
Message(CEPH_MSG_OSD_OP) {
Message(CEPH_MSG_OSD_OP),
ticket(tkt) {
memset(&head, 0, sizeof(head));
head.tid = tid;
head.client_inc = inc;
@ -153,8 +157,10 @@ public:
virtual void encode_payload() {
head.num_snaps = snaps.size();
head.num_ops = ops.size();
head.ticket_len = ticket.length();
::encode(head, payload);
::encode_nohead(ops, payload);
::encode_nohead(ticket, payload);
::encode_nohead(snaps, payload);
if (head.flags & CEPH_OSD_FLAG_PEERSTAT)
::encode(peer_stat, payload);
@ -164,6 +170,7 @@ public:
bufferlist::iterator p = payload.begin();
::decode(head, p);
decode_nohead(head.num_ops, ops, p);
decode_nohead(head.ticket_len, ticket, p);
decode_nohead(head.num_snaps, snaps, p);
if (head.flags & CEPH_OSD_FLAG_PEERSTAT)
::decode(peer_stat, p);

View File

@ -388,7 +388,7 @@ tid_t Objecter::read_submit(ReadOp *rd)
int flags = rd->flags;
if (rd->onfinish)
flags |= CEPH_OSD_FLAG_ACK;
MOSDOp *m = new MOSDOp(client_inc, last_tid,
MOSDOp *m = new MOSDOp(signed_ticket, client_inc, last_tid,
rd->oid, rd->layout, osdmap->get_epoch(),
flags | CEPH_OSD_FLAG_READ);
m->set_snapid(rd->snap);
@ -506,7 +506,7 @@ tid_t Objecter::modify_submit(ModifyOp *wr)
wr->paused = true;
maybe_request_map();
} else if (pg.primary() >= 0) {
MOSDOp *m = new MOSDOp(client_inc, wr->tid,
MOSDOp *m = new MOSDOp(signed_ticket, client_inc, wr->tid,
wr->oid, wr->layout, osdmap->get_epoch(),
flags | CEPH_OSD_FLAG_WRITE);
m->set_snapid(CEPH_NOSNAP);

View File

@ -158,7 +158,10 @@ class Objecter {
Messenger *messenger;
MonMap *monmap;
OSDMap *osdmap;
bufferlist signed_ticket;
private:
tid_t last_tid;
int client_inc;
@ -306,7 +309,7 @@ class Objecter {
public:
Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) :
messenger(m), monmap(mm), osdmap(om),
messenger(m), monmap(mm), osdmap(om),
last_tid(0), client_inc(-1),
num_unacked(0), num_uncommitted(0),
last_epoch_requested(0),