test/fio: add options to insert attributes/omaps into write transaction.

Signed-off-by: Igor Fedotov <ifedotov@suse.com>
This commit is contained in:
Igor Fedotov 2017-11-20 21:59:04 +03:00
parent b1378b343a
commit 120ef367e8
2 changed files with 242 additions and 8 deletions

View File

@ -5,6 +5,28 @@ ioengine=libfio_ceph_objectstore.so # must be found in your LD_LIBRARY_PATH
conf=ceph-bluestore.conf # must point to a valid ceph configuration file conf=ceph-bluestore.conf # must point to a valid ceph configuration file
directory=/mnt/fio-bluestore # directory for osd_data directory=/mnt/fio-bluestore # directory for osd_data
#oi_attr_len=350-4000 # specifies OI(aka '_') attribute length range to couple
# writes with. Default: 0 (disabled)
#snapset_attr_len=35 # specifies snapset attribute length range to couple
# writes with. Default: 0 (disabled)
#_fastinfo_omap_len=186 # specifies _fastinfo omap entry length range to
# couple writes with. Default: 0 (disabled)
#pglog_simulation=1 # couples write and omap generation in OSD PG log manner.
# Ceph's osd_min_pg_log_entries, osd_pg_log_trim_min,
# osd_pg_log_dups_tracked settings control cyclic
# omap keys creation/removal.
# Following additional FIO pglog_ settings to apply too:
#pglog_omap_len=173 # specifies PG log entry length range to couple
# writes with. Default: 0 (disabled)
#pglog_dup_omap_len=57 # specifies duplicate PG log entry length range
# to couple writes with. Default: 0 (disabled)
rw=randwrite rw=randwrite
iodepth=16 iodepth=16

View File

@ -17,12 +17,14 @@
#include "common/errno.h" #include "common/errno.h"
#include "include/intarith.h" #include "include/intarith.h"
#include "include/stringify.h" #include "include/stringify.h"
#include "include/random.h"
#include "common/perf_counters.h" #include "common/perf_counters.h"
#include <fio.h> #include <fio.h>
#include <optgroup.h> #include <optgroup.h>
#include "include/assert.h" // fio.h clobbers our assert.h #include "include/assert.h" // fio.h clobbers our assert.h
#include <algorithm>
#define dout_context g_ceph_context #define dout_context g_ceph_context
#define dout_subsys ceph_subsys_ #define dout_subsys ceph_subsys_
@ -33,6 +35,18 @@ namespace {
struct Options { struct Options {
thread_data* td; thread_data* td;
char* conf; char* conf;
unsigned long long
oi_attr_len_low,
oi_attr_len_high,
snapset_attr_len_low,
snapset_attr_len_high,
pglog_omap_len_low,
pglog_omap_len_high,
pglog_dup_omap_len_low,
pglog_dup_omap_len_high,
_fastinfo_omap_len_low,
_fastinfo_omap_len_high;
bool simulate_pglog;
}; };
template <class Func> // void Func(fio_option&) template <class Func> // void Func(fio_option&)
@ -54,10 +68,67 @@ static std::vector<fio_option> ceph_options{
o.help = "Path to a ceph configuration file"; o.help = "Path to a ceph configuration file";
o.off1 = offsetof(Options, conf); o.off1 = offsetof(Options, conf);
}), }),
make_option([] (fio_option& o) {
o.name = "oi_attr_len";
o.lname = "OI Attr length";
o.type = FIO_OPT_STR_VAL;
o.help = "Set OI(aka '_') attribute to specified length";
o.off1 = offsetof(Options, oi_attr_len_low);
o.off2 = offsetof(Options, oi_attr_len_high);
o.def = 0;
o.minval = 0;
}),
make_option([] (fio_option& o) {
o.name = "snapset_attr_len";
o.lname = "Attr 'snapset' length";
o.type = FIO_OPT_STR_VAL;
o.help = "Set 'snapset' attribute to specified length";
o.off1 = offsetof(Options, snapset_attr_len_low);
o.off2 = offsetof(Options, snapset_attr_len_high);
o.def = 0;
o.minval = 0;
}),
make_option([] (fio_option& o) {
o.name = "_fastinfo_omap_len";
o.lname = "'_fastinfo' omap entry length";
o.type = FIO_OPT_STR_VAL;
o.help = "Set '_fastinfo' OMAP attribute to specified length";
o.off1 = offsetof(Options, _fastinfo_omap_len_low);
o.off2 = offsetof(Options, _fastinfo_omap_len_high);
o.def = 0;
o.minval = 0;
}),
make_option([] (fio_option& o) {
o.name = "pglog_simulation";
o.lname = "pglog behavior simulation";
o.type = FIO_OPT_BOOL;
o.help = "Enables PG Log simulation behavior";
o.off1 = offsetof(Options, simulate_pglog);
o.def = "0";
}),
make_option([] (fio_option& o) {
o.name = "pglog_omap_len";
o.lname = "pglog omap entry length";
o.type = FIO_OPT_STR_VAL;
o.help = "Set pglog omap entry to specified length";
o.off1 = offsetof(Options, pglog_omap_len_low);
o.off2 = offsetof(Options, pglog_omap_len_high);
o.def = 0;
o.minval = 0;
}),
make_option([] (fio_option& o) {
o.name = "pglog_dup_omap_len";
o.lname = "uplicate pglog omap entry length";
o.type = FIO_OPT_STR_VAL;
o.help = "Set duplicate pglog omap entry to specified length";
o.off1 = offsetof(Options, pglog_dup_omap_len_low);
o.off2 = offsetof(Options, pglog_dup_omap_len_high);
o.def = 0;
o.minval = 0;
}),
{} // fio expects a 'null'-terminated list {} // fio expects a 'null'-terminated list
}; };
/// global engine state shared between all jobs within the process. this /// global engine state shared between all jobs within the process. this
/// includes g_ceph_context and the ObjectStore instance /// includes g_ceph_context and the ObjectStore instance
struct Engine { struct Engine {
@ -68,7 +139,7 @@ struct Engine {
std::mutex lock; std::mutex lock;
int ref_count; int ref_count;
Engine(const thread_data* td); Engine(thread_data* td);
~Engine(); ~Engine();
static Engine* get_instance(thread_data* td) { static Engine* get_instance(thread_data* td) {
@ -99,18 +170,19 @@ struct Engine {
ostr << "Generate db histogram: "; ostr << "Generate db histogram: ";
os->generate_db_histogram(f); os->generate_db_histogram(f);
f->flush(ostr); f->flush(ostr);
delete f; delete f;
os->umount(); os->umount();
dout(0) << ostr.str() << dendl; dout(0) << ostr.str() << dendl;
} }
} }
}; };
Engine::Engine(const thread_data* td) : ref_count(0) Engine::Engine(thread_data* td)
: ref_count(0)
{ {
// add the ceph command line arguments // add the ceph command line arguments
auto o = static_cast<const Options*>(td->eo); auto o = static_cast<Options*>(td->eo);
if (!o->conf) { if (!o->conf) {
throw std::runtime_error("missing conf option for ceph configuration file"); throw std::runtime_error("missing conf option for ceph configuration file");
} }
@ -145,6 +217,17 @@ Engine::Engine(const thread_data* td) : ref_count(0)
num_shards = g_conf->osd_op_num_shards_ssd; num_shards = g_conf->osd_op_num_shards_ssd;
os->set_cache_shards(num_shards); os->set_cache_shards(num_shards);
//normalize options
o->oi_attr_len_high = max(o->oi_attr_len_low, o->oi_attr_len_high);
o->snapset_attr_len_high = max(o->snapset_attr_len_low,
o->snapset_attr_len_high);
o->pglog_omap_len_high = max(o->pglog_omap_len_low,
o->pglog_omap_len_high);
o->pglog_dup_omap_len_high = max(o->pglog_dup_omap_len_low,
o->pglog_dup_omap_len_high);
o->_fastinfo_omap_len_high = max(o->_fastinfo_omap_len_low,
o->_fastinfo_omap_len_high);
int r = os->mkfs(); int r = os->mkfs();
if (r < 0) if (r < 0)
throw std::system_error(-r, std::system_category(), "mkfs failed"); throw std::system_error(-r, std::system_category(), "mkfs failed");
@ -152,6 +235,7 @@ Engine::Engine(const thread_data* td) : ref_count(0)
r = os->mount(); r = os->mount();
if (r < 0) if (r < 0)
throw std::system_error(-r, std::system_category(), "mount failed"); throw std::system_error(-r, std::system_category(), "mount failed");
} }
Engine::~Engine() Engine::~Engine()
@ -165,12 +249,19 @@ struct Collection {
coll_t cid; coll_t cid;
ObjectStore::Sequencer sequencer; ObjectStore::Sequencer sequencer;
// Can't use mutex directly in vectors hence dynamic allocation
ceph::unique_ptr<std::mutex> lock;
uint64_t pglog_ver_head = 1;
uint64_t pglog_ver_tail = 1;
uint64_t pglog_dup_ver_tail = 1;
// use big pool ids to avoid clashing with existing collections // use big pool ids to avoid clashing with existing collections
static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff; static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;
Collection(const spg_t& pg) Collection(const spg_t& pg)
: pg(pg), cid(pg), sequencer(stringify(pg)) { : pg(pg), cid(pg), sequencer(stringify(pg)),
sequencer.shard_hint = pg; lock(new std::mutex) {
sequencer.shard_hint = pg;
} }
}; };
@ -191,6 +282,9 @@ struct Job {
std::vector<io_u*> events; //< completions for fio_ceph_os_event() std::vector<io_u*> events; //< completions for fio_ceph_os_event()
const bool unlink; //< unlink objects on destruction const bool unlink; //< unlink objects on destruction
bufferptr one_for_all_data; //< preallocated buffer long enough
//< to use for vairious operations
Job(Engine* engine, const thread_data* td); Job(Engine* engine, const thread_data* td);
~Job(); ~Job();
}; };
@ -224,6 +318,14 @@ Job::Job(Engine* engine, const thread_data* td)
t.create_collection(cid, split_bits); t.create_collection(cid, split_bits);
} }
auto o = static_cast<Options*>(td->eo);
unsigned long long max_data = max(o->oi_attr_len_high,
o->snapset_attr_len_high);
max_data = max(max_data, o->pglog_omap_len_high);
max_data = max(max_data, o->pglog_dup_omap_len_high);
max_data = max(max_data, o->_fastinfo_omap_len_high);
one_for_all_data = buffer::create(max_data);
const uint64_t file_size = td->o.size / max(1u, td->o.nr_files); const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
// create an object for each file in the job // create an object for each file in the job
@ -246,7 +348,7 @@ Job::Job(Engine* engine, const thread_data* td)
ObjectStore::Sequencer sequencer("job init"); ObjectStore::Sequencer sequencer("job init");
int r = engine->os->apply_transaction(&sequencer, std::move(t)); int r = engine->os->apply_transaction(&sequencer, std::move(t));
if (r) { if (r) {
engine->deref(); engine->deref();
throw std::system_error(r, std::system_category(), "job init"); throw std::system_error(r, std::system_category(), "job init");
} }
} }
@ -349,6 +451,9 @@ int fio_ceph_os_queue(thread_data* td, io_u* u)
{ {
fio_ro_check(td, u); fio_ro_check(td, u);
auto o = static_cast<const Options*>(td->eo);
auto job = static_cast<Job*>(td->io_ops_data); auto job = static_cast<Job*>(td->io_ops_data);
auto& object = job->objects[u->file->engine_pos]; auto& object = job->objects[u->file->engine_pos];
auto& coll = object.coll; auto& coll = object.coll;
@ -362,9 +467,116 @@ int fio_ceph_os_queue(thread_data* td, io_u* u)
bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf), bl.push_back(buffer::copy(reinterpret_cast<char*>(u->xfer_buf),
u->xfer_buflen ) ); u->xfer_buflen ) );
map<string,bufferptr> attrset;
map<string, bufferlist> omaps;
// enqueue a write transaction on the collection's sequencer // enqueue a write transaction on the collection's sequencer
ObjectStore::Transaction t; ObjectStore::Transaction t;
char ver_key[64];
// fill attrs if any
if (o->oi_attr_len_high) {
assert(o->oi_attr_len_high >= o->oi_attr_len_low);
// fill with the garbage as we do not care of the actual content...
job->one_for_all_data.set_length(
ceph::util::generate_random_number(
o->oi_attr_len_low, o->oi_attr_len_high));
attrset["_"] = job->one_for_all_data;
}
if (o->snapset_attr_len_high) {
assert(o->snapset_attr_len_high >= o->snapset_attr_len_low);
job->one_for_all_data.set_length(
ceph::util::generate_random_number
(o->snapset_attr_len_low, o->snapset_attr_len_high));
attrset["snapset"] = job->one_for_all_data;
}
if (o->_fastinfo_omap_len_high) {
assert(o->_fastinfo_omap_len_high >= o->_fastinfo_omap_len_low);
// fill with the garbage as we do not care of the actual content...
job->one_for_all_data.set_length(
ceph::util::generate_random_number(
o->_fastinfo_omap_len_low, o->_fastinfo_omap_len_high));
omaps["_fastinfo"].append(job->one_for_all_data);
}
uint64_t pglog_trim_head = 0, pglog_trim_tail = 0;
uint64_t pglog_dup_trim_head = 0, pglog_dup_trim_tail = 0;
if (o->simulate_pglog) {
uint64_t pglog_ver_cnt = 0;
{
std::lock_guard<std::mutex> l(*coll.lock);
pglog_ver_cnt = coll.pglog_ver_head++;
if (o->pglog_omap_len_high &&
pglog_ver_cnt >=
coll.pglog_ver_tail +
g_conf->osd_min_pg_log_entries + g_conf->osd_pg_log_trim_min) {
pglog_trim_tail = coll.pglog_ver_tail;
coll.pglog_ver_tail = pglog_trim_head =
pglog_trim_tail + g_conf->osd_pg_log_trim_min;
if (o->pglog_dup_omap_len_high &&
pglog_ver_cnt >=
coll.pglog_dup_ver_tail + g_conf->osd_pg_log_dups_tracked +
g_conf->osd_pg_log_trim_min) {
pglog_dup_trim_tail = coll.pglog_dup_ver_tail;
coll.pglog_dup_ver_tail = pglog_dup_trim_head =
pglog_dup_trim_tail + g_conf->osd_pg_log_trim_min;
}
}
}
if (o->pglog_omap_len_high) {
assert(o->pglog_omap_len_high >= o->pglog_omap_len_low);
snprintf(ver_key, sizeof(ver_key),
"0000000011.%020llu", (unsigned long long)pglog_ver_cnt);
// fill with the garbage as we do not care of the actual content...
job->one_for_all_data.set_length(
ceph::util::generate_random_number(
o->pglog_omap_len_low, o->pglog_omap_len_high));
omaps[ver_key].append(job->one_for_all_data);
}
if (o->pglog_dup_omap_len_high) {
//insert dup
assert(o->pglog_dup_omap_len_high >= o->pglog_dup_omap_len_low);
for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
snprintf(ver_key, sizeof(ver_key),
"dup_0000000011.%020llu", (unsigned long long)i);
// fill with the garbage as we do not care of the actual content...
job->one_for_all_data.set_length(
ceph::util::generate_random_number(
o->pglog_dup_omap_len_low, o->pglog_dup_omap_len_high));
omaps[ver_key].append(job->one_for_all_data);
}
}
}
if (attrset.size()) {
t.setattrs(coll.cid, object.oid, attrset);
}
t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags); t.write(coll.cid, object.oid, u->offset, u->xfer_buflen, bl, flags);
set<string> rmkeys;
for( auto i = pglog_trim_tail; i < pglog_trim_head; ++i) {
snprintf(ver_key, sizeof(ver_key),
"0000000011.%020llu", (unsigned long long)i);
rmkeys.emplace(ver_key);
}
for( auto i = pglog_dup_trim_tail; i < pglog_dup_trim_head; ++i) {
snprintf(ver_key, sizeof(ver_key),
"dup_0000000011.%020llu", (unsigned long long)i);
rmkeys.emplace(ver_key);
}
if (rmkeys.size()) {
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.omap_rmkeys(coll.cid, pgmeta_oid, rmkeys);
}
if (omaps.size()) {
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.omap_setkeys(coll.cid, pgmeta_oid, omaps);
}
os->queue_transaction(&coll.sequencer, os->queue_transaction(&coll.sequencer,
std::move(t), std::move(t),
nullptr, nullptr,