cephfs admin: add core functionality and volume functions

Adds the central admin type `FSAdmin` and some helper functions.
Adds basic volume list and info functions.
Adds tests for the volume functions and common fsadmin functions.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2020-08-04 17:00:16 -04:00 committed by John Mulligan
parent ea1073dc1d
commit 198541cccd
4 changed files with 309 additions and 0 deletions

122
cephfs/admin/fsadmin.go Normal file
View File

@ -0,0 +1,122 @@
package admin
import (
"encoding/json"
"fmt"
"strconv"
"github.com/ceph/go-ceph/rados"
)
// RadosCommander provides an interface to execute JSON-formatted commands that
// allow the cephfs administrative functions to interact with the Ceph cluster.
type RadosCommander interface {
MgrCommand(buf [][]byte) ([]byte, string, error)
}
// FSAdmin is used to administrate CephFS within a ceph cluster.
type FSAdmin struct {
conn RadosCommander
}
// New creates an FSAdmin automatically based on the default ceph
// configuration file. If more customization is needed, create a
// *rados.Conn as you see fit and use NewFromConn to use that
// connection with these administrative functions.
func New() (*FSAdmin, error) {
conn, err := rados.NewConn()
if err != nil {
return nil, err
}
err = conn.ReadDefaultConfigFile()
if err != nil {
return nil, err
}
err = conn.Connect()
if err != nil {
return nil, err
}
return NewFromConn(conn), nil
}
// NewFromConn creates an FSAdmin management object from a preexisting
// rados connection. The existing connection can be rados.Conn or any
// type implementing the RadosCommander interface. This may be useful
// if the calling layer needs to inject additional logging, error handling,
// fault injection, etc.
func NewFromConn(conn RadosCommander) *FSAdmin {
return &FSAdmin{conn}
}
func (fsa *FSAdmin) validate() error {
if fsa.conn == nil {
return rados.ErrNotConnected
}
return nil
}
// rawMgrCommand takes a byte buffer and sends it to the MGR as a command.
// The buffer is expected to contain preformatted JSON.
func (fsa *FSAdmin) rawMgrCommand(buf []byte) ([]byte, string, error) {
if err := fsa.validate(); err != nil {
return nil, "", err
}
return fsa.conn.MgrCommand([][]byte{buf})
}
// marshalMgrCommand takes an generic interface{} value, converts it to JSON and
// sends the json to the MGR as a command.
func (fsa *FSAdmin) marshalMgrCommand(v interface{}) ([]byte, string, error) {
b, err := json.Marshal(v)
if err != nil {
return nil, "", err
}
return fsa.rawMgrCommand(b)
}
type listNamedResult struct {
Name string `json:"name"`
}
func parseListNames(res []byte, status string, err error) ([]string, error) {
if err != nil {
return nil, err
}
if status != "" {
return nil, fmt.Errorf("error status: %s", status)
}
var r []listNamedResult
if err := json.Unmarshal(res, &r); err != nil {
return nil, err
}
vl := make([]string, len(r))
for i := range r {
vl[i] = r[i].Name
}
return vl, nil
}
// checkEmptyResponseExpected returns an error if the result or status
// are non-empty.
func checkEmptyResponseExpected(res []byte, status string, err error) error {
if err != nil {
return err
}
if len(res) != 0 {
return fmt.Errorf("unexpected response: %s", string(res))
}
if status != "" {
return fmt.Errorf("error status: %s", status)
}
return nil
}
// modeString converts a unix-style mode value to a string-ified version in an
// octal representation (e.g. "777", "700", etc). This format is expected by
// some of the ceph JSON command inputs.
func modeString(m int, force bool) string {
if force || m != 0 {
return strconv.FormatInt(int64(m), 8)
}
return ""
}

View File

@ -0,0 +1,85 @@
package admin
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var cachedFSAdmin *FSAdmin
func getFSAdmin(t *testing.T) *FSAdmin {
if cachedFSAdmin != nil {
return cachedFSAdmin
}
var err error
cachedFSAdmin, err := New()
require.NoError(t, err)
require.NotNil(t, cachedFSAdmin)
return cachedFSAdmin
}
func TestInvalidFSAdmin(t *testing.T) {
fsa := &FSAdmin{}
_, _, err := fsa.rawMgrCommand([]byte("FOOBAR!"))
assert.Error(t, err)
}
type badMarshalType bool
func (badMarshalType) MarshalJSON() ([]byte, error) {
return nil, errors.New("Zowie! wow")
}
func TestBadMarshal(t *testing.T) {
fsa := getFSAdmin(t)
var bad badMarshalType
_, _, err := fsa.marshalMgrCommand(bad)
assert.Error(t, err)
}
func TestParseListNames(t *testing.T) {
t.Run("error", func(t *testing.T) {
_, err := parseListNames(nil, "", errors.New("bonk"))
assert.Error(t, err)
assert.Equal(t, "bonk", err.Error())
})
t.Run("statusSet", func(t *testing.T) {
_, err := parseListNames(nil, "unexpected!", nil)
assert.Error(t, err)
})
t.Run("badJSON", func(t *testing.T) {
_, err := parseListNames([]byte("Foo[[["), "", nil)
assert.Error(t, err)
})
t.Run("ok", func(t *testing.T) {
l, err := parseListNames([]byte(`[{"name":"bob"}]`), "", nil)
assert.NoError(t, err)
if assert.Len(t, l, 1) {
assert.Equal(t, "bob", l[0])
}
})
}
func TestCheckEmptyResponseExpected(t *testing.T) {
t.Run("error", func(t *testing.T) {
err := checkEmptyResponseExpected(nil, "", errors.New("bonk"))
assert.Error(t, err)
assert.Equal(t, "bonk", err.Error())
})
t.Run("statusSet", func(t *testing.T) {
err := checkEmptyResponseExpected(nil, "unexpected!", nil)
assert.Error(t, err)
})
t.Run("someJSON", func(t *testing.T) {
err := checkEmptyResponseExpected([]byte(`{"trouble": true}`), "", nil)
assert.Error(t, err)
})
t.Run("ok", func(t *testing.T) {
err := checkEmptyResponseExpected([]byte{}, "", nil)
assert.NoError(t, err)
})
}

44
cephfs/admin/volume.go Normal file
View File

@ -0,0 +1,44 @@
package admin
import (
"encoding/json"
"fmt"
)
var listVolumesCmd = []byte(`{"prefix":"fs volume ls"}`)
// ListVolumes return a list of volumes in this Ceph cluster.
func (fsa *FSAdmin) ListVolumes() ([]string, error) {
r, s, err := fsa.rawMgrCommand(listVolumesCmd)
return parseListNames(r, s, err)
}
// VolumeStatus reports various properties of a CephFS volume.
// TODO: Fill in.
type VolumeStatus struct {
MDSVersion string `json:"mds_version"`
}
func parseVolumeStatus(res []byte, status string, err error) (*VolumeStatus, error) {
if err != nil {
return nil, err
}
if status != "" {
return nil, fmt.Errorf("error status: %s", status)
}
var vs VolumeStatus
if err := json.Unmarshal(res, &vs); err != nil {
return nil, err
}
return &vs, nil
}
// VolumeStatus returns a VolumeStatus object for the given volume name.
func (fsa *FSAdmin) VolumeStatus(name string) (*VolumeStatus, error) {
r, s, err := fsa.marshalMgrCommand(map[string]string{
"fs": name,
"prefix": "fs status",
"format": "json",
})
return parseVolumeStatus(r, s, err)
}

View File

@ -0,0 +1,58 @@
package admin
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func TestListVolumes(t *testing.T) {
fsa := getFSAdmin(t)
vl, err := fsa.ListVolumes()
assert.NoError(t, err)
assert.Len(t, vl, 1)
assert.Equal(t, "cephfs", vl[0])
}
func TestVolumeStatus(t *testing.T) {
fsa := getFSAdmin(t)
vs, err := fsa.VolumeStatus("cephfs")
assert.NoError(t, err)
assert.Contains(t, vs.MDSVersion, "version")
}
var sampleVolumeStatus1 = []byte(`
{
"clients": [{"clients": 1, "fs": "cephfs"}],
"mds_version": "ceph version 15.2.4 (7447c15c6ff58d7fce91843b705a268a1917325c) octopus (stable)",
"mdsmap": [{"dns": 76, "inos": 19, "name": "Z", "rank": 0, "rate": 0.0, "state": "active"}],
"pools": [{"avail": 1017799872, "id": 2, "name": "cephfs_metadata", "type": "metadata", "used": 2204126}, {"avail": 1017799872, "id": 1, "name": "cephfs_data", "type": "data", "used": 0}]
}
`)
func TestParseVolumeStatus(t *testing.T) {
t.Run("error", func(t *testing.T) {
_, err := parseVolumeStatus(nil, "", errors.New("bonk"))
assert.Error(t, err)
assert.Equal(t, "bonk", err.Error())
})
t.Run("statusSet", func(t *testing.T) {
_, err := parseVolumeStatus(nil, "unexpected!", nil)
assert.Error(t, err)
})
t.Run("badJSON", func(t *testing.T) {
_, err := parseVolumeStatus([]byte("_XxXxX"), "", nil)
assert.Error(t, err)
})
t.Run("ok", func(t *testing.T) {
s, err := parseVolumeStatus(sampleVolumeStatus1, "", nil)
assert.NoError(t, err)
if assert.NotNil(t, s) {
assert.Contains(t, s.MDSVersion, "ceph version 15.2.4")
assert.Contains(t, s.MDSVersion, "octopus")
}
})
}