mirror of https://github.com/ceph/go-ceph
rados: implement watch/notify APIs
This change implements the bindings for the watch/notify APIs of librados. Instead of callbacks, the watcher is implemented with more Go-idiomatic channels. A watcher object exposes two read-only channels, one to receive the notify events and one to receive occuring errors. Signed-off-by: Sven Anderson <sven@redhat.com>
This commit is contained in:
parent
dae56d0c65
commit
21f192a484
|
@ -0,0 +1,379 @@
|
||||||
|
//go:build ceph_preview
|
||||||
|
// +build ceph_preview
|
||||||
|
|
||||||
|
package rados
|
||||||
|
|
||||||
|
/*
|
||||||
|
#cgo LDFLAGS: -lrados
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <rados/librados.h>
|
||||||
|
extern void watchNotifyCb(void*, uint64_t, uint64_t, uint64_t, void*, size_t);
|
||||||
|
extern void watchErrorCb(void*, uint64_t, int);
|
||||||
|
*/
|
||||||
|
import "C"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// WatcherID is the unique id of a Watcher.
|
||||||
|
WatcherID uint64
|
||||||
|
// NotifyID is the unique id of a NotifyEvent.
|
||||||
|
NotifyID uint64
|
||||||
|
// NotifierID is the unique id of a notifying client.
|
||||||
|
NotifierID uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
// NotifyEvent is received by a watcher for each notification.
|
||||||
|
type NotifyEvent struct {
|
||||||
|
ID NotifyID
|
||||||
|
WatcherID WatcherID
|
||||||
|
NotifierID NotifierID
|
||||||
|
Data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyAck represents an acknowleged notification.
|
||||||
|
type NotifyAck struct {
|
||||||
|
WatcherID WatcherID
|
||||||
|
NotifierID NotifierID
|
||||||
|
Response []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyTimeout represents an unacknowleged notification.
|
||||||
|
type NotifyTimeout struct {
|
||||||
|
WatcherID WatcherID
|
||||||
|
NotifierID NotifierID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Watcher receives all notifications for certain object.
|
||||||
|
type Watcher struct {
|
||||||
|
id WatcherID
|
||||||
|
oid string
|
||||||
|
ioctx *IOContext
|
||||||
|
events chan NotifyEvent
|
||||||
|
errors chan error
|
||||||
|
done chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
watchers = map[WatcherID]*Watcher{}
|
||||||
|
watchersMtx sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
// Watch creates a Watcher for the specified object.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// A Watcher receives all notifications that are sent to the object on which it
|
||||||
|
// has been created. It exposes two read-only channels: Events() receives all
|
||||||
|
// the NotifyEvents and Errors() receives all occuring errors. A typical code
|
||||||
|
// creating a Watcher could look like this:
|
||||||
|
//
|
||||||
|
// watcher, err := ioctx.Watch(oid)
|
||||||
|
// go func() { // event handler
|
||||||
|
// for ne := range watcher.Events() {
|
||||||
|
// ...
|
||||||
|
// ne.Ack([]byte("response data..."))
|
||||||
|
// ...
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
// go func() { // error handler
|
||||||
|
// for err := range watcher.Errors() {
|
||||||
|
// ... handle err ...
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
//
|
||||||
|
// CAUTION: the Watcher references the IOContext in which it has been created.
|
||||||
|
// Therefore all watchers must be deleted with the Delete() method before the
|
||||||
|
// IOContext is being destroyed.
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_watch2(rados_ioctx_t io, const char* o, uint64_t* cookie,
|
||||||
|
// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, void* arg)
|
||||||
|
func (ioctx *IOContext) Watch(obj string) (*Watcher, error) {
|
||||||
|
return ioctx.WatchWithTimeout(obj, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WatchWithTimeout creates a watcher on an object. Same as Watcher(), but
|
||||||
|
// different timeout than the default can be specified.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie,
|
||||||
|
// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout,
|
||||||
|
// void *arg);
|
||||||
|
func (ioctx *IOContext) WatchWithTimeout(oid string, timeout time.Duration) (*Watcher, error) {
|
||||||
|
cObj := C.CString(oid)
|
||||||
|
defer C.free(unsafe.Pointer(cObj))
|
||||||
|
var id C.uint64_t
|
||||||
|
watchersMtx.Lock()
|
||||||
|
defer watchersMtx.Unlock()
|
||||||
|
ret := C.rados_watch3(
|
||||||
|
ioctx.ioctx,
|
||||||
|
cObj,
|
||||||
|
&id,
|
||||||
|
(C.rados_watchcb2_t)(C.watchNotifyCb),
|
||||||
|
(C.rados_watcherrcb_t)(C.watchErrorCb),
|
||||||
|
C.uint32_t(timeout.Milliseconds()/1000),
|
||||||
|
nil,
|
||||||
|
)
|
||||||
|
if err := getError(ret); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
evCh := make(chan NotifyEvent)
|
||||||
|
errCh := make(chan error)
|
||||||
|
w := &Watcher{
|
||||||
|
id: WatcherID(id),
|
||||||
|
ioctx: ioctx,
|
||||||
|
oid: oid,
|
||||||
|
events: evCh,
|
||||||
|
errors: errCh,
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
watchers[WatcherID(id)] = w
|
||||||
|
return w, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ID returns the WatcherId of the Watcher
|
||||||
|
// PREVIEW
|
||||||
|
func (w *Watcher) ID() WatcherID {
|
||||||
|
return w.id
|
||||||
|
}
|
||||||
|
|
||||||
|
// Events returns a read-only channel, that receives all notifications that are
|
||||||
|
// sent to the object of the Watcher.
|
||||||
|
// PREVIEW
|
||||||
|
func (w *Watcher) Events() <-chan NotifyEvent {
|
||||||
|
return w.events
|
||||||
|
}
|
||||||
|
|
||||||
|
// Errors returns a read-only channel, that receives all errors for the Watcher.
|
||||||
|
// PREVIEW
|
||||||
|
func (w *Watcher) Errors() <-chan error {
|
||||||
|
return w.errors
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check on the status of a Watcher.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Returns the time since it was last confirmed. If there is an error, the
|
||||||
|
// Watcher is no longer valid, and should be destroyed with the Delete() method.
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
|
||||||
|
func (w *Watcher) Check() (time.Duration, error) {
|
||||||
|
ret := C.rados_watch_check(w.ioctx.ioctx, C.uint64_t(w.id))
|
||||||
|
if ret < 0 {
|
||||||
|
return 0, getError(ret)
|
||||||
|
}
|
||||||
|
return time.Millisecond * time.Duration(ret), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the watcher. This closes both the event and error channel.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_unwatch2(rados_ioctx_t io, uint64_t cookie)
|
||||||
|
func (w *Watcher) Delete() error {
|
||||||
|
watchersMtx.Lock()
|
||||||
|
_, ok := watchers[w.id]
|
||||||
|
if ok {
|
||||||
|
delete(watchers, w.id)
|
||||||
|
}
|
||||||
|
watchersMtx.Unlock()
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
ret := C.rados_unwatch2(w.ioctx.ioctx, C.uint64_t(w.id))
|
||||||
|
if ret != 0 {
|
||||||
|
return getError(ret)
|
||||||
|
}
|
||||||
|
close(w.done) // unblock blocked callbacks
|
||||||
|
close(w.events)
|
||||||
|
close(w.errors)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify sends a notification with the provided data to all Watchers of the
|
||||||
|
// specified object.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// CAUTION: even if the error is not nil. the returned slices
|
||||||
|
// might still contain data.
|
||||||
|
func (ioctx *IOContext) Notify(obj string, data []byte) ([]NotifyAck, []NotifyTimeout, error) {
|
||||||
|
return ioctx.NotifyWithTimeout(obj, data, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotifyWithTimeout is like Notify() but with a different timeout than the
|
||||||
|
// default.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_notify2(rados_ioctx_t io, const char* o, const char* buf, int buf_len,
|
||||||
|
// uint64_t timeout_ms, char** reply_buffer, size_t* reply_buffer_len)
|
||||||
|
func (ioctx *IOContext) NotifyWithTimeout(obj string, data []byte, timeout time.Duration) ([]NotifyAck,
|
||||||
|
[]NotifyTimeout, error) {
|
||||||
|
cObj := C.CString(obj)
|
||||||
|
defer C.free(unsafe.Pointer(cObj))
|
||||||
|
var cResponse *C.char
|
||||||
|
defer C.rados_buffer_free(cResponse)
|
||||||
|
var responseLen C.size_t
|
||||||
|
var dataPtr *C.char
|
||||||
|
if len(data) > 0 {
|
||||||
|
dataPtr = (*C.char)(unsafe.Pointer(&data[0]))
|
||||||
|
}
|
||||||
|
ret := C.rados_notify2(
|
||||||
|
ioctx.ioctx,
|
||||||
|
cObj,
|
||||||
|
dataPtr,
|
||||||
|
C.int(len(data)),
|
||||||
|
C.uint64_t(timeout.Milliseconds()),
|
||||||
|
&cResponse,
|
||||||
|
&responseLen,
|
||||||
|
)
|
||||||
|
// cResponse has been set even if an error is returned, so we decode it anyway
|
||||||
|
acks, timeouts := decodeNotifyResponse(cResponse, responseLen)
|
||||||
|
return acks, timeouts, getError(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ack sends an acknowledgement with the specified response data to the notfier
|
||||||
|
// of the NotifyEvent. If a notify is not ack'ed, the originating Notify() call
|
||||||
|
// blocks and eventiually times out.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id,
|
||||||
|
// uint64_t cookie, const char *buf, int buf_len)
|
||||||
|
func (ne *NotifyEvent) Ack(response []byte) error {
|
||||||
|
watchersMtx.RLock()
|
||||||
|
w, ok := watchers[ne.WatcherID]
|
||||||
|
watchersMtx.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("can't ack on deleted watcher %v", ne.WatcherID)
|
||||||
|
}
|
||||||
|
cOID := C.CString(w.oid)
|
||||||
|
defer C.free(unsafe.Pointer(cOID))
|
||||||
|
var respPtr *C.char
|
||||||
|
if len(response) > 0 {
|
||||||
|
respPtr = (*C.char)(unsafe.Pointer(&response[0]))
|
||||||
|
}
|
||||||
|
ret := C.rados_notify_ack(
|
||||||
|
w.ioctx.ioctx,
|
||||||
|
cOID,
|
||||||
|
C.uint64_t(ne.ID),
|
||||||
|
C.uint64_t(ne.WatcherID),
|
||||||
|
respPtr,
|
||||||
|
C.int(len(response)),
|
||||||
|
)
|
||||||
|
return getError(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
// WatcherFlush flushes all pending notifications of the cluster.
|
||||||
|
// PREVIEW
|
||||||
|
//
|
||||||
|
// Implements:
|
||||||
|
// int rados_watch_flush(rados_t cluster)
|
||||||
|
func (c *Conn) WatcherFlush() error {
|
||||||
|
if !c.connected {
|
||||||
|
return ErrNotConnected
|
||||||
|
}
|
||||||
|
ret := C.rados_watch_flush(c.cluster)
|
||||||
|
return getError(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
// decoder for this notify response format:
|
||||||
|
// le32 num_acks
|
||||||
|
// {
|
||||||
|
// le64 gid global id for the client (for client.1234 that's 1234)
|
||||||
|
// le64 cookie cookie for the client
|
||||||
|
// le32 buflen length of reply message buffer
|
||||||
|
// u8 buflen payload
|
||||||
|
// } num_acks
|
||||||
|
// le32 num_timeouts
|
||||||
|
// {
|
||||||
|
// le64 gid global id for the client
|
||||||
|
// le64 cookie cookie for the client
|
||||||
|
// } num_timeouts
|
||||||
|
//
|
||||||
|
// NOTE: starting with pacific this is implemented as a C function and this can
|
||||||
|
// be replaced later
|
||||||
|
func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []NotifyTimeout) {
|
||||||
|
if len == 0 || response == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
b := (*[math.MaxInt32]byte)(unsafe.Pointer(response))[:len:len]
|
||||||
|
pos := 0
|
||||||
|
|
||||||
|
num := binary.LittleEndian.Uint32(b[pos:])
|
||||||
|
pos += 4
|
||||||
|
acks := make([]NotifyAck, num)
|
||||||
|
for i := range acks {
|
||||||
|
acks[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:]))
|
||||||
|
pos += 8
|
||||||
|
acks[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:]))
|
||||||
|
pos += 8
|
||||||
|
dataLen := binary.LittleEndian.Uint32(b[pos:])
|
||||||
|
pos += 4
|
||||||
|
if dataLen > 0 {
|
||||||
|
acks[i].Response = C.GoBytes(unsafe.Pointer(&b[pos]), C.int(dataLen))
|
||||||
|
pos += int(dataLen)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
num = binary.LittleEndian.Uint32(b[pos:])
|
||||||
|
pos += 4
|
||||||
|
timeouts := make([]NotifyTimeout, num)
|
||||||
|
for i := range timeouts {
|
||||||
|
timeouts[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:]))
|
||||||
|
pos += 8
|
||||||
|
timeouts[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:]))
|
||||||
|
pos += 8
|
||||||
|
}
|
||||||
|
return acks, timeouts
|
||||||
|
}
|
||||||
|
|
||||||
|
//export watchNotifyCb
|
||||||
|
func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
|
||||||
|
notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) {
|
||||||
|
watchersMtx.RLock()
|
||||||
|
w, ok := watchers[WatcherID(id)]
|
||||||
|
watchersMtx.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
// usually this should not happen, but who knows
|
||||||
|
// TODO: some log message (once we have logging)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ev := NotifyEvent{
|
||||||
|
ID: NotifyID(notifyID),
|
||||||
|
WatcherID: WatcherID(id),
|
||||||
|
NotifierID: NotifierID(notifierID),
|
||||||
|
}
|
||||||
|
if dataLen > 0 {
|
||||||
|
ev.Data = C.GoBytes(cData, C.int(dataLen))
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-w.done: // unblock when deleted
|
||||||
|
case w.events <- ev:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//export watchErrorCb
|
||||||
|
func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) {
|
||||||
|
watchersMtx.RLock()
|
||||||
|
w, ok := watchers[WatcherID(id)]
|
||||||
|
watchersMtx.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
// usually this should not happen, but who knows
|
||||||
|
// TODO: some log message (once we have logging)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-w.done: // unblock when deleted
|
||||||
|
case w.errors <- getError(err):
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,286 @@
|
||||||
|
//go:build ceph_preview
|
||||||
|
// +build ceph_preview
|
||||||
|
|
||||||
|
package rados
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (suite *RadosTestSuite) TestWatcher() {
|
||||||
|
suite.SetupConnection()
|
||||||
|
oid := suite.GenObjectName()
|
||||||
|
err := suite.ioctx.Create(oid, CreateExclusive)
|
||||||
|
assert.NoError(suite.T(), err)
|
||||||
|
defer func() { _ = suite.ioctx.Delete(oid) }()
|
||||||
|
|
||||||
|
suite.T().Run("DoubleDelete", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotPanics(t, func() {
|
||||||
|
err := watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("DeleteClosesChannels", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
evDone := make(chan struct{})
|
||||||
|
errDone := make(chan struct{})
|
||||||
|
go func() { // event handler
|
||||||
|
for ne := range watcher.Events() {
|
||||||
|
t.Errorf("received event: %v", ne)
|
||||||
|
}
|
||||||
|
close(evDone)
|
||||||
|
}()
|
||||||
|
go func() { // error handler
|
||||||
|
for err := range watcher.Errors() {
|
||||||
|
t.Errorf("received error: %v", err)
|
||||||
|
}
|
||||||
|
close(errDone)
|
||||||
|
}()
|
||||||
|
err = watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
select {
|
||||||
|
case <-evDone:
|
||||||
|
// Delete closed the events channel
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("timeout closing event channel")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-errDone:
|
||||||
|
// Delete closed the error channel
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("timeout closing error channel")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("NotifyNoAck", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
defer func() { _ = watcher.Delete() }()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
data := []byte("notification")
|
||||||
|
acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second)
|
||||||
|
assert.Error(t, err) // without ack it must timeout
|
||||||
|
assert.Len(t, timeouts, 1)
|
||||||
|
assert.Equal(t, watcher.ID(), timeouts[0].WatcherID)
|
||||||
|
assert.Empty(t, acks)
|
||||||
|
select {
|
||||||
|
case ev := <-watcher.Events():
|
||||||
|
assert.Equal(t, data, ev.Data)
|
||||||
|
assert.Equal(t, timeouts[0].NotifierID, ev.NotifierID)
|
||||||
|
case err = <-watcher.Errors():
|
||||||
|
t.Error(err)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("timeout")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("NotifyDeletedWatcher", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
data := []byte("notification")
|
||||||
|
acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, timeouts)
|
||||||
|
assert.Empty(t, acks)
|
||||||
|
select {
|
||||||
|
case _, ok := <-watcher.Events():
|
||||||
|
assert.False(t, ok)
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("timeout")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("NotifyNoAckDeleteUnblocksChannels", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
i := i
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
_, _, err := suite.ioctx.Notify(oid, []byte(fmt.Sprintf("notify%d", i)))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond * 100) // wait so that callbacks are blocking
|
||||||
|
err = watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
wg.Wait()
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("NotifyWithAck", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
defer func() { _ = watcher.Delete() }()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
data := []byte("notification")
|
||||||
|
response := []byte("response")
|
||||||
|
var receivedEv NotifyEvent
|
||||||
|
go func() { // event handler
|
||||||
|
for ne := range watcher.Events() {
|
||||||
|
receivedEv = ne
|
||||||
|
assert.Equal(t, data, ne.Data)
|
||||||
|
assert.Equal(t, watcher.ID(), ne.WatcherID)
|
||||||
|
err := ne.Ack(response)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() { // error handler
|
||||||
|
for err := range watcher.Errors() {
|
||||||
|
t.Errorf("received error: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
acks, timeouts, err := suite.ioctx.Notify(oid, data)
|
||||||
|
assert.NoError(t, err) // without ack it must timeout
|
||||||
|
assert.Empty(t, timeouts)
|
||||||
|
assert.Len(t, acks, 1)
|
||||||
|
assert.Equal(t, watcher.ID(), acks[0].WatcherID)
|
||||||
|
assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID)
|
||||||
|
assert.Equal(t, response, acks[0].Response)
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("NotifyAckNilData", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
defer func() { _ = watcher.Delete() }()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
var receivedEv NotifyEvent
|
||||||
|
go func() { // event handler
|
||||||
|
for ne := range watcher.Events() {
|
||||||
|
receivedEv = ne
|
||||||
|
assert.Equal(t, []byte(nil), ne.Data)
|
||||||
|
assert.Equal(t, watcher.ID(), ne.WatcherID)
|
||||||
|
err := ne.Ack(nil)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
go func() { // error handler
|
||||||
|
for err := range watcher.Errors() {
|
||||||
|
t.Errorf("received error: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
acks, timeouts, err := suite.ioctx.Notify(oid, nil)
|
||||||
|
assert.NoError(t, err) // without ack it must timeout
|
||||||
|
assert.Empty(t, timeouts)
|
||||||
|
assert.Len(t, acks, 1)
|
||||||
|
assert.Equal(t, watcher.ID(), acks[0].WatcherID)
|
||||||
|
assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID)
|
||||||
|
assert.Equal(t, []byte(nil), acks[0].Response)
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("Check", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.WatchWithTimeout(oid, time.Second)
|
||||||
|
defer func() { _ = watcher.Delete() }()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
last, err := watcher.Check()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Greater(t, int(last), 0)
|
||||||
|
select {
|
||||||
|
case err = <-watcher.Errors(): // watcher times out
|
||||||
|
case <-time.After(time.Second * 2):
|
||||||
|
t.Error("timeout")
|
||||||
|
}
|
||||||
|
assert.Error(t, err)
|
||||||
|
last, err2 := watcher.Check()
|
||||||
|
assert.Error(t, err2)
|
||||||
|
assert.Equal(t, err, err2)
|
||||||
|
assert.Zero(t, last)
|
||||||
|
})
|
||||||
|
|
||||||
|
suite.T().Run("Flush", func(t *testing.T) {
|
||||||
|
watcher, err := suite.ioctx.Watch(oid)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
_, _, _ = suite.ioctx.Notify(oid, nil)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
flushed := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
err := suite.conn.WatcherFlush()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
close(flushed)
|
||||||
|
}()
|
||||||
|
select {
|
||||||
|
case <-flushed:
|
||||||
|
t.Error("flush returned before event got received")
|
||||||
|
case <-time.After(time.Millisecond * 100):
|
||||||
|
}
|
||||||
|
<-watcher.Events()
|
||||||
|
select {
|
||||||
|
case <-flushed:
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Error("flush didn't return after receiving event")
|
||||||
|
}
|
||||||
|
err = watcher.Delete()
|
||||||
|
assert.NoError(t, err)
|
||||||
|
<-done
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDecodeNotifyResponse(t *testing.T) {
|
||||||
|
testEmptyResponse := [...]byte{
|
||||||
|
// le32 num_acks
|
||||||
|
0, 0, 0, 0,
|
||||||
|
// le32 num_timeouts
|
||||||
|
0, 0, 0, 0,
|
||||||
|
}
|
||||||
|
testResponse := [...]byte{
|
||||||
|
// le32 num_acks
|
||||||
|
1, 0, 0, 0,
|
||||||
|
// le64 gid global id for the client (for client.1234 that's 1234)
|
||||||
|
0, 1, 0, 0, 0, 0, 0, 0,
|
||||||
|
// le64 cookie cookie for the client
|
||||||
|
1, 1, 0, 0, 0, 0, 0, 0,
|
||||||
|
// le32 buflen length of reply message buffer
|
||||||
|
4, 0, 0, 0,
|
||||||
|
// u8 buflen payload
|
||||||
|
1, 2, 3, 4,
|
||||||
|
// le32 num_timeouts
|
||||||
|
2, 0, 0, 0,
|
||||||
|
// le64 gid global id for the client
|
||||||
|
2, 1, 0, 0, 0, 0, 0, 0,
|
||||||
|
// le64 cookie cookie for the client
|
||||||
|
3, 1, 0, 0, 0, 0, 0, 0,
|
||||||
|
// le64 gid global id for the client
|
||||||
|
4, 1, 0, 0, 0, 0, 0, 0,
|
||||||
|
// le64 cookie cookie for the client
|
||||||
|
255, 255, 255, 255, 255, 255, 255, 255,
|
||||||
|
}
|
||||||
|
t.Run("Empty", func(t *testing.T) {
|
||||||
|
l := _Ctype_size_t(len(testEmptyResponse))
|
||||||
|
b := (*_Ctype_char)(unsafe.Pointer(&testEmptyResponse[0]))
|
||||||
|
acks, tOuts := decodeNotifyResponse(b, l)
|
||||||
|
assert.Len(t, acks, 0)
|
||||||
|
assert.Len(t, tOuts, 0)
|
||||||
|
})
|
||||||
|
t.Run("Example", func(t *testing.T) {
|
||||||
|
l := _Ctype_size_t(len(testResponse))
|
||||||
|
b := (*_Ctype_char)(unsafe.Pointer(&testResponse[0]))
|
||||||
|
acks, tOuts := decodeNotifyResponse(b, l)
|
||||||
|
assert.Len(t, acks, 1)
|
||||||
|
assert.Equal(t, acks[0].NotifierID, NotifierID(256))
|
||||||
|
assert.Equal(t, acks[0].WatcherID, WatcherID(257))
|
||||||
|
assert.Len(t, acks[0].Response, 4)
|
||||||
|
assert.Equal(t, acks[0].Response, []byte{1, 2, 3, 4})
|
||||||
|
assert.Len(t, tOuts, 2)
|
||||||
|
assert.Equal(t, tOuts[0].NotifierID, NotifierID(258))
|
||||||
|
assert.Equal(t, tOuts[0].WatcherID, WatcherID(259))
|
||||||
|
assert.Equal(t, tOuts[1].NotifierID, NotifierID(260))
|
||||||
|
assert.Equal(t, tOuts[1].WatcherID, WatcherID(math.MaxUint64))
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in New Issue