Merge pull request #7778 from marcan/pybind-fixes

pybind/rados: fix object lifetime issues and other bugs in aio

Reviewed-by: Mehdi Abaakouk <sileht@redhat.com>
Reviewed-by: Josh Durgin <jdurgin@redhat.com>
This commit is contained in:
Josh Durgin 2016-02-29 22:48:53 -08:00
commit 4f98ba25d0
2 changed files with 53 additions and 34 deletions

View File

@ -40,7 +40,7 @@ else:
cdef extern from "Python.h":
# These are in cpython/string.pxd, but use "object" types instead of
# PyObject*, which invokes assumptions in cpython that we need to
# legitimately break to implement zero-copy string buffers in Image.read().
# legitimately break to implement zero-copy string buffers in Ioctx.read().
# This is valid use of the Python API and documented as a special case.
PyObject *PyBytes_FromStringAndSize(char *v, Py_ssize_t len) except NULL
char* PyBytes_AsString(PyObject *string) except NULL
@ -1556,6 +1556,7 @@ cdef class Completion(object):
rados_callback_t complete_cb
rados_callback_t safe_cb
rados_completion_t rados_comp
PyObject* buf
def __cinit__(self, Ioctx ioctx, object oncomplete, object onsafe):
self.oncomplete = oncomplete
@ -1643,8 +1644,12 @@ cdef class Completion(object):
Call this when you no longer need the completion. It may not be
freed immediately if the operation is not acked and committed.
"""
with nogil:
rados_aio_release(self.rados_comp)
ref.Py_XDECREF(self.buf)
self.buf = NULL
if self.rados_comp != NULL:
with nogil:
rados_aio_release(self.rados_comp)
self.rados_comp = NULL
def _complete(self):
self.oncomplete(self)
@ -1713,7 +1718,7 @@ cdef int __aio_safe_cb(rados_completion_t completion, void *args) with gil:
Callback to onsafe() for asynchronous operations
"""
cdef object cb = <object>args
cb.onsafe(cb)
cb._safe()
return 0
@ -1722,7 +1727,7 @@ cdef int __aio_complete_cb(rados_completion_t completion, void *args) with gil:
Callback to oncomplete() for asynchronous operations
"""
cdef object cb = <object>args
cb.oncomplete(cb)
cb._complete()
return 0
@ -1829,13 +1834,13 @@ cdef class Ioctx(object):
uint64_t _offset = offset
completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_write(self.io, _object_name, completion.rados_comp,
_to_write, size, _offset)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error writing object %s" % object_name)
self.__track_completion(completion)
return completion
def aio_write_full(self, object_name, to_write,
@ -1871,13 +1876,14 @@ cdef class Ioctx(object):
size_t size = len(to_write)
completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_write_full(self.io, _object_name,
completion.rados_comp,
_to_write, size)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error writing object %s" % object_name)
self.__track_completion(completion)
return completion
def aio_append(self, object_name, to_append, oncomplete=None, onsafe=None):
@ -1911,13 +1917,14 @@ cdef class Ioctx(object):
size_t size = len(to_append)
completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_append(self.io, _object_name,
completion.rados_comp,
_to_append, size)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error appending object %s" % object_name)
self.__track_completion(completion)
return completion
def aio_flush(self):
@ -1962,34 +1969,25 @@ cdef class Ioctx(object):
char *ref_buf
size_t _length = length
PyObject* ret_s = NULL
def oncomplete_(completion_v):
try:
return_value = completion_v.get_return_value()
if return_value != length:
_PyBytes_Resize(&ret_s, return_value)
return oncomplete(completion_v, <object>ret_s if return_value >= 0 else None)
finally:
# We DECREF unconditionally: the cast to object above will have
# INCREFed if necessary. This also takes care of exceptions,
# including if _PyString_Resize fails (that will free the string
# itself and set ret_s to NULL, hence XDECREF).
ref.Py_XDECREF(ret_s)
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
completion = self.__get_completion(oncomplete_, None)
ret_s = PyBytes_FromStringAndSize(NULL, length)
try:
ret_buf = PyBytes_AsString(ret_s)
with nogil:
ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
ret_buf, _length, _offset)
if ret < 0:
raise make_ex(ret, "error reading %s" % object_name)
self.__track_completion(completion)
return completion
except Exception:
ref.Py_XDECREF(ret_s)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
self.__track_completion(completion)
with nogil:
ret = rados_aio_read(self.io, _object_name, completion.rados_comp,
ret_buf, _length, _offset)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error reading %s" % object_name)
return completion
def aio_remove(self, object_name, oncomplete=None, onsafe=None):
"""
@ -2014,12 +2012,13 @@ cdef class Ioctx(object):
char* _object_name = object_name
completion = self.__get_completion(oncomplete, onsafe)
self.__track_completion(completion)
with nogil:
ret = rados_aio_remove(self.io, _object_name,
completion.rados_comp)
if ret < 0:
completion._cleanup()
raise make_ex(ret, "error removing %s" % object_name)
self.__track_completion(completion)
return completion
def require_ioctx_open(self):

View File

@ -649,6 +649,7 @@ class TestIoctx(object):
assert(loops <= 10)
eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)
# test2: use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control
@ -666,6 +667,25 @@ class TestIoctx(object):
comp.wait_for_complete_and_cb()
assert(retval[0] is not None)
eq(retval[0], payload)
eq(sys.getrefcount(comp), 2)
# test3: error case, use wait_for_complete_and_cb(), verify retval[0] is
# set by the time we regain control
retval[0] = 1
self._take_down_acting_set('test_pool', 'bar')
comp = self.ioctx.aio_read("bar", len(payload), 0, cb)
eq(False, comp.is_complete())
time.sleep(3)
eq(False, comp.is_complete())
with lock:
eq(1, retval[0])
self._let_osds_back_up()
comp.wait_for_complete_and_cb()
eq(None, retval[0])
assert(comp.get_return_value() < 0)
eq(sys.getrefcount(comp), 2)
[i.remove() for i in self.ioctx.list_objects()]