mirror of
https://github.com/ceph/ceph
synced 2025-03-25 11:48:05 +00:00
*** empty log message ***
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@193 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
parent
ebedbaf652
commit
60488cc19a
@ -20,7 +20,7 @@ md_config_t g_conf = {
|
||||
|
||||
client_cache_size: 400,
|
||||
client_cache_mid: .5,
|
||||
client_requests: 100,
|
||||
client_requests: 10,
|
||||
client_deterministic: false,
|
||||
|
||||
log_messages: true,
|
||||
|
@ -173,8 +173,16 @@ void MDS::handle_shutdown_finish(Message *m)
|
||||
dout(2) << " shut down so far: " << did_shut_down << endl;
|
||||
|
||||
if (did_shut_down.size() == mdcluster->get_num_mds()) {
|
||||
// MDS's all shut down!
|
||||
|
||||
// shut down osd's
|
||||
for (int i=0; i<g_conf.num_osd; i++) {
|
||||
messenger->send_message(new MGenericMessage(MSG_SHUTDOWN),
|
||||
MSG_ADDR_OSD(i), 0, 0);
|
||||
}
|
||||
|
||||
// shut myself down.
|
||||
shutting_down = false;
|
||||
messenger->shutdown();
|
||||
}
|
||||
|
||||
// done
|
||||
@ -312,6 +320,7 @@ void MDS::dispatch(Message *m)
|
||||
if (mdcache->shutdown_pass()) {
|
||||
shutting_down = false;
|
||||
shut_down = true;
|
||||
shutdown_final();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -63,13 +63,18 @@ int mpimessenger_init(int& argc, char**& argv)
|
||||
gethostname(hostname,100);
|
||||
int pid = getpid();
|
||||
|
||||
dout(1) << "init: i am " << hostname << " pid " << pid << endl;
|
||||
dout(12) << "init: i am " << hostname << " pid " << pid << endl;
|
||||
|
||||
assert(mpi_world > g_conf.num_osd+g_conf.num_mds);
|
||||
|
||||
return mpi_rank;
|
||||
}
|
||||
|
||||
int mpimessenger_shutdown()
|
||||
{
|
||||
MPI_Finalize();
|
||||
}
|
||||
|
||||
int mpimessenger_world()
|
||||
{
|
||||
return mpi_world;
|
||||
@ -117,7 +122,7 @@ Message *mpi_recv(int tag)
|
||||
int mpi_send(Message *m, int rank, int tag)
|
||||
{
|
||||
if (rank == mpi_rank) {
|
||||
dout(3) << "local delivery not implemented" << endl;
|
||||
dout(1) << "local delivery not implemented" << endl;
|
||||
assert(0);
|
||||
}
|
||||
|
||||
@ -168,9 +173,11 @@ static int get_thread_tag()
|
||||
|
||||
void* mpimessenger_loop(void*)
|
||||
{
|
||||
dout(1) << "mpimessenger_loop start" << endl;
|
||||
|
||||
while (!mpi_done) {
|
||||
// check mpi
|
||||
dout(12) << "waiting for (unsolicited) messages" << endl;
|
||||
dout(12) << "mpimessenger_loop waiting for (unsolicited) messages" << endl;
|
||||
|
||||
// get message
|
||||
Message *m = mpi_recv(TAG_UNSOLICITED);
|
||||
@ -192,8 +199,9 @@ void* mpimessenger_loop(void*)
|
||||
}
|
||||
}
|
||||
|
||||
dout(1) << "waiting for all to finish" << endl;
|
||||
dout(5) << "mpimessenger_loop finish, waiting for all to finish" << endl;
|
||||
MPI_Barrier (MPI_COMM_WORLD);
|
||||
dout(5) << "mpimessenger_loop everybody done, exiting loop" << endl;
|
||||
}
|
||||
|
||||
|
||||
@ -201,7 +209,7 @@ void* mpimessenger_loop(void*)
|
||||
|
||||
int mpimessenger_start()
|
||||
{
|
||||
dout(1) << "mpimessenger_start starting thread" << endl;
|
||||
dout(5) << "mpimessenger_start starting thread" << endl;
|
||||
|
||||
// start a thread
|
||||
pthread_create(&thread_id,
|
||||
@ -212,7 +220,12 @@ int mpimessenger_start()
|
||||
|
||||
void mpimessenger_stop()
|
||||
{
|
||||
dout(1) << "mpimessenger_stop stopping thread" << endl;
|
||||
dout(5) << "mpimessenger_stop stopping thread" << endl;
|
||||
|
||||
if (mpi_done) {
|
||||
dout(1) << "mpimessenger_stop called, but already done!" << endl;
|
||||
assert(!mpi_done);
|
||||
}
|
||||
|
||||
// set finish flag
|
||||
mpi_done = true;
|
||||
@ -228,14 +241,13 @@ void mpimessenger_stop()
|
||||
|
||||
// wait for thread to stop
|
||||
mpimessenger_wait();
|
||||
|
||||
dout(1) << "mpimessenger_stop stopped" << endl;
|
||||
}
|
||||
|
||||
void mpimessenger_wait()
|
||||
{
|
||||
void *returnval;
|
||||
pthread_join(thread_id, &returnval);
|
||||
dout(10) << "mpimessenger_wait thread finished." << endl;
|
||||
}
|
||||
|
||||
|
||||
@ -281,12 +293,17 @@ int MPIMessenger::shutdown()
|
||||
|
||||
// last one?
|
||||
if (directory.empty()) {
|
||||
dout(10) << "last mpimessenger on rank " << mpi_rank << " shut down" << endl;
|
||||
pthread_t whoami = pthread_self();
|
||||
|
||||
dout(15) << "whoami = " << whoami << ", thread = " << thread_id << endl;
|
||||
if (whoami == thread_id) {
|
||||
// i am the event loop thread, just set flag!
|
||||
dout(15) << " set mpi_done=true" << endl;
|
||||
mpi_done = true;
|
||||
} else {
|
||||
// i am a different thread, tell the event loop to stop.
|
||||
dout(15) << " calling mpimessenger_stop()" << endl;
|
||||
mpimessenger_stop();
|
||||
}
|
||||
}
|
||||
@ -309,8 +326,10 @@ int MPIMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo
|
||||
mpi_send(m, rank, 0); // tag 0 for regular messages
|
||||
}
|
||||
|
||||
Message *MPIMessenger::sendrecv(Message *m, msg_addr_t dest, int port, int fromport)
|
||||
Message *MPIMessenger::sendrecv(Message *m, msg_addr_t dest, int port)
|
||||
{
|
||||
int fromport = 0;
|
||||
|
||||
// set envelope
|
||||
m->set_source(myaddr, fromport);
|
||||
m->set_dest(dest, port);
|
||||
|
@ -25,7 +25,7 @@ class MPIMessenger : public Messenger {
|
||||
|
||||
// message interface
|
||||
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
|
||||
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0, int fromport=0);
|
||||
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
|
||||
};
|
||||
|
||||
/**
|
||||
@ -36,6 +36,7 @@ extern int mpimessenger_init(int& argc, char**& argv); // init mpi
|
||||
extern int mpimessenger_start(); // start thread
|
||||
extern void mpimessenger_stop(); // stop thread.
|
||||
extern void mpimessenger_wait(); // wait for thread to finish.
|
||||
extern int mpimessenger_shutdown(); // finalize MPI
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -4,7 +4,7 @@
|
||||
|
||||
#define MSG_PING 1
|
||||
|
||||
#define MSG_FINISH 0
|
||||
#define MSG_SHUTDOWN 2
|
||||
|
||||
#define MSG_OSD_READ 10
|
||||
#define MSG_OSD_READREPLY 11
|
||||
|
@ -188,6 +188,7 @@ decode_message(crope& ser)
|
||||
|
||||
case MSG_MDS_SHUTDOWNSTART:
|
||||
case MSG_MDS_SHUTDOWNFINISH:
|
||||
case MSG_SHUTDOWN:
|
||||
case MSG_CLIENT_DONE:
|
||||
m = new MGenericMessage(type);
|
||||
break;
|
||||
|
@ -60,6 +60,11 @@ int OSD::shutdown()
|
||||
void OSD::dispatch(Message *m)
|
||||
{
|
||||
switch (m->get_type()) {
|
||||
|
||||
case MSG_SHUTDOWN:
|
||||
shutdown();
|
||||
break;
|
||||
|
||||
case MSG_OSD_READ:
|
||||
read((MOSDRead*)m);
|
||||
break;
|
||||
|
@ -68,51 +68,36 @@ int main(int argc, char **argv) {
|
||||
client[i]->issue_request();
|
||||
}
|
||||
|
||||
// start message loop
|
||||
mpimessenger_start();
|
||||
|
||||
// wait for it to finish
|
||||
mpimessenger_wait();
|
||||
mpimessenger_start(); // start message loop
|
||||
mpimessenger_wait(); // wait for thread to finish
|
||||
mpimessenger_shutdown(); // shutdown MPI
|
||||
|
||||
//
|
||||
/*
|
||||
cout << "---- check ----" << endl;
|
||||
for (int i=0; i<NUMMDS; i++) {
|
||||
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
|
||||
mds[i]->mdcache->shutdown_pass();
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
// cleanup
|
||||
cout << "cleanup" << endl;
|
||||
//cout << "cleanup" << endl;
|
||||
for (int i=0; i<NUMMDS; i++) {
|
||||
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_MDS(i),world)) continue;
|
||||
if (mds[i]->shutdown_final() == 0) {
|
||||
//cout << "clean shutdown of mds " << i << endl;
|
||||
delete mds[i];
|
||||
} else {
|
||||
cout << "problems shutting down mds " << i << endl;
|
||||
}
|
||||
delete mds[i];
|
||||
}
|
||||
for (int i=0; i<NUMOSD; i++) {
|
||||
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_OSD(i),world)) continue;
|
||||
if (osd[i]->shutdown() == 0) {
|
||||
//cout << "clean shutdown of osd " << i << endl;
|
||||
delete osd[i];
|
||||
} else {
|
||||
cout << "problems shutting down osd " << i << endl;
|
||||
}
|
||||
delete osd[i];
|
||||
}
|
||||
for (int i=0; i<NUMCLIENT; i++) {
|
||||
if (myrank != MPI_DEST_TO_RANK(MSG_ADDR_CLIENT(i),world)) continue;
|
||||
if (client[i]->shutdown() == 0) {
|
||||
//cout << "clean shutdown of client " << i << endl;
|
||||
delete client[i];
|
||||
} else {
|
||||
cout << "problems shutting down client " << i << endl;
|
||||
}
|
||||
delete client[i];
|
||||
}
|
||||
delete mdc;
|
||||
|
||||
cout << "done." << endl;
|
||||
//cout << "done." << endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user