diff --git a/rbd/mirror.go b/rbd/mirror.go index 2a98dcf..c5a9b0d 100644 --- a/rbd/mirror.go +++ b/rbd/mirror.go @@ -389,6 +389,13 @@ func (image *Image) GetGlobalMirrorStatus() (GlobalMirrorImageStatus, error) { } defer C.rbd_mirror_image_global_status_cleanup(&s) + status := newGlobalMirrorImageStatus(&s) + return status, nil +} + +func newGlobalMirrorImageStatus( + s *C.rbd_mirror_image_global_status_t) GlobalMirrorImageStatus { + status := GlobalMirrorImageStatus{ Name: C.GoString(s.name), Info: convertMirrorImageInfo(&s.info), @@ -406,7 +413,7 @@ func (image *Image) GetGlobalMirrorStatus() (GlobalMirrorImageStatus, error) { Up: bool(ss.up), } } - return status, nil + return status } // CreateMirrorSnapshot creates a snapshot for image propagation to mirrors. @@ -583,3 +590,107 @@ func ImportMirrorPeerBootstrapToken( cToken) return getError(ret) } + +// GlobalMirrorImageIDAndStatus values contain an ID string for a RBD image +// and that image's GlobalMirrorImageStatus. +type GlobalMirrorImageIDAndStatus struct { + ID string + Status GlobalMirrorImageStatus +} + +func mirrorImageGlobalStatusList( + ioctx *rados.IOContext, start string, + results []GlobalMirrorImageIDAndStatus) (int, error) { + // this C function is treated like a "batch" iterator. Based on it's + // design it appears expected to call it multiple times to get + // the entire result. + cStart := C.CString(start) + defer C.free(unsafe.Pointer(cStart)) + + var ( + max = C.size_t(len(results)) + length = C.size_t(0) + ids = make([]*C.char, len(results)) + images = make([]C.rbd_mirror_image_global_status_t, len(results)) + ) + ret := C.rbd_mirror_image_global_status_list( + cephIoctx(ioctx), + cStart, + max, + (**C.char)(unsafe.Pointer(&ids[0])), + (*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])), + &length) + + for i := 0; i < int(length); i++ { + results[i].ID = C.GoString(ids[i]) + results[i].Status = newGlobalMirrorImageStatus(&images[0]) + } + C.rbd_mirror_image_global_status_list_cleanup( + (**C.char)(unsafe.Pointer(&ids[0])), + (*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])), + length) + return int(length), getError(ret) +} + +// statusIterBufSize is intentionally not a constant. The unit tests alter +// this value in order to get more code coverage w/o needing to create +// very many images. +var statusIterBufSize = 64 + +// MirrorImageGlobalStatusIter provide methods for iterating over all +// the GlobalMirrorImageIdAndStatus values in a pool. +type MirrorImageGlobalStatusIter struct { + ioctx *rados.IOContext + + buf []GlobalMirrorImageIDAndStatus + lastID string +} + +// NewMirrorImageGlobalStatusIter creates a new iterator type ready for use. +func NewMirrorImageGlobalStatusIter(ioctx *rados.IOContext) *MirrorImageGlobalStatusIter { + return &MirrorImageGlobalStatusIter{ + ioctx: ioctx, + } +} + +// Next fetches one GlobalMirrorImageIDAndStatus value or a nil value if +// iteration is exhausted. The error return will be non-nil if an underlying +// error fetching more values occurred. +func (iter *MirrorImageGlobalStatusIter) Next() (*GlobalMirrorImageIDAndStatus, error) { + if len(iter.buf) == 0 { + if err := iter.fetch(); err != nil { + return nil, err + } + } + if len(iter.buf) == 0 { + return nil, nil + } + item := iter.buf[0] + iter.lastID = item.ID + iter.buf = iter.buf[1:] + return &item, nil +} + +// Close terminates iteration regardless if iteration was completed and +// frees any associated resources. +func (iter *MirrorImageGlobalStatusIter) Close() error { + iter.buf = nil + iter.lastID = "" + return nil +} + +func (iter *MirrorImageGlobalStatusIter) fetch() error { + iter.buf = nil + items := make([]GlobalMirrorImageIDAndStatus, statusIterBufSize) + n, err := mirrorImageGlobalStatusList( + iter.ioctx, + iter.lastID, + items) + if err != nil { + return err + } + if n > 0 { + iter.buf = items[:n] + } + return nil +} diff --git a/rbd/mirror_test.go b/rbd/mirror_test.go index 7080412..5362be5 100644 --- a/rbd/mirror_test.go +++ b/rbd/mirror_test.go @@ -734,3 +734,85 @@ func TestMirrorBootstrapToken(t *testing.T) { assert.NoError(t, err) }) } + +func TestMirrorImageGlobalStatusIter(t *testing.T) { + defer func(x int) { + statusIterBufSize = x + }(statusIterBufSize) + // shrink the buffer size in order to trigger more of the + // retry logic in the iter type + statusIterBufSize = 4 + + conn := radosConnect(t) + poolName := GetUUID() + err := conn.MakePool(poolName) + require.NoError(t, err) + defer func() { + assert.NoError(t, conn.DeletePool(poolName)) + conn.Shutdown() + }() + + ioctx, err := conn.OpenIOContext(poolName) + assert.NoError(t, err) + defer func() { + ioctx.Destroy() + }() + + // enable per-image mirroring for this pool + err = SetMirrorMode(ioctx, MirrorModeImage) + require.NoError(t, err) + + imgName := GetUUID() + options := NewRbdImageOptions() + assert.NoError(t, options.SetUint64(ImageOptionOrder, uint64(testImageOrder))) + + for i := 0; i < 7; i++ { + name := fmt.Sprintf("%s%d", imgName, i) + err = CreateImage(ioctx, name, testImageSize, options) + require.NoError(t, err) + img, err := OpenImage(ioctx, name, NoSnapshot) + assert.NoError(t, err) + err = img.MirrorEnable(ImageMirrorModeSnapshot) + assert.NoError(t, err) + require.NoError(t, img.Close()) + } + + t.Run("ioctxNil", func(t *testing.T) { + iter := NewMirrorImageGlobalStatusIter(nil) + defer iter.Close() + assert.Panics(t, func() { + iter.Next() + }) + }) + + t.Run("getStatus", func(t *testing.T) { + lst := []*GlobalMirrorImageIDAndStatus{} + iter := NewMirrorImageGlobalStatusIter(ioctx) + for { + istatus, err := iter.Next() + assert.NoError(t, err) + if istatus == nil { + break + } + lst = append(lst, istatus) + } + assert.Len(t, lst, 7) + gms := lst[0].Status + assert.NoError(t, err) + assert.NotEqual(t, "", gms.Name) + assert.NotEqual(t, "", gms.Info.GlobalID) + assert.Equal(t, gms.Info.State, MirrorImageEnabled) + assert.Equal(t, gms.Info.Primary, false) + if assert.Len(t, gms.SiteStatuses, 1) { + ss := gms.SiteStatuses[0] + assert.Equal(t, "", ss.MirrorUUID) + assert.Equal(t, MirrorImageStatusStateUnknown, ss.State, ss.State) + assert.Equal(t, "status not found", ss.Description) + assert.Equal(t, int64(0), ss.LastUpdate) + assert.False(t, ss.Up) + ls, err := gms.LocalStatus() + assert.NoError(t, err) + assert.Equal(t, ss, ls) + } + }) +}