From 59cdfdc5a408e6ed085d73ec8405cd8896a056da Mon Sep 17 00:00:00 2001 From: Michael Andersen Date: Sat, 3 Feb 2018 10:59:54 -0800 Subject: [PATCH] rados: improve support for namespaces --- rados/ioctx.go | 23 +++++++-- rados/rados.go | 2 + rados/rados_test.go | 123 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 5 deletions(-) diff --git a/rados/ioctx.go b/rados/ioctx.go index cf2cab0..7115b1d 100644 --- a/rados/ioctx.go +++ b/rados/ioctx.go @@ -231,7 +231,9 @@ type ObjectListFunc func(oid string) // ListObjects lists all of the objects in the pool associated with the I/O // context, and called the provided listFn function for each object, passing -// to the function the name of the object. +// to the function the name of the object. Call SetNamespace with +// RadosAllNamespaces before calling this function to return objects from all +// namespaces func (ioctx *IOContext) ListObjects(listFn ObjectListFunc) error { var ctx C.rados_list_ctx_t ret := C.rados_nobjects_list_open(ioctx.ioctx, &ctx) @@ -580,9 +582,10 @@ func (ioctx *IOContext) CleanOmap(oid string) error { } type Iter struct { - ctx C.rados_list_ctx_t - err error - entry string + ctx C.rados_list_ctx_t + err error + entry string + namespace string } type IterToken uint32 @@ -621,11 +624,13 @@ func (iter *Iter) Seek(token IterToken) { // func (iter *Iter) Next() bool { var c_entry *C.char - if cerr := C.rados_nobjects_list_next(iter.ctx, &c_entry, nil, nil); cerr < 0 { + var c_namespace *C.char + if cerr := C.rados_nobjects_list_next(iter.ctx, &c_entry, nil, &c_namespace); cerr < 0 { iter.err = GetRadosError(int(cerr)) return false } iter.entry = C.GoString(c_entry) + iter.namespace = C.GoString(c_namespace) return true } @@ -637,6 +642,14 @@ func (iter *Iter) Value() string { return iter.entry } +// Returns the namespace associated with the current value of the iterator (object name), after a successful call to Next. +func (iter *Iter) Namespace() string { + if iter.err != nil { + return "" + } + return iter.namespace +} + // Checks whether the iterator has encountered an error. func (iter *Iter) Err() error { if iter.err == RadosErrorNotFound { diff --git a/rados/rados.go b/rados/rados.go index 944e642..4538628 100644 --- a/rados/rados.go +++ b/rados/rados.go @@ -17,6 +17,8 @@ func (e RadosError) Error() string { return fmt.Sprintf("rados: %s", C.GoString(C.strerror(C.int(-e)))) } +var RadosAllNamespaces = "\x01" + var RadosErrorNotFound = RadosError(-C.ENOENT) var RadosErrorPermissionDenied = RadosError(-C.EPERM) diff --git a/rados/rados_test.go b/rados/rados_test.go index dcd636e..9c606f8 100644 --- a/rados/rados_test.go +++ b/rados/rados_test.go @@ -591,6 +591,15 @@ func TestObjectIterator(t *testing.T) { assert.NoError(t, iter.Err()) assert.True(t, len(objectList) == 0) + //create an object in a different namespace to verify that + //iteration within a namespace does not return it + ioctx.SetNamespace("ns1") + bytes_in := []byte("input data") + err = ioctx.Write(GetUUID(), bytes_in, 0) + assert.NoError(t, err) + + ioctx.SetNamespace("") + createdList := []string{} for i := 0; i < 200; i++ { oid := GetUUID() @@ -616,6 +625,78 @@ func TestObjectIterator(t *testing.T) { assert.Equal(t, objectList, createdList) } +func TestObjectIteratorAcrossNamespaces(t *testing.T) { + const perNamespace = 100 + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + poolname := GetUUID() + err := conn.MakePool(poolname) + assert.NoError(t, err) + + ioctx, err := conn.OpenIOContext(poolname) + assert.NoError(t, err) + + objectListNS1 := []string{} + objectListNS2 := []string{} + + iter, err := ioctx.Iter() + assert.NoError(t, err) + preexisting := 0 + for iter.Next() { + preexisting++ + } + iter.Close() + assert.NoError(t, iter.Err()) + assert.EqualValues(t, 0, preexisting) + + createdList := []string{} + ioctx.SetNamespace("ns1") + for i := 0; i < 90; i++ { + oid := GetUUID() + bytes_in := []byte("input data") + err = ioctx.Write(oid, bytes_in, 0) + assert.NoError(t, err) + createdList = append(createdList, oid) + } + ioctx.SetNamespace("ns2") + for i := 0; i < 100; i++ { + oid := GetUUID() + bytes_in := []byte("input data") + err = ioctx.Write(oid, bytes_in, 0) + assert.NoError(t, err) + createdList = append(createdList, oid) + } + assert.True(t, len(createdList) == 190) + + ioctx.SetNamespace(rados.RadosAllNamespaces) + iter, err = ioctx.Iter() + assert.NoError(t, err) + rogue := 0 + for iter.Next() { + if iter.Namespace() == "ns1" { + objectListNS1 = append(objectListNS1, iter.Value()) + } else if iter.Namespace() == "ns2" { + objectListNS2 = append(objectListNS2, iter.Value()) + } else { + rogue++ + } + } + iter.Close() + assert.NoError(t, iter.Err()) + assert.EqualValues(t, 0, rogue) + assert.Equal(t, len(objectListNS1), 90) + assert.Equal(t, len(objectListNS2), 100) + objectList := []string{} + objectList = append(objectList, objectListNS1...) + objectList = append(objectList, objectListNS2...) + sort.Strings(objectList) + sort.Strings(createdList) + + assert.Equal(t, objectList, createdList) +} + func TestNewConnWithUser(t *testing.T) { _, err := rados.NewConnWithUser("admin") assert.Equal(t, err, nil) @@ -902,6 +983,48 @@ func TestSetNamespace(t *testing.T) { conn.Shutdown() } +func TestListAcrossNamespaces(t *testing.T) { + conn, _ := rados.NewConn() + conn.ReadDefaultConfigFile() + conn.Connect() + + pool_name := GetUUID() + err := conn.MakePool(pool_name) + assert.NoError(t, err) + + pool, err := conn.OpenIOContext(pool_name) + assert.NoError(t, err) + + bytes_in := []byte("input data") + err = pool.Write("obj", bytes_in, 0) + assert.NoError(t, err) + + pool.SetNamespace("space1") + + bytes_in = []byte("input data") + err = pool.Write("obj2", bytes_in, 0) + assert.NoError(t, err) + + foundObjects := 0 + err = pool.ListObjects(func(oid string) { + foundObjects++ + }) + assert.NoError(t, err) + assert.EqualValues(t, 1, foundObjects) + + pool.SetNamespace(rados.RadosAllNamespaces) + + foundObjects = 0 + err = pool.ListObjects(func(oid string) { + foundObjects++ + }) + assert.NoError(t, err) + assert.EqualValues(t, 2, foundObjects) + + pool.Destroy() + conn.Shutdown() +} + func TestLocking(t *testing.T) { conn, _ := rados.NewConn() conn.ReadDefaultConfigFile()