rbd admin: add rbd task [list|cancel|remove|trash remove|flatten]` apis

These functions let one add various rbd tasks that are done
asynchronously in the background. Also added tests for the same.

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R 2021-09-16 15:37:41 +05:30 committed by mergify[bot]
parent 890b3619db
commit 126d6f5747
2 changed files with 565 additions and 0 deletions

142
rbd/admin/task.go Normal file
View File

@ -0,0 +1,142 @@
// +build !nautilus,ceph_preview
package admin
import (
ccom "github.com/ceph/go-ceph/common/commands"
"github.com/ceph/go-ceph/internal/commands"
)
// TaskAdmin encapsulates management functions for
// ceph rbd task operations.
// PREVIEW
type TaskAdmin struct {
conn ccom.MgrCommander
}
// Task returns a TaskAdmin type for
// managing ceph rbd task operations.
// PREVIEW
func (ra *RBDAdmin) Task() *TaskAdmin {
return &TaskAdmin{conn: ra.conn}
}
// TaskRefs contains the action name and information about the image.
// PREVIEW
type TaskRefs struct {
Action string `json:"action"`
PoolName string `json:"pool_name"`
PoolNamespace string `json:"pool_namespace"`
ImageName string `json:"image_name"`
ImageID string `json:"image_id"`
}
// TaskResponse contains the information about the task added on an image.
// PREVIEW
type TaskResponse struct {
Sequence int `json:"sequence"`
ID string `json:"id"`
Message string `json:"message"`
Refs TaskRefs `json:"refs"`
InProgress bool `json:"in_progress"`
Progress float64 `json:"progress"`
RetryAttempts int `json:"retry_attempts"`
RetryTime string `json:"retry_time"`
RetryMessage string `json:"retry_message"`
}
func parseTaskResponse(res commands.Response) (TaskResponse, error) {
var taskResponse TaskResponse
err := res.NoStatus().Unmarshal(&taskResponse).End()
return taskResponse, err
}
func parseTaskResponseList(res commands.Response) ([]TaskResponse, error) {
var taskResponseList []TaskResponse
err := res.NoStatus().Unmarshal(&taskResponseList).End()
return taskResponseList, err
}
// AddFlatten adds a background task to flatten a cloned image based on the supplied image spec.
//
// Similar To:
// rbd task add flatten <image_spec>
// PREVIEW
func (ta *TaskAdmin) AddFlatten(img ImageSpec) (TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task add flatten",
"image_spec": img.spec,
"format": "json",
}
return parseTaskResponse(commands.MarshalMgrCommand(ta.conn, m))
}
// AddRemove adds a background task to remove an image based on the supplied image spec.
//
// Similar To:
// rbd task add remove <image_spec>
// PREVIEW
func (ta *TaskAdmin) AddRemove(img ImageSpec) (TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task add remove",
"image_spec": img.spec,
"format": "json",
}
return parseTaskResponse(commands.MarshalMgrCommand(ta.conn, m))
}
// AddTrashRemove adds a background task to remove an image from the trash based on the
// supplied image id spec.
//
// Similar To:
// rbd task add trash remove <image_id_spec>
// PREVIEW
func (ta *TaskAdmin) AddTrashRemove(img ImageSpec) (TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task add trash remove",
"image_id_spec": img.spec,
"format": "json",
}
return parseTaskResponse(commands.MarshalMgrCommand(ta.conn, m))
}
// List pending or running asynchronous tasks.
//
// Similar To:
// rbd task list
// PREVIEW
func (ta *TaskAdmin) List() ([]TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task list",
"format": "json",
}
return parseTaskResponseList(commands.MarshalMgrCommand(ta.conn, m))
}
// GetTaskByID returns pending or running asynchronous task using id.
//
// Similar To:
// rbd task list <task_id>
// PREVIEW
func (ta *TaskAdmin) GetTaskByID(taskID string) (TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task list",
"task_id": taskID,
"format": "json",
}
return parseTaskResponse(commands.MarshalMgrCommand(ta.conn, m))
}
// Cancel a pending or running asynchronous task.
//
// Similar To:
// rbd task cancel <task_id>
// PREVIEW
func (ta *TaskAdmin) Cancel(taskID string) (TaskResponse, error) {
m := map[string]string{
"prefix": "rbd task cancel",
"task_id": taskID,
"format": "json",
}
return parseTaskResponse(commands.MarshalMgrCommand(ta.conn, m))
}

423
rbd/admin/task_test.go Normal file
View File

@ -0,0 +1,423 @@
// +build !nautilus,ceph_preview
package admin
import (
"errors"
"testing"
"time"
"github.com/ceph/go-ceph/internal/commands"
"github.com/ceph/go-ceph/rbd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var tr1 = `
{
"sequence":1,
"id":"id-1",
"message":"Removing image pool/test from trash",
"refs":{
"action":"trash remove",
"pool_name":"pool",
"pool_namespace":"",
"image_id":"12345"
},
"retry_attempts":1,
"retry_time":"2021-09-13T05:58:24.826408"
}
`
var tr2 = `
{
"sequence":2,
"id":"id-2",
"message":"Removing image pool/test",
"refs":{
"action":"remove",
"pool_name":"pool",
"pool_namespace":"",
"image_name":"test",
"image_id":"123456"
},
"in_progress":true,
"progress":0.70
}
`
var trList = `[
{
"sequence":1,
"id":"id-1",
"message":"Removing image pool/test from trash",
"refs":{
"action":"trash remove",
"pool_name":"pool",
"pool_namespace":"",
"image_id":"12345"
},
"retry_attempts":1,
"retry_time":"2021-09-13T05:58:24.826408"
},
{
"sequence":2,
"id":"id-2",
"message":"Removing image pool/test",
"refs":{
"action":"remove",
"pool_name":"pool",
"pool_namespace":"",
"image_name":"test",
"image_id":"123456"
},
"in_progress":true,
"progress":0.70
}
]`
func TestParseTaskResponse(t *testing.T) {
type args struct {
res commands.Response
}
tests := []struct {
name string
args args
want TaskResponse
wantErr bool
}{
{
name: "",
args: args{
res: commands.NewResponse([]byte(tr1), "", nil),
},
want: TaskResponse{
Sequence: 1,
ID: "id-1",
Message: "Removing image pool/test from trash",
Refs: TaskRefs{
Action: "trash remove",
PoolName: "pool",
PoolNamespace: "",
ImageName: "",
ImageID: "12345",
},
InProgress: false,
Progress: 0,
RetryAttempts: 1,
RetryTime: "2021-09-13T05:58:24.826408",
RetryMessage: "",
},
wantErr: false,
},
{
name: "",
args: args{
res: commands.NewResponse([]byte(tr2), "", nil),
},
want: TaskResponse{
Sequence: 2,
ID: "id-2",
Message: "Removing image pool/test",
Refs: TaskRefs{
Action: "remove",
PoolName: "pool",
PoolNamespace: "",
ImageName: "test",
ImageID: "123456",
},
InProgress: true,
Progress: 0.70,
RetryAttempts: 0,
RetryTime: "",
RetryMessage: "",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTaskResponse(tt.args.res)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
func TesParseTaskResponseList(t *testing.T) {
type args struct {
res commands.Response
}
res1, err := parseTaskResponse(commands.NewResponse([]byte(tr1), "", nil))
assert.NoError(t, err)
res2, err := parseTaskResponse(commands.NewResponse([]byte(tr2), "", nil))
assert.NoError(t, err)
tests := []struct {
name string
args args
want []TaskResponse
wantErr bool
}{
{
name: "emptyList",
args: args{
res: commands.NewResponse([]byte(`[]`), "", nil),
},
want: []TaskResponse{},
wantErr: false,
},
{
name: "twoItemList",
args: args{
res: commands.NewResponse([]byte(trList), "", nil),
},
want: []TaskResponse{res1, res2},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseTaskResponseList(tt.args.res)
assert.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
var (
imageName = "img"
testNS = "test"
)
type args struct {
pool string
namespace string
imageName string
}
var tests = []struct {
name string
args args
}{
{
name: "onlyImageName",
args: args{
pool: "",
namespace: "",
imageName: imageName,
},
},
{
name: "Image&PoolName",
args: args{
pool: defaultPoolName,
namespace: "",
imageName: imageName,
},
},
{
name: "AllArgs",
args: args{
pool: defaultPoolName,
namespace: testNS,
imageName: imageName,
},
},
}
func TestTaskAdminAddRemove(t *testing.T) {
ensureDefaultPool(t)
conn := getConn(t)
ioctx, err := conn.OpenIOContext(defaultPoolName)
require.NoError(t, err)
defer ioctx.Destroy()
assert.NoError(t, rbd.NamespaceCreate(ioctx, testNS))
defer func() {
assert.NoError(t, rbd.NamespaceRemove(ioctx, testNS))
}()
ta := getAdmin(t).Task()
options := rbd.NewRbdImageOptions()
assert.NoError(t,
options.SetUint64(rbd.ImageOptionOrder, uint64(testImageOrder)))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ioctx.SetNamespace(tt.args.namespace)
err = rbd.CreateImage(ioctx, tt.args.imageName, testImageSize, options)
assert.NoError(t, err)
tr, err := ta.AddRemove(NewImageSpec(tt.args.pool, tt.args.namespace, tt.args.imageName))
assert.NoError(t, err)
assert.Equal(t, tt.args.imageName, tr.Refs.ImageName)
assert.Equal(t, defaultPoolName, tr.Refs.PoolName)
assert.Equal(t, tt.args.namespace, tr.Refs.PoolNamespace)
assert.Equal(t, "remove", tr.Refs.Action)
found := false
// wait for the image to be deleted
for i := 0; i < 35; i++ {
imgList, err := rbd.GetImageNames(ioctx)
assert.NoError(t, err)
found = false
for _, img := range imgList {
if img == imageName {
found = true
break
}
}
if !found {
break
}
time.Sleep(time.Second)
}
assert.Equal(t, false, found)
})
}
}
func TestTaskAdminAddTrashRemove(t *testing.T) {
ensureDefaultPool(t)
conn := getConn(t)
ioctx, err := conn.OpenIOContext(defaultPoolName)
require.NoError(t, err)
defer ioctx.Destroy()
assert.NoError(t, rbd.NamespaceCreate(ioctx, testNS))
defer func() {
assert.NoError(t, rbd.NamespaceRemove(ioctx, testNS))
}()
ta := getAdmin(t).Task()
options := rbd.NewRbdImageOptions()
assert.NoError(t,
options.SetUint64(rbd.ImageOptionOrder, uint64(testImageOrder)))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ioctx.SetNamespace(tt.args.namespace)
err = rbd.CreateImage(ioctx, tt.args.imageName, testImageSize, options)
assert.NoError(t, err)
img, err := rbd.OpenImage(ioctx, tt.args.imageName, rbd.NoSnapshot)
assert.NoError(t, err)
imageID, err := img.GetId()
assert.NoError(t, err)
assert.NoError(t, img.Trash(0))
assert.NoError(t, img.Close())
tr, err := ta.AddTrashRemove(NewImageSpec(tt.args.pool, tt.args.namespace, imageID))
assert.NoError(t, err)
assert.Equal(t, imageID, tr.Refs.ImageID)
assert.Equal(t, defaultPoolName, tr.Refs.PoolName)
assert.Equal(t, tt.args.namespace, tr.Refs.PoolNamespace)
assert.Equal(t, "trash remove", tr.Refs.Action)
trashList := []rbd.TrashInfo{}
// wait for the image to be deleted
for i := 0; i < 35; i++ {
trashList, err = rbd.GetTrashList(ioctx)
assert.NoError(t, err)
if len(trashList) == 0 {
break
}
time.Sleep(time.Second)
}
assert.Equal(t, 0, len(trashList))
})
}
}
func TestTaskAdminAddFlatten(t *testing.T) {
parentImageName := "parent"
ensureDefaultPool(t)
conn := getConn(t)
ioctx, err := conn.OpenIOContext(defaultPoolName)
require.NoError(t, err)
defer ioctx.Destroy()
assert.NoError(t, rbd.NamespaceCreate(ioctx, testNS))
defer func() {
assert.NoError(t, rbd.NamespaceRemove(ioctx, testNS))
}()
ta := getAdmin(t).Task()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ioctx.SetNamespace(tt.args.namespace)
options := rbd.NewRbdImageOptions()
assert.NoError(t,
options.SetUint64(rbd.ImageOptionOrder, uint64(testImageOrder)))
err = rbd.CreateImage(ioctx, parentImageName, testImageSize, options)
assert.NoError(t, err)
parentImage, err := rbd.OpenImage(ioctx, parentImageName, rbd.NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, parentImage.Close())
assert.NoError(t, parentImage.Remove())
}()
snap, err := parentImage.CreateSnapshot(tt.args.imageName)
assert.NoError(t, err)
err = snap.Protect()
assert.NoError(t, err)
defer func() {
assert.NoError(t, snap.Unprotect())
assert.NoError(t, snap.Remove())
}()
assert.NoError(t, options.SetUint64(rbd.ImageOptionFormat, uint64(2)))
assert.NoError(t, rbd.CloneImage(ioctx, parentImageName, tt.args.imageName, ioctx, tt.args.imageName, options))
childImage, err := rbd.OpenImage(ioctx, tt.args.imageName, rbd.NoSnapshot)
assert.NoError(t, err)
defer func() {
assert.NoError(t, childImage.Close())
assert.NoError(t, childImage.Remove())
}()
parentInfo, err := childImage.GetParent()
assert.NoError(t, err)
assert.Equal(t, parentImageName, parentInfo.Image.ImageName)
assert.Equal(t, tt.args.imageName, parentInfo.Snap.SnapName)
tr, err := ta.AddFlatten(NewImageSpec(tt.args.pool, tt.args.namespace, tt.args.imageName))
assert.NoError(t, err)
assert.Equal(t, tt.args.imageName, tr.Refs.ImageName)
assert.Equal(t, defaultPoolName, tr.Refs.PoolName)
assert.Equal(t, tt.args.namespace, tr.Refs.PoolNamespace)
assert.Equal(t, "flatten", tr.Refs.Action)
// wait for the image to be flattened
for i := 0; i < 35; i++ {
_, err = childImage.GetParent()
if errors.Is(err, rbd.RbdErrorNotFound) {
break
}
assert.NoError(t, err)
time.Sleep(time.Second)
}
assert.Error(t, err)
})
}
}