diff --git a/src/os/bluestore/NVMEDevice.cc b/src/os/bluestore/NVMEDevice.cc index bef0c635760..39446ff7a71 100644 --- a/src/os/bluestore/NVMEDevice.cc +++ b/src/os/bluestore/NVMEDevice.cc @@ -221,16 +221,27 @@ struct Task { std::function fill_cb; Task *next = nullptr; int64_t return_code; + Task *primary = nullptr; ceph::coarse_real_clock::time_point start; - IORequest io_request; + IORequest io_request = {}; ceph::mutex lock = ceph::make_mutex("Task::lock"); ceph::condition_variable cond; SharedDriverQueueData *queue = nullptr; - Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0) + // reference count by subtasks. + int ref = 0; + Task(NVMEDevice *dev, IOCommand c, uint64_t off, uint64_t l, int64_t rc = 0, + Task *p = nullptr) : device(dev), command(c), offset(off), len(l), - return_code(rc), - start(ceph::coarse_real_clock::now()) {} + return_code(rc), primary(p), + start(ceph::coarse_real_clock::now()) { + if (primary) { + primary->ref++; + return_code = primary->return_code; + } + } ~Task() { + if (primary) + primary->ref--; ceph_assert(!io_request.nseg); } void release_segs(SharedDriverQueueData *queue_data) { @@ -679,7 +690,13 @@ void io_complete(void *t, const struct spdk_nvme_cpl *completion) } delete task; } else { - task->return_code = 0; + if (Task* primary = task->primary; primary != nullptr) { + delete task; + if (!primary->ref) + primary->return_code = 0; + } else { + task->return_code = 0; + } ctx->try_aio_wake(); } } else { @@ -791,6 +808,20 @@ void NVMEDevice::aio_submit(IOContext *ioc) } } +static void ioc_append_task(IOContext *ioc, Task *t) +{ + Task *first, *last; + + first = static_cast(ioc->nvme_task_first); + last = static_cast(ioc->nvme_task_last); + if (last) + last->next = t; + if (!first) + ioc->nvme_task_first = t; + ioc->nvme_task_last = t; + ++ioc->num_pending; +} + static void write_split( NVMEDevice *dev, uint64_t off, @@ -798,7 +829,7 @@ static void write_split( IOContext *ioc) { uint64_t remain_len = bl.length(), begin = 0, write_size; - Task *t, *first, *last; + Task *t; // This value may need to be got from configuration later. uint64_t split_size = 131072; // 128KB. @@ -810,18 +841,49 @@ static void write_split( bl.splice(0, write_size, &t->bl); remain_len -= write_size; t->ctx = ioc; - first = static_cast(ioc->nvme_task_first); - last = static_cast(ioc->nvme_task_last); - if (last) - last->next = t; - if (!first) - ioc->nvme_task_first = t; - ioc->nvme_task_last = t; - ++ioc->num_pending; + ioc_append_task(ioc, t); begin += write_size; } } +static void make_read_tasks( + NVMEDevice *dev, + uint64_t aligned_off, + IOContext *ioc, char *buf, uint64_t aligned_len, Task *primary, + uint64_t orig_off, uint64_t orig_len) +{ + // This value may need to be got from configuration later. + uint64_t split_size = 131072; // 128KB. + uint64_t tmp_off = orig_off - aligned_off, remain_orig_len = orig_len; + auto begin = aligned_off; + const auto aligned_end = begin + aligned_len; + + for (; begin < aligned_end; begin += split_size) { + auto read_size = std::min(aligned_end - begin, split_size); + auto tmp_len = std::min(remain_orig_len, read_size - tmp_off); + Task *t = nullptr; + + if (primary && (aligned_len <= split_size)) { + t = primary; + } else { + t = new Task(dev, IOCommand::READ_COMMAND, begin, read_size, 0, primary); + } + + t->ctx = ioc; + + // TODO: if upper layer alloc memory with known physical address, + // we can reduce this copy + t->fill_cb = [buf, t, tmp_off, tmp_len] { + t->copy_to_buf(buf, tmp_off, tmp_len); + }; + + ioc_append_task(ioc, t); + remain_orig_len -= tmp_len; + buf += tmp_len; + tmp_off = 0; + } +} + int NVMEDevice::aio_write( uint64_t off, bufferlist &bl, @@ -869,14 +931,12 @@ int NVMEDevice::read(uint64_t off, uint64_t len, bufferlist *pbl, Task *t = new Task(this, IOCommand::READ_COMMAND, off, len, 1); bufferptr p = buffer::create_small_page_aligned(len); int r = 0; - t->ctx = ioc; char *buf = p.c_str(); - t->fill_cb = [buf, t]() { - t->copy_to_buf(buf, 0, t->len); - }; - ++ioc->num_pending; - ioc->nvme_task_first = t; + ceph_assert(ioc->nvme_task_first == nullptr); + ceph_assert(ioc->nvme_task_last == nullptr); + make_read_tasks(this, off, ioc, buf, len, t, off, len); + dout(5) << __func__ << " " << off << "~" << len << dendl; aio_submit(ioc); ioc->aio_wait(); @@ -894,26 +954,12 @@ int NVMEDevice::aio_read( { dout(20) << __func__ << " " << off << "~" << len << " ioc " << ioc << dendl; ceph_assert(is_valid_io(off, len)); - - Task *t = new Task(this, IOCommand::READ_COMMAND, off, len); - bufferptr p = buffer::create_small_page_aligned(len); pbl->append(p); - t->ctx = ioc; char* buf = p.c_str(); - t->fill_cb = [buf, t]() { - t->copy_to_buf(buf, 0, t->len); - }; - - Task *first = static_cast(ioc->nvme_task_first); - Task *last = static_cast(ioc->nvme_task_last); - if (last) - last->next = t; - if (!first) - ioc->nvme_task_first = t; - ioc->nvme_task_last = t; - ++ioc->num_pending; + make_read_tasks(this, off, ioc, buf, len, NULL, off, len); + dout(5) << __func__ << " " << off << "~" << len << dendl; return 0; } @@ -930,13 +976,8 @@ int NVMEDevice::read_random(uint64_t off, uint64_t len, char *buf, bool buffered IOContext ioc(g_ceph_context, nullptr); Task *t = new Task(this, IOCommand::READ_COMMAND, aligned_off, aligned_len, 1); int r = 0; - t->ctx = &ioc; - t->fill_cb = [buf, t, off, len]() { - t->copy_to_buf(buf, off-t->offset, len); - }; - ++ioc.num_pending; - ioc.nvme_task_first = t; + make_read_tasks(this, aligned_off, &ioc, buf, aligned_len, t, off, len); aio_submit(&ioc); ioc.aio_wait();