Merge pull request #32647 from optimistyzy/115

NVMEDevice: Split the read I/O if the io size is large.

Reviewed-by: Kefu Chai <kchai@redhat.com>
This commit is contained in:
Kefu Chai 2020-02-13 15:35:19 +08:00 committed by GitHub
commit 12da789dc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -221,16 +221,27 @@ struct Task {
std::function<void()> fill_cb;
Task *next = nullptr;
int64_t return_code;
Task *primary = nullptr;
ceph::coarse_real_clock::time_point start;
IORequest io_request;
IORequest io_request = {};
ceph::mutex lock = ceph::make_mutex("Task::lock");
ceph::condition_variable cond;
SharedDriverQueueData *queue = nullptr;
Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0)
// reference count by subtasks.
int ref = 0;
Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0,
Task *p = nullptr)
: device(dev), command(c), offset(off), len(l),
return_code(rc),
start(ceph::coarse_real_clock::now()) {}
return_code(rc), primary(p),
start(ceph::coarse_real_clock::now()) {
if (primary) {
primary->ref++;
return_code = primary->return_code;
}
}
~Task() {
if (primary)
primary->ref--;
ceph_assert(!io_request.nseg);
}
void release_segs(SharedDriverQueueData *queue_data) {
@ -679,7 +690,13 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion)
}
delete task;
} else {
task->return_code = 0;
if (Task* primary = task->primary; primary != nullptr) {
delete task;
if (!primary->ref)
primary->return_code = 0;
} else {
task->return_code = 0;
}
ctx->try_aio_wake();
}
} else {
@ -791,6 +808,20 @@ void NVMEDevice::aio_submit(IOContext *ioc)
}
}
static void ioc_append_task(IOContext *ioc, Task *t)
{
Task *first, *last;
first = static_cast<Task*>(ioc->nvme_task_first);
last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
++ioc->num_pending;
}
static void write_split(
NVMEDevice *dev,
uint64_t off,
@ -798,7 +829,7 @@ static void write_split(
IOContext *ioc)
{
uint64_t remain_len = bl.length(), begin = 0, write_size;
Task *t, *first, *last;
Task *t;
// This value may need to be got from configuration later.
uint64_t split_size = 131072; // 128KB.
@ -810,18 +841,49 @@ static void write_split(
bl.splice(0, write_size, &t->bl);
remain_len -= write_size;
t->ctx = ioc;
first = static_cast<Task*>(ioc->nvme_task_first);
last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
++ioc->num_pending;
ioc_append_task(ioc, t);
begin += write_size;
}
}
static void make_read_tasks(
NVMEDevice *dev,
uint64_t aligned_off,
IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary,
uint64_t orig_off, uint64_t orig_len)
{
// This value may need to be got from configuration later.
uint64_t split_size = 131072; // 128KB.
uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len;
auto begin = aligned_off;
const auto aligned_end = begin + aligned_len;
for (; begin < aligned_end; begin += split_size) {
auto read_size = std::min(aligned_end - begin, split_size);
auto tmp_len = std::min(remain_orig_len, read_size - tmp_off);
Task *t = nullptr;
if (primary && (aligned_len <= split_size)) {
t = primary;
} else {
t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary);
}
t->ctx = ioc;
// TODO: if upper layer alloc memory with known physical address,
// we can reduce this copy
t->fill_cb = [buf, t, tmp_off, tmp_len] {
t->copy_to_buf(buf, tmp_off, tmp_len);
};
ioc_append_task(ioc, t);
remain_orig_len -= tmp_len;
buf += tmp_len;
tmp_off = 0;
}
}
int NVMEDevice::aio_write(
uint64_t off,
bufferlist &bl,
@ -869,14 +931,12 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1);
bufferptr p = buffer::create_small_page_aligned(len);
int r = 0;
t->ctx = ioc;
char *buf = p.c_str();
t->fill_cb = [buf, t]() {
t->copy_to_buf(buf, 0, t->len);
};
++ioc->num_pending;
ioc->nvme_task_first = t;
ceph_assert(ioc->nvme_task_first == nullptr);
ceph_assert(ioc->nvme_task_last == nullptr);
make_read_tasks(this, off, ioc, buf, len, t, off, len);
dout(5) << __func__ << " " << off << "~" << len << dendl;
aio_submit(ioc);
ioc->aio_wait();
@ -894,26 +954,12 @@ int NVMEDevice::aio_read(
{
dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl;
ceph_assert(is_valid_io(off, len));
Task *t = new Task(this, IOCommand::READ_COMMAND, off, len);
bufferptr p = buffer::create_small_page_aligned(len);
pbl->append(p);
t->ctx = ioc;
char* buf = p.c_str();
t->fill_cb = [buf, t]() {
t->copy_to_buf(buf, 0, t->len);
};
Task *first = static_cast<Task*>(ioc->nvme_task_first);
Task *last = static_cast<Task*>(ioc->nvme_task_last);
if (last)
last->next = t;
if (!first)
ioc->nvme_task_first = t;
ioc->nvme_task_last = t;
++ioc->num_pending;
make_read_tasks(this, off, ioc, buf, len, NULL, off, len);
dout(5) << __func__ << " " << off << "~" << len << dendl;
return 0;
}
@ -930,13 +976,8 @@ int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered
IOContext ioc(g_ceph_context, nullptr);
Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1);
int r = 0;
t->ctx = &ioc;
t->fill_cb = [buf, t, off, len]() {
t->copy_to_buf(buf, off-t->offset, len);
};
++ioc.num_pending;
ioc.nvme_task_first = t;
make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, t, off, len);
aio_submit(&ioc);
ioc.aio_wait();