rados_bencher.h:

bench_write and bench_seq will now wait on any write/read
	rather than the one least recently started.

	bench_write adds its pid to the BENCH_DATA object

	bench_read uses the pid in BENCH_DATA to generate the object
	names to read.

Signed-off-by: Samuel Just <samuelj@hq.newdream.net>
This commit is contained in:
Samuel Just 2010-11-24 10:49:12 -08:00
parent fe9fad7bea
commit 63fab458f6

View File

@ -47,18 +47,23 @@ struct bench_data {
char *object_contents; //pointer to the contents written to each object
};
void generate_object_name(char *s, int objnum)
void generate_object_name(char *s, int objnum, int pid = 0)
{
char hostname[30];
gethostname(hostname, sizeof(hostname)-1);
hostname[sizeof(hostname)-1] = 0;
snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, getpid(), objnum);
if (pid) {
snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, pid, objnum);
} else {
snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, getpid(), objnum);
}
}
int write_bench(Rados& rados, rados_pool_t pool,
int secondsToRun, int concurrentios, bench_data *data);
int seq_read_bench(Rados& rados, rados_pool_t pool,
int secondsToRun, int concurrentios, bench_data *data);
int secondsToRun, int concurrentios, bench_data *data,
int writePid);
void *status_printer(void * data_store);
void sanitize_object_contents(bench_data *data, int length);
@ -68,11 +73,12 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation,
int num_objects = 0;
char* contentsChars = new char[op_size];
int r = 0;
int prevPid = 0;
//get data from previous write run, if available
if (operation != OP_WRITE) {
bufferlist object_data;
r = rados.read(pool, BENCH_DATA, 0, object_data, sizeof(int)*2);
r = rados.read(pool, BENCH_DATA, 0, object_data, sizeof(int)*3);
if (r <= 0) {
delete[] contentsChars;
if (r == -2)
@ -82,6 +88,7 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation,
bufferlist::iterator p = object_data.begin();
::decode(object_size, p);
::decode(num_objects, p);
::decode(prevPid, p);
} else {
object_size = op_size;
}
@ -108,7 +115,7 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation,
if (r != 0) goto out;
}
else if (OP_SEQ_READ == operation) {
r = seq_read_bench(rados, pool, secondsToRun, concurrentios, data);
r = seq_read_bench(rados, pool, secondsToRun, concurrentios, data, prevPid);
if (r != 0) goto out;
}
else if (OP_RAND_READ == operation) {
@ -122,6 +129,13 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation,
return r;
}
void _aio_cb(void *cb, void *arg) {
dataLock.Lock();
Cond *cond = (Cond *) arg;
cond->Signal();
dataLock.Unlock();
}
int write_bench(Rados& rados, rados_pool_t pool,
int secondsToRun, int concurrentios, bench_data *data) {
cout << "Maintaining " << concurrentios << " concurrent writes of "
@ -135,6 +149,10 @@ int write_bench(Rados& rados, rados_pool_t pool,
utime_t start_times[concurrentios];
utime_t stopTime;
int r = 0;
bufferlist b_write;
Cond cond;
utime_t runtime;
utime_t timePassed;
//set up writes so I can start them together
for (int i = 0; i<concurrentios; ++i) {
@ -153,11 +171,11 @@ int write_bench(Rados& rados, rados_pool_t pool,
dataLock.Unlock();
for (int i = 0; i<concurrentios; ++i) {
start_times[i] = g_clock.now();
completions[i] = rados.aio_create_completion();
completions[i] = rados.aio_create_completion((void *) &cond, 0,
&_aio_cb);
r = rados.aio_write(pool, name[i], 0, *contents[i], data->object_size, completions[i]);
if (r < 0) { //naughty, doesn't clean up heap
dataLock.Unlock();
return -5; //EIO
goto ERR;
}
dataLock.Lock();
++data->started;
@ -169,15 +187,25 @@ int write_bench(Rados& rados, rados_pool_t pool,
int slot;
bufferlist* newContents;
char* newName;
utime_t runtime;
utime_t timePassed = g_clock.now() - data->start_time;
//don't need locking for reads because other thread doesn't write
runtime.set_from_double(secondsToRun);
stopTime = data->start_time + runtime;
while( g_clock.now() < stopTime ) {
slot = data->finished % concurrentios;
dataLock.Lock();
while (1) {
for (slot = 0; slot < concurrentios; ++slot) {
if (completions[slot]->is_safe()) {
break;
}
}
if (slot < concurrentios) {
break;
}
cond.Wait(dataLock);
}
dataLock.Unlock();
//create new contents and name on the heap, and fill them
newContents = new bufferlist();
newName = new char[128];
@ -189,7 +217,7 @@ int write_bench(Rados& rados, rados_pool_t pool,
r = completions[slot]->get_return_value();
if (r != 0) {
dataLock.Unlock();
return r;
goto ERR;
}
data->cur_latency = g_clock.now() - start_times[slot];
total_latency += data->cur_latency;
@ -206,10 +234,12 @@ int write_bench(Rados& rados, rados_pool_t pool,
//write new stuff to rados, then delete old stuff
//and save locations of new stuff for later deletion
start_times[slot] = g_clock.now();
completions[slot] = rados.aio_create_completion();
completions[slot] = rados.aio_create_completion((void *) &cond, 0,
&_aio_cb);
r = rados.aio_write(pool, newName, 0, *newContents, data->object_size, completions[slot]);
if (r < 0) //naughty; doesn't clean up heap space.
return r;
if (r < 0) {//naughty; doesn't clean up heap space.
goto ERR;
}
dataLock.Lock();
++data->started;
++data->in_flight;
@ -227,7 +257,7 @@ int write_bench(Rados& rados, rados_pool_t pool,
r = completions[slot]->get_return_value();
if (r != 0) {
dataLock.Unlock();
return r;
goto ERR;
}
data->cur_latency = g_clock.now() - start_times[slot];
total_latency += data->cur_latency;
@ -268,15 +298,22 @@ int write_bench(Rados& rados, rados_pool_t pool,
int written_objects[2];
written_objects[0] = data->object_size;
written_objects[1] = data->finished;
bufferlist b_write;
::encode(data->object_size, b_write);
::encode(data->finished, b_write);
rados.write(pool, BENCH_DATA, 0, b_write, sizeof(int)*2);
::encode(getpid(), b_write);
rados.write(pool, BENCH_DATA, 0, b_write, sizeof(int)*3);
return 0;
ERR:
dataLock.Lock();
data->done = 1;
dataLock.Unlock();
pthread_join(print_thread, NULL);
return -5;
}
int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
int concurrentios, bench_data *write_data) {
int concurrentios, bench_data *write_data, int pid) {
bench_data *data = new bench_data();
data->done = false;
data->object_size = write_data->object_size;
@ -289,9 +326,11 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
data->avg_latency = 0;
data->object_contents = write_data->object_contents;
Cond cond;
Rados::AioCompletion* completions[concurrentios];
char* name[concurrentios];
bufferlist* contents[concurrentios];
int index[concurrentios];
int errors = 0;
utime_t start_time;
utime_t start_times[concurrentios];
@ -299,13 +338,14 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
time_to_run.set_from_double(seconds_to_run);
double total_latency = 0;
int r = 0;
utime_t runtime;
sanitize_object_contents(data, 128); //clean it up once; subsequent
//changes will be safe because string length monotonically increases
//set up initial reads
for (int i = 0; i < concurrentios; ++i) {
name[i] = new char[128];
generate_object_name(name[i], i);
generate_object_name(name[i], i, pid);
contents[i] = new bufferlist();
}
@ -314,42 +354,55 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
dataLock.Lock();
data->start_time = g_clock.now();
dataLock.Unlock();
utime_t finish_time = data->start_time + time_to_run;
//start initial reads
for (int i = 0; i < concurrentios; ++i) {
index[i] = i;
start_times[i] = g_clock.now();
completions[i] = rados.aio_create_completion();
completions[i] = rados.aio_create_completion((void *) &cond,
&_aio_cb, 0);
r = rados.aio_read(pool, name[i], 0, contents[i], data->object_size, completions[i]);
if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
cerr << "r = " << r << std::endl;
dataLock.Unlock();
return -5; //EIO
goto ERR;
}
dataLock.Lock();
++data->started;
++data->in_flight;
dataLock.Unlock();
}
dataLock.Unlock();
//keep on adding new reads as old ones complete
int slot;
char* newName;
utime_t runtime;
bufferlist *cur_contents;
utime_t finish_time = data->start_time + time_to_run;
bool not_overtime = true;
for (int i = 0;
i < write_data->finished - concurrentios && not_overtime;
++i) {
slot = data->finished % concurrentios;
while (seconds_to_run && (g_clock.now() < finish_time)) {
dataLock.Lock();
while (1) {
for (slot = 0; slot < concurrentios; ++slot) {
if (completions[slot]->is_complete()) {
break;
}
}
if (slot < concurrentios) {
break;
}
cond.Wait(dataLock);
}
dataLock.Unlock();
newName = new char[128];
generate_object_name(newName, data->started);
generate_object_name(newName, data->started, pid);
int current_index = index[slot];
index[slot] = data->started;
completions[slot]->wait_for_complete();
dataLock.Lock();
r = completions[slot]->get_return_value();
if (r != 0) {
cerr << "read got " << r << std::endl;
dataLock.Unlock();
return r;
goto ERR;
}
data->cur_latency = g_clock.now() - start_times[slot];
total_latency += data->cur_latency;
@ -366,24 +419,24 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
//start new read and check data if requested
start_times[slot] = g_clock.now();
contents[slot] = new bufferlist();
completions[slot] = rados.aio_create_completion();
completions[slot] = rados.aio_create_completion((void *) &cond,
&_aio_cb, 0);
r = rados.aio_read(pool, newName, 0, contents[slot], data->object_size, completions[slot]);
if (r < 0)
return r;
if (r < 0) {
goto ERR;
}
dataLock.Lock();
++data->started;
++data->in_flight;
snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i);
snprintf(data->object_contents, data->object_size, "I'm the %dth object!", current_index);
dataLock.Unlock();
if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) {
cerr << name[slot] << " is not correct!";
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
delete name[slot];
name[slot] = newName;
delete cur_contents;
if (seconds_to_run != 0)
not_overtime = !(g_clock.now() > finish_time);
}
//wait for final reads to complete
@ -395,7 +448,7 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
if (r != 0) {
cerr << "read got " << r << std::endl;
dataLock.Unlock();
return r;
goto ERR;
}
data->cur_latency = g_clock.now() - start_times[slot];
total_latency += data->cur_latency;
@ -406,7 +459,7 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
--data->in_flight;
completions[slot]-> release();
completions[slot] = 0;
snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->finished-1);
snprintf(data->object_contents, data->object_size, "I'm the %dth object!", index[slot]);
dataLock.Unlock();
if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
@ -439,6 +492,13 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run,
delete data;
return 0;
ERR:
dataLock.Lock();
data->done = 1;
dataLock.Unlock();
pthread_join(print_thread, NULL);
return -5;
}