rbd: add Watch type and watch callbacks support

Add UpdateWatch function implementing rbd_update_watch.
Add Unwatch function implementing rbd_update_unwatch.
Add a higher level Watch type to encapsulate watching
the image and the watch handle.
Add basic tests to validate the callback behavior.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2020-06-30 15:36:23 -04:00 committed by John Mulligan
parent d3f8f0cf70
commit f3f2180e66
3 changed files with 194 additions and 3 deletions

View File

@ -0,0 +1,15 @@
// +build !luminous
package rbd
/*
#include <rbd/librbd.h>
extern void imageWatchCallback(int index);
void callWatchCallback(int index) {
imageWatchCallback(index);
}
*/
import "C"

View File

@ -4,12 +4,29 @@
package rbd
// #cgo LDFLAGS: -lrbd
// #include <errno.h>
// #include <rbd/librbd.h>
/*
#cgo LDFLAGS: -lrbd
#include <rbd/librbd.h>
extern int callWatchCallback(int index);
// cgo has trouble converting the types of the callback and data arg defined in
// librbd header. It wants the callback function to be a byte pointer and
// the arg to be a pointer, which is pretty much the opposite of what we
// actually want. This shim exists to help coerce the auto-type-conversion
// to do the right thing for us.
static inline int wrap_rbd_update_watch(
rbd_image_t image,
uint64_t *handle,
void *watch_cb,
uintptr_t arg) {
return rbd_update_watch(image, handle, watch_cb, (void*)arg);
}
*/
import "C"
import (
"github.com/ceph/go-ceph/internal/callbacks"
"github.com/ceph/go-ceph/internal/retry"
)
@ -59,3 +76,77 @@ func (image *Image) ListWatchers() ([]ImageWatcher, error) {
}
return imageWatchers, nil
}
// watchCallbacks tracks the active callbacks for rbd watches
var watchCallbacks = callbacks.New()
// WatchCallback defines the function signature needed for the UpdateWatch
// callback.
type WatchCallback func(interface{})
type watchInstance struct {
callback WatchCallback
data interface{}
}
// Watch represents an ongoing image metadata watch.
type Watch struct {
image *Image
wi watchInstance
handle C.uint64_t
cbIndex int
}
// UpdateWatch updates the image object to watch metadata changes to the
// image, returning a Watch object.
//
// Implements:
// int rbd_update_watch(rbd_image_t image, uint64_t *handle,
// rbd_update_callback_t watch_cb, void *arg);
func (image *Image) UpdateWatch(cb WatchCallback, data interface{}) (*Watch, error) {
if err := image.validate(imageIsOpen); err != nil {
return nil, err
}
wi := watchInstance{
callback: cb,
data: data,
}
w := &Watch{
image: image,
wi: wi,
cbIndex: watchCallbacks.Add(wi),
}
ret := C.wrap_rbd_update_watch(
image.image,
&w.handle,
C.callWatchCallback,
C.uintptr_t(w.cbIndex))
if ret != 0 {
return nil, getError(ret)
}
return w, nil
}
// Unwatch un-registers the image watch.
//
// Implements:
// int rbd_update_unwatch(rbd_image_t image, uint64_t handle);
func (w *Watch) Unwatch() error {
if w.image == nil {
return ErrImageNotOpen
}
if err := w.image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_update_unwatch(w.image.image, w.handle)
watchCallbacks.Remove(w.cbIndex)
return getError(ret)
}
//export imageWatchCallback
func imageWatchCallback(index C.int) {
v := watchCallbacks.Lookup(int(index))
wi := v.(watchInstance)
wi.callback(wi.data)
}

View File

@ -6,6 +6,7 @@ package rbd
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -115,3 +116,87 @@ func TestListWatchers(t *testing.T) {
assert.Equal(t, 0, len(watchers))
})
}
func TestWatch(t *testing.T) {
conn := radosConnect(t)
require.NotNil(t, conn)
defer conn.Shutdown()
poolname := GetUUID()
err := conn.MakePool(poolname)
require.NoError(t, err)
defer conn.DeletePool(poolname)
ioctx, err := conn.OpenIOContext(poolname)
require.NoError(t, err)
defer ioctx.Destroy()
startSize := uint64(1 << 21)
name := GetUUID()
options := NewRbdImageOptions()
err = CreateImage(ioctx, name, startSize, options)
require.NoError(t, err)
defer func() { assert.NoError(t, RemoveImage(ioctx, name)) }()
t.Run("imageNotOpen", func(t *testing.T) {
image, err := OpenImageReadOnly(ioctx, name, NoSnapshot)
require.NoError(t, err)
require.NotNil(t, image)
err = image.Close()
require.NoError(t, err)
_, err = image.UpdateWatch(func(d interface{}) {
}, nil)
assert.Equal(t, ErrImageNotOpen, err)
})
t.Run("simpleWatch", func(t *testing.T) {
image, err := OpenImage(ioctx, name, NoSnapshot)
require.NoError(t, err)
require.NotNil(t, image)
defer func() {
assert.NoError(t, image.Close())
}()
cc := 0
w, err := image.UpdateWatch(func(d interface{}) {
cc++
}, nil)
assert.NoError(t, err)
defer func() {
assert.NoError(t, w.Unwatch())
}()
x := make(chan int)
defer close(x)
go func() {
for i := 0; i < 5; i++ {
i1, err := OpenImage(ioctx, name, NoSnapshot)
err = i1.Resize(startSize * uint64(1+i))
assert.NoError(t, err)
err = i1.Close()
assert.NoError(t, err)
time.Sleep(5 * time.Millisecond)
}
x <- 0
}()
<-x
assert.Equal(t, 5, cc)
})
t.Run("badWatch", func(t *testing.T) {
w := &Watch{}
err := w.Unwatch()
assert.Error(t, err)
i1, err := OpenImage(ioctx, name, NoSnapshot)
assert.NoError(t, err)
assert.NoError(t, i1.Close())
w.image = i1
err = w.Unwatch()
assert.Error(t, err)
})
}