rbd: add an initial set of basic mirroring functions

Add functions:
* SetMirrorMode implementing rbd_mirror_mode_set
* GetMirrorMode implementing rbd_mirror_mode_get
* MirrorEnable implementing rbd_mirror_image_enable2
* MirrorDisable implementing rbd_mirror_image_disable
* MirrorPromote implementing rbd_mirror_image_promote
* MirrorDemote implementing rbd_mirror_image_demote
* MirrorResync implementing rbd_mirror_image_resync
* MirrorInstanceId implementing rbd_mirror_image_get_instance_id

For now, these mirroring related functions are only supported on octopus
builds. Right now the demand is for snapshot based mirroring, which is
new in octopus. We can always relax this in the future and add the
necessary support for nautilus, for journaling only based mirroring.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2020-11-03 14:44:23 -05:00 committed by John Mulligan
parent a40638e947
commit 1e3b9a2cd9
2 changed files with 351 additions and 0 deletions

167
rbd/mirror.go Normal file
View File

@ -0,0 +1,167 @@
// +build !nautilus
// Initially, we're only providing mirroring related functions for octopus as
// that version of ceph deprecated a number of the functions in nautilus. If
// you need mirroring on an earlier supported version of ceph please file an
// issue in our tracker.
package rbd
// #cgo LDFLAGS: -lrbd
// #include <stdlib.h>
// #include <rbd/librbd.h>
import "C"
import (
"unsafe"
"github.com/ceph/go-ceph/internal/retry"
"github.com/ceph/go-ceph/rados"
)
// MirrorMode is used to indicate an approach used for RBD mirroring.
type MirrorMode int64
const (
// MirrorModeDisabled disables mirroring.
MirrorModeDisabled = MirrorMode(C.RBD_MIRROR_MODE_DISABLED)
// MirrorModeImage enables mirroring on a per-image basis.
MirrorModeImage = MirrorMode(C.RBD_MIRROR_MODE_IMAGE)
// MirrorModePool enables mirroring on all journaled images.
MirrorModePool = MirrorMode(C.RBD_MIRROR_MODE_POOL)
)
// ImageMirrorMode is used to indicate the mirroring approach for an RBD image.
type ImageMirrorMode int64
const (
// ImageMirrorModeJournal uses journaling to propagate RBD images between
// ceph clusters.
ImageMirrorModeJournal = ImageMirrorMode(C.RBD_MIRROR_IMAGE_MODE_JOURNAL)
// ImageMirrorModeSnapshot uses snapshot RDB images to propagate images
// between ceph clusters.
ImageMirrorModeSnapshot = ImageMirrorMode(C.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)
)
// SetMirrorMode is used to enable or disable pool level mirroring with either
// an automatic or per-image behavior.
//
// Implements:
// int rbd_mirror_mode_set(rados_ioctx_t io_ctx,
// rbd_mirror_mode_t mirror_mode);
func SetMirrorMode(ioctx *rados.IOContext, mode MirrorMode) error {
ret := C.rbd_mirror_mode_set(
cephIoctx(ioctx),
C.rbd_mirror_mode_t(mode))
return getError(ret)
}
// GetMirrorMode is used to fetch the current mirroring mode for a pool.
//
// Implements:
// int rbd_mirror_mode_get(rados_ioctx_t io_ctx,
// rbd_mirror_mode_t *mirror_mode);
func GetMirrorMode(ioctx *rados.IOContext) (MirrorMode, error) {
var mode C.rbd_mirror_mode_t
ret := C.rbd_mirror_mode_get(
cephIoctx(ioctx),
&mode)
if err := getError(ret); err != nil {
return MirrorModeDisabled, err
}
return MirrorMode(mode), nil
}
// MirrorEnable will enable mirroring for an image using the specified mode.
//
// Implements:
// int rbd_mirror_image_enable2(rbd_image_t image,
// rbd_mirror_image_mode_t mode);
func (image *Image) MirrorEnable(mode ImageMirrorMode) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_mirror_image_enable2(image.image, C.rbd_mirror_image_mode_t(mode))
return getError(ret)
}
// MirrorDisable will disable mirroring for the image.
//
// Implements:
// int rbd_mirror_image_disable(rbd_image_t image, bool force);
func (image *Image) MirrorDisable(force bool) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_mirror_image_disable(image.image, C.bool(force))
return getError(ret)
}
// MirrorPromote will promote the image to primary status.
//
// Implements:
// int rbd_mirror_image_promote(rbd_image_t image, bool force);
func (image *Image) MirrorPromote(force bool) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_mirror_image_promote(image.image, C.bool(force))
return getError(ret)
}
// MirrorDemote will demote the image to secondary status.
//
// Implements:
// int rbd_mirror_image_demote(rbd_image_t image);
func (image *Image) MirrorDemote() error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_mirror_image_demote(image.image)
return getError(ret)
}
// MirrorResync is used to manually resolve split-brain status by triggering
// resynchronization.
//
// Implements:
// int rbd_mirror_image_resync(rbd_image_t image);
func (image *Image) MirrorResync() error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
ret := C.rbd_mirror_image_resync(image.image)
return getError(ret)
}
// MirrorInstanceID returns a string naming the instance id for the image.
//
// Implements:
// int rbd_mirror_image_get_instance_id(rbd_image_t image,
// char *instance_id,
// size_t *id_max_length);
func (image *Image) MirrorInstanceID() (string, error) {
if err := image.validate(imageIsOpen); err != nil {
return "", err
}
var (
err error
buf []byte
cSize C.size_t
)
retry.WithSizes(1024, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
buf = make([]byte, cSize)
ret := C.rbd_mirror_image_get_instance_id(
image.image,
(*C.char)(unsafe.Pointer(&buf[0])),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return "", err
}
return string(buf[:cSize]), nil
}

184
rbd/mirror_test.go Normal file
View File

@ -0,0 +1,184 @@
// +build !nautilus
// Initially, we're only providing mirroring related functions for octopus as
// that version of ceph deprecated a number of the functions in nautilus. If
// you need mirroring on an earlier supported version of ceph please file an
// issue in our tracker.
package rbd
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetMirrorMode(t *testing.T) {
conn := radosConnect(t)
poolName := GetUUID()
err := conn.MakePool(poolName)
require.NoError(t, err)
defer func() {
assert.NoError(t, conn.DeletePool(poolName))
conn.Shutdown()
}()
ioctx, err := conn.OpenIOContext(poolName)
assert.NoError(t, err)
defer func() {
ioctx.Destroy()
}()
t.Run("mirrorModeDisabled", func(t *testing.T) {
m, err := GetMirrorMode(ioctx)
assert.NoError(t, err)
assert.Equal(t, m, MirrorModeDisabled)
})
t.Run("mirrorModeEnabled", func(t *testing.T) {
err = SetMirrorMode(ioctx, MirrorModeImage)
require.NoError(t, err)
m, err := GetMirrorMode(ioctx)
assert.NoError(t, err)
assert.Equal(t, m, MirrorModeImage)
})
t.Run("ioctxNil", func(t *testing.T) {
assert.Panics(t, func() {
GetMirrorMode(nil)
})
})
}
func TestMirroring(t *testing.T) {
conn := radosConnect(t)
poolName := GetUUID()
err := conn.MakePool(poolName)
require.NoError(t, err)
defer func() {
assert.NoError(t, conn.DeletePool(poolName))
conn.Shutdown()
}()
ioctx, err := conn.OpenIOContext(poolName)
assert.NoError(t, err)
defer func() {
ioctx.Destroy()
}()
// verify that mirroring is not enabled on this new pool
m, err := GetMirrorMode(ioctx)
assert.NoError(t, err)
assert.Equal(t, m, MirrorModeDisabled)
// enable per-image mirroring for this pool
err = SetMirrorMode(ioctx, MirrorModeImage)
require.NoError(t, err)
name1 := GetUUID()
options := NewRbdImageOptions()
assert.NoError(t,
options.SetUint64(ImageOptionOrder, uint64(testImageOrder)))
err = CreateImage(ioctx, name1, testImageSize, options)
require.NoError(t, err)
t.Run("enableDisable", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, img.Close())
}()
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.NoError(t, err)
err = img.MirrorDisable(false)
assert.NoError(t, err)
})
t.Run("enableDisableInvalid", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
assert.NoError(t, img.Close())
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.Error(t, err)
err = img.MirrorDisable(false)
assert.Error(t, err)
})
t.Run("promoteDemote", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, img.Close())
}()
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.NoError(t, err)
err = img.MirrorDemote()
assert.NoError(t, err)
err = img.MirrorPromote(false)
assert.NoError(t, err)
err = img.MirrorDisable(false)
assert.NoError(t, err)
})
t.Run("promoteDemoteInvalid", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
assert.NoError(t, img.Close())
err = img.MirrorDemote()
assert.Error(t, err)
err = img.MirrorPromote(false)
assert.Error(t, err)
})
t.Run("resync", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, img.Close())
}()
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.NoError(t, err)
err = img.MirrorDemote()
assert.NoError(t, err)
err = img.MirrorResync()
assert.NoError(t, err)
err = img.MirrorDisable(true)
assert.NoError(t, err)
})
t.Run("resyncInvalid", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
assert.NoError(t, img.Close())
err = img.MirrorResync()
assert.Error(t, err)
})
t.Run("instanceId", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, img.Close())
}()
err = img.MirrorEnable(ImageMirrorModeSnapshot)
assert.NoError(t, err)
miid, err := img.MirrorInstanceID()
// this is not currently testable for the "success" case
// see also the ceph tree where nothing is asserted except
// that the error is raised.
// TODO(?): figure out how to test this
assert.Error(t, err)
assert.Equal(t, "", miid)
err = img.MirrorDisable(false)
assert.NoError(t, err)
})
t.Run("instanceIdInvalid", func(t *testing.T) {
img, err := OpenImage(ioctx, name1, NoSnapshot)
assert.NoError(t, err)
assert.NoError(t, img.Close())
_, err = img.MirrorInstanceID()
assert.Error(t, err)
})
}