mirror of
https://github.com/ceph/ceph
synced 2025-03-25 11:48:05 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@724 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
f090a97fe3
commit
b2150a945a
@ -45,6 +45,7 @@ OSD_OBJS= \
|
||||
osd/PG.o\
|
||||
osd/ObjectStore.o\
|
||||
osd/FakeStore.o\
|
||||
ebofs.o\
|
||||
osd/OSD.o
|
||||
|
||||
COMMON_OBJS= \
|
||||
@ -57,7 +58,6 @@ COMMON_OBJS= \
|
||||
common/Logger.o\
|
||||
common/Clock.o\
|
||||
common/Timer.o\
|
||||
ebofs.o\
|
||||
config.o
|
||||
|
||||
CLIENT_OBJS= \
|
||||
|
191
ceph/TODO
191
ceph/TODO
@ -1,15 +1,163 @@
|
||||
|
||||
|
||||
make graphs from dat runs:
|
||||
|
||||
/makedirs
|
||||
/ossh.lib
|
||||
/ossh.include
|
||||
|
||||
makedirs tput vs lat
|
||||
|
||||
openshared
|
||||
writefiles
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
llnl direct deposit fax 925 424 2663
|
||||
|
||||
|
||||
mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 112 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout hash --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576 > log/osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/.done &
|
||||
|
||||
mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 896 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout crush --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576 > log/osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576/.done &
|
||||
|
||||
mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 448 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout hash --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576 > log/osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/.done &
|
||||
|
||||
|
||||
|
||||
|
||||
541 488 0496 warren
|
||||
|
||||
|
||||
TONIGHT TODO
|
||||
|
||||
/make comb calc latency
|
||||
|
||||
/finish openshared run
|
||||
|
||||
vary cper on makedirs mds=96, make a nice tput/lat curve
|
||||
do big makedirs run w/ smaller set of mds sizes
|
||||
|
||||
fill out ossh.lib (read to go, just exclude cper 50 .. or adjust bc of above)
|
||||
|
||||
|
||||
FINAL data sets...
|
||||
|
||||
makedirs
|
||||
n1 good.
|
||||
n3 is skewed down.. not clear why?
|
||||
big has low cper values! doh. **rerun if possible?**
|
||||
|
||||
makedirs.tput
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
MAR 1
|
||||
|
||||
makedirs
|
||||
final --> 12
|
||||
fixed lb -> m2
|
||||
|
||||
-> do it again with cper 50,100 , max 2,3 .. ready! -> all about the same.
|
||||
plot "log/alcdat/makedirs.n1/c" u 1:2 t "cper=100,mds_bal_max=2", "log/alcdat/makedirs.n1/c" u 1:3 t "cper=100,mds_bal_max=3", "log/alcdat/makedirs.n1/c" u 1:4 t "cper=50,mds_bal_max=2", "log/alcdat/makedirs.n1/c" u 1:5 t "cper=50,mds_bal_max=3";
|
||||
|
||||
/**-> bal_max=1, cper=100 (n2)
|
||||
plot "log/alcdat/makedirs.n2/c" u 1:2 t "cper=100,mds_bal_max=1";
|
||||
.. same same
|
||||
|
||||
**-> bal_max=2 cper=15,25 (n3) ready Q/
|
||||
|
||||
makedirs.big
|
||||
|
||||
**-> nbig1 .. flop! overlay no worky
|
||||
**-> nbig4 .. Q/
|
||||
|
||||
|
||||
makedirs.tput
|
||||
finalish -> 1
|
||||
|
||||
**-> ready.. run (last!) to fill in gaps .. (2) Q
|
||||
|
||||
** running subset of points tho, as (2)! be careful merging results, not all data points for all runs!!
|
||||
|
||||
|
||||
ossh.lib
|
||||
cper 50 looks good when nummds=48..
|
||||
|
||||
f14 is ok...
|
||||
repeat w/ range of cper (besides just 50)
|
||||
|
||||
-> run with cper range .. ready
|
||||
plot "log/alcdat/ossh.lib.n1/c" u 1:2 t "cper=100", "log/alcdat/ossh.lib.n1/c" u 1:3 t "cper=25", "log/alcdat/ossh.lib.n1/c" u 1:4 t "cper=50";
|
||||
.. only 25 is good at scale!
|
||||
|
||||
**-> cper=10,16 (n2) ready Q
|
||||
|
||||
|
||||
ossh.lib.big
|
||||
|
||||
**-> ready
|
||||
|
||||
|
||||
|
||||
ossh.include
|
||||
max 2 @ 80, cper 75
|
||||
|
||||
best so far is d12.
|
||||
no, f13 is better.
|
||||
|
||||
d9 80 was good too? cper=50 : log/alcdat/ossh.include.d9/cper=50,nummds=128/o ... srun --wait=120 --exclude=jobs/ltest.ignore -l -t 7 -N 385 -p ltest ./tcpsyn --mkfs --ebofs --syn until 300 --nummds 128 --numclient 6400 --numosd 128 --kill_after 400 --mds_bal_rep 1700 --mds_bal_interval 45 --mds_bal_max 2 --mds_decay_halflife 30 --mds_bal_hash_rd 100000 --tcp_skip_rank0 --mds_shutdown_check 60 --syn only 0 --syn trace traces/openssh/untar.include 1 --syn sleep 30 --syn trace traces/openssh/make.include 1000 --log_name alcdat/ossh.include.d9/cper=50,nummds=128 > log/alcdat/ossh.include.d9/cper=50,nummds=128/o && touch log/alcdat/ossh.include.d9/cper=50,nummds=128/.done &
|
||||
|
||||
-> run with cper range ... ready (n1 good!)
|
||||
plot "log/alcdat/ossh.include.n1/c" u 1:2 t "cper=100", "log/alcdat/ossh.include.n1/c" u 1:3 t "cper=25", "log/alcdat/ossh.include.n1/c" u 1:4 t "cper=50";
|
||||
|
||||
|
||||
**-> extend w/ cper 15,20 (n2) ..only if time..
|
||||
|
||||
|
||||
ossh.include.big
|
||||
|
||||
**-> nbig2 ready
|
||||
|
||||
|
||||
|
||||
makefiles
|
||||
cper + nummds ... ok, 4 ... go back and fill in data points if time!
|
||||
150 > 100. check 200 if time
|
||||
|
||||
|
||||
openshared
|
||||
f3 .. has various cper
|
||||
|
||||
|
||||
|
||||
mdtest
|
||||
|
||||
|
||||
|
||||
|
||||
striping
|
||||
light vs saturated vs supersaturated?
|
||||
stripe size
|
||||
|
||||
|
||||
|
||||
gotchas
|
||||
- watch out for cper too large.. bogs down mds0, fucks load balancer
|
||||
|
||||
|
||||
|
||||
opensshinclude
|
||||
choose osdfac -> 1
|
||||
test overlay_clients .. scale nummds
|
||||
or condense clients into fewer nodes?
|
||||
fix up ossh.include
|
||||
|
||||
|
||||
|
||||
|
||||
- aged object stores on googoo
|
||||
@ -23,9 +171,18 @@ MAR 1
|
||||
- fakestore crapping out.. missing timer events?
|
||||
mpiexec -l -n 30 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 65536 --nummds 1 --numclient 100 --numosd 6 --kill_after 300 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --mds_shutdown_check 60 --log_name osd/write_sizes.sdb2.ebo.file2/fs=ebofs,writefile_size=65536
|
||||
|
||||
2: tcpsyn: mds/MDCache.cc:2388: void MDCache::handle_cache_expire(MCacheExpire*): Assertion `in->state_test((1<<6))' failed.
|
||||
2: tcpsyn: mds/MDCache.cc:2388: void MDCache::handle_cache_expire(MCacheExpire*): Assertion `in->state_test((1<<6))' failed.
|
||||
2: mds1 on tcprank 3 googoo-27.2013
|
||||
2: mds1 on tcprank 2 googoo-27.1693
|
||||
|
||||
1: tcpsyn: mds/MDCache.cc:2382: void MDCache::handle_cache_expire(MCacheExpire*): Assertion `in' failed.
|
||||
0 on tcprank 24 googoo-17.21123
|
||||
|
||||
|
||||
|
||||
- tcpmessenger inq count
|
||||
|
||||
|
||||
- mds
|
||||
- vary log stripe size, count (on ebofs and fakestore)
|
||||
@ -61,10 +218,13 @@ mpiexec -l -n 30 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 65
|
||||
- shared compile (/lib, /include)
|
||||
need something with shared files.. so not a linux kernel :(
|
||||
|
||||
* get rid of randomsleep?
|
||||
|
||||
|
||||
- data scaling .. aggregate tput
|
||||
- writes to local files
|
||||
- strided write to shared file ??? meaningless?
|
||||
- strided write with O_LAZY ???
|
||||
- scale_wr - writes to local files
|
||||
- strided write to shared file ??? meaningless? (later!)
|
||||
- strided write with O_LAZY ??? later!
|
||||
|
||||
- tput per client
|
||||
- local file
|
||||
@ -74,6 +234,7 @@ mpiexec -l -n 30 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 65
|
||||
- crush vs linear
|
||||
- at a large scale!
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -82,6 +243,8 @@ OSDI
|
||||
|
||||
- tcp recv throttling
|
||||
|
||||
- fix object ov/nv thing? (tcpmessenger locking bug?)
|
||||
|
||||
- tune ebofs
|
||||
|
||||
- vary osd_maxthreads, [ ebofs, fakestore ], write_size
|
||||
@ -99,6 +262,26 @@ OSDI
|
||||
- ld_preload?
|
||||
|
||||
|
||||
- crush creeping performance degradation due to dead nodes -- richard g.
|
||||
|
||||
|
||||
|
||||
need post osdi:
|
||||
|
||||
mds
|
||||
statlite
|
||||
stat single writer
|
||||
|
||||
client
|
||||
flesh out posix layer
|
||||
statlite
|
||||
readdir + stat
|
||||
readdirplus
|
||||
lazy_*
|
||||
|
||||
osd
|
||||
new rados mech
|
||||
|
||||
|
||||
|
||||
client
|
||||
@ -121,6 +304,7 @@ filer
|
||||
- ...do a short return in those cases? ..maybe a 'bool atomic' flag..
|
||||
|
||||
mds
|
||||
- delayed replica caps release... we need to set a timer event! (and cancel it when appropriate)
|
||||
- implement/test truncate()
|
||||
- chdir
|
||||
- client handles for directories!
|
||||
@ -383,3 +567,4 @@ IMPLEMENT
|
||||
|
||||
- anchors
|
||||
- hard links
|
||||
|
||||
|
@ -324,8 +324,9 @@ void Client::insert_trace(const vector<c_inode_info*>& trace)
|
||||
root->inode = in_info->inode;
|
||||
inode_map[root->inode.ino] = root;
|
||||
}
|
||||
|
||||
root->last_updated = now;
|
||||
|
||||
if (g_conf.client_cache_stat_ttl)
|
||||
root->valid_until = now + g_conf.client_cache_stat_ttl;
|
||||
|
||||
root->dir_auth = in_info->dir_auth;
|
||||
assert(root->dir_auth == 0);
|
||||
@ -339,7 +340,9 @@ void Client::insert_trace(const vector<c_inode_info*>& trace)
|
||||
dout(12) << "insert_trace trace " << i << endl;
|
||||
Dir *dir = cur->open_dir();
|
||||
cur = this->insert_inode_info(dir, trace[i]);
|
||||
cur->last_updated = now;
|
||||
|
||||
if (g_conf.client_cache_stat_ttl)
|
||||
cur->valid_until = now + g_conf.client_cache_stat_ttl;
|
||||
|
||||
// move to top of lru!
|
||||
if (cur->dn) lru.lru_touch(cur->dn);
|
||||
@ -367,7 +370,7 @@ Dentry *Client::lookup(filepath& path)
|
||||
Dir *dir = cur->dir;
|
||||
if (dir->dentries.count(path[i])) {
|
||||
dn = dir->dentries[path[i]];
|
||||
dout(14) << " hit dentry " << path[i] << " inode is " << dn->inode << " last_updated " << dn->inode->last_updated<< endl;
|
||||
dout(14) << " hit dentry " << path[i] << " inode is " << dn->inode << " valid_until " << dn->inode->valid_until << endl;
|
||||
} else {
|
||||
dout(14) << " dentry " << path[i] << " dne" << endl;
|
||||
return NULL;
|
||||
@ -380,7 +383,7 @@ Dentry *Client::lookup(filepath& path)
|
||||
}
|
||||
|
||||
if (dn) {
|
||||
dout(11) << "lookup '" << path << "' found " << dn->name << " inode " << hex << dn->inode->inode.ino << dec << " last_updated " << dn->inode->last_updated<< endl;
|
||||
dout(11) << "lookup '" << path << "' found " << dn->name << " inode " << hex << dn->inode->inode.ino << dec << " valid_until " << dn->inode->valid_until<< endl;
|
||||
}
|
||||
|
||||
return dn;
|
||||
@ -464,7 +467,7 @@ MClientReply *Client::make_request(MClientRequest *req,
|
||||
if (req->get_op() == MDS_OP_STAT ||
|
||||
req->get_op() == MDS_OP_LSTAT ||
|
||||
req->get_op() == MDS_OP_READDIR ||
|
||||
req->get_op() == MDS_OP_OPEN || // not quite true! a lie actually!
|
||||
req->get_op() == MDS_OP_OPEN ||
|
||||
req->get_op() == MDS_OP_RELEASE)
|
||||
nojournal = true;
|
||||
|
||||
@ -1373,15 +1376,19 @@ int Client::lstat(const char *relpath, struct stat *stbuf)
|
||||
Dentry *dn = lookup(req->get_filepath());
|
||||
inode_t inode;
|
||||
time_t now = time(NULL);
|
||||
if (dn &&
|
||||
((now - dn->inode->last_updated) < g_conf.client_cache_stat_ttl)) {
|
||||
if (dn && now <= dn->inode->valid_until) {
|
||||
inode = dn->inode->inode;
|
||||
dout(10) << "lstat cache hit, age is " << (now - dn->inode->last_updated) << endl;
|
||||
dout(10) << "lstat cache hit, valid until " << dn->inode->valid_until << endl;
|
||||
|
||||
if (g_conf.client_cache_stat_ttl == 0)
|
||||
dn->inode->valid_until = 0; // only one stat allowed after each readdir
|
||||
|
||||
delete req; // don't need this
|
||||
} else {
|
||||
// FIXME where does FUSE maintain user information
|
||||
req->set_caller_uid(getuid());
|
||||
req->set_caller_gid(getgid());
|
||||
//struct fuse_context *fc = fuse_get_context();
|
||||
//req->set_caller_uid(fc->uid);
|
||||
//req->set_caller_gid(fc->gid);
|
||||
|
||||
MClientReply *reply = make_request(req);
|
||||
res = reply->get_result();
|
||||
@ -1627,14 +1634,18 @@ int Client::getdir(const char *relpath, map<string,inode_t*>& contents)
|
||||
it++) {
|
||||
// put in cache
|
||||
Inode *in = this->insert_inode_info(dir, *it);
|
||||
in->last_updated = now;
|
||||
|
||||
if (g_conf.client_cache_stat_ttl)
|
||||
in->valid_until = now + g_conf.client_cache_stat_ttl;
|
||||
else if (g_conf.client_cache_readdir_ttl)
|
||||
in->valid_until = now + g_conf.client_cache_readdir_ttl;
|
||||
|
||||
// contents to caller too!
|
||||
contents[(*it)->ref_dn] = &in->inode;
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: remove items in cache that weren't in my readdir
|
||||
// FIXME: remove items in cache that weren't in my readdir?
|
||||
// ***
|
||||
}
|
||||
|
||||
@ -2196,8 +2207,10 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
|
||||
// time
|
||||
utime_t lat = g_clock.now();
|
||||
lat -= start;
|
||||
client_logger->finc("wrlsum",(double)lat);
|
||||
client_logger->inc("wrlnum");
|
||||
if (client_logger) {
|
||||
client_logger->finc("wrlsum",(double)lat);
|
||||
client_logger->inc("wrlnum");
|
||||
}
|
||||
|
||||
dout(20) << " sync write done " << onfinish << endl;
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ class InodeCap {
|
||||
class Inode {
|
||||
public:
|
||||
inode_t inode; // the actual inode
|
||||
time_t last_updated;
|
||||
time_t valid_until;
|
||||
|
||||
// about the dir (if this is one!)
|
||||
int dir_auth;
|
||||
@ -149,7 +149,7 @@ class Inode {
|
||||
}
|
||||
|
||||
Inode() :
|
||||
last_updated(0),
|
||||
valid_until(0),
|
||||
dir_auth(-1), dir_hashed(false), dir_replicated(false),
|
||||
file_wr_mtime(0), file_wr_size(0), num_rd(0), num_wr(0),
|
||||
ref(0), dir(0), dn(0), symlink(0) { }
|
||||
|
@ -75,6 +75,11 @@ void parse_syn_options(vector<char*>& args)
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
} else if (strcmp(args[i],"statdirs") == 0) {
|
||||
syn_modes.push_back( SYNCLIENT_MODE_STATDIRS );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
} else if (strcmp(args[i],"makefiles") == 0) {
|
||||
syn_modes.push_back( SYNCLIENT_MODE_MAKEFILES );
|
||||
syn_iargs.push_back( atoi(args[++i]) );
|
||||
@ -256,6 +261,18 @@ int SyntheticClient::run()
|
||||
}
|
||||
}
|
||||
break;
|
||||
case SYNCLIENT_MODE_STATDIRS:
|
||||
{
|
||||
string sarg1 = get_sarg(0);
|
||||
int iarg1 = iargs.front(); iargs.pop_front();
|
||||
int iarg2 = iargs.front(); iargs.pop_front();
|
||||
int iarg3 = iargs.front(); iargs.pop_front();
|
||||
if (run_me()) {
|
||||
dout(2) << "statdirs " << sarg1 << " " << iarg1 << " " << iarg2 << " " << iarg3 << endl;
|
||||
stat_dirs(sarg1.c_str(), iarg1, iarg2, iarg3);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case SYNCLIENT_MODE_MAKEFILES:
|
||||
{
|
||||
int num = iargs.front(); iargs.pop_front();
|
||||
@ -689,6 +706,36 @@ int SyntheticClient::make_dirs(const char *basedir, int dirs, int files, int dep
|
||||
return 0;
|
||||
}
|
||||
|
||||
int SyntheticClient::stat_dirs(const char *basedir, int dirs, int files, int depth)
|
||||
{
|
||||
if (time_to_stop()) return 0;
|
||||
|
||||
// make sure base dir exists
|
||||
struct stat st;
|
||||
int r = client->lstat(basedir, &st);
|
||||
if (r != 0) {
|
||||
dout(1) << "can't make base dir? " << basedir << endl;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// children
|
||||
char d[500];
|
||||
dout(3) << "stat_dirs " << basedir << " dirs " << dirs << " files " << files << " depth " << depth << endl;
|
||||
for (int i=0; i<files; i++) {
|
||||
sprintf(d,"%s/file.%d", basedir, i);
|
||||
client->lstat(d, &st);
|
||||
}
|
||||
|
||||
if (depth == 0) return 0;
|
||||
|
||||
for (int i=0; i<dirs; i++) {
|
||||
sprintf(d, "%s/dir.%d", basedir, i);
|
||||
stat_dirs(d, dirs, files, depth-1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int SyntheticClient::make_files(int num, int count, int priv, bool more)
|
||||
{
|
||||
|
@ -26,7 +26,8 @@
|
||||
#define SYNCLIENT_MODE_FULLWALK 2
|
||||
#define SYNCLIENT_MODE_REPEATWALK 7
|
||||
|
||||
#define SYNCLIENT_MODE_MAKEDIRS 10 // dirs files depth
|
||||
#define SYNCLIENT_MODE_MAKEDIRS 9 // dirs files depth
|
||||
#define SYNCLIENT_MODE_STATDIRS 10 // dirs files depth
|
||||
#define SYNCLIENT_MODE_MAKEFILES 11 // num count private
|
||||
#define SYNCLIENT_MODE_MAKEFILES2 12 // num count private
|
||||
#define SYNCLIENT_MODE_CREATESHARED 13 // num
|
||||
@ -167,6 +168,7 @@ class SyntheticClient {
|
||||
int random_walk(int n);
|
||||
|
||||
int make_dirs(const char *basedir, int dirs, int files, int depth);
|
||||
int stat_dirs(const char *basedir, int dirs, int files, int depth);
|
||||
int make_files(int num, int count, int priv, bool more);
|
||||
|
||||
int create_shared(int num);
|
||||
|
@ -41,14 +41,7 @@ Messenger *messenger = 0;
|
||||
|
||||
/**** thread solution *****/
|
||||
|
||||
void *timer_thread_entrypoint(void *arg)
|
||||
{
|
||||
Timer *t = (Timer*)arg;
|
||||
t->timer_thread();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Timer::timer_thread()
|
||||
void Timer::timer_entry()
|
||||
{
|
||||
lock.Lock();
|
||||
|
||||
@ -65,7 +58,7 @@ void Timer::timer_thread()
|
||||
|
||||
if (event && now >= next) {
|
||||
// move to pending list
|
||||
map< utime_t, set<Context*> >::iterator it = scheduled.begin();
|
||||
map< utime_t, multiset<Context*> >::iterator it = scheduled.begin();
|
||||
while (it != scheduled.end()) {
|
||||
if (it->first > now) break;
|
||||
|
||||
@ -73,7 +66,7 @@ void Timer::timer_thread()
|
||||
dout(DBL) << "queueing event(s) scheduled at " << t << endl;
|
||||
|
||||
if (messenger) {
|
||||
for (set<Context*>::iterator cit = it->second.begin();
|
||||
for (multiset<Context*>::iterator cit = it->second.begin();
|
||||
cit != it->second.end();
|
||||
cit++) {
|
||||
pending.push_back(*cit);
|
||||
@ -83,21 +76,27 @@ void Timer::timer_thread()
|
||||
}
|
||||
|
||||
//pending[t] = it->second;
|
||||
map< utime_t, set<Context*> >::iterator previt = it;
|
||||
map< utime_t, multiset<Context*> >::iterator previt = it;
|
||||
it++;
|
||||
scheduled.erase(previt);
|
||||
}
|
||||
|
||||
if (!pending.empty()) {
|
||||
sleeping = false;
|
||||
lock.Unlock();
|
||||
{ // make sure we're not holding any locks while we talk to the messenger
|
||||
for (list<Context*>::iterator cit = pending.begin();
|
||||
cit != pending.end();
|
||||
cit++) {
|
||||
dout(DBL) << "queue callback " << *cit << endl;
|
||||
messenger->queue_callback(*cit);
|
||||
{ // make sure we're not holding any locks while we do callbacks (or talk to the messenger)
|
||||
if (1) {
|
||||
// make the callbacks myself.
|
||||
for (list<Context*>::iterator cit = pending.begin();
|
||||
cit != pending.end();
|
||||
cit++)
|
||||
(*cit)->finish(0);
|
||||
pending.clear();
|
||||
} else {
|
||||
// give them to the messenger
|
||||
messenger->queue_callbacks(pending);
|
||||
}
|
||||
pending.clear();
|
||||
assert(pending.empty());
|
||||
}
|
||||
lock.Lock();
|
||||
}
|
||||
@ -109,12 +108,14 @@ void Timer::timer_thread()
|
||||
if (event) {
|
||||
dout(DBL) << "sleeping until " << next << endl;
|
||||
timed_sleep = true;
|
||||
sleeping = true;
|
||||
timeout_cond.WaitUntil(lock, next); // wait for waker or time
|
||||
utime_t now = g_clock.now();
|
||||
dout(DBL) << "kicked or timed out at " << now << endl;
|
||||
} else {
|
||||
dout(DBL) << "sleeping" << endl;
|
||||
timed_sleep = false;
|
||||
sleeping = true;
|
||||
sleep_cond.Wait(lock); // wait for waker
|
||||
utime_t now = g_clock.now();
|
||||
dout(DBL) << "kicked at " << now << endl;
|
||||
@ -144,22 +145,28 @@ void Timer::unset_messenger()
|
||||
|
||||
void Timer::register_timer()
|
||||
{
|
||||
if (thread_id) {
|
||||
dout(DBL) << "register_timer kicking thread" << endl;
|
||||
if (timed_sleep)
|
||||
timeout_cond.SignalAll();
|
||||
else
|
||||
sleep_cond.SignalAll();
|
||||
if (timer_thread.is_started()) {
|
||||
if (sleeping) {
|
||||
dout(DBL) << "register_timer kicking thread" << endl;
|
||||
if (timed_sleep)
|
||||
timeout_cond.SignalAll();
|
||||
else
|
||||
sleep_cond.SignalAll();
|
||||
} else {
|
||||
dout(DBL) << "register_timer doing nothing; thread is alive but not sleeping" << endl;
|
||||
// it's probably delivering callbacks to the messenger loop
|
||||
}
|
||||
} else {
|
||||
dout(DBL) << "register_timer starting thread" << endl;
|
||||
pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this);
|
||||
timer_thread.create();
|
||||
//pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this);
|
||||
}
|
||||
}
|
||||
|
||||
void Timer::cancel_timer()
|
||||
{
|
||||
// clear my callback pointers
|
||||
if (thread_id) {
|
||||
if (timer_thread.is_started()) {
|
||||
dout(10) << "setting thread_stop flag" << endl;
|
||||
lock.Lock();
|
||||
thread_stop = true;
|
||||
@ -171,7 +178,7 @@ void Timer::cancel_timer()
|
||||
|
||||
dout(10) << "waiting for thread to finish" << endl;
|
||||
void *ptr;
|
||||
pthread_join(thread_id, &ptr);
|
||||
timer_thread.join(&ptr);//pthread_join(thread_id, &ptr);
|
||||
|
||||
dout(10) << "thread finished, exit code " << ptr << endl;
|
||||
}
|
||||
@ -199,6 +206,7 @@ void Timer::add_event_at(utime_t when,
|
||||
|
||||
lock.Lock();
|
||||
scheduled[ when ].insert(callback);
|
||||
assert(event_times.count(callback) == 0); // err.. there can be only one (for now!)
|
||||
event_times[callback] = when;
|
||||
|
||||
num_event++;
|
||||
@ -224,7 +232,10 @@ bool Timer::cancel_event(Context *callback)
|
||||
|
||||
utime_t tp = event_times[callback];
|
||||
assert(scheduled.count(tp));
|
||||
scheduled[tp].erase(callback);
|
||||
|
||||
multiset<Context*>::iterator p = scheduled[tp].find(callback); // there may be more than one?
|
||||
assert(p != scheduled[tp].end());
|
||||
scheduled[tp].erase(p);
|
||||
|
||||
event_times.erase(callback);
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
#include "Mutex.h"
|
||||
#include "Cond.h"
|
||||
#include "Thread.h"
|
||||
|
||||
#include <map>
|
||||
#include <set>
|
||||
@ -49,47 +50,62 @@ namespace __gnu_cxx {
|
||||
|
||||
class Timer {
|
||||
private:
|
||||
map< utime_t, set<Context*> > scheduled; // time -> (context ...)
|
||||
map< utime_t, multiset<Context*> > scheduled; // time -> (context ...)
|
||||
hash_map< Context*, utime_t > event_times; // event -> time
|
||||
|
||||
// get time of the next event
|
||||
Context* get_next_scheduled(utime_t& when) {
|
||||
if (scheduled.empty()) return 0;
|
||||
map< utime_t, set<Context*> >::iterator it = scheduled.begin();
|
||||
map< utime_t, multiset<Context*> >::iterator it = scheduled.begin();
|
||||
when = it->first;
|
||||
set<Context*>::iterator sit = it->second.begin();
|
||||
multiset<Context*>::iterator sit = it->second.begin();
|
||||
return *sit;
|
||||
}
|
||||
|
||||
void register_timer(); // make sure i get a callback
|
||||
void cancel_timer(); // make sure i get a callback
|
||||
|
||||
|
||||
pthread_t thread_id;
|
||||
//pthread_t thread_id;
|
||||
bool thread_stop;
|
||||
Mutex lock;
|
||||
bool timed_sleep;
|
||||
bool sleeping;
|
||||
Cond sleep_cond;
|
||||
Cond timeout_cond;
|
||||
|
||||
public:
|
||||
void timer_thread(); // waiter thread (that wakes us up)
|
||||
void timer_entry(); // waiter thread (that wakes us up)
|
||||
|
||||
class TimerThread : public Thread {
|
||||
Timer *t;
|
||||
public:
|
||||
void *entry() {
|
||||
t->timer_entry();
|
||||
return 0;
|
||||
}
|
||||
TimerThread(Timer *_t) : t(_t) {}
|
||||
} timer_thread;
|
||||
|
||||
|
||||
int num_event;
|
||||
|
||||
|
||||
public:
|
||||
Timer() {
|
||||
thread_id = 0;
|
||||
thread_stop = false;
|
||||
num_event = 0;
|
||||
Timer() :
|
||||
//thread_id0),
|
||||
thread_stop(false),
|
||||
timed_sleep(false),
|
||||
sleeping(false),
|
||||
timer_thread(this),
|
||||
num_event(0)
|
||||
{
|
||||
}
|
||||
~Timer() {
|
||||
// scheduled
|
||||
for (map< utime_t, set<Context*> >::iterator it = scheduled.begin();
|
||||
for (map< utime_t, multiset<Context*> >::iterator it = scheduled.begin();
|
||||
it != scheduled.end();
|
||||
it++) {
|
||||
for (set<Context*>::iterator sit = it->second.begin();
|
||||
for (multiset<Context*>::iterator sit = it->second.begin();
|
||||
sit != it->second.end();
|
||||
sit++)
|
||||
delete *sit;
|
||||
|
@ -83,6 +83,7 @@ md_config_t g_conf = {
|
||||
client_cache_size: 300,
|
||||
client_cache_mid: .5,
|
||||
client_cache_stat_ttl: 0, // seconds until cached stat results become invalid
|
||||
client_cache_readdir_ttl: 1, // 1 second only
|
||||
client_use_random_mds: false,
|
||||
|
||||
client_sync_writes: 0,
|
||||
@ -138,6 +139,8 @@ md_config_t g_conf = {
|
||||
|
||||
mds_verify_export_dirauth: true,
|
||||
|
||||
mds_local_osd: false,
|
||||
|
||||
|
||||
// --- osd ---
|
||||
osd_pg_bits: 8,
|
||||
@ -192,6 +195,7 @@ md_config_t g_conf = {
|
||||
bdev_el_bidir: true, // bidirectional elevator?
|
||||
bdev_iov_max: 512, // max # iov's to collect into a single readv()/writev() call
|
||||
bdev_debug_check_io_overlap: true, // [DEBUG] check for any pending io overlaps
|
||||
bdev_fake_max_mb: 0,
|
||||
|
||||
// --- fakeclient (mds regression testing) (ancient history) ---
|
||||
num_fakeclient: 100,
|
||||
@ -428,11 +432,15 @@ void parse_config_options(vector<char*>& args)
|
||||
else if (strcmp(args[i], "--mds_bal_minchunk") == 0)
|
||||
g_conf.mds_bal_minchunk = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--mds_local_osd") == 0)
|
||||
g_conf.mds_local_osd = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--client_cache_size") == 0)
|
||||
g_conf.client_cache_size = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--client_cache_stat_ttl") == 0)
|
||||
g_conf.client_cache_stat_ttl = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--client_cache_readdir_ttl") == 0)
|
||||
g_conf.client_cache_readdir_ttl = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--client_trace") == 0)
|
||||
g_conf.client_trace = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--fuse_direct_io") == 0)
|
||||
@ -497,6 +505,8 @@ void parse_config_options(vector<char*>& args)
|
||||
g_conf.bdev_iothreads = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--bdev_idle_kick_after_ms") == 0)
|
||||
g_conf.bdev_idle_kick_after_ms = atoi(args[++i]);
|
||||
else if (strcmp(args[i], "--bdev_fake_max_mb") == 0)
|
||||
g_conf.bdev_fake_max_mb = atoi(args[++i]);
|
||||
|
||||
else if (strcmp(args[i], "--osd_object_layout") == 0) {
|
||||
i++;
|
||||
|
@ -55,6 +55,7 @@ struct md_config_t {
|
||||
int client_cache_size;
|
||||
float client_cache_mid;
|
||||
int client_cache_stat_ttl;
|
||||
int client_cache_readdir_ttl;
|
||||
bool client_use_random_mds; // debug flag
|
||||
|
||||
bool client_sync_writes;
|
||||
@ -107,6 +108,8 @@ struct md_config_t {
|
||||
int mds_shutdown_check;
|
||||
bool mds_verify_export_dirauth; // debug flag
|
||||
|
||||
bool mds_local_osd;
|
||||
|
||||
|
||||
// osd
|
||||
int osd_pg_bits;
|
||||
@ -161,6 +164,7 @@ struct md_config_t {
|
||||
bool bdev_el_bidir;
|
||||
int bdev_iov_max;
|
||||
bool bdev_debug_check_io_overlap;
|
||||
int bdev_fake_max_mb;
|
||||
|
||||
// fake client
|
||||
int num_fakeclient;
|
||||
|
@ -88,7 +88,7 @@ namespace crush {
|
||||
float item_weight;
|
||||
|
||||
// primes
|
||||
vector<int> primes;
|
||||
vector<unsigned> primes;
|
||||
|
||||
int get_prime(int j) const {
|
||||
return primes[ j % primes.size() ];
|
||||
@ -101,12 +101,12 @@ namespace crush {
|
||||
primes.clear();
|
||||
|
||||
// start with odd number > num_items
|
||||
int x = items.size() + 1; // this is the minimum!
|
||||
unsigned x = items.size() + 1; // this is the minimum!
|
||||
x += h(items.size()) % (3*items.size()); // bump it up some
|
||||
x |= 1; // make it odd
|
||||
|
||||
while (primes.size() < items.size()) {
|
||||
int j;
|
||||
unsigned j;
|
||||
for (j=2; j*j<=x; j++)
|
||||
if (x % j == 0) break;
|
||||
if (j*j > x) {
|
||||
@ -180,9 +180,9 @@ namespace crush {
|
||||
//cout << "uniformbucket.choose_r(" << x << ", " << r << ")" << endl;
|
||||
//if (r >= get_size()) cout << "warning: r " << r << " >= " << get_size() << " uniformbucket.size" << endl;
|
||||
|
||||
int v = hash(x, get_id());// % get_size();
|
||||
int p = get_prime( hash(get_id(), x) ); // choose a prime based on hash(x, get_id(), 2)
|
||||
int s = (x + v + (r+1)*p) % get_size();
|
||||
unsigned v = hash(x, get_id());// % get_size();
|
||||
unsigned p = get_prime( hash(get_id(), x) ); // choose a prime based on hash(x, get_id(), 2)
|
||||
unsigned s = (x + v + (r+1)*p) % get_size();
|
||||
return items[s];
|
||||
}
|
||||
|
||||
|
@ -65,6 +65,8 @@ float testmovement(int n, float f, int buckettype)
|
||||
b = new ListBucket(1);
|
||||
else if (buckettype == 3)
|
||||
b = new StrawBucket(1);
|
||||
else if (buckettype == 4)
|
||||
b = new UniformBucket(0,0);
|
||||
|
||||
for (int i=0; i<n; i++)
|
||||
b->add_item(ndisks++,1);
|
||||
@ -149,14 +151,14 @@ float testmovement(int n, float f, int buckettype)
|
||||
int main()
|
||||
{
|
||||
//cout << "// " << depth << ", modifydepth " << modifydepth << ", branching " << branching << ", disks " << n << endl;
|
||||
cout << "n\ttree\tlhead\tltail\tstraw" << endl;
|
||||
cout << "n\ttree\tlhead\tltail\tstraw\tuniform" << endl;
|
||||
|
||||
//for (int s=2; s<=64; s+= (s<4?1:(s<16?2:4))) {
|
||||
for (int s=2; s<=64; s+= (s<4?1:4)) {
|
||||
float f = 1.0 / (float)s;
|
||||
//cout << f << "\t" << s;
|
||||
cout << s;
|
||||
for (int buckettype=0; buckettype<4; buckettype++)
|
||||
for (int buckettype=0; buckettype<5; buckettype++)
|
||||
testmovement(s, f, buckettype);
|
||||
cout << endl;
|
||||
}
|
||||
|
@ -33,6 +33,8 @@ int main(int argc, char **argv)
|
||||
{
|
||||
cerr << "fakesyn start" << endl;
|
||||
|
||||
//cerr << "inode_t " << sizeof(inode_t) << endl;
|
||||
|
||||
vector<char*> args;
|
||||
argv_to_vec(argc, argv, args);
|
||||
|
||||
@ -61,9 +63,12 @@ int main(int argc, char **argv)
|
||||
|
||||
// create mds
|
||||
MDS *mds[NUMMDS];
|
||||
OSD *mdsosd[NUMMDS];
|
||||
for (int i=0; i<NUMMDS; i++) {
|
||||
//cerr << "mds" << i << " on rank " << myrank << " " << hostname << "." << pid << endl;
|
||||
mds[i] = new MDS(mdc, i, new FakeMessenger(MSG_ADDR_MDS(i)));
|
||||
if (g_conf.mds_local_osd)
|
||||
mdsosd[i] = new OSD(i+10000, new FakeMessenger(MSG_ADDR_OSD(i+10000)));
|
||||
start++;
|
||||
}
|
||||
|
||||
@ -91,6 +96,8 @@ int main(int argc, char **argv)
|
||||
// init
|
||||
for (int i=0; i<NUMMDS; i++) {
|
||||
mds[i]->init();
|
||||
if (g_conf.mds_local_osd)
|
||||
mdsosd[i]->init();
|
||||
}
|
||||
|
||||
for (int i=0; i<NUMOSD; i++) {
|
||||
|
@ -119,9 +119,11 @@ struct ltstr
|
||||
/** object layout
|
||||
* how objects are mapped into PGs
|
||||
*/
|
||||
#define OBJECT_LAYOUT_HASH 1
|
||||
#define OBJECT_LAYOUT_LINEAR 2
|
||||
#define OBJECT_LAYOUT_HASHINO 3
|
||||
#define OBJECT_LAYOUT_DEFAULT 0 // see g_conf
|
||||
#define OBJECT_LAYOUT_HASH 1
|
||||
#define OBJECT_LAYOUT_LINEAR 2
|
||||
#define OBJECT_LAYOUT_HASHINO 3
|
||||
#define OBJECT_LAYOUT_STARTOSD 4
|
||||
|
||||
/** pg layout
|
||||
* how PGs are mapped into (sets of) OSDs
|
||||
@ -135,12 +137,12 @@ struct ltstr
|
||||
* specifies a striping and replication strategy
|
||||
*/
|
||||
|
||||
#define FILE_LAYOUT_CRUSH 0 // stripe via crush
|
||||
#define FILE_LAYOUT_LINEAR 1 // stripe linearly across cluster
|
||||
//#define FILE_LAYOUT_CRUSH 0 // stripe via crush
|
||||
//#define FILE_LAYOUT_LINEAR 1 // stripe linearly across cluster
|
||||
|
||||
struct FileLayout {
|
||||
// layout
|
||||
int policy; // FILE_LAYOUT_*
|
||||
int object_layout;
|
||||
|
||||
// FIXME: make this a union?
|
||||
// rushstripe
|
||||
@ -154,15 +156,12 @@ struct FileLayout {
|
||||
int num_rep; // replication
|
||||
|
||||
FileLayout() { }
|
||||
FileLayout(int ss, int sc, int os, int nr=2) :
|
||||
policy(FILE_LAYOUT_CRUSH),
|
||||
stripe_size(ss), stripe_count(sc), object_size(os),
|
||||
num_rep(nr) { }
|
||||
/*FileLayout(int o) :
|
||||
policy(FILE_LAYOUT_OSDLOCAL),
|
||||
osd(o),
|
||||
num_rep(1) { }
|
||||
*/
|
||||
FileLayout(int ss, int sc, int os, int nr=2, int o=-1) :
|
||||
object_layout(o < 0 ? OBJECT_LAYOUT_DEFAULT:OBJECT_LAYOUT_STARTOSD),
|
||||
stripe_size(ss), stripe_count(sc), object_size(os),
|
||||
osd(o),
|
||||
num_rep(nr) { }
|
||||
|
||||
};
|
||||
|
||||
|
||||
|
@ -35,6 +35,30 @@ using namespace std;
|
||||
|
||||
|
||||
|
||||
LogStream::LogStream(MDS *mds, Filer *filer, inodeno_t log_ino)
|
||||
{
|
||||
this->mds = mds;
|
||||
this->filer = filer;
|
||||
|
||||
// inode
|
||||
memset(&log_inode, 0, sizeof(log_inode));
|
||||
log_inode.ino = log_ino;
|
||||
log_inode.layout = g_OSD_MDLogLayout;
|
||||
|
||||
if (g_conf.mds_local_osd) {
|
||||
log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD;
|
||||
log_inode.layout.osd = mds->get_nodeid() + 10000; // hack
|
||||
}
|
||||
|
||||
// wr
|
||||
sync_pos = flush_pos = append_pos = 0;
|
||||
autoflush = true;
|
||||
|
||||
// rd
|
||||
read_pos = 0;
|
||||
reading = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// ----------------------------
|
||||
|
@ -57,23 +57,7 @@ class LogStream {
|
||||
bool autoflush;
|
||||
|
||||
public:
|
||||
LogStream(MDS *mds, Filer *filer, inodeno_t log_ino) {
|
||||
this->mds = mds;
|
||||
this->filer = filer;
|
||||
|
||||
// inode
|
||||
memset(&log_inode, 0, sizeof(log_inode));
|
||||
log_inode.ino = log_ino;
|
||||
log_inode.layout = g_OSD_MDLogLayout;
|
||||
|
||||
// wr
|
||||
sync_pos = flush_pos = append_pos = 0;
|
||||
autoflush = true;
|
||||
|
||||
// rd
|
||||
read_pos = 0;
|
||||
reading = false;
|
||||
}
|
||||
LogStream(MDS *mds, Filer *filer, inodeno_t log_ino);
|
||||
|
||||
off_t get_read_pos() { return read_pos; }
|
||||
off_t get_append_pos() { return append_pos; }
|
||||
|
@ -603,10 +603,13 @@ public:
|
||||
|
||||
class C_MDC_ShutdownCheck : public Context {
|
||||
MDCache *mdc;
|
||||
Mutex *lock;
|
||||
public:
|
||||
C_MDC_ShutdownCheck(MDCache *m) : mdc(m) {}
|
||||
C_MDC_ShutdownCheck(MDCache *m, Mutex *l) : mdc(m), lock(l) {}
|
||||
void finish(int) {
|
||||
lock->Lock();
|
||||
mdc->shutdown_check();
|
||||
lock->Unlock();
|
||||
}
|
||||
};
|
||||
|
||||
@ -619,7 +622,7 @@ void MDCache::shutdown_check()
|
||||
g_conf.debug_mds = 10;
|
||||
show_cache();
|
||||
g_conf.debug_mds = o;
|
||||
g_timer.add_event_after(g_conf.mds_shutdown_check, new C_MDC_ShutdownCheck(this));
|
||||
g_timer.add_event_after(g_conf.mds_shutdown_check, new C_MDC_ShutdownCheck(this, &mds->mds_lock));
|
||||
|
||||
// this
|
||||
dout(0) << "lru size now " << lru.lru_get_size() << endl;
|
||||
@ -638,7 +641,7 @@ void MDCache::shutdown_start()
|
||||
dout(1) << "shutdown_start" << endl;
|
||||
|
||||
if (g_conf.mds_shutdown_check)
|
||||
g_timer.add_event_after(g_conf.mds_shutdown_check, new C_MDC_ShutdownCheck(this));
|
||||
g_timer.add_event_after(g_conf.mds_shutdown_check, new C_MDC_ShutdownCheck(this, &mds->mds_lock));
|
||||
}
|
||||
|
||||
|
||||
|
@ -136,6 +136,12 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
|
||||
osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
|
||||
}
|
||||
|
||||
if (g_conf.mds_local_osd) {
|
||||
// add mds osds, but don't put them in the crush mapping func
|
||||
for (int i=0; i<g_conf.num_mds; i++)
|
||||
osdmap->osds.insert(i+10000);
|
||||
}
|
||||
|
||||
// </HACK>
|
||||
|
||||
filer = new Filer(messenger, osdmap);
|
||||
@ -298,13 +304,20 @@ void MDS::handle_shutdown_finish(Message *m)
|
||||
dout(10) << "sending shutdown to mds" << i << endl;
|
||||
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
|
||||
MSG_ADDR_MDS(i), 0, 0);
|
||||
if (g_conf.mds_local_osd)
|
||||
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
|
||||
MSG_ADDR_OSD(i+10000), 0, 0);
|
||||
}
|
||||
|
||||
// shut down osd's
|
||||
for (int i=0; i<g_conf.num_osd; i++) {
|
||||
dout(10) << "sending shutdown to osd" << i << endl;
|
||||
set<int> osds;
|
||||
osdmap->get_all_osds(osds);
|
||||
for (set<int>::iterator it = osds.begin();
|
||||
it != osds.end();
|
||||
it++) {
|
||||
dout(10) << "sending shutdown to osd" << *it << endl;
|
||||
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
|
||||
MSG_ADDR_OSD(i), 0, 0);
|
||||
MSG_ADDR_OSD(*it), 0, 0);
|
||||
}
|
||||
|
||||
// shut myself down.
|
||||
|
@ -101,11 +101,12 @@ void split_path(string& path,
|
||||
|
||||
|
||||
class MDS : public Dispatcher {
|
||||
public:
|
||||
Mutex mds_lock;
|
||||
|
||||
protected:
|
||||
int whoami;
|
||||
|
||||
Mutex mds_lock;
|
||||
|
||||
MDCluster *mdcluster;
|
||||
public:
|
||||
OSDMap *osdmap;
|
||||
|
@ -69,11 +69,19 @@ class C_FakeKicker : public Context {
|
||||
}
|
||||
};
|
||||
|
||||
void FakeMessenger::callback_kick()
|
||||
{
|
||||
pending_timer = true;
|
||||
lock.Lock();
|
||||
cond.Signal(); // why not
|
||||
lock.Unlock();
|
||||
}
|
||||
|
||||
void *fakemessenger_thread(void *ptr)
|
||||
{
|
||||
//dout(1) << "thread start, setting timer kicker" << endl;
|
||||
//g_timer.set_messenger_kicker(new C_FakeKicker());
|
||||
msgr_callback_kicker = new C_FakeKicker();
|
||||
//msgr_callback_kicker = new C_FakeKicker();
|
||||
|
||||
lock.Lock();
|
||||
while (1) {
|
||||
@ -94,7 +102,7 @@ void *fakemessenger_thread(void *ptr)
|
||||
cout << "unsetting messenger" << endl;
|
||||
//g_timer.unset_messenger_kicker();
|
||||
g_timer.unset_messenger();
|
||||
msgr_callback_kicker = 0;
|
||||
//msgr_callback_kicker = 0;
|
||||
|
||||
dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
|
||||
return 0;
|
||||
@ -158,7 +166,7 @@ int fakemessenger_do_loop_2()
|
||||
|
||||
// callbacks
|
||||
lock.Unlock();
|
||||
messenger_do_callbacks();
|
||||
Messenger::do_callbacks();
|
||||
lock.Lock();
|
||||
|
||||
// messages
|
||||
|
@ -47,6 +47,8 @@ class FakeMessenger : public Messenger {
|
||||
|
||||
int get_dispatch_queue_len() { return qlen; }
|
||||
|
||||
void callback_kick();
|
||||
|
||||
// -- incoming queue --
|
||||
// (that nothing uses)
|
||||
Message *get_message() {
|
||||
|
@ -134,17 +134,24 @@ ostream& operator<<(ostream& out, Message& m)
|
||||
|
||||
Mutex msgr_callback_lock;
|
||||
list<Context*> msgr_callback_queue;
|
||||
Context* msgr_callback_kicker = 0;
|
||||
//Context* msgr_callback_kicker = 0;
|
||||
|
||||
void Messenger::queue_callback(Context *c) {
|
||||
msgr_callback_lock.Lock();
|
||||
msgr_callback_queue.push_back(c);
|
||||
msgr_callback_lock.Unlock();
|
||||
|
||||
msgr_callback_kicker->finish(0);
|
||||
callback_kick();
|
||||
}
|
||||
void Messenger::queue_callbacks(list<Context*>& ls) {
|
||||
msgr_callback_lock.Lock();
|
||||
msgr_callback_queue.splice(msgr_callback_queue.end(), ls);
|
||||
msgr_callback_lock.Unlock();
|
||||
|
||||
callback_kick();
|
||||
}
|
||||
|
||||
void messenger_do_callbacks() {
|
||||
void Messenger::do_callbacks() {
|
||||
// take list
|
||||
msgr_callback_lock.Lock();
|
||||
list<Context*> ls;
|
||||
|
@ -63,7 +63,14 @@ class Messenger {
|
||||
|
||||
virtual int shutdown() = 0;
|
||||
|
||||
// callbacks
|
||||
//static Mutex callback_lock;
|
||||
//static list<Context*> callback_queue;
|
||||
static void do_callbacks();
|
||||
|
||||
void queue_callback(Context *c);
|
||||
void queue_callbacks(list<Context*>& ls);
|
||||
virtual void callback_kick() = 0;
|
||||
|
||||
virtual int get_dispatch_queue_len() { return 0; };
|
||||
|
||||
@ -80,11 +87,9 @@ class Messenger {
|
||||
|
||||
// make a procedure call
|
||||
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
|
||||
};
|
||||
|
||||
// callbacks
|
||||
void messenger_do_callbacks();
|
||||
extern Context *msgr_callback_kicker;
|
||||
|
||||
};
|
||||
|
||||
|
||||
extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
|
||||
|
@ -318,6 +318,11 @@ class C_TCPKicker : public Context {
|
||||
}
|
||||
};
|
||||
|
||||
void TCPMessenger::callback_kick()
|
||||
{
|
||||
tcpmessenger_kick_dispatch_loop();
|
||||
}
|
||||
|
||||
|
||||
extern int tcpmessenger_lookup(char *str, tcpaddr_t& ta)
|
||||
{
|
||||
@ -417,7 +422,6 @@ int tcpmessenger_init()
|
||||
|
||||
// register to execute timer events
|
||||
//g_timer.set_messenger_kicker(new C_TCPKicker());
|
||||
msgr_callback_kicker = new C_TCPKicker();
|
||||
|
||||
|
||||
dout(DBL) << "init done" << endl;
|
||||
@ -871,7 +875,7 @@ void* tcp_dispatchthread(void*)
|
||||
|
||||
while (1) {
|
||||
// any pending callbacks?
|
||||
messenger_do_callbacks();
|
||||
Messenger::do_callbacks();
|
||||
|
||||
// inq?
|
||||
incoming_lock.Lock();
|
||||
@ -1198,7 +1202,6 @@ int TCPMessenger::shutdown()
|
||||
|
||||
// no more timer events
|
||||
g_timer.unset_messenger();
|
||||
msgr_callback_kicker = 0;
|
||||
|
||||
// close incoming sockets
|
||||
//void *r;
|
||||
|
@ -45,6 +45,8 @@ class TCPMessenger : public Messenger {
|
||||
|
||||
int get_dispatch_queue_len();
|
||||
|
||||
void callback_kick();
|
||||
|
||||
// init, shutdown MPI and associated event loop thread.
|
||||
virtual int shutdown();
|
||||
|
||||
|
@ -590,11 +590,12 @@ void OSD::update_map(bufferlist& state, bool mkfs)
|
||||
if (mkfs) {
|
||||
assert(osdmap->get_version() == 1);
|
||||
|
||||
ps_t maxps = 1LL << osdmap->get_pg_bits();
|
||||
|
||||
// create PGs
|
||||
for (int nrep = 1;
|
||||
nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep); // for low osd counts.. hackish bleh
|
||||
nrep++) {
|
||||
ps_t maxps = 1LL << osdmap->get_pg_bits();
|
||||
for (pg_t ps = 0; ps < maxps; ps++) {
|
||||
pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
|
||||
vector<int> acting;
|
||||
@ -613,7 +614,26 @@ void OSD::update_map(bufferlist& state, bool mkfs)
|
||||
pg_list.push_back(pgid);
|
||||
}
|
||||
}
|
||||
|
||||
// local PG too
|
||||
pg_t pgid = osdmap->osd_nrep_to_pg(whoami, nrep);
|
||||
vector<int> acting;
|
||||
osdmap->pg_to_acting_osds(pgid, acting);
|
||||
|
||||
if (acting[0] == whoami) {
|
||||
PG *pg = create_pg(pgid);
|
||||
pg->acting = acting;
|
||||
pg->set_role(0);
|
||||
pg->set_primary_since(osdmap->get_version());
|
||||
pg->mark_complete( osdmap->get_version() );
|
||||
|
||||
dout(7) << "created " << *pg << endl;
|
||||
pg_list.push_back(pgid);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
} else {
|
||||
// get pg list
|
||||
get_pg_list(pg_list);
|
||||
|
@ -42,8 +42,11 @@ using namespace std;
|
||||
// from LSB to MSB,
|
||||
#define PG_PS_BITS 32 // max bits for placement seed/group portion of PG
|
||||
#define PG_REP_BITS 10
|
||||
#define PG_TYPE_BITS 2
|
||||
#define PG_PS_MASK ((1LL<<PG_PS_BITS)-1)
|
||||
|
||||
#define PG_TYPE_RAND 1 // default: distribution randomly
|
||||
#define PG_TYPE_STARTOSD 2 // place primary on a specific OSD (named by the pg_bits)
|
||||
|
||||
|
||||
/** OSDMap
|
||||
@ -101,102 +104,155 @@ class OSDMap {
|
||||
/**** mapping facilities ****/
|
||||
|
||||
// oid -> ps
|
||||
ps_t object_to_ps(object_t oid) {
|
||||
ps_t object_to_pg(object_t oid, FileLayout& layout) {
|
||||
static crush::Hash H(777);
|
||||
|
||||
switch (g_conf.osd_object_layout) {
|
||||
int policy = layout.object_layout;
|
||||
if (policy == 0)
|
||||
policy = g_conf.osd_object_layout;
|
||||
|
||||
int type = PG_TYPE_RAND;
|
||||
pg_t ps;
|
||||
|
||||
switch (policy) {
|
||||
case OBJECT_LAYOUT_LINEAR:
|
||||
{
|
||||
const object_t ono = oid & ((1ULL << OID_ONO_BITS)-1ULL);
|
||||
const inodeno_t ino = oid >> OID_ONO_BITS;
|
||||
return (ono + ino) & PG_PS_MASK;
|
||||
ps = (ono + ino) & PG_PS_MASK;
|
||||
ps &= ((1ULL<<pg_bits)-1ULL);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case OBJECT_LAYOUT_HASHINO:
|
||||
{
|
||||
const object_t ono = oid & ((1ULL << OID_ONO_BITS)-1ULL);
|
||||
const inodeno_t ino = oid >> OID_ONO_BITS;
|
||||
return (ono + H(ino)) & PG_PS_MASK;
|
||||
ps = (ono + H(ino)) & PG_PS_MASK;
|
||||
ps &= ((1ULL<<pg_bits)-1ULL);
|
||||
}
|
||||
break;
|
||||
|
||||
case OBJECT_LAYOUT_HASH:
|
||||
{
|
||||
return H( oid ^ (oid >> 32) ) & PG_PS_MASK;
|
||||
ps = H( oid ^ (oid >> 32) ) & PG_PS_MASK;
|
||||
ps &= ((1ULL<<pg_bits)-1ULL);
|
||||
}
|
||||
break;
|
||||
|
||||
case OBJECT_LAYOUT_STARTOSD:
|
||||
{
|
||||
ps = layout.osd;
|
||||
type = PG_TYPE_STARTOSD;
|
||||
}
|
||||
break;
|
||||
}
|
||||
assert(0);
|
||||
return 0;
|
||||
|
||||
// construct final PG
|
||||
pg_t pg = type;
|
||||
pg = (pg << PG_REP_BITS) | (pg_t)layout.num_rep;
|
||||
pg = (pg << PG_PS_BITS) | ps;
|
||||
//cout << "pg " << hex << pg << dec << endl;
|
||||
return pg;
|
||||
}
|
||||
|
||||
// (ps, nrep) -> pg
|
||||
pg_t ps_nrep_to_pg(ps_t ps, int nrep) {
|
||||
return ((pg_t)ps & ((1ULL<<pg_bits)-1ULL))
|
||||
| ((pg_t)nrep << PG_PS_BITS);
|
||||
| ((pg_t)nrep << PG_PS_BITS)
|
||||
| ((pg_t)PG_TYPE_RAND << (PG_PS_BITS+PG_REP_BITS));
|
||||
}
|
||||
pg_t osd_nrep_to_pg(int osd, int nrep) {
|
||||
return ((pg_t)osd)
|
||||
| ((pg_t)nrep << PG_PS_BITS)
|
||||
| ((pg_t)PG_TYPE_STARTOSD << (PG_PS_BITS+PG_REP_BITS));
|
||||
|
||||
}
|
||||
|
||||
// pg -> nrep
|
||||
int pg_to_nrep(pg_t pg) {
|
||||
return pg >> PG_PS_BITS;
|
||||
return (pg >> PG_PS_BITS) & ((1ULL << PG_REP_BITS)-1);
|
||||
}
|
||||
|
||||
// pg -> ps
|
||||
int pg_to_ps(pg_t pg) {
|
||||
return pg & PG_PS_MASK;
|
||||
}
|
||||
|
||||
// pg -> pg_type
|
||||
int pg_to_type(pg_t pg) {
|
||||
return pg >> (PG_PS_BITS + PG_REP_BITS);
|
||||
}
|
||||
|
||||
|
||||
// pg -> (osd list)
|
||||
int pg_to_osds(pg_t pg,
|
||||
vector<int>& osds) { // list of osd addr's
|
||||
int num_rep = pg_to_nrep(pg);
|
||||
pg_t ps = pg_to_ps(pg);
|
||||
|
||||
int num_rep = pg_to_nrep(pg);
|
||||
assert(num_rep > 0);
|
||||
int type = pg_to_type(pg);
|
||||
|
||||
//cout << hex << "pg " << pg << " ps " << ps << " num_rep " << num_rep << " type " << type << dec << endl;
|
||||
if (type == PG_TYPE_STARTOSD) {
|
||||
//cout << "type startosd " << num_rep << " osd" << ps << endl;
|
||||
num_rep--;
|
||||
}
|
||||
|
||||
// spread "on" ps bits around a bit (usually only low bits are set bc of pg_bits)
|
||||
int hps = ((ps >> 32) ^ ps);
|
||||
hps = hps ^ (hps >> 16);
|
||||
|
||||
switch(g_conf.osd_pg_layout) {
|
||||
case PG_LAYOUT_CRUSH:
|
||||
crush.do_rule(crush.rules[num_rep],
|
||||
hps,
|
||||
osds);
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_LINEAR:
|
||||
for (int i=0; i<num_rep; i++)
|
||||
osds.push_back( (i + ps*num_rep) % g_conf.num_osd );
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_HYBRID:
|
||||
{
|
||||
static crush::Hash H(777);
|
||||
int h = H(hps);
|
||||
if (num_rep > 0) {
|
||||
switch(g_conf.osd_pg_layout) {
|
||||
case PG_LAYOUT_CRUSH:
|
||||
crush.do_rule(crush.rules[num_rep],
|
||||
hps,
|
||||
osds);
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_LINEAR:
|
||||
for (int i=0; i<num_rep; i++)
|
||||
osds.push_back( (h+i) % g_conf.num_osd );
|
||||
osds.push_back( (i + ps*num_rep) % g_conf.num_osd );
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_HYBRID:
|
||||
{
|
||||
static crush::Hash H(777);
|
||||
int h = H(hps);
|
||||
for (int i=0; i<num_rep; i++)
|
||||
osds.push_back( (h+i) % g_conf.num_osd );
|
||||
}
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_HASH:
|
||||
{
|
||||
static crush::Hash H(777);
|
||||
for (int i=0; i<num_rep; i++) {
|
||||
int t = 1;
|
||||
int osd = 0;
|
||||
while (t++) {
|
||||
osd = H(i, hps, t) % g_conf.num_osd;
|
||||
int j = 0;
|
||||
for (; j<i; j++)
|
||||
if (osds[j] == osd) break;
|
||||
if (j == i) break;
|
||||
}
|
||||
osds.push_back(osd);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
break;
|
||||
|
||||
case PG_LAYOUT_HASH:
|
||||
{
|
||||
static crush::Hash H(777);
|
||||
for (int i=0; i<num_rep; i++) {
|
||||
int t = 1;
|
||||
int osd = 0;
|
||||
while (t++) {
|
||||
osd = H(i, hps, t) % g_conf.num_osd;
|
||||
int j = 0;
|
||||
for (; j<i; j++)
|
||||
if (osds[j] == osd) break;
|
||||
if (j == i) break;
|
||||
}
|
||||
osds.push_back(osd);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
if (type == PG_TYPE_STARTOSD) {
|
||||
//cout << "putting in osd" << ps << endl;
|
||||
osds.insert(osds.begin(), (int)ps);
|
||||
}
|
||||
|
||||
return osds.size();
|
||||
}
|
||||
|
||||
|
@ -172,10 +172,12 @@ class Filer : public Dispatcher {
|
||||
return ono + (ino << OID_ONO_BITS);
|
||||
}
|
||||
|
||||
/*
|
||||
pg_t file_to_pg(inode_t& inode, size_t ono) {
|
||||
return osdmap->ps_nrep_to_pg( osdmap->object_to_ps( file_to_object(inode.ino, ono) ),
|
||||
inode.layout.num_rep );
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
/* map (ino, offset, len) to a (list of) OSDExtents
|
||||
@ -191,7 +193,7 @@ class Filer : public Dispatcher {
|
||||
map< object_t, OSDExtent > object_extents;
|
||||
|
||||
// RUSHSTRIPE?
|
||||
if (inode.layout.policy == FILE_LAYOUT_CRUSH) {
|
||||
//if (inode.layout.policy == FILE_LAYOUT_CRUSH) {
|
||||
// layout constant
|
||||
size_t stripes_per_object = inode.layout.object_size / inode.layout.stripe_size;
|
||||
|
||||
@ -213,7 +215,7 @@ class Filer : public Dispatcher {
|
||||
else {
|
||||
ex = &object_extents[oid];
|
||||
ex->oid = oid;
|
||||
ex->pg = file_to_pg( inode, objectno );
|
||||
ex->pg = osdmap->object_to_pg( oid, inode.layout );
|
||||
ex->osd = osdmap->get_pg_acting_primary( ex->pg );
|
||||
}
|
||||
|
||||
@ -253,7 +255,7 @@ class Filer : public Dispatcher {
|
||||
it++) {
|
||||
extents.push_back(it->second);
|
||||
}
|
||||
}
|
||||
//}
|
||||
/*else if (inode.layout.policy == FILE_LAYOUT_OSDLOCAL) {
|
||||
// all in one object, on a specific OSD.
|
||||
OSDExtent ex;
|
||||
@ -265,10 +267,11 @@ class Filer : public Dispatcher {
|
||||
ex.buffer_extents[0] = len;
|
||||
|
||||
extents.push_back(ex);
|
||||
}*/
|
||||
else {
|
||||
}
|
||||
else {
|
||||
assert(0);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
};
|
||||
|
@ -86,6 +86,12 @@ for my $f (@filt) {
|
||||
push( @{$res{$x}}, $s->{'avgval'}->{$field} );
|
||||
push( @key, "$f.$field" ) unless $didkey{"$f.$field"};
|
||||
$didkey{"$f.$field"} = 1;
|
||||
|
||||
if (exists $s->{'avgvaldevt'}) {
|
||||
push( @{$res{$x}}, $s->{'avgvaldevt'}->{$field} );
|
||||
push( @key, "$f.$field.dev" ) unless $didkey{"$f.$field.dev"};
|
||||
$didkey{"$f.$field.dev"} = 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -89,6 +89,24 @@ for my $k (sort {$a <=> $b} keys %sum) {
|
||||
|
||||
my $rows = $n || 1;
|
||||
my $files = $tcount{$starttime};
|
||||
my %avgval;
|
||||
|
||||
## devt
|
||||
my %avgvalvart; # std dev of each col avg, over time
|
||||
for my $k (keys %avg) {
|
||||
my $av = $avgval{$k} = $avg{$k} / ($rows*$files);
|
||||
|
||||
my $var = 0.0;
|
||||
for my $t (sort {$a <=> $b} keys %sum) {
|
||||
my $a = $sum{$t}->{$k} / $files;
|
||||
$var += ($a - $av) * ($a - $av);
|
||||
}
|
||||
|
||||
$avgvalvart{$k} = $var / $rows;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
print "\n";
|
||||
print join("\t",'#', map { $col{$_} } @c) . "\n";
|
||||
@ -97,6 +115,13 @@ print join("\t", '#maxval', map { $max{$col{$_}} } @c ) . "\n";
|
||||
print join("\t", '#rows', map { $rows } @c) . "\n";
|
||||
print join("\t", '#files', map { $files } @c) . "\n";
|
||||
print join("\t", '#avgval', #map int,
|
||||
map { ($rows*$files) ? ($_ / ($rows*$files)):0 } map { $avg{$col{$_}} } @c ) . "\n";
|
||||
map { $avgval{$col{$_}} } @c ) . "\n";
|
||||
# map { ($rows*$files) ? ($_ / ($rows*$files)):0 } map { $avg{$col{$_}} } @c ) . "\n";
|
||||
|
||||
print join("\t", '#avgvalvart',
|
||||
map { $avgvalvart{$col{$_}} } @c ) . "\n";
|
||||
print join("\t", '#avgvaldevt',
|
||||
map { sqrt($_) } map { $avgvalvart{$col{$_}} } @c ) . "\n";
|
||||
|
||||
print join("\t", '#avgsum', #map int,
|
||||
map { $_ / $rows } map { $avg{$col{$_}} } @c ) . "\n";
|
||||
|
@ -128,6 +128,7 @@ int main(int argc, char **argv)
|
||||
|
||||
// create mds
|
||||
MDS *mds[NUMMDS];
|
||||
OSD *mdsosd[NUMMDS];
|
||||
for (int i=0; i<NUMMDS; i++) {
|
||||
if (myrank != g_conf.tcp_skip_rank0+i) continue;
|
||||
TCPMessenger *m = new TCPMessenger(MSG_ADDR_MDS(i));
|
||||
@ -135,6 +136,11 @@ int main(int argc, char **argv)
|
||||
mds[i] = new MDS(mdc, i, m);
|
||||
mds[i]->init();
|
||||
started++;
|
||||
|
||||
if (g_conf.mds_local_osd) {
|
||||
mdsosd[i] = new OSD(i+10000, new TCPMessenger(MSG_ADDR_OSD(i+10000)));
|
||||
mdsosd[i]->init();
|
||||
}
|
||||
}
|
||||
|
||||
// create osd
|
||||
|
Loading…
Reference in New Issue
Block a user