From 21f192a484045d7e3358a4181d2a85be46a44853 Mon Sep 17 00:00:00 2001 From: Sven Anderson Date: Mon, 31 Jan 2022 02:44:43 +0100 Subject: [PATCH] rados: implement watch/notify APIs This change implements the bindings for the watch/notify APIs of librados. Instead of callbacks, the watcher is implemented with more Go-idiomatic channels. A watcher object exposes two read-only channels, one to receive the notify events and one to receive occuring errors. Signed-off-by: Sven Anderson --- rados/watcher.go | 379 ++++++++++++++++++++++++++++++++++++++++++ rados/watcher_test.go | 286 +++++++++++++++++++++++++++++++ 2 files changed, 665 insertions(+) create mode 100644 rados/watcher.go create mode 100644 rados/watcher_test.go diff --git a/rados/watcher.go b/rados/watcher.go new file mode 100644 index 0000000..4569c68 --- /dev/null +++ b/rados/watcher.go @@ -0,0 +1,379 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +/* +#cgo LDFLAGS: -lrados +#include +#include +extern void watchNotifyCb(void*, uint64_t, uint64_t, uint64_t, void*, size_t); +extern void watchErrorCb(void*, uint64_t, int); +*/ +import "C" + +import ( + "encoding/binary" + "fmt" + "math" + "sync" + "time" + "unsafe" +) + +type ( + // WatcherID is the unique id of a Watcher. + WatcherID uint64 + // NotifyID is the unique id of a NotifyEvent. + NotifyID uint64 + // NotifierID is the unique id of a notifying client. + NotifierID uint64 +) + +// NotifyEvent is received by a watcher for each notification. +type NotifyEvent struct { + ID NotifyID + WatcherID WatcherID + NotifierID NotifierID + Data []byte +} + +// NotifyAck represents an acknowleged notification. +type NotifyAck struct { + WatcherID WatcherID + NotifierID NotifierID + Response []byte +} + +// NotifyTimeout represents an unacknowleged notification. +type NotifyTimeout struct { + WatcherID WatcherID + NotifierID NotifierID +} + +// Watcher receives all notifications for certain object. +type Watcher struct { + id WatcherID + oid string + ioctx *IOContext + events chan NotifyEvent + errors chan error + done chan struct{} +} + +var ( + watchers = map[WatcherID]*Watcher{} + watchersMtx sync.RWMutex +) + +// Watch creates a Watcher for the specified object. +// PREVIEW +// +// A Watcher receives all notifications that are sent to the object on which it +// has been created. It exposes two read-only channels: Events() receives all +// the NotifyEvents and Errors() receives all occuring errors. A typical code +// creating a Watcher could look like this: +// +// watcher, err := ioctx.Watch(oid) +// go func() { // event handler +// for ne := range watcher.Events() { +// ... +// ne.Ack([]byte("response data...")) +// ... +// } +// }() +// go func() { // error handler +// for err := range watcher.Errors() { +// ... handle err ... +// } +// }() +// +// CAUTION: the Watcher references the IOContext in which it has been created. +// Therefore all watchers must be deleted with the Delete() method before the +// IOContext is being destroyed. +// +// Implements: +// int rados_watch2(rados_ioctx_t io, const char* o, uint64_t* cookie, +// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, void* arg) +func (ioctx *IOContext) Watch(obj string) (*Watcher, error) { + return ioctx.WatchWithTimeout(obj, 0) +} + +// WatchWithTimeout creates a watcher on an object. Same as Watcher(), but +// different timeout than the default can be specified. +// PREVIEW +// +// Implements: +// int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie, +// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout, +// void *arg); +func (ioctx *IOContext) WatchWithTimeout(oid string, timeout time.Duration) (*Watcher, error) { + cObj := C.CString(oid) + defer C.free(unsafe.Pointer(cObj)) + var id C.uint64_t + watchersMtx.Lock() + defer watchersMtx.Unlock() + ret := C.rados_watch3( + ioctx.ioctx, + cObj, + &id, + (C.rados_watchcb2_t)(C.watchNotifyCb), + (C.rados_watcherrcb_t)(C.watchErrorCb), + C.uint32_t(timeout.Milliseconds()/1000), + nil, + ) + if err := getError(ret); err != nil { + return nil, err + } + evCh := make(chan NotifyEvent) + errCh := make(chan error) + w := &Watcher{ + id: WatcherID(id), + ioctx: ioctx, + oid: oid, + events: evCh, + errors: errCh, + done: make(chan struct{}), + } + watchers[WatcherID(id)] = w + return w, nil +} + +// ID returns the WatcherId of the Watcher +// PREVIEW +func (w *Watcher) ID() WatcherID { + return w.id +} + +// Events returns a read-only channel, that receives all notifications that are +// sent to the object of the Watcher. +// PREVIEW +func (w *Watcher) Events() <-chan NotifyEvent { + return w.events +} + +// Errors returns a read-only channel, that receives all errors for the Watcher. +// PREVIEW +func (w *Watcher) Errors() <-chan error { + return w.errors +} + +// Check on the status of a Watcher. +// PREVIEW +// +// Returns the time since it was last confirmed. If there is an error, the +// Watcher is no longer valid, and should be destroyed with the Delete() method. +// +// Implements: +// int rados_watch_check(rados_ioctx_t io, uint64_t cookie) +func (w *Watcher) Check() (time.Duration, error) { + ret := C.rados_watch_check(w.ioctx.ioctx, C.uint64_t(w.id)) + if ret < 0 { + return 0, getError(ret) + } + return time.Millisecond * time.Duration(ret), nil +} + +// Delete the watcher. This closes both the event and error channel. +// PREVIEW +// +// Implements: +// int rados_unwatch2(rados_ioctx_t io, uint64_t cookie) +func (w *Watcher) Delete() error { + watchersMtx.Lock() + _, ok := watchers[w.id] + if ok { + delete(watchers, w.id) + } + watchersMtx.Unlock() + if !ok { + return nil + } + ret := C.rados_unwatch2(w.ioctx.ioctx, C.uint64_t(w.id)) + if ret != 0 { + return getError(ret) + } + close(w.done) // unblock blocked callbacks + close(w.events) + close(w.errors) + return nil +} + +// Notify sends a notification with the provided data to all Watchers of the +// specified object. +// PREVIEW +// +// CAUTION: even if the error is not nil. the returned slices +// might still contain data. +func (ioctx *IOContext) Notify(obj string, data []byte) ([]NotifyAck, []NotifyTimeout, error) { + return ioctx.NotifyWithTimeout(obj, data, 0) +} + +// NotifyWithTimeout is like Notify() but with a different timeout than the +// default. +// PREVIEW +// +// Implements: +// int rados_notify2(rados_ioctx_t io, const char* o, const char* buf, int buf_len, +// uint64_t timeout_ms, char** reply_buffer, size_t* reply_buffer_len) +func (ioctx *IOContext) NotifyWithTimeout(obj string, data []byte, timeout time.Duration) ([]NotifyAck, + []NotifyTimeout, error) { + cObj := C.CString(obj) + defer C.free(unsafe.Pointer(cObj)) + var cResponse *C.char + defer C.rados_buffer_free(cResponse) + var responseLen C.size_t + var dataPtr *C.char + if len(data) > 0 { + dataPtr = (*C.char)(unsafe.Pointer(&data[0])) + } + ret := C.rados_notify2( + ioctx.ioctx, + cObj, + dataPtr, + C.int(len(data)), + C.uint64_t(timeout.Milliseconds()), + &cResponse, + &responseLen, + ) + // cResponse has been set even if an error is returned, so we decode it anyway + acks, timeouts := decodeNotifyResponse(cResponse, responseLen) + return acks, timeouts, getError(ret) +} + +// Ack sends an acknowledgement with the specified response data to the notfier +// of the NotifyEvent. If a notify is not ack'ed, the originating Notify() call +// blocks and eventiually times out. +// PREVIEW +// +// Implements: +// int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id, +// uint64_t cookie, const char *buf, int buf_len) +func (ne *NotifyEvent) Ack(response []byte) error { + watchersMtx.RLock() + w, ok := watchers[ne.WatcherID] + watchersMtx.RUnlock() + if !ok { + return fmt.Errorf("can't ack on deleted watcher %v", ne.WatcherID) + } + cOID := C.CString(w.oid) + defer C.free(unsafe.Pointer(cOID)) + var respPtr *C.char + if len(response) > 0 { + respPtr = (*C.char)(unsafe.Pointer(&response[0])) + } + ret := C.rados_notify_ack( + w.ioctx.ioctx, + cOID, + C.uint64_t(ne.ID), + C.uint64_t(ne.WatcherID), + respPtr, + C.int(len(response)), + ) + return getError(ret) +} + +// WatcherFlush flushes all pending notifications of the cluster. +// PREVIEW +// +// Implements: +// int rados_watch_flush(rados_t cluster) +func (c *Conn) WatcherFlush() error { + if !c.connected { + return ErrNotConnected + } + ret := C.rados_watch_flush(c.cluster) + return getError(ret) +} + +// decoder for this notify response format: +// le32 num_acks +// { +// le64 gid global id for the client (for client.1234 that's 1234) +// le64 cookie cookie for the client +// le32 buflen length of reply message buffer +// u8 buflen payload +// } num_acks +// le32 num_timeouts +// { +// le64 gid global id for the client +// le64 cookie cookie for the client +// } num_timeouts +// +// NOTE: starting with pacific this is implemented as a C function and this can +// be replaced later +func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []NotifyTimeout) { + if len == 0 || response == nil { + return nil, nil + } + b := (*[math.MaxInt32]byte)(unsafe.Pointer(response))[:len:len] + pos := 0 + + num := binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + acks := make([]NotifyAck, num) + for i := range acks { + acks[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + acks[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + dataLen := binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + if dataLen > 0 { + acks[i].Response = C.GoBytes(unsafe.Pointer(&b[pos]), C.int(dataLen)) + pos += int(dataLen) + } + } + + num = binary.LittleEndian.Uint32(b[pos:]) + pos += 4 + timeouts := make([]NotifyTimeout, num) + for i := range timeouts { + timeouts[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + timeouts[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:])) + pos += 8 + } + return acks, timeouts +} + +//export watchNotifyCb +func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t, + notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) { + watchersMtx.RLock() + w, ok := watchers[WatcherID(id)] + watchersMtx.RUnlock() + if !ok { + // usually this should not happen, but who knows + // TODO: some log message (once we have logging) + return + } + ev := NotifyEvent{ + ID: NotifyID(notifyID), + WatcherID: WatcherID(id), + NotifierID: NotifierID(notifierID), + } + if dataLen > 0 { + ev.Data = C.GoBytes(cData, C.int(dataLen)) + } + select { + case <-w.done: // unblock when deleted + case w.events <- ev: + } +} + +//export watchErrorCb +func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) { + watchersMtx.RLock() + w, ok := watchers[WatcherID(id)] + watchersMtx.RUnlock() + if !ok { + // usually this should not happen, but who knows + // TODO: some log message (once we have logging) + return + } + select { + case <-w.done: // unblock when deleted + case w.errors <- getError(err): + } +} diff --git a/rados/watcher_test.go b/rados/watcher_test.go new file mode 100644 index 0000000..c0a5d6b --- /dev/null +++ b/rados/watcher_test.go @@ -0,0 +1,286 @@ +//go:build ceph_preview +// +build ceph_preview + +package rados + +import ( + "fmt" + "math" + "sync" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +func (suite *RadosTestSuite) TestWatcher() { + suite.SetupConnection() + oid := suite.GenObjectName() + err := suite.ioctx.Create(oid, CreateExclusive) + assert.NoError(suite.T(), err) + defer func() { _ = suite.ioctx.Delete(oid) }() + + suite.T().Run("DoubleDelete", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + assert.NoError(t, err) + assert.NotPanics(t, func() { + err := watcher.Delete() + assert.NoError(t, err) + err = watcher.Delete() + assert.NoError(t, err) + }) + }) + + suite.T().Run("DeleteClosesChannels", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + assert.NoError(t, err) + evDone := make(chan struct{}) + errDone := make(chan struct{}) + go func() { // event handler + for ne := range watcher.Events() { + t.Errorf("received event: %v", ne) + } + close(evDone) + }() + go func() { // error handler + for err := range watcher.Errors() { + t.Errorf("received error: %v", err) + } + close(errDone) + }() + err = watcher.Delete() + assert.NoError(t, err) + select { + case <-evDone: + // Delete closed the events channel + case <-time.After(time.Second): + t.Error("timeout closing event channel") + } + select { + case <-errDone: + // Delete closed the error channel + case <-time.After(time.Second): + t.Error("timeout closing error channel") + } + }) + + suite.T().Run("NotifyNoAck", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + defer func() { _ = watcher.Delete() }() + assert.NoError(t, err) + data := []byte("notification") + acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second) + assert.Error(t, err) // without ack it must timeout + assert.Len(t, timeouts, 1) + assert.Equal(t, watcher.ID(), timeouts[0].WatcherID) + assert.Empty(t, acks) + select { + case ev := <-watcher.Events(): + assert.Equal(t, data, ev.Data) + assert.Equal(t, timeouts[0].NotifierID, ev.NotifierID) + case err = <-watcher.Errors(): + t.Error(err) + case <-time.After(time.Second): + t.Error("timeout") + } + }) + + suite.T().Run("NotifyDeletedWatcher", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + assert.NoError(t, err) + err = watcher.Delete() + assert.NoError(t, err) + data := []byte("notification") + acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second) + assert.NoError(t, err) + assert.Empty(t, timeouts) + assert.Empty(t, acks) + select { + case _, ok := <-watcher.Events(): + assert.False(t, ok) + case <-time.After(time.Second): + t.Error("timeout") + } + }) + + suite.T().Run("NotifyNoAckDeleteUnblocksChannels", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + assert.NoError(t, err) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + i := i + wg.Add(1) + go func() { + defer wg.Done() + _, _, err := suite.ioctx.Notify(oid, []byte(fmt.Sprintf("notify%d", i))) + assert.NoError(t, err) + }() + } + time.Sleep(time.Millisecond * 100) // wait so that callbacks are blocking + err = watcher.Delete() + assert.NoError(t, err) + wg.Wait() + }) + + suite.T().Run("NotifyWithAck", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + defer func() { _ = watcher.Delete() }() + assert.NoError(t, err) + data := []byte("notification") + response := []byte("response") + var receivedEv NotifyEvent + go func() { // event handler + for ne := range watcher.Events() { + receivedEv = ne + assert.Equal(t, data, ne.Data) + assert.Equal(t, watcher.ID(), ne.WatcherID) + err := ne.Ack(response) + assert.NoError(t, err) + } + }() + go func() { // error handler + for err := range watcher.Errors() { + t.Errorf("received error: %v", err) + } + }() + acks, timeouts, err := suite.ioctx.Notify(oid, data) + assert.NoError(t, err) // without ack it must timeout + assert.Empty(t, timeouts) + assert.Len(t, acks, 1) + assert.Equal(t, watcher.ID(), acks[0].WatcherID) + assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID) + assert.Equal(t, response, acks[0].Response) + }) + + suite.T().Run("NotifyAckNilData", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + defer func() { _ = watcher.Delete() }() + assert.NoError(t, err) + var receivedEv NotifyEvent + go func() { // event handler + for ne := range watcher.Events() { + receivedEv = ne + assert.Equal(t, []byte(nil), ne.Data) + assert.Equal(t, watcher.ID(), ne.WatcherID) + err := ne.Ack(nil) + assert.NoError(t, err) + } + }() + go func() { // error handler + for err := range watcher.Errors() { + t.Errorf("received error: %v", err) + } + }() + acks, timeouts, err := suite.ioctx.Notify(oid, nil) + assert.NoError(t, err) // without ack it must timeout + assert.Empty(t, timeouts) + assert.Len(t, acks, 1) + assert.Equal(t, watcher.ID(), acks[0].WatcherID) + assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID) + assert.Equal(t, []byte(nil), acks[0].Response) + }) + + suite.T().Run("Check", func(t *testing.T) { + watcher, err := suite.ioctx.WatchWithTimeout(oid, time.Second) + defer func() { _ = watcher.Delete() }() + assert.NoError(t, err) + last, err := watcher.Check() + assert.NoError(t, err) + assert.Greater(t, int(last), 0) + select { + case err = <-watcher.Errors(): // watcher times out + case <-time.After(time.Second * 2): + t.Error("timeout") + } + assert.Error(t, err) + last, err2 := watcher.Check() + assert.Error(t, err2) + assert.Equal(t, err, err2) + assert.Zero(t, last) + }) + + suite.T().Run("Flush", func(t *testing.T) { + watcher, err := suite.ioctx.Watch(oid) + assert.NoError(t, err) + done := make(chan struct{}) + go func() { + _, _, _ = suite.ioctx.Notify(oid, nil) + close(done) + }() + time.Sleep(time.Millisecond * 100) + flushed := make(chan struct{}) + go func() { + err := suite.conn.WatcherFlush() + assert.NoError(t, err) + close(flushed) + }() + select { + case <-flushed: + t.Error("flush returned before event got received") + case <-time.After(time.Millisecond * 100): + } + <-watcher.Events() + select { + case <-flushed: + case <-time.After(time.Second): + t.Error("flush didn't return after receiving event") + } + err = watcher.Delete() + assert.NoError(t, err) + <-done + }) +} + +func TestDecodeNotifyResponse(t *testing.T) { + testEmptyResponse := [...]byte{ + // le32 num_acks + 0, 0, 0, 0, + // le32 num_timeouts + 0, 0, 0, 0, + } + testResponse := [...]byte{ + // le32 num_acks + 1, 0, 0, 0, + // le64 gid global id for the client (for client.1234 that's 1234) + 0, 1, 0, 0, 0, 0, 0, 0, + // le64 cookie cookie for the client + 1, 1, 0, 0, 0, 0, 0, 0, + // le32 buflen length of reply message buffer + 4, 0, 0, 0, + // u8 buflen payload + 1, 2, 3, 4, + // le32 num_timeouts + 2, 0, 0, 0, + // le64 gid global id for the client + 2, 1, 0, 0, 0, 0, 0, 0, + // le64 cookie cookie for the client + 3, 1, 0, 0, 0, 0, 0, 0, + // le64 gid global id for the client + 4, 1, 0, 0, 0, 0, 0, 0, + // le64 cookie cookie for the client + 255, 255, 255, 255, 255, 255, 255, 255, + } + t.Run("Empty", func(t *testing.T) { + l := _Ctype_size_t(len(testEmptyResponse)) + b := (*_Ctype_char)(unsafe.Pointer(&testEmptyResponse[0])) + acks, tOuts := decodeNotifyResponse(b, l) + assert.Len(t, acks, 0) + assert.Len(t, tOuts, 0) + }) + t.Run("Example", func(t *testing.T) { + l := _Ctype_size_t(len(testResponse)) + b := (*_Ctype_char)(unsafe.Pointer(&testResponse[0])) + acks, tOuts := decodeNotifyResponse(b, l) + assert.Len(t, acks, 1) + assert.Equal(t, acks[0].NotifierID, NotifierID(256)) + assert.Equal(t, acks[0].WatcherID, WatcherID(257)) + assert.Len(t, acks[0].Response, 4) + assert.Equal(t, acks[0].Response, []byte{1, 2, 3, 4}) + assert.Len(t, tOuts, 2) + assert.Equal(t, tOuts[0].NotifierID, NotifierID(258)) + assert.Equal(t, tOuts[0].WatcherID, WatcherID(259)) + assert.Equal(t, tOuts[1].NotifierID, NotifierID(260)) + assert.Equal(t, tOuts[1].WatcherID, WatcherID(math.MaxUint64)) + }) +}