diff --git a/rbd/callback_shims_mimic.go b/rbd/callback_shims_mimic.go new file mode 100644 index 0000000..fdf0303 --- /dev/null +++ b/rbd/callback_shims_mimic.go @@ -0,0 +1,15 @@ +// +build !luminous + +package rbd + +/* + +#include + +extern void imageWatchCallback(int index); + +void callWatchCallback(int index) { + imageWatchCallback(index); +} +*/ +import "C" diff --git a/rbd/watchers_mimic.go b/rbd/watchers_mimic.go index 198292a..54d5dfd 100644 --- a/rbd/watchers_mimic.go +++ b/rbd/watchers_mimic.go @@ -4,12 +4,29 @@ package rbd -// #cgo LDFLAGS: -lrbd -// #include -// #include +/* +#cgo LDFLAGS: -lrbd +#include + +extern int callWatchCallback(int index); + +// cgo has trouble converting the types of the callback and data arg defined in +// librbd header. It wants the callback function to be a byte pointer and +// the arg to be a pointer, which is pretty much the opposite of what we +// actually want. This shim exists to help coerce the auto-type-conversion +// to do the right thing for us. +static inline int wrap_rbd_update_watch( + rbd_image_t image, + uint64_t *handle, + void *watch_cb, + uintptr_t arg) { + return rbd_update_watch(image, handle, watch_cb, (void*)arg); +} +*/ import "C" import ( + "github.com/ceph/go-ceph/internal/callbacks" "github.com/ceph/go-ceph/internal/retry" ) @@ -59,3 +76,77 @@ func (image *Image) ListWatchers() ([]ImageWatcher, error) { } return imageWatchers, nil } + +// watchCallbacks tracks the active callbacks for rbd watches +var watchCallbacks = callbacks.New() + +// WatchCallback defines the function signature needed for the UpdateWatch +// callback. +type WatchCallback func(interface{}) + +type watchInstance struct { + callback WatchCallback + data interface{} +} + +// Watch represents an ongoing image metadata watch. +type Watch struct { + image *Image + wi watchInstance + handle C.uint64_t + cbIndex int +} + +// UpdateWatch updates the image object to watch metadata changes to the +// image, returning a Watch object. +// +// Implements: +// int rbd_update_watch(rbd_image_t image, uint64_t *handle, +// rbd_update_callback_t watch_cb, void *arg); +func (image *Image) UpdateWatch(cb WatchCallback, data interface{}) (*Watch, error) { + if err := image.validate(imageIsOpen); err != nil { + return nil, err + } + wi := watchInstance{ + callback: cb, + data: data, + } + w := &Watch{ + image: image, + wi: wi, + cbIndex: watchCallbacks.Add(wi), + } + + ret := C.wrap_rbd_update_watch( + image.image, + &w.handle, + C.callWatchCallback, + C.uintptr_t(w.cbIndex)) + if ret != 0 { + return nil, getError(ret) + } + return w, nil +} + +// Unwatch un-registers the image watch. +// +// Implements: +// int rbd_update_unwatch(rbd_image_t image, uint64_t handle); +func (w *Watch) Unwatch() error { + if w.image == nil { + return ErrImageNotOpen + } + if err := w.image.validate(imageIsOpen); err != nil { + return err + } + ret := C.rbd_update_unwatch(w.image.image, w.handle) + watchCallbacks.Remove(w.cbIndex) + return getError(ret) +} + +//export imageWatchCallback +func imageWatchCallback(index C.int) { + v := watchCallbacks.Lookup(int(index)) + wi := v.(watchInstance) + wi.callback(wi.data) +} diff --git a/rbd/watchers_mimic_test.go b/rbd/watchers_mimic_test.go index 451cfef..0601af5 100644 --- a/rbd/watchers_mimic_test.go +++ b/rbd/watchers_mimic_test.go @@ -6,6 +6,7 @@ package rbd import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -115,3 +116,87 @@ func TestListWatchers(t *testing.T) { assert.Equal(t, 0, len(watchers)) }) } + +func TestWatch(t *testing.T) { + conn := radosConnect(t) + require.NotNil(t, conn) + defer conn.Shutdown() + + poolname := GetUUID() + err := conn.MakePool(poolname) + require.NoError(t, err) + defer conn.DeletePool(poolname) + + ioctx, err := conn.OpenIOContext(poolname) + require.NoError(t, err) + defer ioctx.Destroy() + + startSize := uint64(1 << 21) + name := GetUUID() + options := NewRbdImageOptions() + err = CreateImage(ioctx, name, startSize, options) + require.NoError(t, err) + defer func() { assert.NoError(t, RemoveImage(ioctx, name)) }() + + t.Run("imageNotOpen", func(t *testing.T) { + image, err := OpenImageReadOnly(ioctx, name, NoSnapshot) + require.NoError(t, err) + require.NotNil(t, image) + + err = image.Close() + require.NoError(t, err) + + _, err = image.UpdateWatch(func(d interface{}) { + }, nil) + assert.Equal(t, ErrImageNotOpen, err) + }) + + t.Run("simpleWatch", func(t *testing.T) { + image, err := OpenImage(ioctx, name, NoSnapshot) + require.NoError(t, err) + require.NotNil(t, image) + + defer func() { + assert.NoError(t, image.Close()) + }() + + cc := 0 + w, err := image.UpdateWatch(func(d interface{}) { + cc++ + }, nil) + assert.NoError(t, err) + defer func() { + assert.NoError(t, w.Unwatch()) + }() + + x := make(chan int) + defer close(x) + go func() { + for i := 0; i < 5; i++ { + i1, err := OpenImage(ioctx, name, NoSnapshot) + err = i1.Resize(startSize * uint64(1+i)) + assert.NoError(t, err) + err = i1.Close() + assert.NoError(t, err) + time.Sleep(5 * time.Millisecond) + } + x <- 0 + }() + <-x + + assert.Equal(t, 5, cc) + }) + + t.Run("badWatch", func(t *testing.T) { + w := &Watch{} + err := w.Unwatch() + assert.Error(t, err) + + i1, err := OpenImage(ioctx, name, NoSnapshot) + assert.NoError(t, err) + assert.NoError(t, i1.Close()) + w.image = i1 + err = w.Unwatch() + assert.Error(t, err) + }) +}