From 63fab458f625f953f3f98d94b4cb56cf3293eab8 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 24 Nov 2010 10:49:12 -0800 Subject: [PATCH] rados_bencher.h: bench_write and bench_seq will now wait on any write/read rather than the one least recently started. bench_write adds its pid to the BENCH_DATA object bench_read uses the pid in BENCH_DATA to generate the object names to read. Signed-off-by: Samuel Just --- src/osdc/rados_bencher.h | 144 +++++++++++++++++++++++++++------------ 1 file changed, 102 insertions(+), 42 deletions(-) diff --git a/src/osdc/rados_bencher.h b/src/osdc/rados_bencher.h index 5f1e261658a..e68b3aa74cb 100644 --- a/src/osdc/rados_bencher.h +++ b/src/osdc/rados_bencher.h @@ -47,18 +47,23 @@ struct bench_data { char *object_contents; //pointer to the contents written to each object }; -void generate_object_name(char *s, int objnum) +void generate_object_name(char *s, int objnum, int pid = 0) { char hostname[30]; gethostname(hostname, sizeof(hostname)-1); hostname[sizeof(hostname)-1] = 0; - snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, getpid(), objnum); + if (pid) { + snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, pid, objnum); + } else { + snprintf(s, sizeof(hostname), "%s_%d_object%d", hostname, getpid(), objnum); + } } int write_bench(Rados& rados, rados_pool_t pool, int secondsToRun, int concurrentios, bench_data *data); int seq_read_bench(Rados& rados, rados_pool_t pool, - int secondsToRun, int concurrentios, bench_data *data); + int secondsToRun, int concurrentios, bench_data *data, + int writePid); void *status_printer(void * data_store); void sanitize_object_contents(bench_data *data, int length); @@ -68,11 +73,12 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation, int num_objects = 0; char* contentsChars = new char[op_size]; int r = 0; + int prevPid = 0; //get data from previous write run, if available if (operation != OP_WRITE) { bufferlist object_data; - r = rados.read(pool, BENCH_DATA, 0, object_data, sizeof(int)*2); + r = rados.read(pool, BENCH_DATA, 0, object_data, sizeof(int)*3); if (r <= 0) { delete[] contentsChars; if (r == -2) @@ -82,6 +88,7 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation, bufferlist::iterator p = object_data.begin(); ::decode(object_size, p); ::decode(num_objects, p); + ::decode(prevPid, p); } else { object_size = op_size; } @@ -108,7 +115,7 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation, if (r != 0) goto out; } else if (OP_SEQ_READ == operation) { - r = seq_read_bench(rados, pool, secondsToRun, concurrentios, data); + r = seq_read_bench(rados, pool, secondsToRun, concurrentios, data, prevPid); if (r != 0) goto out; } else if (OP_RAND_READ == operation) { @@ -122,6 +129,13 @@ int aio_bench(Rados& rados, rados_pool_t pool, int operation, return r; } +void _aio_cb(void *cb, void *arg) { + dataLock.Lock(); + Cond *cond = (Cond *) arg; + cond->Signal(); + dataLock.Unlock(); +} + int write_bench(Rados& rados, rados_pool_t pool, int secondsToRun, int concurrentios, bench_data *data) { cout << "Maintaining " << concurrentios << " concurrent writes of " @@ -135,6 +149,10 @@ int write_bench(Rados& rados, rados_pool_t pool, utime_t start_times[concurrentios]; utime_t stopTime; int r = 0; + bufferlist b_write; + Cond cond; + utime_t runtime; + utime_t timePassed; //set up writes so I can start them together for (int i = 0; iobject_size, completions[i]); if (r < 0) { //naughty, doesn't clean up heap - dataLock.Unlock(); - return -5; //EIO + goto ERR; } dataLock.Lock(); ++data->started; @@ -169,15 +187,25 @@ int write_bench(Rados& rados, rados_pool_t pool, int slot; bufferlist* newContents; char* newName; - utime_t runtime; - utime_t timePassed = g_clock.now() - data->start_time; //don't need locking for reads because other thread doesn't write runtime.set_from_double(secondsToRun); stopTime = data->start_time + runtime; while( g_clock.now() < stopTime ) { - slot = data->finished % concurrentios; + dataLock.Lock(); + while (1) { + for (slot = 0; slot < concurrentios; ++slot) { + if (completions[slot]->is_safe()) { + break; + } + } + if (slot < concurrentios) { + break; + } + cond.Wait(dataLock); + } + dataLock.Unlock(); //create new contents and name on the heap, and fill them newContents = new bufferlist(); newName = new char[128]; @@ -189,7 +217,7 @@ int write_bench(Rados& rados, rados_pool_t pool, r = completions[slot]->get_return_value(); if (r != 0) { dataLock.Unlock(); - return r; + goto ERR; } data->cur_latency = g_clock.now() - start_times[slot]; total_latency += data->cur_latency; @@ -206,10 +234,12 @@ int write_bench(Rados& rados, rados_pool_t pool, //write new stuff to rados, then delete old stuff //and save locations of new stuff for later deletion start_times[slot] = g_clock.now(); - completions[slot] = rados.aio_create_completion(); + completions[slot] = rados.aio_create_completion((void *) &cond, 0, + &_aio_cb); r = rados.aio_write(pool, newName, 0, *newContents, data->object_size, completions[slot]); - if (r < 0) //naughty; doesn't clean up heap space. - return r; + if (r < 0) {//naughty; doesn't clean up heap space. + goto ERR; + } dataLock.Lock(); ++data->started; ++data->in_flight; @@ -227,7 +257,7 @@ int write_bench(Rados& rados, rados_pool_t pool, r = completions[slot]->get_return_value(); if (r != 0) { dataLock.Unlock(); - return r; + goto ERR; } data->cur_latency = g_clock.now() - start_times[slot]; total_latency += data->cur_latency; @@ -268,15 +298,22 @@ int write_bench(Rados& rados, rados_pool_t pool, int written_objects[2]; written_objects[0] = data->object_size; written_objects[1] = data->finished; - bufferlist b_write; ::encode(data->object_size, b_write); ::encode(data->finished, b_write); - rados.write(pool, BENCH_DATA, 0, b_write, sizeof(int)*2); + ::encode(getpid(), b_write); + rados.write(pool, BENCH_DATA, 0, b_write, sizeof(int)*3); return 0; + + ERR: + dataLock.Lock(); + data->done = 1; + dataLock.Unlock(); + pthread_join(print_thread, NULL); + return -5; } int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, - int concurrentios, bench_data *write_data) { + int concurrentios, bench_data *write_data, int pid) { bench_data *data = new bench_data(); data->done = false; data->object_size = write_data->object_size; @@ -289,9 +326,11 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, data->avg_latency = 0; data->object_contents = write_data->object_contents; + Cond cond; Rados::AioCompletion* completions[concurrentios]; char* name[concurrentios]; bufferlist* contents[concurrentios]; + int index[concurrentios]; int errors = 0; utime_t start_time; utime_t start_times[concurrentios]; @@ -299,13 +338,14 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, time_to_run.set_from_double(seconds_to_run); double total_latency = 0; int r = 0; + utime_t runtime; sanitize_object_contents(data, 128); //clean it up once; subsequent //changes will be safe because string length monotonically increases //set up initial reads for (int i = 0; i < concurrentios; ++i) { name[i] = new char[128]; - generate_object_name(name[i], i); + generate_object_name(name[i], i, pid); contents[i] = new bufferlist(); } @@ -314,42 +354,55 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, dataLock.Lock(); data->start_time = g_clock.now(); + dataLock.Unlock(); + utime_t finish_time = data->start_time + time_to_run; //start initial reads for (int i = 0; i < concurrentios; ++i) { + index[i] = i; start_times[i] = g_clock.now(); - completions[i] = rados.aio_create_completion(); + completions[i] = rados.aio_create_completion((void *) &cond, + &_aio_cb, 0); r = rados.aio_read(pool, name[i], 0, contents[i], data->object_size, completions[i]); if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! cerr << "r = " << r << std::endl; - dataLock.Unlock(); - return -5; //EIO + goto ERR; } + dataLock.Lock(); ++data->started; ++data->in_flight; + dataLock.Unlock(); } - dataLock.Unlock(); //keep on adding new reads as old ones complete int slot; char* newName; - utime_t runtime; bufferlist *cur_contents; - utime_t finish_time = data->start_time + time_to_run; - bool not_overtime = true; - for (int i = 0; - i < write_data->finished - concurrentios && not_overtime; - ++i) { - slot = data->finished % concurrentios; + while (seconds_to_run && (g_clock.now() < finish_time)) { + dataLock.Lock(); + while (1) { + for (slot = 0; slot < concurrentios; ++slot) { + if (completions[slot]->is_complete()) { + break; + } + } + if (slot < concurrentios) { + break; + } + cond.Wait(dataLock); + } + dataLock.Unlock(); newName = new char[128]; - generate_object_name(newName, data->started); + generate_object_name(newName, data->started, pid); + int current_index = index[slot]; + index[slot] = data->started; completions[slot]->wait_for_complete(); dataLock.Lock(); r = completions[slot]->get_return_value(); if (r != 0) { cerr << "read got " << r << std::endl; dataLock.Unlock(); - return r; + goto ERR; } data->cur_latency = g_clock.now() - start_times[slot]; total_latency += data->cur_latency; @@ -366,24 +419,24 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, //start new read and check data if requested start_times[slot] = g_clock.now(); contents[slot] = new bufferlist(); - completions[slot] = rados.aio_create_completion(); + completions[slot] = rados.aio_create_completion((void *) &cond, + &_aio_cb, 0); r = rados.aio_read(pool, newName, 0, contents[slot], data->object_size, completions[slot]); - if (r < 0) - return r; + if (r < 0) { + goto ERR; + } dataLock.Lock(); ++data->started; ++data->in_flight; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", i); + snprintf(data->object_contents, data->object_size, "I'm the %dth object!", current_index); dataLock.Unlock(); if (memcmp(data->object_contents, cur_contents->c_str(), data->object_size) != 0) { - cerr << name[slot] << " is not correct!"; + cerr << name[slot] << " is not correct!" << std::endl; ++errors; } delete name[slot]; name[slot] = newName; delete cur_contents; - if (seconds_to_run != 0) - not_overtime = !(g_clock.now() > finish_time); } //wait for final reads to complete @@ -395,7 +448,7 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, if (r != 0) { cerr << "read got " << r << std::endl; dataLock.Unlock(); - return r; + goto ERR; } data->cur_latency = g_clock.now() - start_times[slot]; total_latency += data->cur_latency; @@ -406,7 +459,7 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, --data->in_flight; completions[slot]-> release(); completions[slot] = 0; - snprintf(data->object_contents, data->object_size, "I'm the %dth object!", data->finished-1); + snprintf(data->object_contents, data->object_size, "I'm the %dth object!", index[slot]); dataLock.Unlock(); if (memcmp(data->object_contents, contents[slot]->c_str(), data->object_size) != 0) { cerr << name[slot] << " is not correct!" << std::endl; @@ -439,6 +492,13 @@ int seq_read_bench(Rados& rados, rados_pool_t pool, int seconds_to_run, delete data; return 0; + + ERR: + dataLock.Lock(); + data->done = 1; + dataLock.Unlock(); + pthread_join(print_thread, NULL); + return -5; }