Merge pull request #9292 from marcan/pyrbd-aio

pybind: AIO bindings for RBD

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
This commit is contained in:
Jason Dillaman 2016-06-16 14:03:01 -04:00 committed by GitHub
commit 799633c72d
2 changed files with 361 additions and 3 deletions

View File

@ -14,6 +14,9 @@ method.
# Copyright 2011 Josh Durgin
# Copyright 2015 Hector Martin <marcan@marcan.st>
import cython
import sys
from cpython cimport PyObject, ref, exc
from libc cimport errno
from libc.stdint cimport *
@ -38,8 +41,6 @@ cdef extern from "Python.h":
cdef extern from "time.h":
ctypedef long int time_t
ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr)
cdef extern from "rbd/librbd.h" nogil:
enum:
_RBD_FEATURE_LAYERING "RBD_FEATURE_LAYERING"
@ -71,6 +72,7 @@ cdef extern from "rbd/librbd.h" nogil:
ctypedef void* rados_ioctx_t
ctypedef void* rbd_image_t
ctypedef void* rbd_image_options_t
ctypedef void *rbd_completion_t
ctypedef struct rbd_image_info_t:
uint64_t size
@ -123,6 +125,9 @@ cdef extern from "rbd/librbd.h" nogil:
time_t last_update
bint up
ctypedef void (*rbd_callback_t)(rbd_completion_t cb, void *arg)
ctypedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void* ptr)
void rbd_version(int *major, int *minor, int *extra)
void rbd_image_options_create(rbd_image_options_t* opts)
@ -258,6 +263,21 @@ cdef extern from "rbd/librbd.h" nogil:
rbd_mirror_image_status_t *mirror_image_status,
size_t status_size)
int rbd_aio_write2(rbd_image_t image, uint64_t off, size_t len,
const char *buf, rbd_completion_t c, int op_flags)
int rbd_aio_read2(rbd_image_t image, uint64_t off, size_t len,
char *buf, rbd_completion_t c, int op_flags)
int rbd_aio_discard(rbd_image_t image, uint64_t off, uint64_t len,
rbd_completion_t c)
int rbd_aio_create_completion(void *cb_arg, rbd_callback_t complete_cb,
rbd_completion_t *c)
int rbd_aio_is_complete(rbd_completion_t c)
int rbd_aio_wait_for_complete(rbd_completion_t c)
ssize_t rbd_aio_get_return_value(rbd_completion_t c)
void rbd_aio_release(rbd_completion_t c)
int rbd_aio_flush(rbd_image_t image, rbd_completion_t c)
RBD_FEATURE_LAYERING = _RBD_FEATURE_LAYERING
RBD_FEATURE_STRIPINGV2 = _RBD_FEATURE_STRIPINGV2
RBD_FEATURE_EXCLUSIVE_LOCK = _RBD_FEATURE_EXCLUSIVE_LOCK
@ -397,7 +417,7 @@ cdef make_ex(ret, msg):
cdef rados_ioctx_t convert_ioctx(rados.Ioctx ioctx) except? NULL:
return <rados_ioctx_t>ioctx.io
cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr):
cdef int no_op_progress_callback(uint64_t offset, uint64_t total, void* ptr) nogil:
return 0
def cstr(val, name, encoding="utf-8", opt=False):
@ -444,6 +464,113 @@ cdef void* realloc_chk(void* ptr, size_t size) except NULL:
raise MemoryError("realloc failed")
return ret
cdef class Completion
cdef void __aio_complete_cb(rbd_completion_t completion, void *args) with gil:
"""
Callback to oncomplete() for asynchronous operations
"""
cdef Completion cb = <Completion>args
cb._complete()
cdef class Completion(object):
"""completion object"""
cdef:
object image
object oncomplete
rbd_completion_t rbd_comp
PyObject* buf
bint persisted
object exc_info
def __cinit__(self, image, object oncomplete):
self.oncomplete = oncomplete
self.image = image
self.persisted = False
def is_complete(self):
"""
Has an asynchronous operation completed?
This does not imply that the callback has finished.
:returns: True if the operation is completed
"""
with nogil:
ret = rbd_aio_is_complete(self.rbd_comp)
return ret == 1
def wait_for_complete_and_cb(self):
"""
Wait for an asynchronous operation to complete
This method waits for the callback to execute, if one was provided.
It will also re-raise any exceptions raised by the callback. You
should call this to "reap" asynchronous completions and ensure that
any exceptions in the callbacks are handled, as an exception internal
to this module may have occurred.
"""
with nogil:
rbd_aio_wait_for_complete(self.rbd_comp)
if self.exc_info:
raise self.exc_info[0], self.exc_info[1], self.exc_info[2]
def get_return_value(self):
"""
Get the return value of an asychronous operation
The return value is set when the operation is complete.
:returns: int - return value of the operation
"""
with nogil:
ret = rbd_aio_get_return_value(self.rbd_comp)
return ret
def __dealloc__(self):
"""
Release a completion
This is automatically called when the completion object is freed.
"""
ref.Py_XDECREF(self.buf)
self.buf = NULL
if self.rbd_comp != NULL:
with nogil:
rbd_aio_release(self.rbd_comp)
self.rbd_comp = NULL
cdef void _complete(self):
try:
self.__unpersist()
if self.oncomplete:
self.oncomplete(self)
# In the event that something raises an exception during the next 2
# lines of code, we will not be able to catch it, and this may result
# in the app not noticing a failed callback. However, this should only
# happen in extreme circumstances (OOM, etc.). KeyboardInterrupt
# should not be a problem because the callback thread from librbd
# ought to have SIGINT blocked.
except:
self.exc_info = sys.exc_info()
cdef __persist(self):
if self.oncomplete is not None and not self.persisted:
# Increment our own reference count to make sure the completion
# is not freed until the callback is called. The completion is
# allowed to be freed if there is no callback.
ref.Py_INCREF(self)
self.persisted = True
cdef __unpersist(self):
if self.persisted:
ref.Py_DECREF(self)
self.persisted = False
class RBD(object):
"""
This class wraps librbd CRUD functions.
@ -1052,6 +1179,31 @@ cdef class Image(object):
self.close()
return False
def __get_completion(self, oncomplete):
"""
Constructs a completion to use with asynchronous operations
:param oncomplete: callback for the completion
:raises: :class:`Error`
:returns: completion object
"""
completion_obj = Completion(self, oncomplete)
cdef:
rbd_completion_t completion
PyObject* p_completion_obj= <PyObject*>completion_obj
with nogil:
ret = rbd_aio_create_completion(p_completion_obj, __aio_complete_cb,
&completion)
if ret < 0:
raise make_ex(ret, "error getting a completion")
completion_obj.rbd_comp = completion
return completion_obj
def close(self):
"""
Release the resources used by this image object.
@ -1929,6 +2081,152 @@ written." % (self.name, ret, length))
free(c_status.description)
return status
def aio_read(self, offset, length, oncomplete, fadvise_flags=0):
"""
Asynchronously read data from the image
Raises :class:`InvalidArgument` if part of the range specified is
outside the image.
oncomplete will be called with the returned read value as
well as the completion:
oncomplete(completion, data_read)
:param offset: the offset to start reading at
:type offset: int
:param length: how many bytes to read
:type length: int
:param oncomplete: what to do when the read is complete
:type oncomplete: completion
:param fadvise_flags: fadvise flags for this read
:type fadvise_flags: int
:returns: str - the data read
:raises: :class:`InvalidArgument`, :class:`IOError`
"""
cdef:
char *ret_buf
uint64_t _offset = offset
size_t _length = length
int _fadvise_flags = fadvise_flags
Completion completion
def oncomplete_(completion_v):
cdef Completion _completion_v = completion_v
return_value = _completion_v.get_return_value()
if return_value > 0 and return_value != length:
_PyBytes_Resize(&_completion_v.buf, return_value)
return oncomplete(_completion_v, <object>_completion_v.buf if return_value >= 0 else None)
completion = self.__get_completion(oncomplete_)
completion.buf = PyBytes_FromStringAndSize(NULL, length)
ret_buf = PyBytes_AsString(completion.buf)
try:
completion.__persist()
with nogil:
ret = rbd_aio_read2(self.image, _offset, _length, ret_buf,
completion.rbd_comp, _fadvise_flags)
if ret < 0:
raise make_ex(ret, 'error reading %s %ld~%ld' %
(self.name, offset, length))
except:
completion.__unpersist()
raise
return completion
def aio_write(self, data, offset, oncomplete, fadvise_flags=0):
"""
Asynchronously write data to the image
Raises :class:`InvalidArgument` if part of the write would fall outside
the image.
oncomplete will be called with the returned read value as
well as the completion:
oncomplete(completion, data_read)
:param offset: the offset to start reading at
:type offset: int
:param length: how many bytes to read
:type length: int
:param oncomplete: what to do when the read is complete
:type oncomplete: completion
:param fadvise_flags: fadvise flags for this read
:type fadvise_flags: int
:returns: str - the data read
:raises: :class:`InvalidArgument`, :class:`IOError`
"""
cdef:
uint64_t _offset = offset
char *_data = data
size_t _length = len(data)
int _fadvise_flags = fadvise_flags
Completion completion
completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_write2(self.image, _offset, _length, _data,
completion.rbd_comp, _fadvise_flags)
if ret < 0:
raise make_ex(ret, 'error writing %s %ld~%ld' %
(self.name, offset, _length))
except:
completion.__unpersist()
raise
return completion
def aio_discard(self, offset, length, oncomplete):
"""
Asynchronously trim the range from the image. It will be logically
filled with zeroes.
"""
cdef:
uint64_t _offset = offset
size_t _length = length
Completion completion
completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_discard(self.image, _offset, _length,
completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error discarding %s %ld~%ld' %
(self.name, offset, _length))
except:
completion.__unpersist()
raise
return completion
def aio_flush(self, oncomplete):
"""
Asyncronously wait until all writes are fully flushed if caching is
enabled.
"""
cdef Completion completion = self.__get_completion(oncomplete)
try:
completion.__persist()
with nogil:
ret = rbd_aio_flush(self.image, completion.rbd_comp)
if ret < 0:
raise make_ex(ret, 'error flushing')
except:
completion.__unpersist()
raise
return completion
cdef class SnapIterator(object):
"""
Iterator over snapshot info for an image.

View File

@ -3,6 +3,7 @@ import functools
import socket
import os
import time
import sys
from nose import with_setup, SkipTest
from nose.tools import eq_ as eq, assert_raises
@ -718,6 +719,65 @@ class TestImage(object):
self.image.remove_snap('snap1')
self.image.remove_snap('snap2')
def test_aio_read(self):
# this is a list so that the local cb() can modify it
retval = [None]
def cb(_, buf):
retval[0] = buf
# test1: success case
comp = self.image.aio_read(0, 20, cb)
comp.wait_for_complete_and_cb()
eq(retval[0], b'\0' * 20)
eq(comp.get_return_value(), 20)
eq(sys.getrefcount(comp), 2)
# test2: error case
retval[0] = 1
comp = self.image.aio_read(IMG_SIZE, 20, cb)
comp.wait_for_complete_and_cb()
eq(None, retval[0])
assert(comp.get_return_value() < 0)
eq(sys.getrefcount(comp), 2)
def test_aio_write(self):
retval = [None]
def cb(comp):
retval[0] = comp.get_return_value()
data = rand_data(256)
comp = self.image.aio_write(data, 256, cb)
comp.wait_for_complete_and_cb()
eq(retval[0], 0)
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
eq(self.image.read(256, 256), data)
def test_aio_discard(self):
retval = [None]
def cb(comp):
retval[0] = comp.get_return_value()
data = rand_data(256)
self.image.write(data, 0)
comp = self.image.aio_discard(0, 256, cb)
comp.wait_for_complete_and_cb()
eq(retval[0], 0)
eq(comp.get_return_value(), 0)
eq(sys.getrefcount(comp), 2)
eq(self.image.read(256, 256), b'\0' * 256)
def test_aio_flush(self):
retval = [None]
def cb(comp):
retval[0] = comp.get_return_value()
comp = self.image.aio_flush(cb)
comp.wait_for_complete_and_cb()
eq(retval[0], 0)
eq(sys.getrefcount(comp), 2)
def check_diff(image, offset, length, from_snapshot, expected):
extents = []