mirror of
https://github.com/ceph/ceph
synced 2025-01-20 10:01:45 +00:00
merged 1863:1933 from trunk into branches/sage/mds
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1934 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
5e5416e605
commit
82b1142075
@ -139,6 +139,8 @@ void parse_syn_options(vector<char*>& args)
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
|
||||
} else if (strcmp(args[i],"walk") == 0) {
|
||||
syn_modes.push_back( SYNCLIENT_MODE_FULLWALK );
|
||||
|
@ -450,9 +450,9 @@ bool parse_ip_port(const char *s, entity_addr_t& a)
|
||||
s++; off++;
|
||||
|
||||
if (count <= 3)
|
||||
a.ipq[count] = val;
|
||||
a.v.ipq[count] = val;
|
||||
else
|
||||
a.port = val;
|
||||
a.v.port = val;
|
||||
|
||||
count++;
|
||||
if (count == 4 && *s != ':') break;
|
||||
|
@ -428,7 +428,19 @@ int ObjectCache::map_write(block_t start, block_t len,
|
||||
map<block_t, BufferHead*>& hits,
|
||||
version_t super_epoch)
|
||||
{
|
||||
map<block_t, BufferHead*>::iterator p = data.lower_bound(start);
|
||||
map<block_t, BufferHead*>::iterator p;
|
||||
|
||||
// hack speed up common cases
|
||||
if (start == 0) {
|
||||
p = data.begin();
|
||||
} else if (start + len == on->object_blocks && len == 1 && !data.empty()) {
|
||||
// append hack.
|
||||
p = data.end();
|
||||
p--;
|
||||
if (p->first < start) p++;
|
||||
} else {
|
||||
p = data.lower_bound(start);
|
||||
}
|
||||
|
||||
dout(10) << "map_write " << *on << " " << start << "~" << len << dendl;
|
||||
// p->first >= start
|
||||
|
@ -67,7 +67,8 @@ public:
|
||||
public:
|
||||
Onode(object_t oid) : ref(0), object_id(oid), version(0),
|
||||
readonly(false),
|
||||
object_size(0), object_blocks(0), oc(0),
|
||||
object_size(0), object_blocks(0),
|
||||
oc(0),
|
||||
dirty(false), dangling(false), deleted(false) {
|
||||
onode_loc.length = 0;
|
||||
}
|
||||
@ -239,7 +240,23 @@ public:
|
||||
|
||||
//assert(start+len <= object_blocks);
|
||||
|
||||
map<block_t,Extent>::iterator p = extent_map.lower_bound(start);
|
||||
map<block_t,Extent>::iterator p;
|
||||
|
||||
// hack hack speed up common cases!
|
||||
if (start == 0) {
|
||||
p = extent_map.begin();
|
||||
} else if (start+len == object_blocks && len == 1 && !extent_map.empty()) {
|
||||
// append hack.
|
||||
p = extent_map.end();
|
||||
p--;
|
||||
if (p->first < start) p++;
|
||||
//while (p->first >= start) p--;
|
||||
//p++;
|
||||
} else {
|
||||
// normal
|
||||
p = extent_map.lower_bound(start);
|
||||
}
|
||||
|
||||
if (p != extent_map.begin() &&
|
||||
(p == extent_map.end() || p->first > start && p->first)) {
|
||||
p--;
|
||||
|
@ -88,9 +88,9 @@ int main(int argc, char **argv) {
|
||||
|
||||
MonMap *monmap = new MonMap(g_conf.num_mon);
|
||||
entity_addr_t a;
|
||||
a.nonce = getpid();
|
||||
a.v.nonce = getpid();
|
||||
for (int i=0; i<g_conf.num_mon; i++) {
|
||||
a.port = i;
|
||||
a.v.port = i;
|
||||
monmap->mon_inst[i] = entity_inst_t(entity_name_t::MON(i), a); // hack ; see FakeMessenger.cc
|
||||
}
|
||||
|
||||
|
@ -86,9 +86,9 @@ int main(int argc, char **argv)
|
||||
|
||||
MonMap *monmap = new MonMap(g_conf.num_mon);
|
||||
entity_addr_t a;
|
||||
a.nonce = getpid();
|
||||
a.v.nonce = getpid();
|
||||
for (int i=0; i<g_conf.num_mon; i++) {
|
||||
a.port = i;
|
||||
a.v.port = i;
|
||||
monmap->mon_inst[i] = entity_inst_t(entity_name_t::MON(i), a); // hack ; see FakeMessenger.cc
|
||||
}
|
||||
|
||||
|
8
branches/sage/mds/include/ceph_inttypes.h
Normal file
8
branches/sage/mds/include/ceph_inttypes.h
Normal file
@ -0,0 +1,8 @@
|
||||
#ifndef __CEPH_INTTYPES_H
|
||||
#define __CEPH_INTTYPES_H
|
||||
|
||||
typedef uint32_t __u32;
|
||||
typedef uint16_t __u16;
|
||||
typedef uint8_t __u8;
|
||||
|
||||
#endif
|
7
branches/sage/mds/kernel/Makefile
Normal file
7
branches/sage/mds/kernel/Makefile
Normal file
@ -0,0 +1,7 @@
|
||||
#
|
||||
# Makefile for CEPH filesystem.
|
||||
#
|
||||
|
||||
obj-$(CONFIG_CEPH_FS) += ceph.o
|
||||
|
||||
ceph-objs := inode.o
|
75
branches/sage/mds/kernel/ceph_fs.h
Normal file
75
branches/sage/mds/kernel/ceph_fs.h
Normal file
@ -0,0 +1,75 @@
|
||||
/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*-
|
||||
* vim: ts=8 sw=8 smarttab
|
||||
*/
|
||||
|
||||
#ifndef _FS_CEPH_CEPH_H
|
||||
#define _FS_CEPH_CEPH_H
|
||||
|
||||
/* #include <linux/ceph_fs.h> */
|
||||
|
||||
#include "kmsg.h"
|
||||
|
||||
/* do these later
|
||||
#include "osdmap.h"
|
||||
#include "mdsmap.h"
|
||||
#include "monmap.h"
|
||||
*/
|
||||
struct ceph_monmap;
|
||||
struct ceph_osdmap;
|
||||
struct ceph_mdsmap;
|
||||
|
||||
|
||||
|
||||
/*
|
||||
* state associated with an individual MDS<->client session
|
||||
*/
|
||||
struct ceph_mds_session {
|
||||
__u64 s_push_seq;
|
||||
/* wait queue? */
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* CEPH file system in-core superblock info
|
||||
*/
|
||||
struct ceph_sb_info {
|
||||
__u32 s_whoami; /* client number */
|
||||
struct ceph_kmsg *s_kmsg; /* messenger instance */
|
||||
|
||||
struct ceph_monmap *s_monmap; /* monitor map */
|
||||
struct ceph_mdsmap *s_mdsmap; /* mds map */
|
||||
struct ceph_osdmap *s_osdmap; /* osd map */
|
||||
|
||||
/* mds sessions */
|
||||
struct ceph_mds_session **s_mds_sessions; /* sparse array; elements NULL if no session */
|
||||
int s_max_mds_sessions; /* size of s_mds_sessions array */
|
||||
|
||||
/* current requests */
|
||||
/* ... */
|
||||
__u64 last_tid;
|
||||
};
|
||||
|
||||
/*
|
||||
* CEPH file system in-core inode info
|
||||
*/
|
||||
struct ceph_inode_info {
|
||||
unsigned long val; /* inode from types.h is uint64_t */
|
||||
struct inode vfs_inode;
|
||||
};
|
||||
|
||||
static inline struct ceph_inode_info *CEPH_I(struct inode *inode)
|
||||
{
|
||||
return list_entry(inode, struct ceph_inode_info, vfs_inode);
|
||||
}
|
||||
|
||||
|
||||
/* file.c */
|
||||
extern const struct inode_operations ceph_file_inops;
|
||||
extern const struct file_operations ceph_file_operations;
|
||||
extern const struct address_space_operations ceph_aops;
|
||||
|
||||
/* dir.c */
|
||||
extern const struct inode_operations ceph_dir_inops;
|
||||
extern const struct file_operations ceph_dir_operations;
|
||||
|
||||
#endif /* _FS_CEPH_CEPH_H */
|
140
branches/sage/mds/kernel/inode.c
Normal file
140
branches/sage/mds/kernel/inode.c
Normal file
@ -0,0 +1,140 @@
|
||||
/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*-
|
||||
* vim: ts=8 sw=8 smarttab
|
||||
*/
|
||||
|
||||
#include <linux/module.h>
|
||||
#include <linux/fs.h>
|
||||
#include <linux/smp_lock.h>
|
||||
#include <linux/slab.h>
|
||||
#include "ceph_fs.h"
|
||||
|
||||
MODULE_AUTHOR("Patience Warnick <patience@newdream.net>");
|
||||
MODULE_DESCRIPTION("Ceph filesystem for Linux");
|
||||
MODULE_LICENSE("GPL");
|
||||
|
||||
|
||||
static void ceph_read_inode(struct inode * inode)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static int ceph_write_inode(struct inode * inode, int unused)
|
||||
{
|
||||
lock_kernel();
|
||||
unlock_kernel();
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void ceph_delete_inode(struct inode * inode)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static void ceph_put_super(struct super_block *s)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
static int ceph_statfs(struct dentry *dentry, struct kstatfs *buf)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void ceph_write_super(struct super_block *s)
|
||||
{
|
||||
lock_kernel();
|
||||
unlock_kernel();
|
||||
return;
|
||||
}
|
||||
|
||||
static struct kmem_cache *ceph_inode_cachep;
|
||||
|
||||
static struct inode *ceph_alloc_inode(struct super_block *sb)
|
||||
{
|
||||
struct ceph_inode_info *ci;
|
||||
ci = kmem_cache_alloc(ceph_inode_cachep, GFP_KERNEL);
|
||||
if (!ci)
|
||||
return NULL;
|
||||
return &ci->vfs_inode;
|
||||
}
|
||||
|
||||
static void ceph_destroy_inode(struct inode *inode)
|
||||
{
|
||||
kmem_cache_free(ceph_inode_cachep, CEPH_I(inode));
|
||||
}
|
||||
|
||||
static void init_once(void *foo, struct kmem_cache *cachep, unsigned long flags)
|
||||
{
|
||||
struct ceph_inode_info *ci = (struct ceph_inode_info *) foo;
|
||||
|
||||
if ((flags & (SLAB_CTOR_VERIFY|SLAB_CTOR_CONSTRUCTOR)) ==
|
||||
SLAB_CTOR_CONSTRUCTOR)
|
||||
inode_init_once(&ci->vfs_inode);
|
||||
}
|
||||
|
||||
static int init_inodecache(void)
|
||||
{
|
||||
ceph_inode_cachep = kmem_cache_create("ceph_inode_cache",
|
||||
sizeof(struct ceph_inode_info),
|
||||
0, (SLAB_RECLAIM_ACCOUNT|
|
||||
SLAB_MEM_SPREAD),
|
||||
init_once, NULL);
|
||||
if (ceph_inode_cachep == NULL)
|
||||
return -ENOMEM;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void destroy_inodecache(void)
|
||||
{
|
||||
kmem_cache_destroy(ceph_inode_cachep);
|
||||
}
|
||||
|
||||
static const struct super_operations ceph_sops = {
|
||||
.alloc_inode = ceph_alloc_inode,
|
||||
.destroy_inode = ceph_destroy_inode,
|
||||
.read_inode = ceph_read_inode,
|
||||
.write_inode = ceph_write_inode,
|
||||
.delete_inode = ceph_delete_inode,
|
||||
.put_super = ceph_put_super,
|
||||
.write_super = ceph_write_super,
|
||||
.statfs = ceph_statfs,
|
||||
};
|
||||
|
||||
static int ceph_get_sb(struct file_system_type *fs_type,
|
||||
int flags, const char *dev_name, void *data, struct vfsmount *mnt)
|
||||
{
|
||||
printk(KERN_INFO "entered ceph_get_sb\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct file_system_type ceph_fs_type = {
|
||||
.owner = THIS_MODULE,
|
||||
.name = "ceph",
|
||||
.get_sb = ceph_get_sb,
|
||||
.kill_sb = kill_block_super,
|
||||
/* .fs_flags = */
|
||||
};
|
||||
|
||||
static int __init init_ceph(void)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
printk(KERN_INFO "ceph init\n");
|
||||
if (!(ret = init_inodecache())) {
|
||||
if ((ret = register_filesystem(&ceph_fs_type))) {
|
||||
destroy_inodecache();
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void __exit exit_ceph(void)
|
||||
{
|
||||
printk(KERN_INFO "ceph exit\n");
|
||||
|
||||
unregister_filesystem(&ceph_fs_type);
|
||||
}
|
||||
|
||||
|
||||
module_init(init_ceph);
|
||||
module_exit(exit_ceph);
|
46
branches/sage/mds/kernel/mdsmap.h
Normal file
46
branches/sage/mds/kernel/mdsmap.h
Normal file
@ -0,0 +1,46 @@
|
||||
/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*-
|
||||
* vim: ts=8 sw=8 smarttab
|
||||
*/
|
||||
|
||||
#ifndef _FS_CEPH_MDSMAP_H
|
||||
#define _FS_CEPH_MDSMAP_H
|
||||
|
||||
/* see mds/MDSMap.h */
|
||||
#define CEPH_MDS_STATE_DNE 0 /* down, never existed. */
|
||||
#define CEPH_MDS_STATE_STOPPED -1 /* down, once existed, but no subtrees. empty log. */
|
||||
#define CEPH_MDS_STATE_FAILED 2 /* down, active subtrees needs to be recovered. */
|
||||
|
||||
#define CEPH_MDS_STATE_BOOT -3 /* up, boot announcement. destiny unknown. */
|
||||
#define CEPH_MDS_STATE_STANDBY -4 /* up, idle. waiting for assignment by monitor. */
|
||||
#define CEPH_MDS_STATE_CREATING -5 /* up, creating MDS instance (new journal, idalloc..). */
|
||||
#define CEPH_MDS_STATE_STARTING -6 /* up, starting prior stopped MDS instance. */
|
||||
|
||||
#define CEPH_MDS_STATE_REPLAY 7 /* up, starting prior failed instance. scanning journal. */
|
||||
#define CEPH_MDS_STATE_RESOLVE 8 /* up, disambiguating distributed operations (import, rename, etc.) */
|
||||
#define CEPH_MDS_STATE_RECONNECT 9 /* up, reconnect to clients */
|
||||
#define CEPH_MDS_STATE_REJOIN 10 /* up, replayed journal, rejoining distributed cache */
|
||||
#define CEPH_MDS_STATE_ACTIVE 11 /* up, active */
|
||||
#define CEPH_MDS_STATE_STOPPING 12 /* up, exporting metadata (-> standby or out) */
|
||||
|
||||
/*
|
||||
* mds map
|
||||
*
|
||||
* fields limited to those the client cares about
|
||||
*/
|
||||
struct ceph_mdsmap {
|
||||
__u64 m_epoch;
|
||||
__u64 m_same_in_set_since;
|
||||
struct timeval m_created;
|
||||
__u32 m_anchortable;
|
||||
__u32 m_root;
|
||||
struct ceph_entity_addr *m_addr; /* array of addresses */
|
||||
__u8 *m_state; /* array of states */
|
||||
__u32 m_max_mds; /* size of m_addr, m_state arrays */
|
||||
};
|
||||
|
||||
extern int ceph_mdsmap_get_random_mds(ceph_mdsmap *m);
|
||||
extern int ceph_mdsmap_get_state(ceph_mdsmap *m, int w);
|
||||
extern struct ceph_entity_addr *ceph_mdsmap_get_addr(ceph_mdsmap *m, int w);
|
||||
extern int ceph_mdsmap_decode(ceph_mdsmap *m, iovec *v);
|
||||
|
||||
#endif
|
21
branches/sage/mds/kernel/monmap.h
Normal file
21
branches/sage/mds/kernel/monmap.h
Normal file
@ -0,0 +1,21 @@
|
||||
/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*-
|
||||
* vim: ts=8 sw=8 smarttab
|
||||
*/
|
||||
|
||||
#ifndef _FS_CEPH_MONMAP_H
|
||||
#define _FS_CEPH_MONMAP_H
|
||||
|
||||
/*
|
||||
* monitor map
|
||||
*/
|
||||
struct ceph_monmap {
|
||||
__u64 m_epoch;
|
||||
__u32 m_num_mon;
|
||||
__u32 m_last_mon;
|
||||
struct ceph_entity_inst m_mon_inst;
|
||||
};
|
||||
|
||||
extern int ceph_monmap_pick_mon(ceph_monmap *m);
|
||||
extern int ceph_monmap_decode(ceph_monmap *m, iovec *v);
|
||||
|
||||
#endif
|
@ -480,12 +480,16 @@ public:
|
||||
// -- reference counting --
|
||||
void bad_put(int by) {
|
||||
generic_dout(7) << " bad put " << *this << " by " << by << " " << pin_name(by) << " was " << ref << " (" << ref_set << ")" << dendl;
|
||||
#ifdef MDS_REF_SET
|
||||
assert(ref_set.count(by) == 1);
|
||||
#endif
|
||||
assert(ref > 0);
|
||||
}
|
||||
void bad_get(int by) {
|
||||
generic_dout(7) << " bad get " << *this << " by " << by << " " << pin_name(by) << " was " << ref << " (" << ref_set << ")" << dendl;
|
||||
#ifdef MDS_REF_SET
|
||||
assert(ref_set.count(by) == 0);
|
||||
#endif
|
||||
}
|
||||
void first_get();
|
||||
void last_put();
|
||||
|
@ -19,6 +19,7 @@ using namespace std;
|
||||
#include "include/frag.h"
|
||||
#include "include/xlist.h"
|
||||
|
||||
#define MDS_REF_SET // define me for improved debug output, sanity checking
|
||||
|
||||
#define MDS_PORT_MAIN 0
|
||||
#define MDS_PORT_SERVER 1
|
||||
@ -36,12 +37,12 @@ using namespace std;
|
||||
#define MDS_INO_ROOT 1
|
||||
#define MDS_INO_PGTABLE 2
|
||||
#define MDS_INO_ANCHORTABLE 3
|
||||
#define MDS_INO_PG 4 // this should match osd/osd_types.h PG_INO
|
||||
#define MDS_INO_LOG_OFFSET 0x100
|
||||
#define MDS_INO_IDS_OFFSET 0x200
|
||||
#define MDS_INO_CLIENTMAP_OFFSET 0x300
|
||||
#define MDS_INO_STRAY_OFFSET 0x400
|
||||
#define MDS_INO_BASE 0x1000
|
||||
#define MDS_INO_PG 4 // *** WARNING: this should match osd/osd_types.h PG_INO ***
|
||||
#define MDS_INO_LOG_OFFSET (1*MAX_MDS)
|
||||
#define MDS_INO_IDS_OFFSET (2*MAX_MDS)
|
||||
#define MDS_INO_CLIENTMAP_OFFSET (3*MAX_MDS)
|
||||
#define MDS_INO_STRAY_OFFSET (4*MAX_MDS)
|
||||
#define MDS_INO_BASE (5*MAX_MDS)
|
||||
|
||||
#define MDS_INO_STRAY(x) (MDS_INO_STRAY_OFFSET+((unsigned)x))
|
||||
#define MDS_INO_IS_STRAY(i) ((i) >= MDS_INO_STRAY_OFFSET && (i) < MDS_INO_STRAY_OFFSET+MAX_MDS)
|
||||
@ -461,26 +462,36 @@ class MDSCacheObject {
|
||||
// pins
|
||||
protected:
|
||||
int ref; // reference count
|
||||
#ifdef MDS_REF_SET
|
||||
multiset<int> ref_set;
|
||||
#endif
|
||||
|
||||
public:
|
||||
int get_num_ref() { return ref; }
|
||||
bool is_pinned_by(int by) { return ref_set.count(by); }
|
||||
multiset<int>& get_ref_set() { return ref_set; }
|
||||
virtual const char *pin_name(int by) = 0;
|
||||
//bool is_pinned_by(int by) { return ref_set.count(by); }
|
||||
//multiset<int>& get_ref_set() { return ref_set; }
|
||||
|
||||
virtual void last_put() {}
|
||||
virtual void bad_put(int by) {
|
||||
#ifdef MDS_REF_SET
|
||||
assert(ref_set.count(by) > 0);
|
||||
#endif
|
||||
assert(ref > 0);
|
||||
}
|
||||
void put(int by) {
|
||||
#ifdef MDS_REF_SET
|
||||
if (ref == 0 || ref_set.count(by) == 0) {
|
||||
#else
|
||||
if (ref == 0) {
|
||||
#endif
|
||||
bad_put(by);
|
||||
} else {
|
||||
ref--;
|
||||
#ifdef MDS_REF_SET
|
||||
ref_set.erase(ref_set.find(by));
|
||||
assert(ref == (int)ref_set.size());
|
||||
#endif
|
||||
if (ref == 0)
|
||||
last_put();
|
||||
}
|
||||
@ -488,22 +499,29 @@ protected:
|
||||
|
||||
virtual void first_get() {}
|
||||
virtual void bad_get(int by) {
|
||||
#ifdef MDS_REF_SET
|
||||
assert(by < 0 || ref_set.count(by) == 0);
|
||||
#endif
|
||||
assert(0);
|
||||
}
|
||||
void get(int by) {
|
||||
#ifdef MDS_REF_SET
|
||||
if (by >= 0 && ref_set.count(by)) {
|
||||
bad_get(by);
|
||||
} else {
|
||||
#endif
|
||||
if (ref == 0)
|
||||
first_get();
|
||||
ref++;
|
||||
#ifdef MDS_REF_SET
|
||||
ref_set.insert(by);
|
||||
assert(ref == (int)ref_set.size());
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void print_pin_set(ostream& out) {
|
||||
#ifdef MDS_REF_SET
|
||||
multiset<int>::iterator it = ref_set.begin();
|
||||
while (it != ref_set.end()) {
|
||||
out << " " << pin_name(*it);
|
||||
@ -516,6 +534,7 @@ protected:
|
||||
if (c > 1)
|
||||
out << "*" << c;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -218,7 +218,7 @@ int fakemessenger_do_loop_2()
|
||||
// encode
|
||||
if (m->empty_payload())
|
||||
m->encode_payload();
|
||||
msg_envelope_t env = m->get_envelope();
|
||||
ceph_message_header env = m->get_envelope();
|
||||
bufferlist bl;
|
||||
bl.claim( m->get_payload() );
|
||||
//bl.c_str(); // condense into 1 buffer
|
||||
@ -273,9 +273,9 @@ FakeMessenger::FakeMessenger(entity_name_t me) : Messenger(me)
|
||||
{
|
||||
// assign rank
|
||||
_myinst.name = me;
|
||||
_myinst.addr.port = nranks++;
|
||||
_myinst.addr.v.port = nranks++;
|
||||
//if (!me.is_mon())
|
||||
_myinst.addr.nonce = getpid();
|
||||
_myinst.addr.v.nonce = getpid();
|
||||
|
||||
// add to directory
|
||||
directory[ _myinst.addr ] = this;
|
||||
|
@ -105,7 +105,7 @@ using namespace std;
|
||||
|
||||
|
||||
Message *
|
||||
decode_message(msg_envelope_t& env, bufferlist& payload)
|
||||
decode_message(ceph_message_header& env, bufferlist& payload)
|
||||
{
|
||||
// make message
|
||||
Message *m = 0;
|
||||
|
@ -159,22 +159,11 @@ using std::list;
|
||||
// abstract Message class
|
||||
|
||||
|
||||
|
||||
typedef struct {
|
||||
int32_t type;
|
||||
entity_inst_t src, dst;
|
||||
int32_t source_port, dest_port;
|
||||
int32_t nchunks;
|
||||
} msg_envelope_t;
|
||||
|
||||
#define MSG_ENVELOPE_LEN sizeof(msg_envelope_t)
|
||||
|
||||
|
||||
class Message {
|
||||
private:
|
||||
|
||||
protected:
|
||||
msg_envelope_t env; // envelope
|
||||
ceph_message_header env; // envelope
|
||||
bufferlist payload; // payload
|
||||
list<int> chunk_payload_at;
|
||||
|
||||
@ -209,10 +198,11 @@ public:
|
||||
payload = bl;
|
||||
}
|
||||
const list<int>& get_chunk_payload_at() const { return chunk_payload_at; }
|
||||
msg_envelope_t& get_envelope() {
|
||||
void set_chunk_payload_at(list<int>& o) { chunk_payload_at.swap(o); }
|
||||
ceph_message_header& get_envelope() {
|
||||
return env;
|
||||
}
|
||||
void set_envelope(msg_envelope_t& env) {
|
||||
void set_envelope(ceph_message_header& env) {
|
||||
this->env = env;
|
||||
}
|
||||
|
||||
@ -228,23 +218,23 @@ public:
|
||||
virtual char *get_type_name() = 0;
|
||||
|
||||
// source/dest
|
||||
entity_inst_t& get_dest_inst() { return env.dst; }
|
||||
void set_dest_inst(entity_inst_t& inst) { env.dst = inst; }
|
||||
entity_inst_t& get_dest_inst() { return *(entity_inst_t*)&env.dst; }
|
||||
void set_dest_inst(entity_inst_t& inst) { env.dst = *(ceph_entity_inst*)&inst; }
|
||||
|
||||
entity_inst_t& get_source_inst() { return env.src; }
|
||||
void set_source_inst(entity_inst_t& inst) { env.src = inst; }
|
||||
entity_inst_t& get_source_inst() { return *(entity_inst_t*)&env.src; }
|
||||
void set_source_inst(entity_inst_t& inst) { env.src = *(ceph_entity_inst*)&inst; }
|
||||
|
||||
entity_name_t& get_dest() { return env.dst.name; }
|
||||
void set_dest(entity_name_t a, int p) { env.dst.name = a; env.dest_port = p; }
|
||||
entity_name_t& get_dest() { return *(entity_name_t*)&env.dst.name; }
|
||||
void set_dest(entity_name_t a, int p) { env.dst.name = *(ceph_entity_name*)&a; env.dest_port = p; }
|
||||
int get_dest_port() { return env.dest_port; }
|
||||
void set_dest_port(int p) { env.dest_port = p; }
|
||||
|
||||
entity_name_t& get_source() { return env.src.name; }
|
||||
void set_source(entity_name_t a, int p) { env.src.name = a; env.source_port = p; }
|
||||
entity_name_t& get_source() { return *(entity_name_t*)&env.src.name; }
|
||||
void set_source(entity_name_t a, int p) { env.src.name = *(ceph_entity_name*)&a; env.source_port = p; }
|
||||
int get_source_port() { return env.source_port; }
|
||||
|
||||
entity_addr_t& get_source_addr() { return env.src.addr; }
|
||||
void set_source_addr(const entity_addr_t &i) { env.src.addr = i; }
|
||||
entity_addr_t& get_source_addr() { return *(entity_addr_t*)&env.src.addr; }
|
||||
void set_source_addr(const entity_addr_t &i) { env.src.addr = *(ceph_entity_addr*)&i; }
|
||||
|
||||
// PAYLOAD ----
|
||||
void reset_payload() {
|
||||
@ -260,7 +250,7 @@ public:
|
||||
|
||||
};
|
||||
|
||||
extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
|
||||
extern Message *decode_message(ceph_message_header &env, bufferlist& bl);
|
||||
inline ostream& operator<<(ostream& out, Message& m) {
|
||||
m.print(out);
|
||||
return out;
|
||||
|
@ -107,7 +107,7 @@ int Rank::Accepter::start()
|
||||
dout(15) << ".ceph_hosts: host '" << host << "' -> '" << addr << "'" << dendl;
|
||||
if (host == hostname) {
|
||||
parse_ip_port(addr.c_str(), g_my_addr);
|
||||
dout(0) << ".ceph_hosts: my addr is " << g_my_addr << dendl;
|
||||
dout(1) << ".ceph_hosts: my addr is " << g_my_addr << dendl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -153,13 +153,13 @@ int Rank::Accepter::start()
|
||||
myhostname->h_addr_list[0],
|
||||
myhostname->h_length);
|
||||
rank.my_addr.set_addr(listen_addr);
|
||||
rank.my_addr.port = 0; // see below
|
||||
rank.my_addr.v.port = 0; // see below
|
||||
}
|
||||
if (rank.my_addr.port == 0) {
|
||||
if (rank.my_addr.v.port == 0) {
|
||||
entity_addr_t tmp;
|
||||
tmp.set_addr(listen_addr);
|
||||
rank.my_addr.port = tmp.port;
|
||||
rank.my_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
|
||||
rank.my_addr.v.port = tmp.v.port;
|
||||
rank.my_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here.
|
||||
}
|
||||
|
||||
dout(1) << "accepter.start my_addr is " << rank.my_addr << dendl;
|
||||
@ -605,7 +605,7 @@ Message *Rank::Pipe::read_message()
|
||||
// envelope
|
||||
//dout(10) << "receiver.read_message from sd " << sd << dendl;
|
||||
|
||||
msg_envelope_t env;
|
||||
ceph_message_header env;
|
||||
if (!tcp_read( sd, (char*)&env, sizeof(env) )) {
|
||||
need_to_send_close = false;
|
||||
return 0;
|
||||
@ -618,7 +618,9 @@ Message *Rank::Pipe::read_message()
|
||||
|
||||
// payload
|
||||
bufferlist blist;
|
||||
for (int i=0; i<env.nchunks; i++) {
|
||||
int32_t pos = 0;
|
||||
list<int> chunk_at;
|
||||
for (unsigned i=0; i<env.nchunks; i++) {
|
||||
int32_t size;
|
||||
if (!tcp_read( sd, (char*)&size, sizeof(size) )) {
|
||||
need_to_send_close = false;
|
||||
@ -627,6 +629,9 @@ Message *Rank::Pipe::read_message()
|
||||
|
||||
dout(30) << "decode chunk " << i << "/" << env.nchunks << " size " << size << dendl;
|
||||
|
||||
if (pos) chunk_at.push_back(pos);
|
||||
pos += size;
|
||||
|
||||
bufferptr bp;
|
||||
if (size % 4096 == 0) {
|
||||
dout(30) << "decoding page-aligned chunk of " << size << dendl;
|
||||
@ -649,6 +654,8 @@ Message *Rank::Pipe::read_message()
|
||||
// unmarshall message
|
||||
size_t s = blist.length();
|
||||
Message *m = decode_message(env, blist);
|
||||
|
||||
m->set_chunk_payload_at(chunk_at);
|
||||
|
||||
dout(20) << "pipe(" << peer_addr << ' ' << this << ").reader got " << s << " byte message from "
|
||||
<< m->get_source() << dendl;
|
||||
@ -708,7 +715,7 @@ int Rank::Pipe::do_sendmsg(Message *m, struct msghdr *msg, int len)
|
||||
int Rank::Pipe::write_message(Message *m)
|
||||
{
|
||||
// get envelope, buffers
|
||||
msg_envelope_t *env = &m->get_envelope();
|
||||
ceph_message_header *env = &m->get_envelope();
|
||||
bufferlist blist;
|
||||
blist.claim( m->get_payload() );
|
||||
|
||||
|
49
branches/sage/mds/msg/ceph_msg_types.h
Normal file
49
branches/sage/mds/msg/ceph_msg_types.h
Normal file
@ -0,0 +1,49 @@
|
||||
/* -*- mode:C++; tab-width:8; c-basic-offset:8; indent-tabs-mode:t -*-
|
||||
* vim: ts=8 sw=8 smarttab
|
||||
*/
|
||||
#ifndef __CEPH_MSG_TYPES_H
|
||||
#define __CEPH_MSG_TYPES_H
|
||||
|
||||
/*
|
||||
* entity_name
|
||||
*/
|
||||
struct ceph_entity_name {
|
||||
__u32 type;
|
||||
__u32 num;
|
||||
};
|
||||
|
||||
#define CEPH_ENTITY_TYPE_MON 1
|
||||
#define CEPH_ENTITY_TYPE_MDS 2
|
||||
#define CEPH_ENTITY_TYPE_OSD 3
|
||||
#define CEPH_ENTITY_TYPE_CLIENT 4
|
||||
#define CEPH_ENTITY_TYPE_ADMIN 5
|
||||
|
||||
|
||||
/*
|
||||
* entity_addr
|
||||
* ipv4 only for now
|
||||
*/
|
||||
struct ceph_entity_addr {
|
||||
__u8 ipq[4];
|
||||
__u32 port;
|
||||
__u32 nonce;
|
||||
};
|
||||
|
||||
|
||||
struct ceph_entity_inst {
|
||||
struct ceph_entity_name name;
|
||||
struct ceph_entity_addr addr;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
* message header
|
||||
*/
|
||||
struct ceph_message_header {
|
||||
__u32 type;
|
||||
struct ceph_entity_inst src, dst;
|
||||
__u32 source_port, dest_port;
|
||||
__u32 nchunks;
|
||||
};
|
||||
|
||||
#endif
|
@ -15,27 +15,29 @@
|
||||
#ifndef __MSG_TYPES_H
|
||||
#define __MSG_TYPES_H
|
||||
|
||||
// raw C structs
|
||||
#include "include/ceph_inttypes.h"
|
||||
#include "ceph_msg_types.h"
|
||||
|
||||
#include "include/types.h"
|
||||
#include "include/blobhash.h"
|
||||
#include "tcp.h"
|
||||
|
||||
// new typed msg_addr_t way!
|
||||
class entity_name_t {
|
||||
int32_t _type;
|
||||
int32_t _num;
|
||||
struct ceph_entity_name v;
|
||||
|
||||
public:
|
||||
static const int TYPE_MON = 1;
|
||||
static const int TYPE_MDS = 2;
|
||||
static const int TYPE_OSD = 3;
|
||||
static const int TYPE_CLIENT = 4;
|
||||
static const int TYPE_ADMIN = 5;
|
||||
static const int TYPE_MON = CEPH_ENTITY_TYPE_MON;
|
||||
static const int TYPE_MDS = CEPH_ENTITY_TYPE_MDS;
|
||||
static const int TYPE_OSD = CEPH_ENTITY_TYPE_OSD;
|
||||
static const int TYPE_CLIENT = CEPH_ENTITY_TYPE_CLIENT;
|
||||
static const int TYPE_ADMIN = CEPH_ENTITY_TYPE_ADMIN;
|
||||
|
||||
static const int NEW = -1;
|
||||
|
||||
// cons
|
||||
entity_name_t() : _type(0), _num(0) {}
|
||||
entity_name_t(int t, int n=NEW) : _type(t), _num(n) {}
|
||||
entity_name_t() { v.type = v.num = 0; }
|
||||
entity_name_t(int t, int n=NEW) { v.type = t; v.num = n; }
|
||||
|
||||
// static cons
|
||||
static entity_name_t MON(int i=NEW) { return entity_name_t(TYPE_MON, i); }
|
||||
@ -44,8 +46,8 @@ public:
|
||||
static entity_name_t CLIENT(int i=NEW) { return entity_name_t(TYPE_CLIENT, i); }
|
||||
static entity_name_t ADMIN(int i=NEW) { return entity_name_t(TYPE_ADMIN, i); }
|
||||
|
||||
int num() const { return _num; }
|
||||
int type() const { return _type; }
|
||||
int num() const { return v.num; }
|
||||
int type() const { return v.type; }
|
||||
const char *type_str() const {
|
||||
switch (type()) {
|
||||
case TYPE_MDS: return "mds";
|
||||
@ -80,6 +82,9 @@ inline std::ostream& operator<<(std::ostream& out, const entity_name_t& addr) {
|
||||
else
|
||||
return out << addr.type_str() << addr.num();
|
||||
}
|
||||
inline std::ostream& operator<<(std::ostream& out, const ceph_entity_name& addr) {
|
||||
return out << *(const entity_name_t*)&addr;
|
||||
}
|
||||
|
||||
namespace __gnu_cxx {
|
||||
template<> struct hash< entity_name_t >
|
||||
@ -105,35 +110,34 @@ namespace __gnu_cxx {
|
||||
* ipv4 for now.
|
||||
*/
|
||||
struct entity_addr_t {
|
||||
uint8_t ipq[4];
|
||||
uint32_t port;
|
||||
uint32_t nonce; // bind time, or pid, or something unique!
|
||||
struct ceph_entity_addr v;
|
||||
uint32_t _pad;
|
||||
|
||||
entity_addr_t() : port(0), nonce(0), _pad(0) {
|
||||
ipq[0] = ipq[1] = ipq[2] = ipq[3] = 0;
|
||||
entity_addr_t() : _pad(0) {
|
||||
v.port = v.nonce = 0;
|
||||
v.ipq[0] = v.ipq[1] = v.ipq[2] = v.ipq[3] = 0;
|
||||
}
|
||||
|
||||
void set_addr(tcpaddr_t a) {
|
||||
memcpy((char*)ipq, (char*)&a.sin_addr.s_addr, 4);
|
||||
port = ntohs(a.sin_port);
|
||||
memcpy((char*)v.ipq, (char*)&a.sin_addr.s_addr, 4);
|
||||
v.port = ntohs(a.sin_port);
|
||||
}
|
||||
void make_addr(tcpaddr_t& a) const {
|
||||
memset(&a, 0, sizeof(a));
|
||||
a.sin_family = AF_INET;
|
||||
memcpy((char*)&a.sin_addr.s_addr, (char*)ipq, 4);
|
||||
a.sin_port = htons(port);
|
||||
memcpy((char*)&a.sin_addr.s_addr, (char*)v.ipq, 4);
|
||||
a.sin_port = htons(v.port);
|
||||
}
|
||||
};
|
||||
|
||||
inline ostream& operator<<(ostream& out, const entity_addr_t &addr)
|
||||
{
|
||||
return out << (int)addr.ipq[0]
|
||||
<< '.' << (int)addr.ipq[1]
|
||||
<< '.' << (int)addr.ipq[2]
|
||||
<< '.' << (int)addr.ipq[3]
|
||||
<< ':' << addr.port
|
||||
<< '.' << addr.nonce;
|
||||
return out << (int)addr.v.ipq[0]
|
||||
<< '.' << (int)addr.v.ipq[1]
|
||||
<< '.' << (int)addr.v.ipq[2]
|
||||
<< '.' << (int)addr.v.ipq[3]
|
||||
<< ':' << addr.v.port
|
||||
<< '.' << addr.v.nonce;
|
||||
}
|
||||
|
||||
inline bool operator==(const entity_addr_t& a, const entity_addr_t& b) { return memcmp(&a, &b, sizeof(a)) == 0; }
|
||||
@ -188,6 +192,11 @@ inline ostream& operator<<(ostream& out, const entity_inst_t &i)
|
||||
{
|
||||
return out << i.name << " " << i.addr;
|
||||
}
|
||||
inline ostream& operator<<(ostream& out, const ceph_entity_inst &i)
|
||||
{
|
||||
return out << *(const entity_inst_t*)&i;
|
||||
}
|
||||
|
||||
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user