NVMEDevice: let aio_write submit request FIFO

Signed-off-by: Haomai Wang <haomai@xsky.com>
This commit is contained in:
Haomai Wang 2016-01-15 19:29:58 +08:00
parent 23fd140d6a
commit 47335108e1
4 changed files with 68 additions and 44 deletions

View File

@ -750,7 +750,7 @@ AM_CONDITIONAL(WITH_SPDK, [test "x$with_spdk" != "xno"])
if test "x$with_spdk" != x"no"; then
CPPFLAGS="$CPPFLAGS -I$with_spdk/include"
LDFLAGS="$LDFLAGS -L$with_spdk/lib"
LDFLAGS="$LDFLAGS -L$with_spdk/lib/nvme -L$with_spdk/lib/memory -L$with_spdk/lib/util"
AC_CHECK_HEADER([spdk/nvme.h], [], AC_MSG_ERROR([Cannot find header 'spdk/nvme.h'.]))
#AC_CHECK_LIB([spdk_nvme], [nvme_attach], [], AC_MSG_FAILURE([SPDK nvme_attach not found]))

View File

@ -24,7 +24,8 @@
struct IOContext {
void *priv;
#ifdef HAVE_SPDK
void *backend_priv = nullptr;
void *nvme_task_first = nullptr;
void *nvme_task_last = nullptr;
#endif
Mutex lock;

View File

@ -69,13 +69,14 @@ static void io_complete(void *t, const struct nvme_completion *completion) {
// destroy this ioc).
dout(20) << __func__ << " write op successfully, left " << left << dendl;
if (!left) {
ctx->backend_priv = nullptr;
if (ctx->priv)
task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
bool exist_priv = ctx->priv != nullptr;
if (ctx->num_waiting.read()) {
Mutex::Locker l(ctx->lock);
ctx->cond.Signal();
}
if (exist_priv) {
task->device->aio_callback(task->device->aio_callback_priv, ctx->priv);
}
}
utime_t lat = ceph_clock_now(g_ceph_context);
lat -= task->start;
@ -168,9 +169,10 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
char serial_number[128];
while ((pci_dev = pci_device_next(iter)) != NULL) {
dout(10) << __func__ << " found device at "<< pci_dev->bus << ":" << pci_dev->dev << ":"
<< pci_dev->func << " vendor:0x" << pci_dev->vendor_id << " device:0x" << pci_dev->device_id
<< " name:" << pci_device_get_device_name(pci_dev) << dendl;
dout(0) << __func__ << " found device at name: " << pci_device_get_device_name(pci_dev)
<< " bus: " << pci_dev->bus << ":" << pci_dev->dev << ":"
<< pci_dev->func << " vendor:0x" << pci_dev->vendor_id << " device:0x" << pci_dev->device_id
<< dendl;
r = pci_device_get_serial_number(pci_dev, serial_number, 128);
if (r < 0) {
dout(10) << __func__ << " failed to get serial number from " << pci_device_get_device_name(pci_dev) << dendl;
@ -178,7 +180,7 @@ int SharedDriverData::_scan_nvme_device(const string &sn_tag, nvme_controller **
}
if (sn_tag.compare(string(serial_number, 16))) {
dout(10) << __func__ << " device serial number not match " << serial_number << dendl;
dout(0) << __func__ << " device serial number not match " << serial_number << dendl;
continue;
}
break;
@ -347,7 +349,7 @@ int NVMEDevice::open(string p)
}
block_size = nvme_ns_get_sector_size(ns);
size = block_size * nvme_ns_get_num_sectors(ns);
aio_thread.create();
aio_thread.create("nvme_aio_thread");
dout(1) << __func__ << " size " << size << " (" << pretty_si_t(size) << "B)"
<< " block_size " << block_size << " (" << pretty_si_t(block_size)
@ -424,14 +426,14 @@ void NVMEDevice::_aio_thread()
dout(20) << __func__ << " write command issued " << lba_off << "~" << lba_count << dendl;
r = nvme_ns_cmd_write(ns, t->buf, lba_off, lba_count, io_complete, t);
if (r < 0) {
t->ctx->backend_priv = nullptr;
t->ctx->nvme_task_first = t->ctx->nvme_task_last = nullptr;
rte_free(t->buf);
rte_mempool_put(task_pool, t);
derr << __func__ << " failed to do write command" << dendl;
assert(0);
}
inflight_ops.inc();
t = t->prev;
t = t->next;
}
break;
}
@ -477,6 +479,7 @@ void NVMEDevice::_aio_thread()
}
nvme_ctrlr_process_io_completions(ctrlr, max);
reap_ioc();
}
nvme_unregister_io_thread();
dout(10) << __func__ << " end" << dendl;
@ -505,7 +508,7 @@ int NVMEDevice::flush()
t->len = 0;
t->device = this;
t->return_code = 1;
t->next = t->prev = nullptr;
t->next = nullptr;
{
Mutex::Locker l(queue_lock);
task_queue.push(t);
@ -530,17 +533,20 @@ void NVMEDevice::aio_submit(IOContext *ioc)
dout(20) << __func__ << " ioc " << ioc << " pending "
<< ioc->num_pending.read() << " running "
<< ioc->num_running.read() << dendl;
Task *t = static_cast<Task*>(ioc->backend_priv);
int pending = ioc->num_pending.read();
ioc->num_running.add(pending);
ioc->num_pending.sub(pending);
assert(ioc->num_pending.read() == 0); // we should be only thread doing this
Mutex::Locker l(queue_lock);
// Only need to push the first entry
task_queue.push(t);
if (queue_empty.read()) {
queue_empty.dec();
queue_cond.Signal();
Task *t = static_cast<Task*>(ioc->nvme_task_first);
if (pending && t) {
ioc->num_running.add(pending);
ioc->num_pending.sub(pending);
assert(ioc->num_pending.read() == 0); // we should be only thread doing this
Mutex::Locker l(queue_lock);
// Only need to push the first entry
task_queue.push(t);
if (queue_empty.read()) {
queue_empty.dec();
queue_cond.Signal();
}
ioc->nvme_task_first = ioc->nvme_task_last = nullptr;
}
}
@ -567,11 +573,11 @@ int NVMEDevice::aio_write(
t->start = ceph_clock_now(g_ceph_context);
t->buf = rte_malloc(NULL, len, block_size);
if (t->buf == NULL) {
derr << __func__ << " task->buf rte_malloc failed" << dendl;
if (t->buf == NULL) {
derr << __func__ << " task->buf rte_malloc failed" << dendl;
rte_mempool_put(task_pool, t);
return -ENOMEM;
}
}
bl.copy(0, len, static_cast<char*>(t->buf));
t->ctx = ioc;
@ -580,13 +586,27 @@ int NVMEDevice::aio_write(
t->len = len;
t->device = this;
t->return_code = 0;
Task *prev = static_cast<Task*>(ioc->backend_priv);
t->prev = prev;
if (prev)
prev->next = t;
ioc->backend_priv = t;
t->next = nullptr;
ioc->num_pending.inc();
if (0 && buffered) {
Mutex::Locker l(queue_lock);
task_queue.push(t);
if (queue_empty.read()) {
queue_empty.dec();
queue_cond.Signal();
}
ioc->num_running.inc();
t->next = nullptr;
} else {
Task *first = static_cast<Task*>(ioc->nvme_task_first);
Task *last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
t->next = nullptr;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
ioc->num_pending.inc();
}
dout(5) << __func__ << " " << off << "~" << len << dendl;
@ -650,7 +670,7 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
t->len = len;
t->device = this;
t->return_code = 1;
t->next = t->prev = nullptr;
t->next = nullptr;
ioc->num_reading.inc();;
{
Mutex::Locker l(queue_lock);
@ -684,13 +704,14 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
{
dout(5) << __func__ << " " << off << "~" << len << dendl;
assert(len > 0);
assert(off < size);
assert(off + len <= size);
uint64_t aligned_off = align_down(off, block_size);
uint64_t aligned_len = align_up(len, block_size);
uint64_t aligned_len = align_up(off+len, block_size) - aligned_off;
dout(5) << __func__ << " " << off << "~" << len
<< " aligned " << aligned_off << "~" << aligned_len << dendl;
IOContext ioc(nullptr);
Task *t;
int r = rte_mempool_get(task_pool, (void **)&t);
@ -703,16 +724,17 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
if (t->buf == NULL) {
derr << __func__ << " task->buf rte_malloc failed" << dendl;
r = -ENOMEM;
goto out;
rte_mempool_put(task_pool, t);
return r;
}
t->ctx = ioc;
t->ctx = &ioc;
t->command = IOCommand::READ_COMMAND;
t->offset = aligned_off;
t->len = aligned_len;
t->device = this;
t->return_code = 1;
t->next = t->prev = nullptr;
ioc->num_reading.inc();;
t->next = nullptr;
ioc.num_reading.inc();;
{
Mutex::Locker l(queue_lock);
task_queue.push(t);
@ -723,13 +745,14 @@ int NVMEDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
}
{
Mutex::Locker l(ioc->lock);
Mutex::Locker l(ioc.lock);
while (t->return_code > 0)
ioc->cond.Wait(ioc->lock);
ioc.cond.Wait(ioc.lock);
}
memcpy(buf, t->buf+off-aligned_off, len);
memcpy(buf, (char*)t->buf+off-aligned_off, len);
r = t->return_code;
rte_free(t->buf);
rte_mempool_put(task_pool, t);
return r;
}

View File

@ -53,7 +53,7 @@ struct Task {
IOCommand command;
uint64_t offset, len;
void *buf;
Task *next, *prev;
Task *next;
int64_t return_code;
utime_t start;
};