go-ceph/cephfs/admin/mirror_workflow_test.go
John Mulligan 6f7879428f cephfs admin: add additional delay when module is enabled
There was continued flakiness when the tests that enabled the cephfs
mirroring module were enabled. This change adds another 200 ms delay
to hopefully cover the time that might occur as the mgr has a new
module enabled and restart. But unfortunately this is still a bit of
a wild guess.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
2022-12-08 15:12:38 +00:00

256 lines
6.3 KiB
Go

//go:build !nautilus && !octopus
// +build !nautilus,!octopus
package admin
import (
"errors"
"os"
pth "path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/ceph/go-ceph/cephfs"
"github.com/ceph/go-ceph/common/admin/manager"
)
func mirrorConfig() string {
return os.Getenv("MIRROR_CONF")
}
const (
noForce = false
mirrorClient = "client.mirror_remote"
)
func waitForMirroring(t *testing.T, fsa *FSAdmin) {
mgradmin := manager.NewFromConn(fsa.conn)
for i := 0; i < 30; i++ {
modinfo, err := mgradmin.ListModules()
require.NoError(t, err)
for _, emod := range modinfo.EnabledModules {
if emod == "mirroring" {
// give additional time for mgr to restart(?)
time.Sleep(200 * time.Millisecond)
return
}
}
time.Sleep(100 * time.Millisecond)
}
t.Fatalf("timed out waiting for mirroring module")
}
func TestMirroring(t *testing.T) {
if mirrorConfig() == "" {
t.Skip("no mirror config available")
}
fsa1 := getFSAdmin(t)
fsname := "cephfs"
require.NotNil(t, fsa1.conn)
err := fsa1.EnableMirroringModule(noForce)
assert.NoError(t, err)
defer func() {
err := fsa1.DisableMirroringModule()
assert.NoError(t, err)
}()
require.NoError(t, err)
waitForMirroring(t, fsa1)
smadmin1 := fsa1.SnapshotMirror()
err = smadmin1.Enable(fsname)
require.NoError(t, err)
defer func() {
err := smadmin1.Disable(fsname)
require.NoError(t, err)
}()
fsa2 := newFSAdmin(t, mirrorConfig())
err = fsa2.EnableMirroringModule(noForce)
require.NoError(t, err)
defer func() {
err := fsa2.DisableMirroringModule()
assert.NoError(t, err)
}()
waitForMirroring(t, fsa2)
smadmin2 := fsa2.SnapshotMirror()
err = smadmin2.Enable(fsname)
require.NoError(t, err)
defer func() {
err := smadmin2.Disable(fsname)
require.NoError(t, err)
}()
// from https://docs.ceph.com/en/pacific/dev/cephfs-mirroring/
// "Peer bootstrap involves creating a bootstrap token on the peer cluster"
// and "Import the bootstrap token in the primary cluster"
token, err := smadmin2.CreatePeerBootstrapToken(fsname, mirrorClient, "ceph_b")
require.NoError(t, err)
err = smadmin1.ImportPeerBoostrapToken(fsname, token)
require.NoError(t, err)
// we need a path to mirror
path := "/wonderland"
mount1 := fsConnect(t, "")
defer func(mount *cephfs.MountInfo) {
assert.NoError(t, mount.Unmount())
assert.NoError(t, mount.Release())
}(mount1)
mount2 := fsConnect(t, mirrorConfig())
defer func(mount *cephfs.MountInfo) {
assert.NoError(t, mount.Unmount())
assert.NoError(t, mount.Release())
}(mount2)
err = mount1.MakeDir(path, 0770)
require.NoError(t, err)
defer func() {
err = mount2.ChangeDir("/")
assert.NoError(t, err)
err = mount1.RemoveDir(path)
assert.NoError(t, err)
}()
err = mount2.MakeDir(path, 0770)
require.NoError(t, err)
defer func() {
err = mount2.ChangeDir("/")
assert.NoError(t, err)
err = mount2.RemoveDir(path)
assert.NoError(t, err)
}()
err = smadmin1.Add(fsname, path)
require.NoError(t, err)
err = mount1.ChangeDir(path)
require.NoError(t, err)
// write some dirs & files
err = mount1.MakeDir("drink_me", 0770)
require.NoError(t, err)
err = mount1.MakeDir("eat_me", 0770)
require.NoError(t, err)
writeFile(t, mount1, "drink_me/bottle1.txt",
[]byte("magic potions #1\n"))
snapname1 := "alice"
err = mount1.MakeDir(pth.Join(snapDir, snapname1), 0700)
require.NoError(t, err)
defer func() {
err := mount1.RemoveDir(pth.Join(snapDir, snapname1))
assert.NoError(t, err)
err = mount2.RemoveDir(pth.Join(snapDir, snapname1))
assert.NoError(t, err)
}()
err = mount2.ChangeDir(path)
require.NoError(t, err)
// wait a bit for the snapshot to propagate and the dirs to be created on
// the remote fs.
for i := 0; i < 60; i++ {
time.Sleep(500 * time.Millisecond)
_, err1 := mount2.Statx("drink_me", cephfs.StatxBasicStats, 0)
_, err2 := mount2.Statx("eat_me", cephfs.StatxBasicStats, 0)
if err1 == nil && err2 == nil {
break
}
}
waitforpeers:
for i := 0; i < 60; i++ {
time.Sleep(500 * time.Millisecond)
dstatus, err := smadmin1.DaemonStatus(fsname)
assert.NoError(t, err)
for _, dsinfo := range dstatus {
for _, fsinfo := range dsinfo.FileSystems {
if len(fsinfo.Peers) > 0 {
break waitforpeers
}
}
}
}
p, err := smadmin1.PeerList(fsname)
assert.NoError(t, err)
assert.Len(t, p, 1)
for _, peer := range p {
assert.Equal(t, "cephfs", peer.FSName)
}
stx, err := mount2.Statx("drink_me", cephfs.StatxBasicStats, 0)
if assert.NoError(t, err) {
assert.Equal(t, uint16(0040000), stx.Mode&0040000) // is dir?
}
stx, err = mount2.Statx("eat_me", cephfs.StatxBasicStats, 0)
if assert.NoError(t, err) {
assert.Equal(t, uint16(0040000), stx.Mode&0040000) // is dir?
}
stx, err = mount2.Statx("drink_me/bottle1.txt", cephfs.StatxBasicStats, 0)
if assert.NoError(t, err) {
assert.Equal(t, uint16(0100000), stx.Mode&0100000) // is reg?
assert.Equal(t, uint64(17), stx.Size)
}
data := readFile(t, mount2, "drink_me/bottle1.txt")
assert.Equal(t, "magic potions #1\n", string(data))
err = mount1.Unlink("drink_me/bottle1.txt")
require.NoError(t, err)
err = mount1.RemoveDir("drink_me")
require.NoError(t, err)
err = mount1.RemoveDir("eat_me")
require.NoError(t, err)
snapname2 := "rabbit"
err = mount1.MakeDir(pth.Join(snapDir, snapname2), 0700)
require.NoError(t, err)
defer func() {
err := mount1.RemoveDir(pth.Join(snapDir, snapname2))
assert.NoError(t, err)
err = mount2.RemoveDir(pth.Join(snapDir, snapname2))
assert.NoError(t, err)
}()
// wait a bit for the snapshot to propagate and the dirs to be removed on
// the remote fs.
for i := 0; i < 60; i++ {
time.Sleep(500 * time.Millisecond)
_, err1 := mount2.Statx("drink_me", cephfs.StatxBasicStats, 0)
_, err2 := mount2.Statx("eat_me", cephfs.StatxBasicStats, 0)
if err1 != nil && err2 != nil {
break
}
}
_, err = mount2.Statx("drink_me", cephfs.StatxBasicStats, 0)
if assert.Error(t, err) {
var ec errorWithCode
if assert.True(t, errors.As(err, &ec)) {
assert.Equal(t, -2, ec.ErrorCode())
}
}
_, err = mount2.Statx("eat_me", cephfs.StatxBasicStats, 0)
if assert.Error(t, err) {
var ec errorWithCode
if assert.True(t, errors.As(err, &ec)) {
assert.Equal(t, -2, ec.ErrorCode())
}
}
err = smadmin1.Remove(fsname, path)
assert.NoError(t, err)
}
type errorWithCode interface {
ErrorCode() int
}