Merge pull request #14420 from optimistyzy/329

os/bluestore/NVMEDevice: Add multiple thread support for SPDK I/O thread

Reviewed-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Sage Weil 2017-04-13 12:34:51 -05:00 committed by GitHub
commit 9967fceb62

View File

@ -43,6 +43,7 @@
#include "common/errno.h"
#include "common/debug.h"
#include "common/perf_counters.h"
#include "common/io_priority.h"
#include "NVMEDevice.h"
@ -51,14 +52,14 @@
#undef dout_prefix
#define dout_prefix *_dout << "bdev(" << sn << ") "
std::vector<void*> data_buf_mempool;
static constexpr uint16_t data_buffer_default_num = 2048;
static constexpr uint32_t data_buffer_size = 8192;
static constexpr uint16_t inline_segment_num = 32;
static thread_local int queue_id = -1;
enum {
l_bluestore_nvmedevice_first = 632430,
l_bluestore_nvmedevice_aio_write_lat,
@ -94,6 +95,205 @@ struct IORequest {
void **extra_segs = nullptr;
};
class SharedDriverQueueData {
SharedDriverData *driver;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
std::string sn;
uint64_t block_size;
uint32_t sector_size;
uint32_t core_id;
uint32_t queueid;
struct spdk_nvme_qpair *qpair;
std::function<void ()> run_func;
friend class AioCompletionThread;
bool aio_stop = false;
void _aio_thread();
int alloc_buf_from_pool(Task *t, bool write);
std::atomic_bool queue_empty;
Mutex queue_lock;
Cond queue_cond;
std::queue<Task*> task_queue;
Mutex flush_lock;
Cond flush_cond;
std::atomic_int flush_waiters;
std::set<uint64_t> flush_waiter_seqs;
public:
std::atomic_ulong completed_op_seq, queue_op_seq;
std::vector<void*> data_buf_mempool;
PerfCounters *logger = nullptr;
SharedDriverQueueData(SharedDriverData *driver, spdk_nvme_ctrlr *c, spdk_nvme_ns *ns, uint64_t block_size,
const std::string &sn_tag, uint32_t sector_size, uint32_t core, uint32_t queue_id)
: driver(driver),
ctrlr(c),
ns(ns),
sn(sn_tag),
block_size(block_size),
sector_size(sector_size),
core_id(core),
queueid(queue_id),
run_func([this]() { _aio_thread(); }),
queue_empty(false),
queue_lock("NVMEDevice::queue_lock"),
flush_lock("NVMEDevice::flush_lock"),
flush_waiters(0),
completed_op_seq(0), queue_op_seq(0) {
qpair = spdk_nvme_ctrlr_alloc_io_qpair(ctrlr, SPDK_NVME_QPRIO_URGENT);
PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
logger = b.create_perf_counters();
g_ceph_context->get_perfcounters_collection()->add(logger);
}
void queue_task(Task *t, uint64_t ops = 1) {
queue_op_seq += ops;
Mutex::Locker l(queue_lock);
task_queue.push(t);
if (queue_empty.load()) {
queue_empty = false;
queue_cond.Signal();
}
}
void flush_wait() {
uint64_t cur_seq = queue_op_seq.load();
uint64_t left = cur_seq - completed_op_seq.load();
if (cur_seq > completed_op_seq) {
// TODO: this may contains read op
dout(10) << __func__ << " existed inflight ops " << left << dendl;
Mutex::Locker l(flush_lock);
++flush_waiters;
flush_waiter_seqs.insert(cur_seq);
while (cur_seq > completed_op_seq.load()) {
flush_cond.Wait(flush_lock);
}
flush_waiter_seqs.erase(cur_seq);
--flush_waiters;
}
}
void start() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
core_id);
assert(r == 0);
}
void stop() {
{
Mutex::Locker l(queue_lock);
aio_stop = true;
queue_cond.Signal();
}
int r = rte_eal_wait_lcore(core_id);
assert(r == 0);
aio_stop = false;
}
~SharedDriverQueueData() {
g_ceph_context->get_perfcounters_collection()->remove(logger);
if (!qpair) {
spdk_nvme_ctrlr_free_io_qpair(qpair);
}
delete logger;
}
};
class SharedDriverData {
unsigned id;
uint32_t core_id;
std::string sn;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
uint64_t block_size = 0;
uint32_t sector_size = 0;
uint64_t size = 0;
uint32_t queue_number;
std::vector<SharedDriverQueueData*> queues;
void _aio_start() {
for (auto &&it : queues)
it->start();
}
void _aio_stop() {
for (auto &&it : queues)
it->stop();
}
public:
std::vector<NVMEDevice*> registered_devices;
SharedDriverData(unsigned _id, const std::string &sn_tag,
spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
: id(_id),
sn(sn_tag),
ctrlr(c),
ns(ns) {
int i;
sector_size = spdk_nvme_ns_get_sector_size(ns);
block_size = std::max(CEPH_PAGE_SIZE, sector_size);
size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
RTE_LCORE_FOREACH_SLAVE(i) {
queues.push_back(new SharedDriverQueueData(this, ctrlr, ns, block_size, sn, sector_size, i, queue_number++));
}
_aio_start();
}
bool is_equal(const string &tag) const { return sn == tag; }
~SharedDriverData() {
for (auto p : queues) {
delete p;
}
}
SharedDriverQueueData *get_queue(uint32_t i) {
return queues.at(i%queue_number);
}
void register_device(NVMEDevice *device) {
// in case of registered_devices, we stop thread now.
// Because release is really a rare case, we could bear this
_aio_stop();
registered_devices.push_back(device);
_aio_start();
}
void remove_device(NVMEDevice *device) {
_aio_stop();
std::vector<NVMEDevice*> new_devices;
for (auto &&it : registered_devices) {
if (it != device)
new_devices.push_back(it);
}
registered_devices.swap(new_devices);
_aio_start();
}
uint64_t get_block_size() {
return block_size;
}
uint64_t get_size() {
return size;
}
};
struct Task {
NVMEDevice *device;
IOContext *ctx = nullptr;
@ -108,6 +308,7 @@ struct Task {
IORequest io_request;
std::mutex lock;
std::condition_variable cond;
SharedDriverQueueData *queue;
Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
: device(dev), command(c), offset(off), len(l),
return_code(rc),
@ -115,14 +316,14 @@ struct Task {
~Task() {
assert(!io_request.nseg);
}
void release_segs() {
void release_segs(SharedDriverQueueData *queue_data) {
if (io_request.extra_segs) {
for (uint16_t i = 0; i < io_request.nseg; i++)
data_buf_mempool.push_back(io_request.extra_segs[i]);
queue_data->data_buf_mempool.push_back(io_request.extra_segs[i]);
delete io_request.extra_segs;
} else if (io_request.nseg) {
for (uint16_t i = 0; i < io_request.nseg; i++)
data_buf_mempool.push_back(io_request.inline_segs[i]);
queue_data->data_buf_mempool.push_back(io_request.inline_segs[i]);
}
io_request.nseg = 0;
}
@ -153,147 +354,6 @@ struct Task {
}
};
class SharedDriverData {
unsigned id;
uint32_t core_id;
std::string sn;
spdk_nvme_ctrlr *ctrlr;
spdk_nvme_ns *ns;
struct spdk_nvme_qpair *qpair;
std::function<void ()> run_func;
uint64_t block_size = 0;
uint32_t sector_size = 0;
uint64_t size = 0;
std::vector<NVMEDevice*> registered_devices;
friend class AioCompletionThread;
bool aio_stop = false;
void _aio_thread();
void _aio_start() {
int r = rte_eal_remote_launch(dpdk_thread_adaptor, static_cast<void*>(&run_func),
core_id);
assert(r == 0);
}
void _aio_stop() {
{
Mutex::Locker l(queue_lock);
aio_stop = true;
queue_cond.Signal();
}
int r = rte_eal_wait_lcore(core_id);
assert(r == 0);
aio_stop = false;
}
std::atomic_bool queue_empty;
Mutex queue_lock;
Cond queue_cond;
std::queue<Task*> task_queue;
Mutex flush_lock;
Cond flush_cond;
std::atomic_int flush_waiters;
std::set<uint64_t> flush_waiter_seqs;
public:
std::atomic_ulong completed_op_seq, queue_op_seq;
PerfCounters *logger = nullptr;
SharedDriverData(unsigned i, uint32_t core, const std::string &sn_tag,
spdk_nvme_ctrlr *c, spdk_nvme_ns *ns)
: id(i),
core_id(core),
sn(sn_tag),
ctrlr(c),
ns(ns),
run_func([this]() { _aio_thread(); }),
queue_empty(false),
queue_lock("NVMEDevice::queue_lock"),
flush_lock("NVMEDevice::flush_lock"),
flush_waiters(0),
completed_op_seq(0), queue_op_seq(0) {
sector_size = spdk_nvme_ns_get_sector_size(ns);
block_size = std::max(CEPH_PAGE_SIZE, sector_size);
size = ((uint64_t)sector_size) * spdk_nvme_ns_get_num_sectors(ns);
qpair = spdk_nvme_ctrlr_alloc_io_qpair(c, SPDK_NVME_QPRIO_URGENT);
PerfCountersBuilder b(g_ceph_context, string("NVMEDevice-AIOThread-"+stringify(this)),
l_bluestore_nvmedevice_first, l_bluestore_nvmedevice_last);
b.add_time_avg(l_bluestore_nvmedevice_aio_write_lat, "aio_write_lat", "Average write completing latency");
b.add_time_avg(l_bluestore_nvmedevice_read_lat, "read_lat", "Average read completing latency");
b.add_time_avg(l_bluestore_nvmedevice_flush_lat, "flush_lat", "Average flush completing latency");
b.add_u64(l_bluestore_nvmedevice_queue_ops, "queue_ops", "Operations in nvme queue");
b.add_time_avg(l_bluestore_nvmedevice_polling_lat, "polling_lat", "Average polling latency");
b.add_time_avg(l_bluestore_nvmedevice_aio_write_queue_lat, "aio_write_queue_lat", "Average queue write request latency");
b.add_time_avg(l_bluestore_nvmedevice_read_queue_lat, "read_queue_lat", "Average queue read request latency");
b.add_time_avg(l_bluestore_nvmedevice_flush_queue_lat, "flush_queue_lat", "Average queue flush request latency");
b.add_u64_counter(l_bluestore_nvmedevice_buffer_alloc_failed, "buffer_alloc_failed", "Alloc data buffer failed count");
logger = b.create_perf_counters();
g_ceph_context->get_perfcounters_collection()->add(logger);
_aio_start();
}
~SharedDriverData() {
g_ceph_context->get_perfcounters_collection()->remove(logger);
if (!qpair) {
spdk_nvme_ctrlr_free_io_qpair(qpair);
}
delete logger;
}
bool is_equal(const string &tag) const { return sn == tag; }
void register_device(NVMEDevice *device) {
// in case of registered_devices, we stop thread now.
// Because release is really a rare case, we could bear this
_aio_stop();
registered_devices.push_back(device);
_aio_start();
}
void remove_device(NVMEDevice *device) {
_aio_stop();
std::vector<NVMEDevice*> new_devices;
for (auto &&it : registered_devices) {
if (it != device)
new_devices.push_back(it);
}
registered_devices.swap(new_devices);
_aio_start();
}
uint64_t get_block_size() {
return block_size;
}
uint64_t get_size() {
return size;
}
void queue_task(Task *t, uint64_t ops = 1) {
queue_op_seq += ops;
Mutex::Locker l(queue_lock);
task_queue.push(t);
if (queue_empty.load()) {
queue_empty = false;
queue_cond.Signal();
}
}
void flush_wait() {
uint64_t cur_seq = queue_op_seq.load();
uint64_t left = cur_seq - completed_op_seq.load();
if (cur_seq > completed_op_seq) {
// TODO: this may contains read op
dout(10) << __func__ << " existed inflight ops " << left << dendl;
Mutex::Locker l(flush_lock);
++flush_waiters;
flush_waiter_seqs.insert(cur_seq);
while (cur_seq > completed_op_seq.load()) {
flush_cond.Wait(flush_lock);
}
flush_waiter_seqs.erase(cur_seq);
--flush_waiters;
}
}
};
static void data_buf_reset_sgl(void *cb_arg, uint32_t sgl_offset)
{
Task *t = static_cast<Task*>(cb_arg);
@ -349,7 +409,7 @@ static int data_buf_next_sge(void *cb_arg, void **address, uint32_t *length)
return 0;
}
static int alloc_buf_from_pool(Task *t, bool write)
int SharedDriverQueueData::alloc_buf_from_pool(Task *t, bool write)
{
uint64_t count = t->len / data_buffer_size;
if (t->len % data_buffer_size)
@ -382,7 +442,7 @@ static int alloc_buf_from_pool(Task *t, bool write)
return 0;
}
void SharedDriverData::_aio_thread()
void SharedDriverQueueData::_aio_thread()
{
dout(1) << __func__ << " start" << dendl;
@ -400,6 +460,7 @@ void SharedDriverData::_aio_thread()
Task *t = nullptr;
int r = 0;
uint64_t lba_off, lba_count;
ceph::coarse_real_clock::time_point cur, start
= ceph::coarse_real_clock::now();
while (true) {
@ -414,6 +475,7 @@ void SharedDriverData::_aio_thread()
}
for (; t; t = t->next) {
t->queue = this;
lba_off = t->offset / sector_size;
lba_count = t->len / sector_size;
switch (t->command) {
@ -432,7 +494,7 @@ void SharedDriverData::_aio_thread()
if (r < 0) {
derr << __func__ << " failed to do write command" << dendl;
t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
t->release_segs();
t->release_segs(this);
delete t;
ceph_abort();
}
@ -455,7 +517,7 @@ void SharedDriverData::_aio_thread()
data_buf_reset_sgl, data_buf_next_sge);
if (r < 0) {
derr << __func__ << " failed to read" << dendl;
t->release_segs();
t->release_segs(this);
delete t;
ceph_abort();
} else {
@ -471,7 +533,7 @@ void SharedDriverData::_aio_thread()
r = spdk_nvme_ns_cmd_flush(ns, qpair, io_complete, t);
if (r < 0) {
derr << __func__ << " failed to flush" << dendl;
t->release_segs();
t->release_segs(this);
delete t;
ceph_abort();
} else {
@ -500,10 +562,13 @@ void SharedDriverData::_aio_thread()
flush_cond.Signal();
}
if (!inflight) {
for (auto &&it : registered_devices)
it->reap_ioc();
// be careful, here we need to let each thread reap its own, currently it is done
// by only one dedicatd dpdk thread
if(!queueid) {
for (auto &&it : driver->registered_devices)
it->reap_ioc();
}
Mutex::Locker l(queue_lock);
if (queue_empty.load()) {
@ -568,7 +633,7 @@ class NVMEManager {
// only support one device per osd now!
assert(shared_driver_datas.empty());
// index 0 is occured by master thread
shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, rte_get_next_lcore(-1, 1, 0), sn_tag, c, ns));
shared_driver_datas.push_back(new SharedDriverData(shared_driver_datas.size()+1, sn_tag, c, ns));
*driver = shared_driver_datas.back();
}
};
@ -755,17 +820,18 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
{
Task *task = static_cast<Task*>(t);
IOContext *ctx = task->ctx;
SharedDriverData *driver = task->device->get_driver();
SharedDriverQueueData *queue = task->queue;
assert(queue != NULL);
assert(ctx != NULL);
++driver->completed_op_seq;
++queue->completed_op_seq;
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - task->start);
if (task->command == IOCommand::WRITE_COMMAND) {
driver->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
queue->logger->tinc(l_bluestore_nvmedevice_aio_write_lat, dur);
assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " write/zero op successfully, left "
<< driver->queue_op_seq - driver->completed_op_seq << dendl;
<< queue->queue_op_seq - queue->completed_op_seq << dendl;
// check waiting count before doing callback (which may
// destroy this ioc).
if (!--ctx->num_running) {
@ -774,14 +840,14 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
}
}
task->release_segs();
task->release_segs(queue);
delete task;
} else if (task->command == IOCommand::READ_COMMAND) {
driver->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
queue->logger->tinc(l_bluestore_nvmedevice_read_lat, dur);
assert(!spdk_nvme_cpl_is_error(completion));
dout(20) << __func__ << " read op successfully" << dendl;
task->fill_cb();
task->release_segs();
task->release_segs(queue);
// read submitted by AIO
if(!task->return_code) {
if (!--ctx->num_running) {
@ -800,7 +866,7 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
} else {
assert(task->command == IOCommand::FLUSH_COMMAND);
assert(!spdk_nvme_cpl_is_error(completion));
driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
dout(20) << __func__ << " flush op successfully" << dendl;
task->return_code = 0;
ctx->aio_wake();
@ -907,10 +973,15 @@ int NVMEDevice::flush()
{
dout(10) << __func__ << " start" << dendl;
auto start = ceph::coarse_real_clock::now();
driver->flush_wait();
if(queue_id == -1)
queue_id = ceph_gettid();
SharedDriverQueueData *queue = driver->get_queue(queue_id);
assert(queue != NULL);
queue->flush_wait();
auto dur = std::chrono::duration_cast<std::chrono::nanoseconds>(
ceph::coarse_real_clock::now() - start);
driver->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
queue->logger->tinc(l_bluestore_nvmedevice_flush_lat, dur);
return 0;
}
@ -926,7 +997,9 @@ void NVMEDevice::aio_submit(IOContext *ioc)
ioc->num_pending -= pending;
assert(ioc->num_pending.load() == 0); // we should be only thread doing this
// Only need to push the first entry
driver->queue_task(t, pending);
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t, pending);
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
@ -954,7 +1027,9 @@ int NVMEDevice::aio_write(
if (buffered) {
// Only need to push the first entry
driver->queue_task(t);
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t);
} else {
t->ctx = ioc;
Task *first = static_cast<Task*>(ioc->nvme_task_first);
@ -992,7 +1067,9 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
t->copy_to_buf(buf, 0, t->len);
};
++ioc->num_reading;
driver->queue_task(t);
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t);
while(t->return_code > 0) {
t->io_wait();
@ -1038,8 +1115,6 @@ int NVMEDevice::aio_read(
return 0;
}
int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered)
{
assert(len > 0);
@ -1058,7 +1133,9 @@ int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered
t->copy_to_buf(buf, off-t->offset, len);
};
++ioc.num_reading;
driver->queue_task(t);
if(queue_id == -1)
queue_id = ceph_gettid();
driver->get_queue(queue_id)->queue_task(t);
while(t->return_code > 0) {
t->io_wait();