go-ceph/cephfs/admin/workflow_test.go
guoguangwu 448e4b7f5c cephfs: remove refs to deprecated io/ioutil
Signed-off-by: guoguangwu <guoguangwu@magic-shield.com>
2023-07-08 14:26:48 +00:00

271 lines
7.4 KiB
Go

package admin
import (
"errors"
"fmt"
"io"
"os"
pathpkg "path"
"testing"
"time"
"github.com/ceph/go-ceph/cephfs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var snapDir = ".snapshots"
func fsConnect(t *testing.T, configFile string) *cephfs.MountInfo {
mount, err := cephfs.CreateMount()
require.NoError(t, err)
require.NotNil(t, mount)
if configFile == "" {
err = mount.ReadDefaultConfigFile()
require.NoError(t, err)
} else {
err = mount.ReadConfigFile(configFile)
require.NoError(t, err)
}
err = mount.SetConfigOption("client_snapdir", snapDir)
require.NoError(t, err)
timeout := time.After(time.Second * 5)
ch := make(chan error)
go func(mount *cephfs.MountInfo) {
ch <- mount.Mount()
}(mount)
select {
case err = <-ch:
case <-timeout:
err = fmt.Errorf("timed out waiting for connect")
}
require.NoError(t, err)
return mount
}
func writeFile(t *testing.T, mount *cephfs.MountInfo, path string, content []byte) {
f1, err := mount.Open(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700)
require.NoError(t, err)
defer f1.Close()
f1.WriteAt(content, 0)
}
func readFile(t *testing.T, mount *cephfs.MountInfo, path string) []byte {
f1, err := mount.Open(path, os.O_RDONLY, 0)
require.NoError(t, err)
defer f1.Close()
b, err := io.ReadAll(f1)
require.NoError(t, err)
return b
}
func getSnapPath(t *testing.T, mount *cephfs.MountInfo, subvol, snapname string) string {
// I wish there was a nicer way to do this
snapPath := pathpkg.Join(subvol, snapDir, snapname)
_, err := mount.Statx(snapPath, cephfs.StatxBasicStats, 0)
if err == nil {
return snapPath
}
snapPath = pathpkg.Join(
pathpkg.Dir(subvol),
snapDir,
snapname,
pathpkg.Base(subvol))
_, err = mount.Statx(snapPath, cephfs.StatxBasicStats, 0)
if err == nil {
return snapPath
}
t.Fatalf("did not find a snap path for %s", snapname)
return ""
}
// TestWorkflow aims to do more than just exercise the API calls, but rather to
// also check that they do what they say on the tin. This means importing the
// cephfs lib in addition to the admin lib and reading and writing to the
// subvolume, snapshot, and clone as appropriate.
func TestWorkflow(t *testing.T) {
fsa := getFSAdmin(t)
volume := "cephfs"
group := "workflow1"
// verify the volume we want to use
l, err := fsa.ListVolumes()
require.NoError(t, err)
require.Contains(t, l, volume)
err = fsa.CreateSubVolumeGroup(volume, group, nil)
require.NoError(t, err)
defer func() {
err := fsa.RemoveSubVolumeGroup(volume, group)
assert.NoError(t, err)
}()
subname := "files1"
svopts := &SubVolumeOptions{
Mode: 0777,
Size: 2 * gibiByte,
}
err = fsa.CreateSubVolume(volume, group, subname, svopts)
require.NoError(t, err)
defer func() {
err := fsa.RemoveSubVolume(volume, group, subname)
assert.NoError(t, err)
}()
// getpath
subPath, err := fsa.SubVolumePath(volume, group, subname)
require.NoError(t, err)
require.NotEqual(t, "", subPath)
// connect to volume, cd to path (?)
mount := fsConnect(t, "")
defer func(mount *cephfs.MountInfo) {
assert.NoError(t, mount.Unmount())
assert.NoError(t, mount.Release())
}(mount)
err = mount.ChangeDir(subPath)
require.NoError(t, err)
// write some dirs & files
err = mount.MakeDir("content1", 0770)
require.NoError(t, err)
writeFile(t, mount, "content1/robots.txt",
[]byte("robbie\nr2\nbender\nclaptrap\n"))
writeFile(t, mount, "content1/songs.txt",
[]byte("none of them knew they were robots\n"))
assert.NoError(t, mount.MakeDir("content1/emptyDir1", 0770))
err = mount.MakeDir("content2", 0770)
require.NoError(t, err)
writeFile(t, mount, "content2/androids.txt",
[]byte("data\nmarvin\n"))
assert.NoError(t, mount.MakeDir("content2/docs", 0770))
writeFile(t, mount, "content2/docs/lore.txt",
[]byte("Compendium\nLegend\nLore\nDeadweight\nSpirit at Aphelion\n"))
assert.NoError(t, mount.SyncFs())
// take a snapshot
snapname1 := "hotSpans1"
err = fsa.CreateSubVolumeSnapshot(volume, group, subname, snapname1)
require.NoError(t, err)
defer func() {
err := fsa.RemoveSubVolumeSnapshot(volume, group, subname, snapname1)
assert.NoError(t, err)
}()
sinfo, err := fsa.SubVolumeSnapshotInfo(volume, group, subname, snapname1)
require.NoError(t, err)
require.NotNil(t, sinfo)
time.Sleep(500 * time.Millisecond) // is there a race?
// examine the snapshot
snapPath := getSnapPath(t, mount, subPath, snapname1)
require.NotEqual(t, "", snapPath)
tempPath := pathpkg.Join(snapPath, "content1/robots.txt")
txt := readFile(t, mount, tempPath)
assert.Contains(t, string(txt), "robbie")
// original subvol can be manipulated
err = mount.Rename("content2/docs/lore.txt", "content1/lore.txt")
assert.NoError(t, err)
writeFile(t, mount, "content1/songs.txt",
[]byte("none of them knew they were robots\nars moriendi\n"))
// snapshot may not be modified
err = mount.Rename(
pathpkg.Join(snapPath, "content2/docs/lore.txt"),
pathpkg.Join(snapPath, "content1/lore.txt"))
assert.Error(t, err)
txt = readFile(t, mount, pathpkg.Join(snapPath, "content2/docs/lore.txt"))
assert.Contains(t, string(txt), "Spirit")
// make a clone
clonename := "files2"
err = fsa.CloneSubVolumeSnapshot(
volume, group, subname, snapname1, clonename,
&CloneOptions{TargetGroup: group})
var x NotProtectedError
if errors.As(err, &x) {
err = fsa.ProtectSubVolumeSnapshot(volume, group, subname, snapname1)
assert.NoError(t, err)
defer func() {
err := fsa.UnprotectSubVolumeSnapshot(volume, group, subname, snapname1)
assert.NoError(t, err)
}()
err = fsa.CloneSubVolumeSnapshot(
volume, group, subname, snapname1, clonename,
&CloneOptions{TargetGroup: group})
}
require.NoError(t, err)
defer func() {
err := fsa.ForceRemoveSubVolume(volume, group, clonename)
assert.NoError(t, err)
}()
// wait for cloning to complete
for done := false; !done; {
status, err := fsa.CloneStatus(volume, group, clonename)
assert.NoError(t, err)
assert.NotNil(t, status)
switch status.State {
case ClonePending, CloneInProgress:
time.Sleep(5 * time.Millisecond)
case CloneComplete:
done = true
case CloneFailed:
t.Fatal("clone failed")
default:
t.Fatalf("invalid status.State: %q", status.State)
}
}
// examine the clone
clonePath, err := fsa.SubVolumePath(volume, group, clonename)
require.NoError(t, err)
require.NotEqual(t, "", clonePath)
txt = readFile(t, mount, pathpkg.Join(clonePath, "content1/robots.txt"))
assert.Contains(t, string(txt), "robbie")
// clones are r/w
err = mount.Rename(
pathpkg.Join(clonePath, "content2/docs/lore.txt"),
pathpkg.Join(clonePath, "content1/lore.txt"))
assert.NoError(t, err)
txt = readFile(t, mount, pathpkg.Join(clonePath, "content1/lore.txt"))
assert.Contains(t, string(txt), "Spirit")
// it reflects what was in the snapshot
txt = readFile(t, mount, pathpkg.Join(clonePath, "content1/songs.txt"))
assert.Contains(t, string(txt), "robots")
assert.NotContains(t, string(txt), "moriendi")
// ... with it's own independent data
writeFile(t, mount, pathpkg.Join(clonePath, "content1/songs.txt"),
[]byte("none of them knew they were robots\nsweet charity\n"))
// (orig)
txt = readFile(t, mount, "content1/songs.txt")
assert.Contains(t, string(txt), "robots")
assert.Contains(t, string(txt), "moriendi")
assert.NotContains(t, string(txt), "charity")
// (clone)
txt = readFile(t, mount, pathpkg.Join(clonePath, "content1/songs.txt"))
assert.Contains(t, string(txt), "robots")
assert.NotContains(t, string(txt), "moriendi")
assert.Contains(t, string(txt), "charity")
}