diff --git a/cephfs/admin/workflow_test.go b/cephfs/admin/workflow_test.go new file mode 100644 index 0000000..fccab72 --- /dev/null +++ b/cephfs/admin/workflow_test.go @@ -0,0 +1,261 @@ +// +build !luminous,!mimic + +package admin + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "testing" + "time" + + "github.com/ceph/go-ceph/cephfs" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var snapDir = ".snapshots" + +func fsConnect(t *testing.T) *cephfs.MountInfo { + mount, err := cephfs.CreateMount() + require.NoError(t, err) + require.NotNil(t, mount) + + err = mount.ReadDefaultConfigFile() + require.NoError(t, err) + err = mount.SetConfigOption("client_snapdir", snapDir) + require.NoError(t, err) + + timeout := time.After(time.Second * 5) + ch := make(chan error) + go func(mount *cephfs.MountInfo) { + ch <- mount.Mount() + }(mount) + select { + case err = <-ch: + case <-timeout: + err = fmt.Errorf("timed out waiting for connect") + } + require.NoError(t, err) + return mount +} + +func writeFile(t *testing.T, mount *cephfs.MountInfo, path string, content []byte) { + f1, err := mount.Open(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0700) + require.NoError(t, err) + defer f1.Close() + f1.WriteAt(content, 0) +} + +func readFile(t *testing.T, mount *cephfs.MountInfo, path string) []byte { + f1, err := mount.Open(path, os.O_RDONLY, 0) + require.NoError(t, err) + defer f1.Close() + b, err := ioutil.ReadAll(f1) + require.NoError(t, err) + return b +} + +func getSnapPath(t *testing.T, mount *cephfs.MountInfo, subvol, snapname string) string { + // I wish there was a nicer way to do this + snapPath := path.Join(subvol, snapDir, snapname) + _, err := mount.Statx(snapPath, cephfs.StatxBasicStats, 0) + if err == nil { + return snapPath + } + snapPath = path.Join(path.Dir(subvol), snapDir, snapname) + _, err = mount.Statx(snapPath, cephfs.StatxBasicStats, 0) + if err == nil { + return snapPath + } + t.Fatalf("did not find a snap path for %s", snapname) + return "" +} + +// TestWorkflow aims to do more than just exercise the API calls, but rather to +// also check that they do what they say on the tin. This means importing the +// cephfs lib in addition to the admin lib and reading and writing to the +// subvolume, snapshot, and clone as appropriate. +func TestWorkflow(t *testing.T) { + fsa := getFSAdmin(t) + volume := "cephfs" + group := "workflow1" + + // verify the volume we want to use + l, err := fsa.ListVolumes() + require.NoError(t, err) + require.Contains(t, l, volume) + + err = fsa.CreateSubVolumeGroup(volume, group, nil) + require.NoError(t, err) + defer func() { + err := fsa.RemoveSubVolumeGroup(volume, group) + assert.NoError(t, err) + }() + + subname := "files1" + svopts := &SubVolumeOptions{ + Mode: 0777, + Size: 2 * gibiByte, + } + err = fsa.CreateSubVolume(volume, group, subname, svopts) + require.NoError(t, err) + defer func() { + err := fsa.RemoveSubVolume(volume, group, subname) + assert.NoError(t, err) + }() + + // getpath + subPath, err := fsa.SubVolumePath(volume, group, subname) + require.NoError(t, err) + require.NotEqual(t, "", subPath) + + // connect to volume, cd to path (?) + mount := fsConnect(t) + defer func(mount *cephfs.MountInfo) { + assert.NoError(t, mount.Unmount()) + assert.NoError(t, mount.Release()) + }(mount) + + err = mount.ChangeDir(subPath) + require.NoError(t, err) + + // write some dirs & files + err = mount.MakeDir("content1", 0770) + require.NoError(t, err) + + writeFile(t, mount, "content1/robots.txt", + []byte("robbie\nr2\nbender\nclaptrap\n")) + writeFile(t, mount, "content1/songs.txt", + []byte("none of them knew they were robots\n")) + assert.NoError(t, mount.MakeDir("content1/emptyDir1", 0770)) + + err = mount.MakeDir("content2", 0770) + require.NoError(t, err) + + writeFile(t, mount, "content2/androids.txt", + []byte("data\nmarvin\n")) + assert.NoError(t, mount.MakeDir("content2/docs", 0770)) + writeFile(t, mount, "content2/docs/lore.txt", + []byte("Compendium\nLegend\nLore\nDeadweight\nSpirit at Aphelion\n")) + + assert.NoError(t, mount.SyncFs()) + + // take a snapshot + + snapname1 := "hotSpans1" + err = fsa.CreateSubVolumeSnapshot(volume, group, subname, snapname1) + require.NoError(t, err) + defer func() { + err := fsa.RemoveSubVolumeSnapshot(volume, group, subname, snapname1) + assert.NoError(t, err) + }() + + sinfo, err := fsa.SubVolumeSnapshotInfo(volume, group, subname, snapname1) + require.NoError(t, err) + require.NotNil(t, sinfo) + + // examine the snapshot + snapPath := getSnapPath(t, mount, subPath, snapname1) + require.NotEqual(t, "", snapPath) + + tempPath := path.Join(snapPath, "content1/robots.txt") + txt := readFile(t, mount, tempPath) + assert.Contains(t, string(txt), "robbie") + + // original subvol can be manipulated + err = mount.Rename("content2/docs/lore.txt", "content1/lore.txt") + assert.NoError(t, err) + writeFile(t, mount, "content1/songs.txt", + []byte("none of them knew they were robots\nars moriendi\n")) + + // snapshot may not be modified + err = mount.Rename( + path.Join(snapPath, "content2/docs/lore.txt"), + path.Join(snapPath, "content1/lore.txt")) + assert.Error(t, err) + txt = readFile(t, mount, path.Join(snapPath, "content2/docs/lore.txt")) + assert.Contains(t, string(txt), "Spirit") + + // make a clone + + clonename := "files2" + err = fsa.CloneSubVolumeSnapshot( + volume, group, subname, snapname1, clonename, + &CloneOptions{TargetGroup: group}) + var x NotProtectedError + if errors.As(err, &x) { + err = fsa.ProtectSubVolumeSnapshot(volume, group, subname, snapname1) + assert.NoError(t, err) + defer func() { + err := fsa.UnprotectSubVolumeSnapshot(volume, group, subname, snapname1) + assert.NoError(t, err) + }() + + err = fsa.CloneSubVolumeSnapshot( + volume, group, subname, snapname1, clonename, + &CloneOptions{TargetGroup: group}) + } + require.NoError(t, err) + defer func() { + err := fsa.ForceRemoveSubVolume(volume, group, clonename) + assert.NoError(t, err) + }() + + // wait for cloning to complete + for done := false; !done; { + status, err := fsa.CloneStatus(volume, group, clonename) + assert.NoError(t, err) + assert.NotNil(t, status) + switch status.State { + case ClonePending, CloneInProgress: + time.Sleep(5 * time.Millisecond) + case CloneComplete: + done = true + case CloneFailed: + t.Fatal("clone failed") + default: + t.Fatalf("invalid status.State: %q", status.State) + } + } + + // examine the clone + clonePath, err := fsa.SubVolumePath(volume, group, clonename) + require.NoError(t, err) + require.NotEqual(t, "", clonePath) + + txt = readFile(t, mount, path.Join(clonePath, "content1/robots.txt")) + assert.Contains(t, string(txt), "robbie") + + // clones are r/w + err = mount.Rename( + path.Join(clonePath, "content2/docs/lore.txt"), + path.Join(clonePath, "content1/lore.txt")) + assert.NoError(t, err) + txt = readFile(t, mount, path.Join(clonePath, "content1/lore.txt")) + assert.Contains(t, string(txt), "Spirit") + + // it reflects what was in the snapshot + txt = readFile(t, mount, path.Join(clonePath, "content1/songs.txt")) + assert.Contains(t, string(txt), "robots") + assert.NotContains(t, string(txt), "moriendi") + + // ... with it's own independent data + writeFile(t, mount, path.Join(clonePath, "content1/songs.txt"), + []byte("none of them knew they were robots\nsweet charity\n")) + + // (orig) + txt = readFile(t, mount, "content1/songs.txt") + assert.Contains(t, string(txt), "robots") + assert.Contains(t, string(txt), "moriendi") + assert.NotContains(t, string(txt), "charity") + + // (clone) + txt = readFile(t, mount, path.Join(clonePath, "content1/songs.txt")) + assert.Contains(t, string(txt), "robots") + assert.NotContains(t, string(txt), "moriendi") + assert.Contains(t, string(txt), "charity") +}