cephfs admin: add EnumerateVolumes function

The EnumerateVolumes function returns a list of Name & ID pairs,
to identify the file systems in the cluster. The enumeration is based
on `ceph fs dump` like JSON, but for implementation only. The dump
feature itself is not currently exported in a raw form.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2020-09-01 11:12:45 -04:00 committed by John Mulligan
parent 3afac0b0b8
commit c4e1292da1
2 changed files with 215 additions and 1 deletions

View File

@ -2,7 +2,10 @@
package admin
var listVolumesCmd = []byte(`{"prefix":"fs volume ls"}`)
var (
listVolumesCmd = []byte(`{"prefix":"fs volume ls"}`)
dumpVolumesCmd = []byte(`{"prefix":"fs dump","format":"json"}`)
)
// ListVolumes return a list of volumes in this Ceph cluster.
//
@ -12,3 +15,55 @@ func (fsa *FSAdmin) ListVolumes() ([]string, error) {
r, s, err := fsa.rawMgrCommand(listVolumesCmd)
return parseListNames(r, s, err)
}
// VolumeIdent contains a pair of file system identifying values: the volume
// name and the volume ID.
type VolumeIdent struct {
Name string
ID int64
}
type cephFileSystem struct {
ID int64 `json:"id"`
MDSMap struct {
FSName string `json:"fs_name"`
} `json:"mdsmap"`
}
type fsDump struct {
FileSystems []cephFileSystem `json:"filesystems"`
}
const (
dumpOkPrefix = "dumped fsmap epoch"
dumpOkLen = len(dumpOkPrefix)
)
func parseDumpToIdents(r []byte, s string, err error) ([]VolumeIdent, error) {
if len(s) >= dumpOkLen && s[:dumpOkLen] == dumpOkPrefix {
// Unhelpfully, ceph drops a status string on success responses for this
// call. this hacks around that by ignoring its typical prefix
s = ""
}
var dump fsDump
if err := unmarshalResponseJSON(r, s, err, &dump); err != nil {
return nil, err
}
// copy the dump json into the simpler enumeration list
idents := make([]VolumeIdent, len(dump.FileSystems))
for i := range dump.FileSystems {
idents[i].ID = dump.FileSystems[i].ID
idents[i].Name = dump.FileSystems[i].MDSMap.FSName
}
return idents, nil
}
// EnumerateVolumes returns a list of volume-name volume-id pairs.
func (fsa *FSAdmin) EnumerateVolumes() ([]VolumeIdent, error) {
// We base our enumeration on the ceph fs dump json. This may not be the
// only way to do it, but it's the only one I know of currently. Because of
// this and to keep our initial implementation simple, we expose our own
// simplified type only, rather do a partial implementation of dump.
r, s, err := fsa.rawMonCommand(dumpVolumesCmd)
return parseDumpToIdents(r, s, err)
}

View File

@ -3,6 +3,7 @@
package admin
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
@ -16,3 +17,161 @@ func TestListVolumes(t *testing.T) {
assert.Len(t, vl, 1)
assert.Equal(t, "cephfs", vl[0])
}
func TestEnumerateVolumes(t *testing.T) {
fsa := getFSAdmin(t)
ve, err := fsa.EnumerateVolumes()
assert.NoError(t, err)
if assert.Len(t, ve, 1) {
assert.Equal(t, "cephfs", ve[0].Name)
assert.Equal(t, int64(1), ve[0].ID)
}
}
// note: some of these dumps are simplified for testing purposes if we add
// general dump support these samples may need to be expanded upon.
var sampleDump1 = []byte(`
{
"epoch": 5,
"default_fscid": 1,
"filesystems": [
{
"mdsmap": {
"epoch": 5,
"flags": 18,
"ever_allowed_features": 0,
"explicitly_allowed_features": 0,
"created": "2020-08-31T18:37:34.657633+0000",
"modified": "2020-08-31T18:37:36.700989+0000",
"tableserver": 0,
"root": 0,
"session_timeout": 60,
"session_autoclose": 300,
"min_compat_client": "0 (unknown)",
"max_file_size": 1099511627776,
"last_failure": 0,
"last_failure_osd_epoch": 0,
"compat": {
"compat": {},
"ro_compat": {},
"incompat": {
"feature_1": "base v0.20",
"feature_2": "client writeable ranges",
"feature_3": "default file layouts on dirs",
"feature_4": "dir inode in separate object",
"feature_5": "mds uses versioned encoding",
"feature_6": "dirfrag is stored in omap",
"feature_8": "no anchor table",
"feature_9": "file layout v2",
"feature_10": "snaprealm v2"
}
},
"max_mds": 1,
"in": [
0
],
"up": {
"mds_0": 4115
},
"failed": [],
"damaged": [],
"stopped": [],
"info": {
"gid_4115": {
"gid": 4115,
"name": "Z",
"rank": 0,
"incarnation": 4,
"state": "up:active",
"state_seq": 2,
"addr": "127.0.0.1:6809/2568111595",
"addrs": {
"addrvec": [
{
"type": "v1",
"addr": "127.0.0.1:6809",
"nonce": 2568111595
}
]
},
"join_fscid": -1,
"export_targets": [],
"features": 4540138292836696000,
"flags": 0
}
},
"data_pools": [
1
],
"metadata_pool": 2,
"enabled": true,
"fs_name": "cephfs",
"balancer": "",
"standby_count_wanted": 0
},
"id": 1
}
]
}
`)
var sampleDump2 = []byte(`
{
"epoch": 5,
"default_fscid": 1,
"filesystems": [
{
"mdsmap": {
"fs_name": "wiffleball",
"standby_count_wanted": 0
},
"id": 1
},
{
"mdsmap": {
"fs_name": "beanbag",
"standby_count_wanted": 0
},
"id": 2
}
]
}
`)
func TestParseDumpToIdents(t *testing.T) {
fakePrefix := dumpOkPrefix + " 5"
t.Run("error", func(t *testing.T) {
idents, err := parseDumpToIdents(nil, "", errors.New("boop"))
assert.Error(t, err)
assert.Equal(t, "boop", err.Error())
assert.Nil(t, idents)
})
t.Run("badStatus", func(t *testing.T) {
_, err := parseDumpToIdents(sampleDump1, "unexpected!", nil)
assert.Error(t, err)
})
t.Run("oneVolOk", func(t *testing.T) {
idents, err := parseDumpToIdents(sampleDump1, fakePrefix, nil)
assert.NoError(t, err)
if assert.Len(t, idents, 1) {
assert.Equal(t, "cephfs", idents[0].Name)
assert.Equal(t, int64(1), idents[0].ID)
}
})
t.Run("twoVolOk", func(t *testing.T) {
idents, err := parseDumpToIdents(sampleDump2, fakePrefix, nil)
assert.NoError(t, err)
if assert.Len(t, idents, 2) {
assert.Equal(t, "wiffleball", idents[0].Name)
assert.Equal(t, int64(1), idents[0].ID)
assert.Equal(t, "beanbag", idents[1].Name)
assert.Equal(t, int64(2), idents[1].ID)
}
})
t.Run("unexpectedStatus", func(t *testing.T) {
idents, err := parseDumpToIdents(sampleDump1, "slip-up", nil)
assert.Error(t, err)
assert.Nil(t, idents)
})
}