*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@327 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2005-06-16 20:26:49 +00:00
parent 478b50d7a3
commit a7fdc272c0
12 changed files with 98 additions and 78 deletions

View File

@ -20,7 +20,8 @@
// single global instance
Timer g_timer;
Messenger *messenger_to_kick = 0;
Context *messenger_kicker = 0;
ostream& operator<<(ostream& out, timepair_t& t)
{
@ -64,10 +65,13 @@ void Timer::timer_thread()
scheduled.erase(t);
}
if (messenger_to_kick) {
if (messenger_kicker) {
dout(DBL) << "kicking messenger" << endl;
messenger_to_kick->trigger_timer(this);
messenger_kicker->finish(0);
} else {
dout(DBL) << "no messenger ot kick!" << endl;
}
}
else {
@ -92,15 +96,19 @@ void Timer::timer_thread()
*/
void Timer::set_messenger(Messenger *m)
void Timer::set_messenger_kicker(Context *c)
{
dout(10) << "messenger to kick is " << m << endl;
messenger_to_kick = m;
dout(10) << "messenger kicker is " << c << endl;
messenger_kicker = c;
}
void Timer::unset_messenger()
void Timer::unset_messenger_kicker()
{
dout(10) << "unset messenger" << endl;
messenger_to_kick = 0;
if (messenger_kicker) {
delete messenger_kicker;
messenger_kicker = 0;
}
cancel_timer();
}
@ -118,8 +126,6 @@ void Timer::register_timer()
void Timer::cancel_timer()
{
// clear my callback pointers
messenger_to_kick = 0;
if (thread_id) {
dout(10) << "setting thread_stop flag" << endl;
lock.Lock();

View File

@ -19,6 +19,7 @@ using namespace std;
class Messenger;
class Timer {
private:
map< timepair_t, set<Context*> > scheduled; // time -> (context ...)
@ -96,8 +97,8 @@ class Timer {
cancel_timer();
}
void set_messenger(Messenger *m);
void unset_messenger();
void set_messenger_kicker(Context *c);
void unset_messenger_kicker();
// schedule events
void add_event_after(float seconds,

View File

@ -185,6 +185,7 @@ int main(int oargc, char **oargv) {
free(argv);
delete[] nargv;
cout << "fakesyn done" << endl;
return 0;
}

View File

@ -14,7 +14,7 @@ using namespace std;
#define BUFFER_MODE_NOFREE 0
#define BUFFER_MODE_FREE 2
#define BUFFER_MODE_DEFAULT (BUFFER_MODE_COPY|BUFFER_MODE_FREE)
#define BUFFER_MODE_DEFAULT 3//(BUFFER_MODE_COPY|BUFFER_MODE_FREE)
/*
* buffer - the underlying buffer container. with a reference count.
@ -34,34 +34,46 @@ class buffer {
int _ref;
int _get() { return ++_ref; }
int _put() { return --_ref; }
int _get() {
cout << "buffer.get " << *this << " get " << _ref+1 << endl;
return ++_ref;
}
int _put() {
cout << "buffer.put " << *this << " put " << _ref-1 << endl;
return --_ref;
}
friend class bufferptr;
public:
// constructors
buffer() : _dataptr(0), _len(0), _alloc_len(0), _ref(0), _myptr(true) {
//cout << "buffer() " << *this << endl;
cout << "buffer.cons " << *this << endl;
}
buffer(int a) : _len(0), _alloc_len(a), _ref(0), _myptr(true) {
//cout << "buffer(empty) " << *this << endl;
buffer(int a) : _dataptr(0), _len(0), _alloc_len(a), _ref(0), _myptr(true) {
cout << "buffer.cons " << *this << endl;
_dataptr = new char[a];
cout << "buffer.malloc " << (void*)_dataptr << endl;
}
~buffer() {
//cout << "~buffer " << *this << endl;
if (_dataptr && _myptr)
cout << "buffer.des " << *this << endl;
if (_dataptr && _myptr) {
cout << "buffer.free " << (void*)_dataptr << endl;
delete[] _dataptr;
}
}
buffer(const char *p, int l, int mode=BUFFER_MODE_DEFAULT) :
_len(l),
_alloc_len(l),
_ref(0),
_myptr(mode & BUFFER_MODE_FREE ? true:false) {
//cout << "buffer cons mode = " << _myptr << endl;
_dataptr(0),
_len(l),
_alloc_len(l),
_ref(0),
_myptr(0) {
_myptr = mode & BUFFER_MODE_FREE ? true:false;
cout << "buffer.cons " << *this << " mode = " << mode << ", myptr=" << _myptr << endl;
if (mode & BUFFER_MODE_COPY) {
_dataptr = new char[l];
cout << "buffer.malloc " << (void*)_dataptr << endl;
memcpy(_dataptr, p, l);
//cout << "buffer(copy) " << *this << endl;
} else {
@ -94,7 +106,7 @@ class buffer {
};
inline ostream& operator<<(ostream& out, buffer& b) {
return out << "buffer(len=" << b._len << ", alloc=" << b._alloc_len << ", " << (void*)b._dataptr << ")";
return out << "buffer(this=" << &b << " len=" << b._len << ", alloc=" << b._alloc_len << ", data=" << (void*)b._dataptr << " ref=" << b._ref << ")";
}

View File

@ -47,9 +47,10 @@ class CheesySerializer : public Messenger,
Message *sendrecv(Message *m, msg_addr_t dest,
int port=0); // blocks for matching reply
/*
void trigger_timer(class Timer *t) {
messenger->trigger_timer(t);
}
}*/
};
#endif

View File

@ -38,6 +38,8 @@ LogType fakemsg_logtype;
Mutex lock;
Cond cond;
bool pending_timer = false;
bool awake = false;
bool shutdown = false;
pthread_t thread_id;
@ -83,10 +85,15 @@ void fakemessenger_wait()
cout << "fakemessenger_wait waiting" << endl;
void *ptr;
pthread_join(thread_id, &ptr);
g_timer.unset_messenger_kicker();
}
Timer *pending_timer = 0;
// lame main looper
@ -112,11 +119,8 @@ int fakemessenger_do_loop_2()
// timer?
if (pending_timer) {
Timer *t = pending_timer;
pending_timer = 0;
dout(5) << "pending timer" << endl;
t->execute_pending();
g_timer.execute_pending();
}
// messages
@ -168,13 +172,20 @@ int fakemessenger_do_loop_2()
// class
class C_FakeKicker : public Context {
void finish(int r) {
dout(18) << "timer kick" << endl;
pending_timer = true;
cond.Signal(); // why not
}
};
FakeMessenger::FakeMessenger(long me) : Messenger(me)
{
whoami = me;
directory[ whoami ] = this;
g_timer.set_messenger(this);
pending_timer = 0;
g_timer.set_messenger_kicker(new C_FakeKicker());
cout << "fakemessenger " << whoami << " messenger is " << this << endl;
@ -205,14 +216,13 @@ FakeMessenger::~FakeMessenger()
int FakeMessenger::shutdown()
{
g_timer.unset_messenger();
//cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
directory.erase(whoami);
if (directory.empty())
::shutdown = true;
}
/*
void FakeMessenger::trigger_timer(Timer *t)
{
// note timer to call
@ -221,7 +231,7 @@ void FakeMessenger::trigger_timer(Timer *t)
// wake up thread?
cond.Signal(); // why not
}
*/
int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{

View File

@ -31,7 +31,7 @@ class FakeMessenger : public Messenger {
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); };
// events
virtual void trigger_timer(Timer *t);
//virtual void trigger_timer(Timer *t);
// -- incoming queue --

View File

@ -46,7 +46,7 @@ pthread_t thread_id = 0; // thread id of the event loop. init value == nobody
Mutex sender_lock;
Mutex out_queue_lock;
Timer *pending_timer = 0;
bool pending_timer;
// our lock for any common data; it's okay to have only the one global mutex
@ -348,18 +348,15 @@ void* mpimessenger_loop(void*)
// timer events?
if (pending_timer) {
Timer *t = pending_timer;
pending_timer = 0;
dout(DBLVL) << "pending timer" << endl;
t->execute_pending();
g_timer.execute_pending();
}
// done?
if (mpi_done &&
incoming.empty() &&
outgoing.empty() &&
pending_timer == 0) break;
!pending_timer) break;
// incoming
@ -485,6 +482,13 @@ void mpimessenger_wait()
* MPIMessenger class implementation
*/
class C_MPIKicker : public Context {
void finish(int r) {
dout(DBLVL) << "timer kick" << endl;
mpimessenger_kick_loop();
}
};
MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr)
{
// my address
@ -494,7 +498,7 @@ MPIMessenger::MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr)
directory[myaddr] = this;
// register to execute timer events
g_timer.set_messenger(this);
g_timer.set_messenger_kicker(new C_MPIKicker());
// logger
/*
@ -524,7 +528,7 @@ int MPIMessenger::shutdown()
directory.erase(myaddr);
// no more timer events
g_timer.unset_messenger();
g_timer.unset_messenger_kicker();
// last one?
if (directory.empty()) {
@ -548,15 +552,6 @@ int MPIMessenger::shutdown()
/*** events
*/
void MPIMessenger::trigger_timer(Timer *t)
{
pending_timer = t;
mpimessenger_kick_loop();
}
/***
* public messaging interface

View File

@ -29,7 +29,7 @@ class MPIMessenger : public Messenger {
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
// events
virtual void trigger_timer(Timer *t);
//virtual void trigger_timer(Timer *t);
};
/**

View File

@ -39,7 +39,7 @@ class Messenger {
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) = 0;
// events
virtual void trigger_timer(Timer *t) = 0;
//virtual void trigger_timer(Timer *t) = 0;
};

View File

@ -65,7 +65,7 @@ pthread_t out_thread_id = 0; // thread id of the event loop. init value == no
pthread_t listen_thread_id = 0;
Mutex sender_lock;
Timer *pending_timer = 0;
bool pending_timer = false;
@ -491,17 +491,14 @@ void* tcpmessenger_loop(void*)
// timer events?
if (pending_timer) {
Timer *t = pending_timer;
pending_timer = 0;
dout(DBL) << "pending timer" << endl;
t->execute_pending();
g_timer.execute_pending();
}
// done?
if (tcp_done &&
incoming.empty() &&
pending_timer == 0) break;
!pending_timer) break;
// incoming
dout(12) << "loop waiting for incoming messages" << endl;
@ -612,6 +609,13 @@ void tcpmessenger_wait()
* Tcpmessenger class implementation
*/
class C_TCPKicker : public Context {
void finish(int r) {
dout(DBL) << "timer kick" << endl;
incoming_cond.Signal();
}
};
TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
{
// my address
@ -621,7 +625,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr)
directory[myaddr] = this;
// register to execute timer events
g_timer.set_messenger(this);
g_timer.set_messenger_kicker(new C_TCPKicker());
// logger
/*
@ -651,7 +655,7 @@ int TCPMessenger::shutdown()
directory.erase(myaddr);
// no more timer events
g_timer.unset_messenger();
g_timer.unset_messenger_kicker();
// last one?
if (directory.empty()) {
@ -691,15 +695,6 @@ int TCPMessenger::shutdown()
/*** events
*/
void TCPMessenger::trigger_timer(Timer *t)
{
pending_timer = t;
tcpmessenger_kick_loop();
}
/***
* public messaging interface
@ -714,8 +709,10 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo
m->set_dest(dest, port);
if (0) {
// der
tcp_send(m);
} else {
// good way
outgoing_lock.Lock();
outgoing.push_back(m);
outgoing_lock.Unlock();

View File

@ -28,9 +28,6 @@ class TCPMessenger : public Messenger {
// message interface
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0) { assert(0); }
// events
virtual void trigger_timer(Timer *t);
};
/**