diff --git a/rados/ioctx.go b/rados/ioctx.go index a35826e..5fb8522 100644 --- a/rados/ioctx.go +++ b/rados/ioctx.go @@ -4,10 +4,20 @@ package rados // #include // #include // #include +// +// char* nextChunk(char **idx) { +// char *copy; +// copy = strdup(*idx); +// *idx += strlen(*idx) + 1; +// return copy; +// } import "C" -import "unsafe" -import "time" +import ( + "syscall" + "time" + "unsafe" +) // PoolStat represents Ceph pool statistics. type PoolStat struct { @@ -41,6 +51,16 @@ type ObjectStat struct { ModTime time.Time } +// LockInfo represents information on a current Ceph lock +type LockInfo struct { + NumLockers int + Exclusive bool + Tag string + Clients []string + Cookies []string + Addrs []string +} + // IOContext represents a context for performing I/O within a pool. type IOContext struct { ioctx C.rados_ioctx_t @@ -619,3 +639,223 @@ func (iter *Iter) Err() error { func (iter *Iter) Close() { C.rados_objects_list_close(iter.ctx) } + +// Take an exclusive lock on an object. +func (ioctx *IOContext) LockExclusive(oid, name, cookie, desc string, duration time.Duration, flags *byte) (int, error) { + c_oid := C.CString(oid) + c_name := C.CString(name) + c_cookie := C.CString(cookie) + c_desc := C.CString(desc) + + var c_duration C.struct_timeval + if duration != 0 { + tv := syscall.NsecToTimeval(time.Now().Add(duration).UnixNano()) + c_duration = C.struct_timeval{tv_sec: C.__time_t(tv.Sec), tv_usec: C.__suseconds_t(tv.Usec)} + } + + var c_flags C.uint8_t + if flags != nil { + c_flags = C.uint8_t(*flags) + } + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_cookie)) + defer C.free(unsafe.Pointer(c_desc)) + + ret := C.rados_lock_exclusive( + ioctx.ioctx, + c_oid, + c_name, + c_cookie, + c_desc, + &c_duration, + c_flags) + + // 0 on success, negative error code on failure + // -EBUSY if the lock is already held by another (client, cookie) pair + // -EEXIST if the lock is already held by the same (client, cookie) pair + + switch ret { + case 0: + return int(ret), nil + case -16: // EBUSY + return int(ret), nil + case -17: // EEXIST + return int(ret), nil + default: + return int(ret), RadosError(int(ret)) + } +} + +// Take a shared lock on an object. +func (ioctx *IOContext) LockShared(oid, name, cookie, tag, desc string, duration time.Duration, flags *byte) (int, error) { + c_oid := C.CString(oid) + c_name := C.CString(name) + c_cookie := C.CString(cookie) + c_tag := C.CString(tag) + c_desc := C.CString(desc) + + var c_duration C.struct_timeval + if duration != 0 { + tv := syscall.NsecToTimeval(time.Now().Add(duration).UnixNano()) + c_duration = C.struct_timeval{tv_sec: C.__time_t(tv.Sec), tv_usec: C.__suseconds_t(tv.Usec)} + } + + var c_flags C.uint8_t + if flags != nil { + c_flags = C.uint8_t(*flags) + } + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_cookie)) + defer C.free(unsafe.Pointer(c_tag)) + defer C.free(unsafe.Pointer(c_desc)) + + ret := C.rados_lock_shared( + ioctx.ioctx, + c_oid, + c_name, + c_cookie, + c_tag, + c_desc, + &c_duration, + c_flags) + + // 0 on success, negative error code on failure + // -EBUSY if the lock is already held by another (client, cookie) pair + // -EEXIST if the lock is already held by the same (client, cookie) pair + + switch ret { + case 0: + return int(ret), nil + case -16: // EBUSY + return int(ret), nil + case -17: // EEXIST + return int(ret), nil + default: + return int(ret), RadosError(int(ret)) + } +} + +// Release a shared or exclusive lock on an object. +func (ioctx *IOContext) Unlock(oid, name, cookie string) (int, error) { + c_oid := C.CString(oid) + c_name := C.CString(name) + c_cookie := C.CString(cookie) + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_cookie)) + + // 0 on success, negative error code on failure + // -ENOENT if the lock is not held by the specified (client, cookie) pair + + ret := C.rados_unlock( + ioctx.ioctx, + c_oid, + c_name, + c_cookie) + + switch ret { + case 0: + return int(ret), nil + case -2: // -ENOENT + return int(ret), nil + default: + return int(ret), RadosError(int(ret)) + } +} + +// List clients that have locked the named object lock and information about the lock. +// The number of bytes required in each buffer is put in the corresponding size out parameter. +// If any of the provided buffers are too short, -ERANGE is returned after these sizes are filled in. +func (ioctx *IOContext) ListLockers(oid, name string) (*LockInfo, error) { + c_oid := C.CString(oid) + c_name := C.CString(name) + + c_tag := (*C.char)(C.malloc(C.size_t(1024))) + c_clients := (*C.char)(C.malloc(C.size_t(1024))) + c_cookies := (*C.char)(C.malloc(C.size_t(1024))) + c_addrs := (*C.char)(C.malloc(C.size_t(1024))) + + var c_exclusive C.int + c_tag_len := C.size_t(1024) + c_clients_len := C.size_t(1024) + c_cookies_len := C.size_t(1024) + c_addrs_len := C.size_t(1024) + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_tag)) + defer C.free(unsafe.Pointer(c_clients)) + defer C.free(unsafe.Pointer(c_cookies)) + defer C.free(unsafe.Pointer(c_addrs)) + + ret := C.rados_list_lockers( + ioctx.ioctx, + c_oid, + c_name, + &c_exclusive, + c_tag, + &c_tag_len, + c_clients, + &c_clients_len, + c_cookies, + &c_cookies_len, + c_addrs, + &c_addrs_len) + + splitCString := func(items *C.char, itemsLen C.size_t) []string { + currLen := 0 + clients := []string{} + for currLen < int(itemsLen) { + client := C.GoString(C.nextChunk(&items)) + clients = append(clients, client) + currLen += len(client) + 1 + } + return clients + } + + if ret < 0 { + return nil, RadosError(int(ret)) + } else { + return &LockInfo{int(ret), c_exclusive == 1, C.GoString(c_tag), splitCString(c_clients, c_clients_len), splitCString(c_cookies, c_cookies_len), splitCString(c_addrs, c_addrs_len)}, nil + } +} + +// Releases a shared or exclusive lock on an object, which was taken by the specified client. +func (ioctx *IOContext) BreakLock(oid, name, client, cookie string) (int, error) { + c_oid := C.CString(oid) + c_name := C.CString(name) + c_client := C.CString(client) + c_cookie := C.CString(cookie) + + defer C.free(unsafe.Pointer(c_oid)) + defer C.free(unsafe.Pointer(c_name)) + defer C.free(unsafe.Pointer(c_client)) + defer C.free(unsafe.Pointer(c_cookie)) + + // 0 on success, negative error code on failure + // -ENOENT if the lock is not held by the specified (client, cookie) pair + // -EINVAL if the client cannot be parsed + + ret := C.rados_break_lock( + ioctx.ioctx, + c_oid, + c_name, + c_client, + c_cookie) + + switch ret { + case 0: + return int(ret), nil + case -2: // -ENOENT + return int(ret), nil + case -22: // -EINVAL + return int(ret), nil + default: + return int(ret), RadosError(int(ret)) + } +} diff --git a/rados/rados_test.go b/rados/rados_test.go index b0f8159..378114a 100644 --- a/rados/rados_test.go +++ b/rados/rados_test.go @@ -901,3 +901,81 @@ func TestSetNamespace(t *testing.T) { pool.Destroy() conn.Shutdown() } + +func TestLocking(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + // lock ex + res, err := pool.LockExclusive("obj", "myLock", "myCookie", "this is a test lock", 0, nil) + assert.NoError(t, err) + assert.Equal(t, 0, res) + + // verify lock ex + info, err := pool.ListLockers("obj", "myLock") + assert.NoError(t, err) + assert.Equal(t, 1, len(info.Clients)) + assert.Equal(t, true, info.Exclusive) + + // fail to lock ex again + res, err = pool.LockExclusive("obj", "myLock", "myCookie", "this is a description", 0, nil) + assert.NoError(t, err) + assert.Equal(t, -17, res) + + // fail to lock sh + res, err = pool.LockShared("obj", "myLock", "myCookie", "", "a description", 0, nil) + assert.NoError(t, err) + assert.Equal(t, -17, res) + + // unlock + res, err = pool.Unlock("obj", "myLock", "myCookie") + assert.NoError(t, err) + assert.Equal(t, 0, res) + + // verify unlock + info, err = pool.ListLockers("obj", "myLock") + assert.NoError(t, err) + assert.Equal(t, 0, len(info.Clients)) + + // lock sh + res, err = pool.LockShared("obj", "myLock", "myCookie", "", "a description", 0, nil) + assert.NoError(t, err) + assert.Equal(t, 0, res) + + // verify lock sh + info, err = pool.ListLockers("obj", "myLock") + assert.NoError(t, err) + assert.Equal(t, 1, len(info.Clients)) + assert.Equal(t, false, info.Exclusive) + + // fail to lock sh again + res, err = pool.LockExclusive("obj", "myLock", "myCookie", "a description", 0, nil) + assert.NoError(t, err) + assert.Equal(t, -17, res) + + // fail to lock ex + res, err = pool.LockExclusive("obj", "myLock", "myCookie", "this is a test lock", 0, nil) + assert.NoError(t, err) + assert.Equal(t, res, -17) + + // break the lock + res, err = pool.BreakLock("obj", "myLock", info.Clients[0], "myCookie") + assert.NoError(t, err) + assert.Equal(t, 0, res) + + // verify lock broken + info, err = pool.ListLockers("obj", "myLock") + assert.NoError(t, err) + assert.Equal(t, 0, len(info.Clients)) + + pool.Destroy() + conn.Shutdown() +}