*** empty log message ***

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@572 29311d96-e01e-0410-9327-a35deaab8ce9
This commit is contained in:
sage 2006-01-03 01:17:10 +00:00
parent e776ee5dad
commit 1119d468f5
6 changed files with 165 additions and 121 deletions

View File

@ -1,7 +1,7 @@
client
- some heuristic behavior to consolidate caps to inode auth
- client will re-tx anything it needed to say upon rx of new mds notification
- client will re-tx anything it needed to say upon rx of new mds notification (?)
ebofs
@ -12,9 +12,6 @@ ebofs
osd
- thread queue + object locks... need to be ordered!
- rep ops trickyness?
- ardos validation
- what happens if entire pg set changes, recovery doesn't finish, modifications are made, and pg set changes again? is anybody "complete" anymore?

View File

@ -65,6 +65,15 @@ class Cond
int r = pthread_cond_broadcast(&C);
return r;
}
int SignalOne() {
int r = pthread_cond_signal(&C);
return r;
}
int SignalAll() {
//int r = pthread_cond_signal(&C);
int r = pthread_cond_broadcast(&C);
return r;
}
};
#endif // !_Cond_Posix_

View File

@ -129,6 +129,7 @@ md_config_t g_conf = {
ebofs_abp_max_alloc: 4096*32,
// --- block device ---
bdev_iothreads: 2,
bdev_el_fw_max_ms: 1000, // restart elevator at least once every 1000 ms
bdev_el_bw_max_ms: 300, // restart elevator at least once every 1000 ms
bdev_el_bidir: true, // bidirectional elevator?

View File

@ -103,6 +103,7 @@ struct md_config_t {
size_t ebofs_abp_max_alloc;
// block device
int bdev_iothreads;
int bdev_el_fw_max_ms;
int bdev_el_bw_max_ms;
bool bdev_el_bidir;

View File

@ -50,121 +50,130 @@ block_t BlockDevice::get_num_blocks()
int BlockDevice::io_thread_entry()
{
dout(10) << "io_thread start" << endl;
// elevator nonsense!
bool dir_forward = true;
block_t pos = 0;
lock.Lock();
int whoami = io_threads_started++;
dout(10) << "io_thread" << whoami << " start" << endl;
if (whoami == 0) { // i'm the first one starting...
el_dir_forward = true;
el_pos = 0;
el_stop = g_clock.now();
utime_t max(0, 1000*g_conf.bdev_el_fw_max_ms); // (s,us), convert ms -> us!
el_stop += max;
}
int fd = open_fd();
assert(fd > 0);
while (!io_stop) {
// queue?
if (!io_queue.empty()) {
utime_t stop = g_clock.now();
if (dir_forward) {
// forward sweep
dout(20) << "io_thread forward sweep" << endl;
pos = 0;
// go until (we) reverse
dout(20) << "io_thread" << whoami << " going" << endl;
utime_t max(0, 1000*g_conf.bdev_el_fw_max_ms); // (s,us), convert ms -> us!
stop += max;
while (1) {
// find i >= pos
multimap<block_t,biovec*>::iterator i = io_queue.lower_bound(pos);
while (1) {
// find i >= pos
multimap<block_t,biovec*>::iterator i;
if (el_dir_forward) {
i = io_queue.lower_bound(el_pos);
if (i == io_queue.end()) break;
// merge contiguous ops
list<biovec*> biols;
int n = 0;
char type = i->second->type;
pos = i->first;
while (pos == i->first &&
type == i->second->type &&
++n <= g_conf.bdev_iov_max) {
dout(20) << "io_thread dequeue io at " << pos << " " << *i->second << endl;
biovec *bio = i->second;
biols.push_back(bio);
pos += bio->length;
multimap<block_t,biovec*>::iterator prev = i;
i++;
io_queue_map.erase(bio);
io_queue.erase(prev);
if (i == io_queue.end()) break;
}
lock.Unlock();
do_io(biols);
lock.Lock();
utime_t now = g_clock.now();
if (now > stop) break;
}
} else {
// reverse sweep
dout(20) << "io_thread reverse sweep" << endl;
pos = get_num_blocks();
utime_t max(0, 1000*g_conf.bdev_el_bw_max_ms); // (s,us), convert ms -> us!
stop += max;
while (1) {
// find i > pos
multimap<block_t,biovec*>::iterator i = io_queue.upper_bound(pos);
} else {
i = io_queue.upper_bound(el_pos);
if (i == io_queue.begin()) break;
i--; // and back down one (to get i <= pos)
// merge continguous ops
list<biovec*> biols;
char type = i->second->type;
int n = 0;
pos = i->first;
while (pos == i->first && type == i->second->type &&
++n <= g_conf.bdev_iov_max) {
dout(20) << "io_thread dequeue io at " << pos << " " << *i->second << endl;
biovec *bio = i->second;
biols.push_back(bio);
pos += bio->length;
multimap<block_t,biovec*>::iterator prev = i;
bool begin = (i == io_queue.begin());
if (!begin) i--;
io_queue_map.erase(bio);
io_queue.erase(prev);
if (begin) break;
}
lock.Unlock();
do_io(biols);
lock.Lock();
utime_t now = g_clock.now();
if (now > stop) break;
i--; // and back down one (to get i <= pos)
}
// merge contiguous ops
list<biovec*> biols;
char type = i->second->type;
int n = 0;
el_pos = i->first;
while (el_pos == i->first && type == i->second->type && // while (contiguous)
++n <= g_conf.bdev_iov_max) { // and not too big
biovec *bio = i->second;
if (el_dir_forward) {
dout(20) << "io_thread" << whoami << " fw dequeue io at " << el_pos << " " << *i->second << endl;
biols.push_back(bio); // at back
} else {
dout(20) << "io_thread" << whoami << " bw dequeue io at " << el_pos << " " << *i->second << endl;
biols.push_front(bio); // at front
}
multimap<block_t,biovec*>::iterator prev;
bool stop = false;
if (el_dir_forward) {
el_pos += bio->length; // cont. next would start right after us
prev = i;
i++;
if (i == io_queue.end()) stop = true;
} else {
prev = i;
if (i == io_queue.begin()) {
stop = true;
} else {
i--; // cont. would start before us...
if (i->first + i->second->length == el_pos)
el_pos = i->first; // yep, it does!
}
}
// dequeue
io_queue_map.erase(bio);
io_queue.erase(prev);
if (stop) break;
}
// drop lock to do the io
lock.Unlock();
do_io(fd, biols);
lock.Lock();
utime_t now = g_clock.now();
if (now > el_stop) break;
}
// reverse?
if (g_conf.bdev_el_bidir) {
dout(20) << "io_thread" << whoami << " reversing" << endl;
el_dir_forward = !el_dir_forward;
}
if (g_conf.bdev_el_bidir)
dir_forward = !dir_forward;
// reset disk pointers, timers
el_stop = g_clock.now();
if (el_dir_forward) {
el_pos = 0;
utime_t max(0, 1000*g_conf.bdev_el_fw_max_ms); // (s,us), convert ms -> us!
el_stop += max;
dout(20) << "io_thread" << whoami << " forward sweep for " << max << endl;
} else {
el_pos = num_blocks;
utime_t max(0, 1000*g_conf.bdev_el_bw_max_ms); // (s,us), convert ms -> us!
el_stop += max;
dout(20) << "io_thread" << whoami << " reverse sweep for " << max << endl;
}
} else {
// sleep
dout(20) << "io_thread sleeping" << endl;
dout(20) << "io_thread" << whoami << " sleeping" << endl;
io_wakeup.Wait(lock);
dout(20) << "io_thread woke up" << endl;
dout(20) << "io_thread" << whoami << " woke up" << endl;
}
}
::close(fd);
lock.Unlock();
dout(10) << "io_thread finish" << endl;
dout(10) << "io_thread" << whoami << " finish" << endl;
return 0;
}
void BlockDevice::do_io(list<biovec*>& biols)
void BlockDevice::do_io(int fd, list<biovec*>& biols)
{
int r;
assert(!biols.empty());
@ -177,18 +186,21 @@ void BlockDevice::do_io(list<biovec*>& biols)
char type = biols.front()->type;
list<biovec*>::iterator p = biols.begin();
int n = 1;
for (p++; p != biols.end(); p++) {
length += (*p)->length;
bl.claim_append((*p)->bl);
n++;
}
// do it
dout(20) << "do_io start " << (type==biovec::IO_WRITE?"write":"read")
<< " " << start << "~" << length << endl;
<< " " << start << "~" << length
<< " " << n << " bits" << endl;
if (type == biovec::IO_WRITE) {
r = _write(start, length, bl);
r = _write(fd, start, length, bl);
} else if (type == biovec::IO_READ) {
r = _read(start, length, bl);
r = _read(fd, start, length, bl);
} else assert(0);
dout(20) << "do_io finish " << (type==biovec::IO_WRITE?"write":"read")
<< " " << start << "~" << length << endl;
@ -268,8 +280,11 @@ void BlockDevice::_submit_io(biovec *b)
// NOTE: lock must be held
dout(15) << "_submit_io " << *b << endl;
// wake up thread?
if (io_queue.empty()) io_wakeup.Signal();
// wake up io_thread(s)?
if (io_queue.empty())
io_wakeup.SignalOne();
else
io_wakeup.SignalAll();
// check for overlapping ios
{
@ -327,7 +342,7 @@ int BlockDevice::count_io(block_t start, block_t len)
// low level io
int BlockDevice::_read(block_t bno, unsigned num, bufferlist& bl)
int BlockDevice::_read(int fd, block_t bno, unsigned num, bufferlist& bl)
{
dout(10) << "_read " << bno << "~" << num << endl;
@ -362,7 +377,7 @@ int BlockDevice::_read(block_t bno, unsigned num, bufferlist& bl)
return 0;
}
int BlockDevice::_write(unsigned bno, unsigned num, bufferlist& bl)
int BlockDevice::_write(int fd, unsigned bno, unsigned num, bufferlist& bl)
{
dout(10) << "_write " << bno << "~" << num << endl;
@ -415,11 +430,16 @@ int BlockDevice::_write(unsigned bno, unsigned num, bufferlist& bl)
// open/close
int BlockDevice::open_fd()
{
return ::open(dev, O_CREAT|O_RDWR|O_SYNC|O_DIRECT, 0);
}
int BlockDevice::open()
{
assert(fd == 0);
fd = ::open(dev, O_CREAT|O_RDWR|O_SYNC|O_DIRECT, 0);
fd = open_fd();
if (fd < 0) {
dout(1) << "open " << dev << " failed, r = " << fd << " " << strerror(errno) << endl;
fd = 0;
@ -427,12 +447,12 @@ int BlockDevice::open()
}
// lock
int r = ::flock(fd, LOCK_EX);
/*int r = ::flock(fd, LOCK_EX);
if (r < 0) {
dout(1) << "open " << dev << " failed to get LOCK_EX" << endl;
assert(0);
return -1;
}
}*/
// figure size
__uint64_t bsize = 0;
@ -448,7 +468,11 @@ int BlockDevice::open()
dout(1) << "open " << dev << " is " << bsize << " bytes, " << num_blocks << " blocks" << endl;
// start thread
io_thread.create();
io_threads_started = 0;
for (int i=0; i<g_conf.bdev_iothreads; i++) {
io_threads.push_back(IOThread(this));
io_threads.back().create();
}
complete_thread.create();
return fd;
@ -464,19 +488,22 @@ int BlockDevice::close()
lock.Lock();
complete_lock.Lock();
io_stop = true;
io_wakeup.Signal();
complete_wakeup.Signal();
io_wakeup.SignalAll();
complete_wakeup.SignalAll();
complete_lock.Unlock();
lock.Unlock();
io_thread.join();
for (int i=0; i<g_conf.bdev_iothreads; i++)
io_threads[i].join();
complete_thread.join();
io_stop = false; // in case we start again
dout(1) << "close " << dev << endl;
::flock(fd, LOCK_UN);
//::flock(fd, LOCK_UN);
::close(fd);
fd = 0;

View File

@ -54,11 +54,18 @@ class BlockDevice {
map<biovec*, block_t> io_queue_map;
Cond io_wakeup;
bool io_stop;
int io_threads_started;
void _submit_io(biovec *b);
int _cancel_io(biovec *bio);
void do_io(list<biovec*>& biols);
void do_io(int fd, list<biovec*>& biols);
// elevator scheduler
bool el_dir_forward;
block_t el_pos;
utime_t el_stop;
// io thread
int io_thread_entry();
class IOThread : public Thread {
@ -66,11 +73,12 @@ class BlockDevice {
public:
IOThread(BlockDevice *d) : dev(d) {}
void *entry() { return (void*)dev->io_thread_entry(); }
} io_thread;
} ;
vector<IOThread> io_threads;
// low level io
int _read(block_t bno, unsigned num, bufferlist& bl);
int _write(unsigned bno, unsigned num, bufferlist& bl);
int _read(int fd, block_t bno, unsigned num, bufferlist& bl);
int _write(int fd, unsigned bno, unsigned num, bufferlist& bl);
// complete queue
@ -90,13 +98,14 @@ class BlockDevice {
} complete_thread;
int open_fd(); // get an fd
public:
BlockDevice(char *d) :
dev(d), fd(0), num_blocks(0),
io_stop(false),
io_thread(this), complete_thread(this)
io_stop(false), io_threads_started(0),
el_dir_forward(true), el_pos(0),
complete_thread(this)
{ };
~BlockDevice() {
if (fd > 0) close();