From 3f4ffe0703e8482d2784102ea3a3cb57b8364673 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Tue, 2 Jul 2024 12:56:51 +0200 Subject: [PATCH 1/7] test/allocsim: osd op scraper replayer For now this is a simple single threaded replayer with a 64 iodepth where we load all ops from a file and try to push them in order as soon as we can. Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 230 ++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 src/test/objectstore/allocsim/ops_replayer.cc diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc new file mode 100644 index 00000000000..687d27a0d76 --- /dev/null +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -0,0 +1,230 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace std; +using namespace ceph; + + +static map> string_cache; +static std::atomic in_flight_ops(0); +static std::condition_variable cv; +static std::mutex in_flight_mutex; + +enum op_type { + Write, + Read +}; + +struct Op { + time_t at; + op_type type; + uint64_t offset; + uint64_t length; + shared_ptr object; + shared_ptr collection; + shared_ptr who; + librados::AioCompletion *completion; + bufferlist read_bl; + + Op( + time_t at, + op_type type, + uint64_t offset, + uint64_t length, + shared_ptr object, + shared_ptr collection, + shared_ptr who + ) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {} + +}; + +void gen_buffer(bufferlist& bl, uint64_t size) { + std::unique_ptr buffer = std::make_unique(size); + std::independent_bits_engine e; + std::generate(buffer.get(), buffer.get()+size, std::ref(e)); + bl.append(buffer.get(), size); +} + +void completion_cb(librados::completion_t cb, void *arg) { + Op *op = static_cast(arg); + // Process the completed operation here + std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; + + delete op->completion; + op->completion = nullptr; + if (op->type == Read) { + op->read_bl.clear(); + } + + { + std::lock_guard lock(in_flight_mutex); + in_flight_ops--; + } + cv.notify_one(); +} + +int main(int argc, char** argv) { + vector ops; + librados::Rados cluster; + librados::IoCtx io; + uint64_t max_buffer_size = 0; + uint64_t io_depth = 64; + string file; + std::filesystem::path ceph_conf_path; + + if (argc < 3) { + cout << fmt::format("usage: ops_replayer file ceph.conf") << endl; + } + file = argv[1]; + ceph_conf_path = argv[2]; + cout << file << endl; + + + + string date, time, who, type, range, object, collection; + ifstream fstream(file, ifstream::in); + const char* date_format_first_column = "%Y-%m-%d"; + // we expect this input: + // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b + while (fstream >> date){ + cout << date << endl; + tm t; + char* res = strptime(date.c_str(), date_format_first_column, &t); + if (res == nullptr) { + fstream.ignore(std::numeric_limits::max(), '\n'); + continue; + } + fstream >> time >> who >> type >> range >> object >> collection; + + date += " " + time; + cout << date << endl; + // FIXME: this is wrong but it returns a reasonable bad timestamp :P + const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; + res = strptime(date.c_str(), date_format_full, &t); + time_t at = mktime(&t); + + cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + + shared_ptr who_ptr = make_shared(who); + auto who_it = string_cache.find(who); + if (who_it == string_cache.end()) { + string_cache.insert({ who, who_ptr }); + } else { + who_ptr = who_it->second; + } + + shared_ptr object_ptr = make_shared(object); + auto object_it = string_cache.find(object); + if (object_it == string_cache.end()) { + string_cache.insert({ object, object_ptr }); + } else { + object_ptr = object_it->second; + } + + shared_ptr collection_ptr = make_shared(collection); + auto collection_it = string_cache.find(collection); + if (collection_it == string_cache.end()) { + string_cache.insert({ collection, collection_ptr }); + } else { + collection_ptr = collection_it->second; + } + + uint64_t offset = 0, length = 0; + stringstream range_stream(range); + string offset_str, length_str; + getline(range_stream, offset_str, '~'); + getline(range_stream, length_str, '~'); + offset = stoll(offset_str); + length = stoll(length_str); + + max_buffer_size = max(length, max_buffer_size); + + op_type ot = type == "write" ? Write : Read; + ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + } + + int ret = cluster.init2("client.admin", "ceph", 0); + if (ret < 0) { + std::cerr << "Couldn't init ceph! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster init ready" << std::endl; + + ret = cluster.conf_read_file(ceph_conf_path.c_str()); + if (ret < 0) { + std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster config ready" << std::endl; + ret = cluster.connect(); + if (ret < 0) { + std::cerr << "Couldn't connect to cluster! error " << ret << std::endl; + return EXIT_FAILURE; + } + std::cout << "cluster connect ready" << std::endl; + cluster.ioctx_create("test_pool", io); + if (ret < 0) { + std::cerr << "Couldn't set up ioctx! error " << ret << std::endl; + exit(EXIT_FAILURE); + } + std::cout << "test-pool ready" << std::endl; + + + // process ops + // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation + bufferlist bl; + gen_buffer(bl, max_buffer_size); + + for (auto &op : ops) { + { + std::unique_lock lock(in_flight_mutex); + cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); + + } + cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); + switch (op.type) { + case Write: { + int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error writing ecode={}", ret) << endl;; + } + break; + } + case Read: { + bufferlist read; + int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error reading ecode={}", ret) << endl;; + } + break; + } + } + in_flight_ops++; + } + while (in_flight_ops > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off) + + cout << ops.size() << endl; + return 0; +} From 9efaa18302d3e4d183aebf79bd0c782781434ddf Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 10:26:23 +0200 Subject: [PATCH 2/7] test/allocsim: truncate,zero,writefull replay operations Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 105 +++++++++++++----- 1 file changed, 77 insertions(+), 28 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 687d27a0d76..a91c6b4e72a 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -30,7 +30,10 @@ static std::mutex in_flight_mutex; enum op_type { Write, - Read + WriteFull, + Read, + Truncate, + Zero }; struct Op { @@ -43,7 +46,7 @@ struct Op { shared_ptr who; librados::AioCompletion *completion; bufferlist read_bl; - + Op( time_t at, op_type type, @@ -51,9 +54,9 @@ struct Op { uint64_t length, shared_ptr object, shared_ptr collection, - shared_ptr who + shared_ptr who ) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {} - + }; void gen_buffer(bufferlist& bl, uint64_t size) { @@ -67,11 +70,11 @@ void completion_cb(librados::completion_t cb, void *arg) { Op *op = static_cast(arg); // Process the completed operation here std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; - + delete op->completion; op->completion = nullptr; if (op->type == Read) { - op->read_bl.clear(); + op->read_bl.clear(); } { @@ -89,16 +92,16 @@ int main(int argc, char** argv) { uint64_t io_depth = 64; string file; std::filesystem::path ceph_conf_path; - + if (argc < 3) { cout << fmt::format("usage: ops_replayer file ceph.conf") << endl; } file = argv[1]; ceph_conf_path = argv[2]; cout << file << endl; - - - + + + string date, time, who, type, range, object, collection; ifstream fstream(file, ifstream::in); const char* date_format_first_column = "%Y-%m-%d"; @@ -113,16 +116,16 @@ int main(int argc, char** argv) { continue; } fstream >> time >> who >> type >> range >> object >> collection; - + date += " " + time; cout << date << endl; // FIXME: this is wrong but it returns a reasonable bad timestamp :P const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; res = strptime(date.c_str(), date_format_full, &t); time_t at = mktime(&t); - + cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; - + shared_ptr who_ptr = make_shared(who); auto who_it = string_cache.find(who); if (who_it == string_cache.end()) { @@ -130,7 +133,7 @@ int main(int argc, char** argv) { } else { who_ptr = who_it->second; } - + shared_ptr object_ptr = make_shared(object); auto object_it = string_cache.find(object); if (object_it == string_cache.end()) { @@ -138,7 +141,25 @@ int main(int argc, char** argv) { } else { object_ptr = object_it->second; } - + + op_type ot; + if (type == "write") { + ot = Write; + } else if (type == "writefull") { + ot = WriteFull; + } else if (type == "read") { + ot = Read; + } else if (type == "sparse-read") { + ot = Read; + } else if (type == "truncate") { + ot = Truncate; + } else if (type == "zero") { + ot = Zero; + } else { + cout << "invalid type " << type << endl; + exit(1); + } + shared_ptr collection_ptr = make_shared(collection); auto collection_it = string_cache.find(collection); if (collection_it == string_cache.end()) { @@ -146,28 +167,31 @@ int main(int argc, char** argv) { } else { collection_ptr = collection_it->second; } - + uint64_t offset = 0, length = 0; stringstream range_stream(range); string offset_str, length_str; getline(range_stream, offset_str, '~'); - getline(range_stream, length_str, '~'); offset = stoll(offset_str); - length = stoll(length_str); - + + if (ot != Truncate) { + // Truncate doesn't only has one number + getline(range_stream, length_str, '~'); + length = stoll(length_str); + } + max_buffer_size = max(length, max_buffer_size); - - op_type ot = type == "write" ? Write : Read; + ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); } - + int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { std::cerr << "Couldn't init ceph! error " << ret << std::endl; return EXIT_FAILURE; } std::cout << "cluster init ready" << std::endl; - + ret = cluster.conf_read_file(ceph_conf_path.c_str()); if (ret < 0) { std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl; @@ -186,18 +210,18 @@ int main(int argc, char** argv) { exit(EXIT_FAILURE); } std::cout << "test-pool ready" << std::endl; - - + + // process ops // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation bufferlist bl; gen_buffer(bl, max_buffer_size); - + for (auto &op : ops) { { std::unique_lock lock(in_flight_mutex); cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); - + } cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); @@ -209,6 +233,13 @@ int main(int argc, char** argv) { } break; } + case WriteFull: { + int ret = io.aio_write_full(*op.object, op.completion, bl); + if (ret != 0) { + cout << fmt::format("Error writing full ecode={}", ret) << endl;; + } + break; + } case Read: { bufferlist read; int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); @@ -217,6 +248,24 @@ int main(int argc, char** argv) { } break; } + case Truncate: { + librados::ObjectWriteOperation write_operation; + write_operation.truncate(op.offset); + int ret = io.aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error truncating ecode={}", ret) << endl;; + } + break; + } + case Zero: { + librados::ObjectWriteOperation write_operation; + write_operation.zero(op.offset, op.length); + int ret = io.aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error zeroing ecode={}", ret) << endl;; + } + break; + } } in_flight_ops++; } @@ -224,7 +273,7 @@ int main(int argc, char** argv) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } // io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off) - + cout << ops.size() << endl; return 0; } From 0b6af743bd0192b0d745a8b8d43479e469175b75 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 10:36:05 +0200 Subject: [PATCH 3/7] test/allocsim: comment parssing cout Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index a91c6b4e72a..74f61cf92ac 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -108,7 +108,7 @@ int main(int argc, char** argv) { // we expect this input: // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b while (fstream >> date){ - cout << date << endl; + // cout << date << endl; tm t; char* res = strptime(date.c_str(), date_format_first_column, &t); if (res == nullptr) { @@ -118,13 +118,13 @@ int main(int argc, char** argv) { fstream >> time >> who >> type >> range >> object >> collection; date += " " + time; - cout << date << endl; + // cout << date << endl; // FIXME: this is wrong but it returns a reasonable bad timestamp :P const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; res = strptime(date.c_str(), date_format_full, &t); time_t at = mktime(&t); - cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; shared_ptr who_ptr = make_shared(who); auto who_it = string_cache.find(who); From 2dadace1b873893acfacd72cb6deade1981cf40f Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 10:52:28 +0200 Subject: [PATCH 4/7] test/allocsim: comment running op cout Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 74f61cf92ac..382ebdd948b 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -223,7 +223,7 @@ int main(int argc, char** argv) { cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); } - cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); switch (op.type) { case Write: { From 6bd4b29fa1ed7faf5791ef7055c56d124d2b6ab8 Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 12:21:46 +0200 Subject: [PATCH 5/7] test/allocsim: mmap threaded parser Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 240 ++++++++++++------ 1 file changed, 156 insertions(+), 84 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 382ebdd948b..9f8f56ccee5 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include #include #include #include @@ -59,6 +63,30 @@ struct Op { }; +struct ParserContext { + map> string_cache; + vector ops; + char *start; // starts and ends in new line or eof + char *end; + uint64_t max_buffer_size; +}; + +class MemoryStreamBuf : public std::streambuf { +public: + MemoryStreamBuf(const char* start, const char* end) { + this->setg(const_cast(start), const_cast(start), const_cast(end)); + } +}; + +class MemoryInputStream : public std::istream { + MemoryStreamBuf _buffer; +public: + MemoryInputStream(const char* start, const char* end) + : std::istream(&_buffer), _buffer(start, end) { + rdbuf(&_buffer); + } +}; + void gen_buffer(bufferlist& bl, uint64_t size) { std::unique_ptr buffer = std::make_unique(size); std::independent_bits_engine e; @@ -69,7 +97,7 @@ void gen_buffer(bufferlist& bl, uint64_t size) { void completion_cb(librados::completion_t cb, void *arg) { Op *op = static_cast(arg); // Process the completed operation here - std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; + // std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl; delete op->completion; op->completion = nullptr; @@ -84,6 +112,91 @@ void completion_cb(librados::completion_t cb, void *arg) { cv.notify_one(); } +void parse_entry_point(shared_ptr context) { + string date, time, who, type, range, object, collection; + MemoryInputStream fstream(context->start, context->end); + const char* date_format_first_column = "%Y-%m-%d"; + // we expect this input: + // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b + while (fstream >> date){ + // cout << date << endl; + tm t; + char* res = strptime(date.c_str(), date_format_first_column, &t); + if (res == nullptr) { + fstream.ignore(std::numeric_limits::max(), '\n'); + continue; + } + fstream >> time >> who >> type >> range >> object >> collection; + + date += " " + time; + // cout << date << endl; + // FIXME: this is wrong but it returns a reasonable bad timestamp :P + const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; + res = strptime(date.c_str(), date_format_full, &t); + time_t at = mktime(&t); + + // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; + + shared_ptr who_ptr = make_shared(who); + auto who_it = string_cache.find(who); + if (who_it == string_cache.end()) { + string_cache.insert({ who, who_ptr }); + } else { + who_ptr = who_it->second; + } + + shared_ptr object_ptr = make_shared(object); + auto object_it = string_cache.find(object); + if (object_it == string_cache.end()) { + string_cache.insert({ object, object_ptr }); + } else { + object_ptr = object_it->second; + } + + op_type ot; + if (type == "write") { + ot = Write; + } else if (type == "writefull") { + ot = WriteFull; + } else if (type == "read") { + ot = Read; + } else if (type == "sparse-read") { + ot = Read; + } else if (type == "truncate") { + ot = Truncate; + } else if (type == "zero") { + ot = Zero; + } else { + cout << "invalid type " << type << endl; + exit(1); + } + + shared_ptr collection_ptr = make_shared(collection); + auto collection_it = string_cache.find(collection); + if (collection_it == string_cache.end()) { + string_cache.insert({ collection, collection_ptr }); + } else { + collection_ptr = collection_it->second; + } + + uint64_t offset = 0, length = 0; + stringstream range_stream(range); + string offset_str, length_str; + getline(range_stream, offset_str, '~'); + offset = stoll(offset_str); + + if (ot != Truncate) { + // Truncate doesn't only has one number + getline(range_stream, length_str, '~'); + length = stoll(length_str); + } + + context->max_buffer_size = max(length, context->max_buffer_size); + + context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + } +} + int main(int argc, char** argv) { vector ops; librados::Rados cluster; @@ -100,90 +213,49 @@ int main(int argc, char** argv) { ceph_conf_path = argv[2]; cout << file << endl; - - - string date, time, who, type, range, object, collection; - ifstream fstream(file, ifstream::in); - const char* date_format_first_column = "%Y-%m-%d"; - // we expect this input: - // 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b - while (fstream >> date){ - // cout << date << endl; - tm t; - char* res = strptime(date.c_str(), date_format_first_column, &t); - if (res == nullptr) { - fstream.ignore(std::numeric_limits::max(), '\n'); - continue; - } - fstream >> time >> who >> type >> range >> object >> collection; - - date += " " + time; - // cout << date << endl; - // FIXME: this is wrong but it returns a reasonable bad timestamp :P - const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z"; - res = strptime(date.c_str(), date_format_full, &t); - time_t at = mktime(&t); - - // cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl; - - shared_ptr who_ptr = make_shared(who); - auto who_it = string_cache.find(who); - if (who_it == string_cache.end()) { - string_cache.insert({ who, who_ptr }); - } else { - who_ptr = who_it->second; - } - - shared_ptr object_ptr = make_shared(object); - auto object_it = string_cache.find(object); - if (object_it == string_cache.end()) { - string_cache.insert({ object, object_ptr }); - } else { - object_ptr = object_it->second; - } - - op_type ot; - if (type == "write") { - ot = Write; - } else if (type == "writefull") { - ot = WriteFull; - } else if (type == "read") { - ot = Read; - } else if (type == "sparse-read") { - ot = Read; - } else if (type == "truncate") { - ot = Truncate; - } else if (type == "zero") { - ot = Zero; - } else { - cout << "invalid type " << type << endl; - exit(1); - } - - shared_ptr collection_ptr = make_shared(collection); - auto collection_it = string_cache.find(collection); - if (collection_it == string_cache.end()) { - string_cache.insert({ collection, collection_ptr }); - } else { - collection_ptr = collection_it->second; - } - - uint64_t offset = 0, length = 0; - stringstream range_stream(range); - string offset_str, length_str; - getline(range_stream, offset_str, '~'); - offset = stoll(offset_str); - - if (ot != Truncate) { - // Truncate doesn't only has one number - getline(range_stream, length_str, '~'); - length = stoll(length_str); - } - - max_buffer_size = max(length, max_buffer_size); - - ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr)); + uint64_t nthreads = 16; + vector parser_threads; + vector> parser_contexts; + int fd = open(file.c_str(), O_RDONLY); + if (fd == -1) { + cout << "Error opening file" << endl; } + struct stat file_stat; + fstat(fd, &file_stat); + char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0); + if (mapped_buffer == nullptr) { + cout << "error mapping buffer" << endl; + } + uint64_t start_offset = 0; + uint64_t step_size = file_stat.st_size / nthreads; + for (int i = 0; i < nthreads; i++) { + char* end = mapped_buffer + start_offset + step_size; + while(*end != '\n') { + end--; + } + if (i == nthreads-1) { + end = mapped_buffer + file_stat.st_size; + } + shared_ptr context = make_shared(); + context->start = mapped_buffer + start_offset; + context->end = end; + context->max_buffer_size = 0; + parser_contexts.push_back(context); + parser_threads.push_back(std::thread(parse_entry_point, context)); + start_offset += (end - mapped_buffer - start_offset); + } + for (auto& t : parser_threads) { + t.join(); + } + for (auto context : parser_contexts) { + string_cache.insert(context->string_cache.begin(), context->string_cache.end()); + ops.insert(ops.end(), context->ops.begin(), context->ops.end()); + max_buffer_size = max(context->max_buffer_size, max_buffer_size); + context->string_cache.clear(); + context->ops.clear(); + } + + int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { From 100e44dbb1fb145fc1ada9044b93c5f525e6089f Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 12:59:20 +0200 Subject: [PATCH 6/7] test/allocsim: worker threads, op hashed by client Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 131 ++++++++++-------- 1 file changed, 74 insertions(+), 57 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 9f8f56ccee5..18a1739e42f 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -197,12 +198,77 @@ void parse_entry_point(shared_ptr context) { } } +void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) { + + bufferlist bl; + gen_buffer(bl, max_buffer_size); + hash hasher; + + cout << "starting thread " << io_depth << endl; + for (auto &op : ops) { + { + std::unique_lock lock(in_flight_mutex); + cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); + + } + size_t key = hasher(*op.who) % nworker_threads; + if (key != id) { + continue; + } + // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; + op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); + switch (op.type) { + case Write: { + int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error writing ecode={}", ret) << endl;; + } + break; + } + case WriteFull: { + int ret = io->aio_write_full(*op.object, op.completion, bl); + if (ret != 0) { + cout << fmt::format("Error writing full ecode={}", ret) << endl;; + } + break; + } + case Read: { + bufferlist read; + int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); + if (ret != 0) { + cout << fmt::format("Error reading ecode={}", ret) << endl;; + } + break; + } + case Truncate: { + librados::ObjectWriteOperation write_operation; + write_operation.truncate(op.offset); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error truncating ecode={}", ret) << endl;; + } + break; + } + case Zero: { + librados::ObjectWriteOperation write_operation; + write_operation.zero(op.offset, op.length); + int ret = io->aio_operate(*op.object, op.completion, &write_operation); + if (ret != 0) { + cout << fmt::format("Error zeroing ecode={}", ret) << endl;; + } + break; + } + } + in_flight_ops++; + } +} + int main(int argc, char** argv) { vector ops; librados::Rados cluster; librados::IoCtx io; uint64_t max_buffer_size = 0; - uint64_t io_depth = 64; + uint64_t io_depth = 8; string file; std::filesystem::path ceph_conf_path; @@ -255,8 +321,6 @@ int main(int argc, char** argv) { context->ops.clear(); } - - int ret = cluster.init2("client.admin", "ceph", 0); if (ret < 0) { std::cerr << "Couldn't init ceph! error " << ret << std::endl; @@ -286,60 +350,13 @@ int main(int argc, char** argv) { // process ops // Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation - bufferlist bl; - gen_buffer(bl, max_buffer_size); - - for (auto &op : ops) { - { - std::unique_lock lock(in_flight_mutex); - cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; }); - - } - // cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl; - op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); - switch (op.type) { - case Write: { - int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset); - if (ret != 0) { - cout << fmt::format("Error writing ecode={}", ret) << endl;; - } - break; - } - case WriteFull: { - int ret = io.aio_write_full(*op.object, op.completion, bl); - if (ret != 0) { - cout << fmt::format("Error writing full ecode={}", ret) << endl;; - } - break; - } - case Read: { - bufferlist read; - int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset); - if (ret != 0) { - cout << fmt::format("Error reading ecode={}", ret) << endl;; - } - break; - } - case Truncate: { - librados::ObjectWriteOperation write_operation; - write_operation.truncate(op.offset); - int ret = io.aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error truncating ecode={}", ret) << endl;; - } - break; - } - case Zero: { - librados::ObjectWriteOperation write_operation; - write_operation.zero(op.offset, op.length); - int ret = io.aio_operate(*op.object, op.completion, &write_operation); - if (ret != 0) { - cout << fmt::format("Error zeroing ecode={}", ret) << endl;; - } - break; - } - } - in_flight_ops++; + vector worker_threads; + uint64_t nworker_threads = 16; + for (int i = 0; i < nworker_threads; i++) { + worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io)); + } + for (auto& worker : worker_threads) { + worker.join(); } while (in_flight_ops > 0) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); From 7719091aef5671bc52739fd01b07f501046fb52d Mon Sep 17 00:00:00 2001 From: Pere Diaz Bou Date: Wed, 3 Jul 2024 15:55:36 +0200 Subject: [PATCH 7/7] test/allocsim: trim bufferlist before sending Signed-off-by: Pere Diaz Bou --- src/test/objectstore/allocsim/ops_replayer.cc | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/test/objectstore/allocsim/ops_replayer.cc b/src/test/objectstore/allocsim/ops_replayer.cc index 18a1739e42f..d9263076345 100644 --- a/src/test/objectstore/allocsim/ops_replayer.cc +++ b/src/test/objectstore/allocsim/ops_replayer.cc @@ -1,8 +1,6 @@ #include #include #include -#include -#include #include #include #include @@ -15,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -219,14 +216,18 @@ void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector &ops, op.completion = librados::Rados::aio_create_completion(static_cast(&op), completion_cb); switch (op.type) { case Write: { - int ret = io->aio_write(*op.object, op.completion, bl, op.length, op.offset); + bufferlist trimmed; + trimmed.substr_of(bl, 0, op.length); + int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset); if (ret != 0) { cout << fmt::format("Error writing ecode={}", ret) << endl;; } break; } case WriteFull: { - int ret = io->aio_write_full(*op.object, op.completion, bl); + bufferlist trimmed; + trimmed.substr_of(bl, 0, op.length); + int ret = io->aio_write_full(*op.object, op.completion, trimmed); if (ret != 0) { cout << fmt::format("Error writing full ecode={}", ret) << endl;; }