From ef183f14c61a0a40dca38387ffb142fe6e99b582 Mon Sep 17 00:00:00 2001 From: Hector Martin Date: Fri, 26 Feb 2016 20:24:16 +0900 Subject: [PATCH 1/2] pybind/rbd: add aio function bindings Signed-off-by: Hector Martin --- src/pybind/rbd/rbd.pyx | 303 +++++++++++++++++++++++++++++++++++- src/test/pybind/test_rbd.py | 60 +++++++ 2 files changed, 360 insertions(+), 3 deletions(-) diff --git a/src/pybind/rbd/rbd.pyx b/src/pybind/rbd/rbd.pyx index 779578b4eec..519dee47172 100644 --- a/src/pybind/rbd/rbd.pyx +++ b/src/pybind/rbd/rbd.pyx @@ -14,6 +14,9 @@ method. # Copyright 2011 Josh Durgin # Copyright 2015 Hector Martin +import cython +import sys + from cpython cimport PyObject, ref, exc from libc cimport errno from libc.stdint cimport * @@ -38,8 +41,6 @@ cdef extern from "Python.h": cdef extern from "time.h": ctypedef long int time_t -ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr) - cdef extern from "rbd/librbd.h" nogil: enum: _RBD_FEATURE_LAYERING "RBD_FEATURE_LAYERING" @@ -71,6 +72,7 @@ cdef extern from "rbd/librbd.h" nogil: ctypedef void* rados_ioctx_t ctypedef void* rbd_image_t ctypedef void* rbd_image_options_t + ctypedef void *rbd_completion_t ctypedef struct rbd_image_info_t: uint64_t size @@ -123,6 +125,9 @@ cdef extern from "rbd/librbd.h" nogil: time_t last_update bint up + ctypedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg) + ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr) + void rbd_version(int *major, int *minor, int *extra) void rbd_image_options_create(rbd_image_options_t* opts) @@ -258,6 +263,21 @@ cdef extern from "rbd/librbd.h" nogil: rbd_mirror_image_status_t *mirror_image_status, size_t status_size) + int rbd_aio_write2(rbd_image_t image, uint64_t off, size_t len, + const char *buf, rbd_completion_t c, int op_flags) + int rbd_aio_read2(rbd_image_t image, uint64_t off, size_t len, + char *buf, rbd_completion_t c, int op_flags) + int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len, + rbd_completion_t c) + + int rbd_aio_create_completion(void *cb_arg, rbd_callback_t complete_cb, + rbd_completion_t *c) + int rbd_aio_is_complete(rbd_completion_t c) + int rbd_aio_wait_for_complete(rbd_completion_t c) + ssize_t rbd_aio_get_return_value(rbd_completion_t c) + void rbd_aio_release(rbd_completion_t c) + int rbd_aio_flush(rbd_image_t image, rbd_completion_t c) + RBD_FEATURE_LAYERING = _RBD_FEATURE_LAYERING RBD_FEATURE_STRIPINGV2 = _RBD_FEATURE_STRIPINGV2 RBD_FEATURE_EXCLUSIVE_LOCK = _RBD_FEATURE_EXCLUSIVE_LOCK @@ -397,7 +417,7 @@ cdef make_ex(ret, msg): cdef rados_ioctx_t convert_ioctx(rados.Ioctx ioctx) except? NULL: return ioctx.io -cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr): +cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr) nogil: return 0 def cstr(val, name, encoding="utf-8", opt=False): @@ -444,6 +464,112 @@ cdef void* realloc_chk(void* ptr, size_t size) except NULL: raise MemoryError("realloc failed") return ret +cdef class Completion + +cdef void __aio_complete_cb(rbd_completion_t completion, void *args) with gil: + """ + Callback to oncomplete() for asynchronous operations + """ + cdef Completion cb = args + cb._complete() + + +@cython.no_gc_clear +cdef class Completion(object): + """completion object""" + + cdef public: + object image + object oncomplete + object ref + object exc_info + + cdef: + rbd_completion_t rbd_comp + PyObject* buf + + def __cinit__(self, image, object oncomplete): + self.oncomplete = oncomplete + self.image = image + + def is_complete(self): + """ + Has an asynchronous operation completed? + + This does not imply that the callback has finished. + + :returns: True if the operation is completed + """ + with nogil: + ret = rbd_aio_is_complete(self.rbd_comp) + return ret == 1 + + def wait_for_complete_and_cb(self): + """ + Wait for an asynchronous operation to complete + + This method waits for the callback to execute, if one was provided. + It will also re-raise any exceptions raised by the callback. You + should call this to "reap" asynchronous completions and ensure that + any exceptions in the callbacks are handled, as an exception internal + to this module may have occurred. + """ + with nogil: + rbd_aio_wait_for_complete(self.rbd_comp) + + if self.exc_info: + raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + + def get_return_value(self): + """ + Get the return value of an asychronous operation + + The return value is set when the operation is complete. + + :returns: int - return value of the operation + """ + with nogil: + ret = rbd_aio_get_return_value(self.rbd_comp) + return ret + + def __dealloc__(self): + """ + Release a completion + + This is automatically called when the completion object is freed. + """ + ref.Py_XDECREF(self.buf) + self.buf = NULL + if self.rbd_comp != NULL: + with nogil: + rbd_aio_release(self.rbd_comp) + self.rbd_comp = NULL + + cdef void _complete(self): + try: + self.__unpersist() + if self.oncomplete: + self.oncomplete(self) + # In the event that something raises an exception during the next 2 + # lines of code, we will not be able to catch it, and this may result + # in the app not noticing a failed callback. However, this should only + # happen in extreme circumstances (OOM, etc.). KeyboardInterrupt + # should not be a problem because the callback thread from librbd + # ought to have SIGINT blocked. + except: + self.exc_info = sys.exc_info() + + cdef __persist(self): + if self.oncomplete is not None: + # Keep around a reference to ourselves to make sure the completion + # is not freed until the callback is called. The completion is + # allowed to be freed if there is no callback. + self.ref = self + + cdef __unpersist(self): + self.ref = None + + class RBD(object): """ This class wraps librbd CRUD functions. @@ -1052,6 +1178,31 @@ cdef class Image(object): self.close() return False + def __get_completion(self, oncomplete): + """ + Constructs a completion to use with asynchronous operations + + :param oncomplete: callback for the completion + + :raises: :class:`Error` + :returns: completion object + """ + + completion_obj = Completion(self, oncomplete) + + cdef: + rbd_completion_t completion + PyObject* p_completion_obj= completion_obj + + with nogil: + ret = rbd_aio_create_completion(p_completion_obj, __aio_complete_cb, + &completion) + if ret < 0: + raise make_ex(ret, "error getting a completion") + + completion_obj.rbd_comp = completion + return completion_obj + def close(self): """ Release the resources used by this image object. @@ -1929,6 +2080,152 @@ written." % (self.name, ret, length)) free(c_status.description) return status + def aio_read(self, offset, length, oncomplete, fadvise_flags=0): + """ + Asynchronously read data from the image + + Raises :class:`InvalidArgument` if part of the range specified is + outside the image. + + oncomplete will be called with the returned read value as + well as the completion: + + oncomplete(completion, data_read) + + :param offset: the offset to start reading at + :type offset: int + :param length: how many bytes to read + :type length: int + :param oncomplete: what to do when the read is complete + :type oncomplete: completion + :param fadvise_flags: fadvise flags for this read + :type fadvise_flags: int + :returns: str - the data read + :raises: :class:`InvalidArgument`, :class:`IOError` + """ + + cdef: + char *ret_buf + uint64_t _offset = offset + size_t _length = length + int _fadvise_flags = fadvise_flags + Completion completion + + def oncomplete_(completion_v): + cdef Completion _completion_v = completion_v + return_value = _completion_v.get_return_value() + if return_value > 0 and return_value != length: + _PyBytes_Resize(&_completion_v.buf, return_value) + return oncomplete(_completion_v, _completion_v.buf if return_value >= 0 else None) + + completion = self.__get_completion(oncomplete_) + completion.buf = PyBytes_FromStringAndSize(NULL, length) + ret_buf = PyBytes_AsString(completion.buf) + try: + completion.__persist() + with nogil: + ret = rbd_aio_read2(self.image, _offset, _length, ret_buf, + completion.rbd_comp, _fadvise_flags) + if ret < 0: + raise make_ex(ret, 'error reading %s %ld~%ld' % + (self.name, offset, length)) + except: + completion.__unpersist() + raise + + return completion + + def aio_write(self, data, offset, oncomplete, fadvise_flags=0): + """ + Asynchronously write data to the image + + Raises :class:`InvalidArgument` if part of the write would fall outside + the image. + + oncomplete will be called with the returned read value as + well as the completion: + + oncomplete(completion, data_read) + + :param offset: the offset to start reading at + :type offset: int + :param length: how many bytes to read + :type length: int + :param oncomplete: what to do when the read is complete + :type oncomplete: completion + :param fadvise_flags: fadvise flags for this read + :type fadvise_flags: int + :returns: str - the data read + :raises: :class:`InvalidArgument`, :class:`IOError` + """ + + cdef: + uint64_t _offset = offset + char *_data = data + size_t _length = len(data) + int _fadvise_flags = fadvise_flags + Completion completion + + completion = self.__get_completion(oncomplete) + try: + completion.__persist() + with nogil: + ret = rbd_aio_write2(self.image, _offset, _length, _data, + completion.rbd_comp, _fadvise_flags) + if ret < 0: + raise make_ex(ret, 'error writing %s %ld~%ld' % + (self.name, offset, _length)) + except: + completion.__unpersist() + raise + + return completion + + def aio_discard(self, offset, length, oncomplete): + """ + Asynchronously trim the range from the image. It will be logically + filled with zeroes. + """ + + cdef: + uint64_t _offset = offset + size_t _length = length + Completion completion + + completion = self.__get_completion(oncomplete) + try: + completion.__persist() + with nogil: + ret = rbd_aio_discard(self.image, _offset, _length, + completion.rbd_comp) + if ret < 0: + raise make_ex(ret, 'error discarding %s %ld~%ld' % + (self.name, offset, _length)) + except: + completion.__unpersist() + raise + + return completion + + def aio_flush(self, oncomplete): + """ + Asyncronously wait until all writes are fully flushed if caching is + enabled. + """ + + cdef Completion completion = self.__get_completion(oncomplete) + try: + completion.__persist() + with nogil: + ret = rbd_aio_flush(self.image, completion.rbd_comp) + if ret < 0: + raise make_ex(ret, 'error flushing') + except: + completion.__unpersist() + raise + + return completion + cdef class SnapIterator(object): """ Iterator over snapshot info for an image. diff --git a/src/test/pybind/test_rbd.py b/src/test/pybind/test_rbd.py index f7204e69cd1..e4ffaab9c80 100644 --- a/src/test/pybind/test_rbd.py +++ b/src/test/pybind/test_rbd.py @@ -3,6 +3,7 @@ import functools import socket import os import time +import sys from nose import with_setup, SkipTest from nose.tools import eq_ as eq, assert_raises @@ -718,6 +719,65 @@ class TestImage(object): self.image.remove_snap('snap1') self.image.remove_snap('snap2') + def test_aio_read(self): + # this is a list so that the local cb() can modify it + retval = [None] + def cb(_, buf): + retval[0] = buf + + # test1: success case + comp = self.image.aio_read(0, 20, cb) + comp.wait_for_complete_and_cb() + eq(retval[0], b'\0' * 20) + eq(comp.get_return_value(), 20) + eq(sys.getrefcount(comp), 2) + + # test2: error case + retval[0] = 1 + comp = self.image.aio_read(IMG_SIZE, 20, cb) + comp.wait_for_complete_and_cb() + eq(None, retval[0]) + assert(comp.get_return_value() < 0) + eq(sys.getrefcount(comp), 2) + + def test_aio_write(self): + retval = [None] + def cb(comp): + retval[0] = comp.get_return_value() + + data = rand_data(256) + comp = self.image.aio_write(data, 256, cb) + comp.wait_for_complete_and_cb() + eq(retval[0], 0) + eq(comp.get_return_value(), 0) + eq(sys.getrefcount(comp), 2) + eq(self.image.read(256, 256), data) + + def test_aio_discard(self): + retval = [None] + def cb(comp): + retval[0] = comp.get_return_value() + + data = rand_data(256) + self.image.write(data, 0) + comp = self.image.aio_discard(0, 256, cb) + comp.wait_for_complete_and_cb() + eq(retval[0], 0) + eq(comp.get_return_value(), 0) + eq(sys.getrefcount(comp), 2) + eq(self.image.read(256, 256), b'\0' * 256) + + def test_aio_flush(self): + retval = [None] + def cb(comp): + retval[0] = comp.get_return_value() + + comp = self.image.aio_flush(cb) + comp.wait_for_complete_and_cb() + eq(retval[0], 0) + eq(sys.getrefcount(comp), 2) + + def check_diff(image, offset, length, from_snapshot, expected): extents = [] From 07ff00232e629ebae61518880855f563a6292187 Mon Sep 17 00:00:00 2001 From: Hector Martin Date: Mon, 13 Jun 2016 18:02:27 +0900 Subject: [PATCH 2/2] pybind/rbd: Make compatible with older Cython We need to support versions that don't have no_gc_clear, so use bare INCREF/DECREF instead. Signed-off-by: Hector Martin --- src/pybind/rbd/rbd.pyx | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/pybind/rbd/rbd.pyx b/src/pybind/rbd/rbd.pyx index 519dee47172..5b63c41d881 100644 --- a/src/pybind/rbd/rbd.pyx +++ b/src/pybind/rbd/rbd.pyx @@ -474,23 +474,21 @@ cdef void __aio_complete_cb(rbd_completion_t completion, void *args) with gil: cb._complete() -@cython.no_gc_clear cdef class Completion(object): """completion object""" - cdef public: - object image - object oncomplete - object ref - object exc_info - cdef: - rbd_completion_t rbd_comp - PyObject* buf + object image + object oncomplete + rbd_completion_t rbd_comp + PyObject* buf + bint persisted + object exc_info def __cinit__(self, image, object oncomplete): self.oncomplete = oncomplete self.image = image + self.persisted = False def is_complete(self): """ @@ -560,14 +558,17 @@ cdef class Completion(object): self.exc_info = sys.exc_info() cdef __persist(self): - if self.oncomplete is not None: - # Keep around a reference to ourselves to make sure the completion + if self.oncomplete is not None and not self.persisted: + # Increment our own reference count to make sure the completion # is not freed until the callback is called. The completion is # allowed to be freed if there is no callback. - self.ref = self + ref.Py_INCREF(self) + self.persisted = True cdef __unpersist(self): - self.ref = None + if self.persisted: + ref.Py_DECREF(self) + self.persisted = False class RBD(object):