mirror of
https://github.com/ceph/go-ceph
synced 2025-01-24 23:23:23 +00:00
21f192a484
This change implements the bindings for the watch/notify APIs of librados. Instead of callbacks, the watcher is implemented with more Go-idiomatic channels. A watcher object exposes two read-only channels, one to receive the notify events and one to receive occuring errors. Signed-off-by: Sven Anderson <sven@redhat.com>
287 lines
7.8 KiB
Go
287 lines
7.8 KiB
Go
//go:build ceph_preview
|
|
// +build ceph_preview
|
|
|
|
package rados
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
"unsafe"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
)
|
|
|
|
func (suite *RadosTestSuite) TestWatcher() {
|
|
suite.SetupConnection()
|
|
oid := suite.GenObjectName()
|
|
err := suite.ioctx.Create(oid, CreateExclusive)
|
|
assert.NoError(suite.T(), err)
|
|
defer func() { _ = suite.ioctx.Delete(oid) }()
|
|
|
|
suite.T().Run("DoubleDelete", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
assert.NoError(t, err)
|
|
assert.NotPanics(t, func() {
|
|
err := watcher.Delete()
|
|
assert.NoError(t, err)
|
|
err = watcher.Delete()
|
|
assert.NoError(t, err)
|
|
})
|
|
})
|
|
|
|
suite.T().Run("DeleteClosesChannels", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
assert.NoError(t, err)
|
|
evDone := make(chan struct{})
|
|
errDone := make(chan struct{})
|
|
go func() { // event handler
|
|
for ne := range watcher.Events() {
|
|
t.Errorf("received event: %v", ne)
|
|
}
|
|
close(evDone)
|
|
}()
|
|
go func() { // error handler
|
|
for err := range watcher.Errors() {
|
|
t.Errorf("received error: %v", err)
|
|
}
|
|
close(errDone)
|
|
}()
|
|
err = watcher.Delete()
|
|
assert.NoError(t, err)
|
|
select {
|
|
case <-evDone:
|
|
// Delete closed the events channel
|
|
case <-time.After(time.Second):
|
|
t.Error("timeout closing event channel")
|
|
}
|
|
select {
|
|
case <-errDone:
|
|
// Delete closed the error channel
|
|
case <-time.After(time.Second):
|
|
t.Error("timeout closing error channel")
|
|
}
|
|
})
|
|
|
|
suite.T().Run("NotifyNoAck", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
defer func() { _ = watcher.Delete() }()
|
|
assert.NoError(t, err)
|
|
data := []byte("notification")
|
|
acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second)
|
|
assert.Error(t, err) // without ack it must timeout
|
|
assert.Len(t, timeouts, 1)
|
|
assert.Equal(t, watcher.ID(), timeouts[0].WatcherID)
|
|
assert.Empty(t, acks)
|
|
select {
|
|
case ev := <-watcher.Events():
|
|
assert.Equal(t, data, ev.Data)
|
|
assert.Equal(t, timeouts[0].NotifierID, ev.NotifierID)
|
|
case err = <-watcher.Errors():
|
|
t.Error(err)
|
|
case <-time.After(time.Second):
|
|
t.Error("timeout")
|
|
}
|
|
})
|
|
|
|
suite.T().Run("NotifyDeletedWatcher", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
assert.NoError(t, err)
|
|
err = watcher.Delete()
|
|
assert.NoError(t, err)
|
|
data := []byte("notification")
|
|
acks, timeouts, err := suite.ioctx.NotifyWithTimeout(oid, data, time.Second)
|
|
assert.NoError(t, err)
|
|
assert.Empty(t, timeouts)
|
|
assert.Empty(t, acks)
|
|
select {
|
|
case _, ok := <-watcher.Events():
|
|
assert.False(t, ok)
|
|
case <-time.After(time.Second):
|
|
t.Error("timeout")
|
|
}
|
|
})
|
|
|
|
suite.T().Run("NotifyNoAckDeleteUnblocksChannels", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
assert.NoError(t, err)
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 10; i++ {
|
|
i := i
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
_, _, err := suite.ioctx.Notify(oid, []byte(fmt.Sprintf("notify%d", i)))
|
|
assert.NoError(t, err)
|
|
}()
|
|
}
|
|
time.Sleep(time.Millisecond * 100) // wait so that callbacks are blocking
|
|
err = watcher.Delete()
|
|
assert.NoError(t, err)
|
|
wg.Wait()
|
|
})
|
|
|
|
suite.T().Run("NotifyWithAck", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
defer func() { _ = watcher.Delete() }()
|
|
assert.NoError(t, err)
|
|
data := []byte("notification")
|
|
response := []byte("response")
|
|
var receivedEv NotifyEvent
|
|
go func() { // event handler
|
|
for ne := range watcher.Events() {
|
|
receivedEv = ne
|
|
assert.Equal(t, data, ne.Data)
|
|
assert.Equal(t, watcher.ID(), ne.WatcherID)
|
|
err := ne.Ack(response)
|
|
assert.NoError(t, err)
|
|
}
|
|
}()
|
|
go func() { // error handler
|
|
for err := range watcher.Errors() {
|
|
t.Errorf("received error: %v", err)
|
|
}
|
|
}()
|
|
acks, timeouts, err := suite.ioctx.Notify(oid, data)
|
|
assert.NoError(t, err) // without ack it must timeout
|
|
assert.Empty(t, timeouts)
|
|
assert.Len(t, acks, 1)
|
|
assert.Equal(t, watcher.ID(), acks[0].WatcherID)
|
|
assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID)
|
|
assert.Equal(t, response, acks[0].Response)
|
|
})
|
|
|
|
suite.T().Run("NotifyAckNilData", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
defer func() { _ = watcher.Delete() }()
|
|
assert.NoError(t, err)
|
|
var receivedEv NotifyEvent
|
|
go func() { // event handler
|
|
for ne := range watcher.Events() {
|
|
receivedEv = ne
|
|
assert.Equal(t, []byte(nil), ne.Data)
|
|
assert.Equal(t, watcher.ID(), ne.WatcherID)
|
|
err := ne.Ack(nil)
|
|
assert.NoError(t, err)
|
|
}
|
|
}()
|
|
go func() { // error handler
|
|
for err := range watcher.Errors() {
|
|
t.Errorf("received error: %v", err)
|
|
}
|
|
}()
|
|
acks, timeouts, err := suite.ioctx.Notify(oid, nil)
|
|
assert.NoError(t, err) // without ack it must timeout
|
|
assert.Empty(t, timeouts)
|
|
assert.Len(t, acks, 1)
|
|
assert.Equal(t, watcher.ID(), acks[0].WatcherID)
|
|
assert.Equal(t, receivedEv.NotifierID, acks[0].NotifierID)
|
|
assert.Equal(t, []byte(nil), acks[0].Response)
|
|
})
|
|
|
|
suite.T().Run("Check", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.WatchWithTimeout(oid, time.Second)
|
|
defer func() { _ = watcher.Delete() }()
|
|
assert.NoError(t, err)
|
|
last, err := watcher.Check()
|
|
assert.NoError(t, err)
|
|
assert.Greater(t, int(last), 0)
|
|
select {
|
|
case err = <-watcher.Errors(): // watcher times out
|
|
case <-time.After(time.Second * 2):
|
|
t.Error("timeout")
|
|
}
|
|
assert.Error(t, err)
|
|
last, err2 := watcher.Check()
|
|
assert.Error(t, err2)
|
|
assert.Equal(t, err, err2)
|
|
assert.Zero(t, last)
|
|
})
|
|
|
|
suite.T().Run("Flush", func(t *testing.T) {
|
|
watcher, err := suite.ioctx.Watch(oid)
|
|
assert.NoError(t, err)
|
|
done := make(chan struct{})
|
|
go func() {
|
|
_, _, _ = suite.ioctx.Notify(oid, nil)
|
|
close(done)
|
|
}()
|
|
time.Sleep(time.Millisecond * 100)
|
|
flushed := make(chan struct{})
|
|
go func() {
|
|
err := suite.conn.WatcherFlush()
|
|
assert.NoError(t, err)
|
|
close(flushed)
|
|
}()
|
|
select {
|
|
case <-flushed:
|
|
t.Error("flush returned before event got received")
|
|
case <-time.After(time.Millisecond * 100):
|
|
}
|
|
<-watcher.Events()
|
|
select {
|
|
case <-flushed:
|
|
case <-time.After(time.Second):
|
|
t.Error("flush didn't return after receiving event")
|
|
}
|
|
err = watcher.Delete()
|
|
assert.NoError(t, err)
|
|
<-done
|
|
})
|
|
}
|
|
|
|
func TestDecodeNotifyResponse(t *testing.T) {
|
|
testEmptyResponse := [...]byte{
|
|
// le32 num_acks
|
|
0, 0, 0, 0,
|
|
// le32 num_timeouts
|
|
0, 0, 0, 0,
|
|
}
|
|
testResponse := [...]byte{
|
|
// le32 num_acks
|
|
1, 0, 0, 0,
|
|
// le64 gid global id for the client (for client.1234 that's 1234)
|
|
0, 1, 0, 0, 0, 0, 0, 0,
|
|
// le64 cookie cookie for the client
|
|
1, 1, 0, 0, 0, 0, 0, 0,
|
|
// le32 buflen length of reply message buffer
|
|
4, 0, 0, 0,
|
|
// u8 buflen payload
|
|
1, 2, 3, 4,
|
|
// le32 num_timeouts
|
|
2, 0, 0, 0,
|
|
// le64 gid global id for the client
|
|
2, 1, 0, 0, 0, 0, 0, 0,
|
|
// le64 cookie cookie for the client
|
|
3, 1, 0, 0, 0, 0, 0, 0,
|
|
// le64 gid global id for the client
|
|
4, 1, 0, 0, 0, 0, 0, 0,
|
|
// le64 cookie cookie for the client
|
|
255, 255, 255, 255, 255, 255, 255, 255,
|
|
}
|
|
t.Run("Empty", func(t *testing.T) {
|
|
l := _Ctype_size_t(len(testEmptyResponse))
|
|
b := (*_Ctype_char)(unsafe.Pointer(&testEmptyResponse[0]))
|
|
acks, tOuts := decodeNotifyResponse(b, l)
|
|
assert.Len(t, acks, 0)
|
|
assert.Len(t, tOuts, 0)
|
|
})
|
|
t.Run("Example", func(t *testing.T) {
|
|
l := _Ctype_size_t(len(testResponse))
|
|
b := (*_Ctype_char)(unsafe.Pointer(&testResponse[0]))
|
|
acks, tOuts := decodeNotifyResponse(b, l)
|
|
assert.Len(t, acks, 1)
|
|
assert.Equal(t, acks[0].NotifierID, NotifierID(256))
|
|
assert.Equal(t, acks[0].WatcherID, WatcherID(257))
|
|
assert.Len(t, acks[0].Response, 4)
|
|
assert.Equal(t, acks[0].Response, []byte{1, 2, 3, 4})
|
|
assert.Len(t, tOuts, 2)
|
|
assert.Equal(t, tOuts[0].NotifierID, NotifierID(258))
|
|
assert.Equal(t, tOuts[0].WatcherID, WatcherID(259))
|
|
assert.Equal(t, tOuts[1].NotifierID, NotifierID(260))
|
|
assert.Equal(t, tOuts[1].WatcherID, WatcherID(math.MaxUint64))
|
|
})
|
|
}
|