diff --git a/cephfs/admin/fsadmin.go b/cephfs/admin/fsadmin.go new file mode 100644 index 0000000..688a808 --- /dev/null +++ b/cephfs/admin/fsadmin.go @@ -0,0 +1,122 @@ +package admin + +import ( + "encoding/json" + "fmt" + "strconv" + + "github.com/ceph/go-ceph/rados" +) + +// RadosCommander provides an interface to execute JSON-formatted commands that +// allow the cephfs administrative functions to interact with the Ceph cluster. +type RadosCommander interface { + MgrCommand(buf [][]byte) ([]byte, string, error) +} + +// FSAdmin is used to administrate CephFS within a ceph cluster. +type FSAdmin struct { + conn RadosCommander +} + +// New creates an FSAdmin automatically based on the default ceph +// configuration file. If more customization is needed, create a +// *rados.Conn as you see fit and use NewFromConn to use that +// connection with these administrative functions. +func New() (*FSAdmin, error) { + conn, err := rados.NewConn() + if err != nil { + return nil, err + } + err = conn.ReadDefaultConfigFile() + if err != nil { + return nil, err + } + err = conn.Connect() + if err != nil { + return nil, err + } + return NewFromConn(conn), nil +} + +// NewFromConn creates an FSAdmin management object from a preexisting +// rados connection. The existing connection can be rados.Conn or any +// type implementing the RadosCommander interface. This may be useful +// if the calling layer needs to inject additional logging, error handling, +// fault injection, etc. +func NewFromConn(conn RadosCommander) *FSAdmin { + return &FSAdmin{conn} +} + +func (fsa *FSAdmin) validate() error { + if fsa.conn == nil { + return rados.ErrNotConnected + } + return nil +} + +// rawMgrCommand takes a byte buffer and sends it to the MGR as a command. +// The buffer is expected to contain preformatted JSON. +func (fsa *FSAdmin) rawMgrCommand(buf []byte) ([]byte, string, error) { + if err := fsa.validate(); err != nil { + return nil, "", err + } + return fsa.conn.MgrCommand([][]byte{buf}) +} + +// marshalMgrCommand takes an generic interface{} value, converts it to JSON and +// sends the json to the MGR as a command. +func (fsa *FSAdmin) marshalMgrCommand(v interface{}) ([]byte, string, error) { + b, err := json.Marshal(v) + if err != nil { + return nil, "", err + } + return fsa.rawMgrCommand(b) +} + +type listNamedResult struct { + Name string `json:"name"` +} + +func parseListNames(res []byte, status string, err error) ([]string, error) { + if err != nil { + return nil, err + } + if status != "" { + return nil, fmt.Errorf("error status: %s", status) + } + var r []listNamedResult + if err := json.Unmarshal(res, &r); err != nil { + return nil, err + } + vl := make([]string, len(r)) + for i := range r { + vl[i] = r[i].Name + } + return vl, nil +} + +// checkEmptyResponseExpected returns an error if the result or status +// are non-empty. +func checkEmptyResponseExpected(res []byte, status string, err error) error { + if err != nil { + return err + } + if len(res) != 0 { + return fmt.Errorf("unexpected response: %s", string(res)) + } + if status != "" { + return fmt.Errorf("error status: %s", status) + } + return nil +} + +// modeString converts a unix-style mode value to a string-ified version in an +// octal representation (e.g. "777", "700", etc). This format is expected by +// some of the ceph JSON command inputs. +func modeString(m int, force bool) string { + if force || m != 0 { + return strconv.FormatInt(int64(m), 8) + } + return "" +} diff --git a/cephfs/admin/fsadmin_test.go b/cephfs/admin/fsadmin_test.go new file mode 100644 index 0000000..606e801 --- /dev/null +++ b/cephfs/admin/fsadmin_test.go @@ -0,0 +1,85 @@ +package admin + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var cachedFSAdmin *FSAdmin + +func getFSAdmin(t *testing.T) *FSAdmin { + if cachedFSAdmin != nil { + return cachedFSAdmin + } + var err error + cachedFSAdmin, err := New() + require.NoError(t, err) + require.NotNil(t, cachedFSAdmin) + return cachedFSAdmin +} + +func TestInvalidFSAdmin(t *testing.T) { + fsa := &FSAdmin{} + _, _, err := fsa.rawMgrCommand([]byte("FOOBAR!")) + assert.Error(t, err) +} + +type badMarshalType bool + +func (badMarshalType) MarshalJSON() ([]byte, error) { + return nil, errors.New("Zowie! wow") +} + +func TestBadMarshal(t *testing.T) { + fsa := getFSAdmin(t) + + var bad badMarshalType + _, _, err := fsa.marshalMgrCommand(bad) + assert.Error(t, err) +} + +func TestParseListNames(t *testing.T) { + t.Run("error", func(t *testing.T) { + _, err := parseListNames(nil, "", errors.New("bonk")) + assert.Error(t, err) + assert.Equal(t, "bonk", err.Error()) + }) + t.Run("statusSet", func(t *testing.T) { + _, err := parseListNames(nil, "unexpected!", nil) + assert.Error(t, err) + }) + t.Run("badJSON", func(t *testing.T) { + _, err := parseListNames([]byte("Foo[[["), "", nil) + assert.Error(t, err) + }) + t.Run("ok", func(t *testing.T) { + l, err := parseListNames([]byte(`[{"name":"bob"}]`), "", nil) + assert.NoError(t, err) + if assert.Len(t, l, 1) { + assert.Equal(t, "bob", l[0]) + } + }) +} + +func TestCheckEmptyResponseExpected(t *testing.T) { + t.Run("error", func(t *testing.T) { + err := checkEmptyResponseExpected(nil, "", errors.New("bonk")) + assert.Error(t, err) + assert.Equal(t, "bonk", err.Error()) + }) + t.Run("statusSet", func(t *testing.T) { + err := checkEmptyResponseExpected(nil, "unexpected!", nil) + assert.Error(t, err) + }) + t.Run("someJSON", func(t *testing.T) { + err := checkEmptyResponseExpected([]byte(`{"trouble": true}`), "", nil) + assert.Error(t, err) + }) + t.Run("ok", func(t *testing.T) { + err := checkEmptyResponseExpected([]byte{}, "", nil) + assert.NoError(t, err) + }) +} diff --git a/cephfs/admin/volume.go b/cephfs/admin/volume.go new file mode 100644 index 0000000..2403347 --- /dev/null +++ b/cephfs/admin/volume.go @@ -0,0 +1,44 @@ +package admin + +import ( + "encoding/json" + "fmt" +) + +var listVolumesCmd = []byte(`{"prefix":"fs volume ls"}`) + +// ListVolumes return a list of volumes in this Ceph cluster. +func (fsa *FSAdmin) ListVolumes() ([]string, error) { + r, s, err := fsa.rawMgrCommand(listVolumesCmd) + return parseListNames(r, s, err) +} + +// VolumeStatus reports various properties of a CephFS volume. +// TODO: Fill in. +type VolumeStatus struct { + MDSVersion string `json:"mds_version"` +} + +func parseVolumeStatus(res []byte, status string, err error) (*VolumeStatus, error) { + if err != nil { + return nil, err + } + if status != "" { + return nil, fmt.Errorf("error status: %s", status) + } + var vs VolumeStatus + if err := json.Unmarshal(res, &vs); err != nil { + return nil, err + } + return &vs, nil +} + +// VolumeStatus returns a VolumeStatus object for the given volume name. +func (fsa *FSAdmin) VolumeStatus(name string) (*VolumeStatus, error) { + r, s, err := fsa.marshalMgrCommand(map[string]string{ + "fs": name, + "prefix": "fs status", + "format": "json", + }) + return parseVolumeStatus(r, s, err) +} diff --git a/cephfs/admin/volume_test.go b/cephfs/admin/volume_test.go new file mode 100644 index 0000000..e9042a4 --- /dev/null +++ b/cephfs/admin/volume_test.go @@ -0,0 +1,58 @@ +package admin + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestListVolumes(t *testing.T) { + fsa := getFSAdmin(t) + + vl, err := fsa.ListVolumes() + assert.NoError(t, err) + assert.Len(t, vl, 1) + assert.Equal(t, "cephfs", vl[0]) +} + +func TestVolumeStatus(t *testing.T) { + fsa := getFSAdmin(t) + + vs, err := fsa.VolumeStatus("cephfs") + assert.NoError(t, err) + assert.Contains(t, vs.MDSVersion, "version") +} + +var sampleVolumeStatus1 = []byte(` +{ +"clients": [{"clients": 1, "fs": "cephfs"}], +"mds_version": "ceph version 15.2.4 (7447c15c6ff58d7fce91843b705a268a1917325c) octopus (stable)", +"mdsmap": [{"dns": 76, "inos": 19, "name": "Z", "rank": 0, "rate": 0.0, "state": "active"}], +"pools": [{"avail": 1017799872, "id": 2, "name": "cephfs_metadata", "type": "metadata", "used": 2204126}, {"avail": 1017799872, "id": 1, "name": "cephfs_data", "type": "data", "used": 0}] +} +`) + +func TestParseVolumeStatus(t *testing.T) { + t.Run("error", func(t *testing.T) { + _, err := parseVolumeStatus(nil, "", errors.New("bonk")) + assert.Error(t, err) + assert.Equal(t, "bonk", err.Error()) + }) + t.Run("statusSet", func(t *testing.T) { + _, err := parseVolumeStatus(nil, "unexpected!", nil) + assert.Error(t, err) + }) + t.Run("badJSON", func(t *testing.T) { + _, err := parseVolumeStatus([]byte("_XxXxX"), "", nil) + assert.Error(t, err) + }) + t.Run("ok", func(t *testing.T) { + s, err := parseVolumeStatus(sampleVolumeStatus1, "", nil) + assert.NoError(t, err) + if assert.NotNil(t, s) { + assert.Contains(t, s.MDSVersion, "ceph version 15.2.4") + assert.Contains(t, s.MDSVersion, "octopus") + } + }) +}