rbd: add SetMirrorSiteName and GetMirrorSiteName functions

* Add SetMirrorSiteName implementing rbd_mirror_site_name_set
* Add GetMirrorSiteName implementing rbd_mirror_site_name_get

These functions are used to set the name of the site (for an entire
cluster) for rbd mirroring purposes. It's a bit strange that it's
part of rdb and not rados, but such is life.

Tests included.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2021-04-29 17:05:43 -04:00 committed by mergify[bot]
parent 1c719b199e
commit 97eefba287
2 changed files with 100 additions and 0 deletions

View File

@ -467,3 +467,50 @@ func MirrorImageStatusSummary(
}
return m, nil
}
// SetMirrorSiteName sets the site name, used for rbd mirroring, for the ceph
// cluster associated with the provided rados connection.
//
// Implements:
// int rbd_mirror_site_name_set(rados_t cluster,
// const char *name);
func SetMirrorSiteName(conn *rados.Conn, name string) error {
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))
ret := C.rbd_mirror_site_name_set(
C.rados_t(conn.Cluster()),
cName)
return getError(ret)
}
// GetMirrorSiteName gets the site name, used for rbd mirroring, for the ceph
// cluster associated with the provided rados connection.
//
// Implements:
// int rbd_mirror_site_name_get(rados_t cluster,
// char *name, size_t *max_len);
func GetMirrorSiteName(conn *rados.Conn) (string, error) {
var (
cluster = C.rados_t(conn.Cluster())
err error
buf []byte
cSize C.size_t
)
retry.WithSizes(1024, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
buf = make([]byte, cSize)
ret := C.rbd_mirror_site_name_get(
cluster,
(*C.char)(unsafe.Pointer(&buf[0])),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return "", err
}
// the C code sets the size including null byte
return string(buf[:cSize-1]), nil
}

View File

@ -587,3 +587,56 @@ func testMirrorImageStatusSummaryMirroredPool(t *testing.T) {
assert.GreaterOrEqual(t, ssum[MirrorImageStatusStateUnknown], uint(1))
}
}
func TestMirrorSiteName(t *testing.T) {
t.Run("connNilGet", func(t *testing.T) {
assert.Panics(t, func() {
GetMirrorSiteName(nil)
})
})
t.Run("connNilSet", func(t *testing.T) {
assert.Panics(t, func() {
SetMirrorSiteName(nil, "foo")
})
})
t.Run("simple", func(t *testing.T) {
conn := radosConnect(t)
defer conn.Shutdown()
err := SetMirrorSiteName(conn, "rbd4eva")
assert.NoError(t, err)
n, err := GetMirrorSiteName(conn)
assert.NoError(t, err)
assert.Equal(t, "rbd4eva", n)
err = SetMirrorSiteName(conn, "ceph_a")
assert.NoError(t, err)
n, err = GetMirrorSiteName(conn)
assert.NoError(t, err)
assert.Equal(t, "ceph_a", n)
})
t.Run("twoCluster", func(t *testing.T) {
mconfig := mirrorConfig()
if mconfig == "" {
t.Skip("no mirror config env var set")
}
conn1 := radosConnect(t)
defer conn1.Shutdown()
conn2 := radosConnectConfig(t, mconfig)
defer conn2.Shutdown()
err := SetMirrorSiteName(conn1, "cluster_a")
assert.NoError(t, err)
err = SetMirrorSiteName(conn2, "cluster_b")
assert.NoError(t, err)
// verify the two conns are properly separate
n1, err := GetMirrorSiteName(conn1)
assert.NoError(t, err)
assert.Equal(t, "cluster_a", n1)
n2, err := GetMirrorSiteName(conn2)
assert.NoError(t, err)
assert.Equal(t, "cluster_b", n2)
})
}