1279 lines
34 KiB
Go
1279 lines
34 KiB
Go
// Package zk is a native Go client library for the ZooKeeper orchestration service.
|
|
package zk
|
|
|
|
/*
|
|
TODO:
|
|
* make sure a ping response comes back in a reasonable time
|
|
|
|
Possible watcher events:
|
|
* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
|
|
*/
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// ErrNoServer indicates that an operation cannot be completed
|
|
// because attempts to connect to all servers in the list failed.
|
|
var ErrNoServer = errors.New("zk: could not connect to a server")
|
|
|
|
// ErrInvalidPath indicates that an operation was being attempted on
|
|
// an invalid path. (e.g. empty path)
|
|
var ErrInvalidPath = errors.New("zk: invalid path")
|
|
|
|
// DefaultLogger uses the stdlib log package for logging.
|
|
var DefaultLogger Logger = defaultLogger{}
|
|
|
|
const (
|
|
bufferSize = 1536 * 1024
|
|
eventChanSize = 6
|
|
sendChanSize = 16
|
|
protectedPrefix = "_c_"
|
|
)
|
|
|
|
type watchType int
|
|
|
|
const (
|
|
watchTypeData = iota
|
|
watchTypeExist
|
|
watchTypeChild
|
|
)
|
|
|
|
type watchPathType struct {
|
|
path string
|
|
wType watchType
|
|
}
|
|
|
|
type Dialer func(network, address string, timeout time.Duration) (net.Conn, error)
|
|
|
|
// Logger is an interface that can be implemented to provide custom log output.
|
|
type Logger interface {
|
|
Printf(string, ...interface{})
|
|
}
|
|
|
|
type authCreds struct {
|
|
scheme string
|
|
auth []byte
|
|
}
|
|
|
|
type Conn struct {
|
|
lastZxid int64
|
|
sessionID int64
|
|
state State // must be 32-bit aligned
|
|
xid uint32
|
|
sessionTimeoutMs int32 // session timeout in milliseconds
|
|
passwd []byte
|
|
|
|
dialer Dialer
|
|
hostProvider HostProvider
|
|
serverMu sync.Mutex // protects server
|
|
server string // remember the address/port of the current server
|
|
conn net.Conn
|
|
eventChan chan Event
|
|
eventCallback EventCallback // may be nil
|
|
shouldQuit chan struct{}
|
|
pingInterval time.Duration
|
|
recvTimeout time.Duration
|
|
connectTimeout time.Duration
|
|
maxBufferSize int
|
|
|
|
creds []authCreds
|
|
credsMu sync.Mutex // protects server
|
|
|
|
sendChan chan *request
|
|
requests map[int32]*request // Xid -> pending request
|
|
requestsLock sync.Mutex
|
|
watchers map[watchPathType][]chan Event
|
|
watchersLock sync.Mutex
|
|
closeChan chan struct{} // channel to tell send loop stop
|
|
|
|
// Debug (used by unit tests)
|
|
reconnectLatch chan struct{}
|
|
setWatchLimit int
|
|
setWatchCallback func([]*setWatchesRequest)
|
|
// Debug (for recurring re-auth hang)
|
|
debugCloseRecvLoop bool
|
|
debugReauthDone chan struct{}
|
|
|
|
logger Logger
|
|
logInfo bool // true if information messages are logged; false if only errors are logged
|
|
|
|
buf []byte
|
|
}
|
|
|
|
// connOption represents a connection option.
|
|
type connOption func(c *Conn)
|
|
|
|
type request struct {
|
|
xid int32
|
|
opcode int32
|
|
pkt interface{}
|
|
recvStruct interface{}
|
|
recvChan chan response
|
|
|
|
// Because sending and receiving happen in separate go routines, there's
|
|
// a possible race condition when creating watches from outside the read
|
|
// loop. We must ensure that a watcher gets added to the list synchronously
|
|
// with the response from the server on any request that creates a watch.
|
|
// In order to not hard code the watch logic for each opcode in the recv
|
|
// loop the caller can use recvFunc to insert some synchronously code
|
|
// after a response.
|
|
recvFunc func(*request, *responseHeader, error)
|
|
}
|
|
|
|
type response struct {
|
|
zxid int64
|
|
err error
|
|
}
|
|
|
|
type Event struct {
|
|
Type EventType
|
|
State State
|
|
Path string // For non-session events, the path of the watched node.
|
|
Err error
|
|
Server string // For connection events
|
|
}
|
|
|
|
// HostProvider is used to represent a set of hosts a ZooKeeper client should connect to.
|
|
// It is an analog of the Java equivalent:
|
|
// http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/client/HostProvider.java?view=markup
|
|
type HostProvider interface {
|
|
// Init is called first, with the servers specified in the connection string.
|
|
Init(servers []string) error
|
|
// Len returns the number of servers.
|
|
Len() int
|
|
// Next returns the next server to connect to. retryStart will be true if we've looped through
|
|
// all known servers without Connected() being called.
|
|
Next() (server string, retryStart bool)
|
|
// Notify the HostProvider of a successful connection.
|
|
Connected()
|
|
}
|
|
|
|
// ConnectWithDialer establishes a new connection to a pool of zookeeper servers
|
|
// using a custom Dialer. See Connect for further information about session timeout.
|
|
// This method is deprecated and provided for compatibility: use the WithDialer option instead.
|
|
func ConnectWithDialer(servers []string, sessionTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
|
|
return Connect(servers, sessionTimeout, WithDialer(dialer))
|
|
}
|
|
|
|
// Connect establishes a new connection to a pool of zookeeper
|
|
// servers. The provided session timeout sets the amount of time for which
|
|
// a session is considered valid after losing connection to a server. Within
|
|
// the session timeout it's possible to reestablish a connection to a different
|
|
// server and keep the same session. This is means any ephemeral nodes and
|
|
// watches are maintained.
|
|
func Connect(servers []string, sessionTimeout time.Duration, options ...connOption) (*Conn, <-chan Event, error) {
|
|
if len(servers) == 0 {
|
|
return nil, nil, errors.New("zk: server list must not be empty")
|
|
}
|
|
|
|
srvs := make([]string, len(servers))
|
|
|
|
for i, addr := range servers {
|
|
if strings.Contains(addr, ":") {
|
|
srvs[i] = addr
|
|
} else {
|
|
srvs[i] = addr + ":" + strconv.Itoa(DefaultPort)
|
|
}
|
|
}
|
|
|
|
// Randomize the order of the servers to avoid creating hotspots
|
|
stringShuffle(srvs)
|
|
|
|
ec := make(chan Event, eventChanSize)
|
|
conn := &Conn{
|
|
dialer: net.DialTimeout,
|
|
hostProvider: &DNSHostProvider{},
|
|
conn: nil,
|
|
state: StateDisconnected,
|
|
eventChan: ec,
|
|
shouldQuit: make(chan struct{}),
|
|
connectTimeout: 1 * time.Second,
|
|
sendChan: make(chan *request, sendChanSize),
|
|
requests: make(map[int32]*request),
|
|
watchers: make(map[watchPathType][]chan Event),
|
|
passwd: emptyPassword,
|
|
logger: DefaultLogger,
|
|
logInfo: true, // default is true for backwards compatability
|
|
buf: make([]byte, bufferSize),
|
|
}
|
|
|
|
// Set provided options.
|
|
for _, option := range options {
|
|
option(conn)
|
|
}
|
|
|
|
if err := conn.hostProvider.Init(srvs); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
conn.setTimeouts(int32(sessionTimeout / time.Millisecond))
|
|
|
|
go func() {
|
|
conn.loop()
|
|
conn.flushRequests(ErrClosing)
|
|
conn.invalidateWatches(ErrClosing)
|
|
close(conn.eventChan)
|
|
}()
|
|
return conn, ec, nil
|
|
}
|
|
|
|
// WithDialer returns a connection option specifying a non-default Dialer.
|
|
func WithDialer(dialer Dialer) connOption {
|
|
return func(c *Conn) {
|
|
c.dialer = dialer
|
|
}
|
|
}
|
|
|
|
// WithHostProvider returns a connection option specifying a non-default HostProvider.
|
|
func WithHostProvider(hostProvider HostProvider) connOption {
|
|
return func(c *Conn) {
|
|
c.hostProvider = hostProvider
|
|
}
|
|
}
|
|
|
|
// WithLogger returns a connection option specifying a non-default Logger
|
|
func WithLogger(logger Logger) connOption {
|
|
return func(c *Conn) {
|
|
c.logger = logger
|
|
}
|
|
}
|
|
|
|
// WithLogInfo returns a connection option specifying whether or not information messages
|
|
// shoud be logged.
|
|
func WithLogInfo(logInfo bool) connOption {
|
|
return func(c *Conn) {
|
|
c.logInfo = logInfo
|
|
}
|
|
}
|
|
|
|
// EventCallback is a function that is called when an Event occurs.
|
|
type EventCallback func(Event)
|
|
|
|
// WithEventCallback returns a connection option that specifies an event
|
|
// callback.
|
|
// The callback must not block - doing so would delay the ZK go routines.
|
|
func WithEventCallback(cb EventCallback) connOption {
|
|
return func(c *Conn) {
|
|
c.eventCallback = cb
|
|
}
|
|
}
|
|
|
|
// WithMaxBufferSize sets the maximum buffer size used to read and decode
|
|
// packets received from the Zookeeper server. The standard Zookeeper client for
|
|
// Java defaults to a limit of 1mb. For backwards compatibility, this Go client
|
|
// defaults to unbounded unless overridden via this option. A value that is zero
|
|
// or negative indicates that no limit is enforced.
|
|
//
|
|
// This is meant to prevent resource exhaustion in the face of potentially
|
|
// malicious data in ZK. It should generally match the server setting (which
|
|
// also defaults ot 1mb) so that clients and servers agree on the limits for
|
|
// things like the size of data in an individual znode and the total size of a
|
|
// transaction.
|
|
//
|
|
// For production systems, this should be set to a reasonable value (ideally
|
|
// that matches the server configuration). For ops tooling, it is handy to use a
|
|
// much larger limit, in order to do things like clean-up problematic state in
|
|
// the ZK tree. For example, if a single znode has a huge number of children, it
|
|
// is possible for the response to a "list children" operation to exceed this
|
|
// buffer size and cause errors in clients. The only way to subsequently clean
|
|
// up the tree (by removing superfluous children) is to use a client configured
|
|
// with a larger buffer size that can successfully query for all of the child
|
|
// names and then remove them. (Note there are other tools that can list all of
|
|
// the child names without an increased buffer size in the client, but they work
|
|
// by inspecting the servers' transaction logs to enumerate children instead of
|
|
// sending an online request to a server.
|
|
func WithMaxBufferSize(maxBufferSize int) connOption {
|
|
return func(c *Conn) {
|
|
c.maxBufferSize = maxBufferSize
|
|
}
|
|
}
|
|
|
|
// WithMaxConnBufferSize sets maximum buffer size used to send and encode
|
|
// packets to Zookeeper server. The standard Zookeepeer client for java defaults
|
|
// to a limit of 1mb. This option should be used for non-standard server setup
|
|
// where znode is bigger than default 1mb.
|
|
func WithMaxConnBufferSize(maxBufferSize int) connOption {
|
|
return func(c *Conn) {
|
|
c.buf = make([]byte, maxBufferSize)
|
|
}
|
|
}
|
|
|
|
func (c *Conn) Close() {
|
|
close(c.shouldQuit)
|
|
|
|
select {
|
|
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
|
|
case <-time.After(time.Second):
|
|
}
|
|
}
|
|
|
|
// State returns the current state of the connection.
|
|
func (c *Conn) State() State {
|
|
return State(atomic.LoadInt32((*int32)(&c.state)))
|
|
}
|
|
|
|
// SessionID returns the current session id of the connection.
|
|
func (c *Conn) SessionID() int64 {
|
|
return atomic.LoadInt64(&c.sessionID)
|
|
}
|
|
|
|
// SetLogger sets the logger to be used for printing errors.
|
|
// Logger is an interface provided by this package.
|
|
func (c *Conn) SetLogger(l Logger) {
|
|
c.logger = l
|
|
}
|
|
|
|
func (c *Conn) setTimeouts(sessionTimeoutMs int32) {
|
|
c.sessionTimeoutMs = sessionTimeoutMs
|
|
sessionTimeout := time.Duration(sessionTimeoutMs) * time.Millisecond
|
|
c.recvTimeout = sessionTimeout * 2 / 3
|
|
c.pingInterval = c.recvTimeout / 2
|
|
}
|
|
|
|
func (c *Conn) setState(state State) {
|
|
atomic.StoreInt32((*int32)(&c.state), int32(state))
|
|
c.sendEvent(Event{Type: EventSession, State: state, Server: c.Server()})
|
|
}
|
|
|
|
func (c *Conn) sendEvent(evt Event) {
|
|
if c.eventCallback != nil {
|
|
c.eventCallback(evt)
|
|
}
|
|
|
|
select {
|
|
case c.eventChan <- evt:
|
|
default:
|
|
// panic("zk: event channel full - it must be monitored and never allowed to be full")
|
|
}
|
|
}
|
|
|
|
func (c *Conn) connect() error {
|
|
var retryStart bool
|
|
for {
|
|
c.serverMu.Lock()
|
|
c.server, retryStart = c.hostProvider.Next()
|
|
c.serverMu.Unlock()
|
|
c.setState(StateConnecting)
|
|
if retryStart {
|
|
c.flushUnsentRequests(ErrNoServer)
|
|
select {
|
|
case <-time.After(time.Second):
|
|
// pass
|
|
case <-c.shouldQuit:
|
|
c.setState(StateDisconnected)
|
|
c.flushUnsentRequests(ErrClosing)
|
|
return ErrClosing
|
|
}
|
|
}
|
|
|
|
zkConn, err := c.dialer("tcp", c.Server(), c.connectTimeout)
|
|
if err == nil {
|
|
c.conn = zkConn
|
|
c.setState(StateConnected)
|
|
if c.logInfo {
|
|
c.logger.Printf("Connected to %s", c.Server())
|
|
}
|
|
return nil
|
|
}
|
|
|
|
c.logger.Printf("Failed to connect to %s: %+v", c.Server(), err)
|
|
}
|
|
}
|
|
|
|
func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
|
|
shouldCancel := func() bool {
|
|
select {
|
|
case <-c.shouldQuit:
|
|
return true
|
|
case <-c.closeChan:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
c.credsMu.Lock()
|
|
defer c.credsMu.Unlock()
|
|
|
|
defer close(reauthReadyChan)
|
|
|
|
if c.logInfo {
|
|
c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
|
|
}
|
|
|
|
for _, cred := range c.creds {
|
|
if shouldCancel() {
|
|
return
|
|
}
|
|
resChan, err := c.sendRequest(
|
|
opSetAuth,
|
|
&setAuthRequest{Type: 0,
|
|
Scheme: cred.scheme,
|
|
Auth: cred.auth,
|
|
},
|
|
&setAuthResponse{},
|
|
nil)
|
|
|
|
if err != nil {
|
|
c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
|
|
// FIXME(prozlach): lets ignore errors for now
|
|
continue
|
|
}
|
|
|
|
var res response
|
|
select {
|
|
case res = <-resChan:
|
|
case <-c.closeChan:
|
|
c.logger.Printf("recv closed, cancel re-submitting credentials")
|
|
return
|
|
case <-c.shouldQuit:
|
|
c.logger.Printf("should quit, cancel re-submitting credentials")
|
|
return
|
|
}
|
|
if res.err != nil {
|
|
c.logger.Printf("credential re-submit failed: %s", res.err)
|
|
// FIXME(prozlach): lets ignore errors for now
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Conn) sendRequest(
|
|
opcode int32,
|
|
req interface{},
|
|
res interface{},
|
|
recvFunc func(*request, *responseHeader, error),
|
|
) (
|
|
<-chan response,
|
|
error,
|
|
) {
|
|
rq := &request{
|
|
xid: c.nextXid(),
|
|
opcode: opcode,
|
|
pkt: req,
|
|
recvStruct: res,
|
|
recvChan: make(chan response, 1),
|
|
recvFunc: recvFunc,
|
|
}
|
|
|
|
if err := c.sendData(rq); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return rq.recvChan, nil
|
|
}
|
|
|
|
func (c *Conn) loop() {
|
|
for {
|
|
if err := c.connect(); err != nil {
|
|
// c.Close() was called
|
|
return
|
|
}
|
|
|
|
err := c.authenticate()
|
|
switch {
|
|
case err == ErrSessionExpired:
|
|
c.logger.Printf("authentication failed: %s", err)
|
|
c.invalidateWatches(err)
|
|
case err != nil && c.conn != nil:
|
|
c.logger.Printf("authentication failed: %s", err)
|
|
c.conn.Close()
|
|
case err == nil:
|
|
if c.logInfo {
|
|
c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
|
|
}
|
|
c.hostProvider.Connected() // mark success
|
|
c.closeChan = make(chan struct{}) // channel to tell send loop stop
|
|
reauthChan := make(chan struct{}) // channel to tell send loop that authdata has been resubmitted
|
|
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
go func() {
|
|
<-reauthChan
|
|
if c.debugCloseRecvLoop {
|
|
close(c.debugReauthDone)
|
|
}
|
|
err := c.sendLoop()
|
|
if err != nil || c.logInfo {
|
|
c.logger.Printf("send loop terminated: err=%v", err)
|
|
}
|
|
c.conn.Close() // causes recv loop to EOF/exit
|
|
wg.Done()
|
|
}()
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
var err error
|
|
if c.debugCloseRecvLoop {
|
|
err = errors.New("DEBUG: close recv loop")
|
|
} else {
|
|
err = c.recvLoop(c.conn)
|
|
}
|
|
if err != io.EOF || c.logInfo {
|
|
c.logger.Printf("recv loop terminated: err=%v", err)
|
|
}
|
|
if err == nil {
|
|
panic("zk: recvLoop should never return nil error")
|
|
}
|
|
close(c.closeChan) // tell send loop to exit
|
|
wg.Done()
|
|
}()
|
|
|
|
c.resendZkAuth(reauthChan)
|
|
|
|
c.sendSetWatches()
|
|
wg.Wait()
|
|
}
|
|
|
|
c.setState(StateDisconnected)
|
|
|
|
select {
|
|
case <-c.shouldQuit:
|
|
c.flushRequests(ErrClosing)
|
|
return
|
|
default:
|
|
}
|
|
|
|
if err != ErrSessionExpired {
|
|
err = ErrConnectionClosed
|
|
}
|
|
c.flushRequests(err)
|
|
|
|
if c.reconnectLatch != nil {
|
|
select {
|
|
case <-c.shouldQuit:
|
|
return
|
|
case <-c.reconnectLatch:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Conn) flushUnsentRequests(err error) {
|
|
for {
|
|
select {
|
|
default:
|
|
return
|
|
case req := <-c.sendChan:
|
|
req.recvChan <- response{-1, err}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send error to all pending requests and clear request map
|
|
func (c *Conn) flushRequests(err error) {
|
|
c.requestsLock.Lock()
|
|
for _, req := range c.requests {
|
|
req.recvChan <- response{-1, err}
|
|
}
|
|
c.requests = make(map[int32]*request)
|
|
c.requestsLock.Unlock()
|
|
}
|
|
|
|
// Send error to all watchers and clear watchers map
|
|
func (c *Conn) invalidateWatches(err error) {
|
|
c.watchersLock.Lock()
|
|
defer c.watchersLock.Unlock()
|
|
|
|
if len(c.watchers) >= 0 {
|
|
for pathType, watchers := range c.watchers {
|
|
ev := Event{Type: EventNotWatching, State: StateDisconnected, Path: pathType.path, Err: err}
|
|
for _, ch := range watchers {
|
|
ch <- ev
|
|
close(ch)
|
|
}
|
|
}
|
|
c.watchers = make(map[watchPathType][]chan Event)
|
|
}
|
|
}
|
|
|
|
func (c *Conn) sendSetWatches() {
|
|
c.watchersLock.Lock()
|
|
defer c.watchersLock.Unlock()
|
|
|
|
if len(c.watchers) == 0 {
|
|
return
|
|
}
|
|
|
|
// NB: A ZK server, by default, rejects packets >1mb. So, if we have too
|
|
// many watches to reset, we need to break this up into multiple packets
|
|
// to avoid hitting that limit. Mirroring the Java client behavior: we are
|
|
// conservative in that we limit requests to 128kb (since server limit is
|
|
// is actually configurable and could conceivably be configured smaller
|
|
// than default of 1mb).
|
|
limit := 128 * 1024
|
|
if c.setWatchLimit > 0 {
|
|
limit = c.setWatchLimit
|
|
}
|
|
|
|
var reqs []*setWatchesRequest
|
|
var req *setWatchesRequest
|
|
var sizeSoFar int
|
|
|
|
n := 0
|
|
for pathType, watchers := range c.watchers {
|
|
if len(watchers) == 0 {
|
|
continue
|
|
}
|
|
addlLen := 4 + len(pathType.path)
|
|
if req == nil || sizeSoFar+addlLen > limit {
|
|
if req != nil {
|
|
// add to set of requests that we'll send
|
|
reqs = append(reqs, req)
|
|
}
|
|
sizeSoFar = 28 // fixed overhead of a set-watches packet
|
|
req = &setWatchesRequest{
|
|
RelativeZxid: c.lastZxid,
|
|
DataWatches: make([]string, 0),
|
|
ExistWatches: make([]string, 0),
|
|
ChildWatches: make([]string, 0),
|
|
}
|
|
}
|
|
sizeSoFar += addlLen
|
|
switch pathType.wType {
|
|
case watchTypeData:
|
|
req.DataWatches = append(req.DataWatches, pathType.path)
|
|
case watchTypeExist:
|
|
req.ExistWatches = append(req.ExistWatches, pathType.path)
|
|
case watchTypeChild:
|
|
req.ChildWatches = append(req.ChildWatches, pathType.path)
|
|
}
|
|
n++
|
|
}
|
|
if n == 0 {
|
|
return
|
|
}
|
|
if req != nil { // don't forget any trailing packet we were building
|
|
reqs = append(reqs, req)
|
|
}
|
|
|
|
if c.setWatchCallback != nil {
|
|
c.setWatchCallback(reqs)
|
|
}
|
|
|
|
go func() {
|
|
res := &setWatchesResponse{}
|
|
// TODO: Pipeline these so queue all of them up before waiting on any
|
|
// response. That will require some investigation to make sure there
|
|
// aren't failure modes where a blocking write to the channel of requests
|
|
// could hang indefinitely and cause this goroutine to leak...
|
|
for _, req := range reqs {
|
|
_, err := c.request(opSetWatches, req, res, nil)
|
|
if err != nil {
|
|
c.logger.Printf("Failed to set previous watches: %s", err.Error())
|
|
break
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (c *Conn) authenticate() error {
|
|
buf := make([]byte, 256)
|
|
|
|
// Encode and send a connect request.
|
|
n, err := encodePacket(buf[4:], &connectRequest{
|
|
ProtocolVersion: protocolVersion,
|
|
LastZxidSeen: c.lastZxid,
|
|
TimeOut: c.sessionTimeoutMs,
|
|
SessionID: c.SessionID(),
|
|
Passwd: c.passwd,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(buf[:4], uint32(n))
|
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
|
|
return err
|
|
}
|
|
_, err = c.conn.Write(buf[:n+4])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Receive and decode a connect response.
|
|
if err := c.conn.SetReadDeadline(time.Now().Add(c.recvTimeout * 10)); err != nil {
|
|
return err
|
|
}
|
|
_, err = io.ReadFull(c.conn, buf[:4])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := c.conn.SetReadDeadline(time.Time{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
blen := int(binary.BigEndian.Uint32(buf[:4]))
|
|
if cap(buf) < blen {
|
|
buf = make([]byte, blen)
|
|
}
|
|
|
|
_, err = io.ReadFull(c.conn, buf[:blen])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r := connectResponse{}
|
|
_, err = decodePacket(buf[:blen], &r)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if r.SessionID == 0 {
|
|
atomic.StoreInt64(&c.sessionID, int64(0))
|
|
c.passwd = emptyPassword
|
|
c.lastZxid = 0
|
|
c.setState(StateExpired)
|
|
return ErrSessionExpired
|
|
}
|
|
|
|
atomic.StoreInt64(&c.sessionID, r.SessionID)
|
|
c.setTimeouts(r.TimeOut)
|
|
c.passwd = r.Passwd
|
|
c.setState(StateHasSession)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) sendData(req *request) error {
|
|
header := &requestHeader{req.xid, req.opcode}
|
|
n, err := encodePacket(c.buf[4:], header)
|
|
if err != nil {
|
|
req.recvChan <- response{-1, err}
|
|
return nil
|
|
}
|
|
|
|
n2, err := encodePacket(c.buf[4+n:], req.pkt)
|
|
if err != nil {
|
|
req.recvChan <- response{-1, err}
|
|
return nil
|
|
}
|
|
|
|
n += n2
|
|
|
|
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
|
|
|
|
c.requestsLock.Lock()
|
|
select {
|
|
case <-c.closeChan:
|
|
req.recvChan <- response{-1, ErrConnectionClosed}
|
|
c.requestsLock.Unlock()
|
|
return ErrConnectionClosed
|
|
default:
|
|
}
|
|
c.requests[req.xid] = req
|
|
c.requestsLock.Unlock()
|
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
|
|
return err
|
|
}
|
|
_, err = c.conn.Write(c.buf[:n+4])
|
|
if err != nil {
|
|
req.recvChan <- response{-1, err}
|
|
c.conn.Close()
|
|
return err
|
|
}
|
|
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) sendLoop() error {
|
|
pingTicker := time.NewTicker(c.pingInterval)
|
|
defer pingTicker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case req := <-c.sendChan:
|
|
if err := c.sendData(req); err != nil {
|
|
return err
|
|
}
|
|
case <-pingTicker.C:
|
|
n, err := encodePacket(c.buf[4:], &requestHeader{Xid: -2, Opcode: opPing})
|
|
if err != nil {
|
|
panic("zk: opPing should never fail to serialize")
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(c.buf[:4], uint32(n))
|
|
|
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.recvTimeout)); err != nil {
|
|
return err
|
|
}
|
|
_, err = c.conn.Write(c.buf[:n+4])
|
|
if err != nil {
|
|
c.conn.Close()
|
|
return err
|
|
}
|
|
if err := c.conn.SetWriteDeadline(time.Time{}); err != nil {
|
|
return err
|
|
}
|
|
case <-c.closeChan:
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Conn) recvLoop(conn net.Conn) error {
|
|
sz := bufferSize
|
|
if c.maxBufferSize > 0 && sz > c.maxBufferSize {
|
|
sz = c.maxBufferSize
|
|
}
|
|
buf := make([]byte, sz)
|
|
for {
|
|
// package length
|
|
if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil {
|
|
c.logger.Printf("failed to set connection deadline: %v", err)
|
|
}
|
|
_, err := io.ReadFull(conn, buf[:4])
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read from connection: %v", err)
|
|
}
|
|
|
|
blen := int(binary.BigEndian.Uint32(buf[:4]))
|
|
if cap(buf) < blen {
|
|
if c.maxBufferSize > 0 && blen > c.maxBufferSize {
|
|
return fmt.Errorf("received packet from server with length %d, which exceeds max buffer size %d", blen, c.maxBufferSize)
|
|
}
|
|
buf = make([]byte, blen)
|
|
}
|
|
|
|
_, err = io.ReadFull(conn, buf[:blen])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := conn.SetReadDeadline(time.Time{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
res := responseHeader{}
|
|
_, err = decodePacket(buf[:16], &res)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if res.Xid == -1 {
|
|
res := &watcherEvent{}
|
|
_, err := decodePacket(buf[16:blen], res)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ev := Event{
|
|
Type: res.Type,
|
|
State: res.State,
|
|
Path: res.Path,
|
|
Err: nil,
|
|
}
|
|
c.sendEvent(ev)
|
|
wTypes := make([]watchType, 0, 2)
|
|
switch res.Type {
|
|
case EventNodeCreated:
|
|
wTypes = append(wTypes, watchTypeExist)
|
|
case EventNodeDeleted, EventNodeDataChanged:
|
|
wTypes = append(wTypes, watchTypeExist, watchTypeData, watchTypeChild)
|
|
case EventNodeChildrenChanged:
|
|
wTypes = append(wTypes, watchTypeChild)
|
|
}
|
|
c.watchersLock.Lock()
|
|
for _, t := range wTypes {
|
|
wpt := watchPathType{res.Path, t}
|
|
if watchers, ok := c.watchers[wpt]; ok {
|
|
for _, ch := range watchers {
|
|
ch <- ev
|
|
close(ch)
|
|
}
|
|
delete(c.watchers, wpt)
|
|
}
|
|
}
|
|
c.watchersLock.Unlock()
|
|
} else if res.Xid == -2 {
|
|
// Ping response. Ignore.
|
|
} else if res.Xid < 0 {
|
|
c.logger.Printf("Xid < 0 (%d) but not ping or watcher event", res.Xid)
|
|
} else {
|
|
if res.Zxid > 0 {
|
|
c.lastZxid = res.Zxid
|
|
}
|
|
|
|
c.requestsLock.Lock()
|
|
req, ok := c.requests[res.Xid]
|
|
if ok {
|
|
delete(c.requests, res.Xid)
|
|
}
|
|
c.requestsLock.Unlock()
|
|
|
|
if !ok {
|
|
c.logger.Printf("Response for unknown request with xid %d", res.Xid)
|
|
} else {
|
|
if res.Err != 0 {
|
|
err = res.Err.toError()
|
|
} else {
|
|
_, err = decodePacket(buf[16:blen], req.recvStruct)
|
|
}
|
|
if req.recvFunc != nil {
|
|
req.recvFunc(req, &res, err)
|
|
}
|
|
req.recvChan <- response{res.Zxid, err}
|
|
if req.opcode == opClose {
|
|
return io.EOF
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Conn) nextXid() int32 {
|
|
return int32(atomic.AddUint32(&c.xid, 1) & 0x7fffffff)
|
|
}
|
|
|
|
func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
|
|
c.watchersLock.Lock()
|
|
defer c.watchersLock.Unlock()
|
|
|
|
ch := make(chan Event, 1)
|
|
wpt := watchPathType{path, watchType}
|
|
c.watchers[wpt] = append(c.watchers[wpt], ch)
|
|
return ch
|
|
}
|
|
|
|
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
|
|
rq := &request{
|
|
xid: c.nextXid(),
|
|
opcode: opcode,
|
|
pkt: req,
|
|
recvStruct: res,
|
|
recvChan: make(chan response, 1),
|
|
recvFunc: recvFunc,
|
|
}
|
|
c.sendChan <- rq
|
|
return rq.recvChan
|
|
}
|
|
|
|
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
|
|
r := <-c.queueRequest(opcode, req, res, recvFunc)
|
|
return r.zxid, r.err
|
|
}
|
|
|
|
func (c *Conn) AddAuth(scheme string, auth []byte) error {
|
|
_, err := c.request(opSetAuth, &setAuthRequest{Type: 0, Scheme: scheme, Auth: auth}, &setAuthResponse{}, nil)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Remember authdata so that it can be re-submitted on reconnect
|
|
//
|
|
// FIXME(prozlach): For now we treat "userfoo:passbar" and "userfoo:passbar2"
|
|
// as two different entries, which will be re-submitted on reconnet. Some
|
|
// research is needed on how ZK treats these cases and
|
|
// then maybe switch to something like "map[username] = password" to allow
|
|
// only single password for given user with users being unique.
|
|
obj := authCreds{
|
|
scheme: scheme,
|
|
auth: auth,
|
|
}
|
|
|
|
c.credsMu.Lock()
|
|
c.creds = append(c.creds, obj)
|
|
c.credsMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Conn) Children(path string) ([]string, *Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
res := &getChildren2Response{}
|
|
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: false}, res, nil)
|
|
return res.Children, &res.Stat, err
|
|
}
|
|
|
|
func (c *Conn) ChildrenW(path string) ([]string, *Stat, <-chan Event, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
var ech <-chan Event
|
|
res := &getChildren2Response{}
|
|
_, err := c.request(opGetChildren2, &getChildren2Request{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
|
|
if err == nil {
|
|
ech = c.addWatcher(path, watchTypeChild)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
return res.Children, &res.Stat, ech, err
|
|
}
|
|
|
|
func (c *Conn) Get(path string) ([]byte, *Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
res := &getDataResponse{}
|
|
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: false}, res, nil)
|
|
return res.Data, &res.Stat, err
|
|
}
|
|
|
|
// GetW returns the contents of a znode and sets a watch
|
|
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
|
|
var ech <-chan Event
|
|
res := &getDataResponse{}
|
|
_, err := c.request(opGetData, &getDataRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
|
|
if err == nil {
|
|
ech = c.addWatcher(path, watchTypeData)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return nil, nil, nil, err
|
|
}
|
|
return res.Data, &res.Stat, ech, err
|
|
}
|
|
|
|
func (c *Conn) Set(path string, data []byte, version int32) (*Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := &setDataResponse{}
|
|
_, err := c.request(opSetData, &SetDataRequest{path, data, version}, res, nil)
|
|
return &res.Stat, err
|
|
}
|
|
|
|
func (c *Conn) Create(path string, data []byte, flags int32, acl []ACL) (string, error) {
|
|
if err := validatePath(path, flags&FlagSequence == FlagSequence); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
res := &createResponse{}
|
|
_, err := c.request(opCreate, &CreateRequest{path, data, acl, flags}, res, nil)
|
|
return res.Path, err
|
|
}
|
|
|
|
// CreateProtectedEphemeralSequential fixes a race condition if the server crashes
|
|
// after it creates the node. On reconnect the session may still be valid so the
|
|
// ephemeral node still exists. Therefore, on reconnect we need to check if a node
|
|
// with a GUID generated on create exists.
|
|
func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl []ACL) (string, error) {
|
|
if err := validatePath(path, true); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
var guid [16]byte
|
|
_, err := io.ReadFull(rand.Reader, guid[:16])
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
guidStr := fmt.Sprintf("%x", guid)
|
|
|
|
parts := strings.Split(path, "/")
|
|
parts[len(parts)-1] = fmt.Sprintf("%s%s-%s", protectedPrefix, guidStr, parts[len(parts)-1])
|
|
rootPath := strings.Join(parts[:len(parts)-1], "/")
|
|
protectedPath := strings.Join(parts, "/")
|
|
|
|
var newPath string
|
|
for i := 0; i < 3; i++ {
|
|
newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
|
|
switch err {
|
|
case ErrSessionExpired:
|
|
// No need to search for the node since it can't exist. Just try again.
|
|
case ErrConnectionClosed:
|
|
children, _, err := c.Children(rootPath)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for _, p := range children {
|
|
parts := strings.Split(p, "/")
|
|
if pth := parts[len(parts)-1]; strings.HasPrefix(pth, protectedPrefix) {
|
|
if g := pth[len(protectedPrefix) : len(protectedPrefix)+32]; g == guidStr {
|
|
return rootPath + "/" + p, nil
|
|
}
|
|
}
|
|
}
|
|
case nil:
|
|
return newPath, nil
|
|
default:
|
|
return "", err
|
|
}
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
func (c *Conn) Delete(path string, version int32) error {
|
|
if err := validatePath(path, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err := c.request(opDelete, &DeleteRequest{path, version}, &deleteResponse{}, nil)
|
|
return err
|
|
}
|
|
|
|
func (c *Conn) Exists(path string) (bool, *Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return false, nil, err
|
|
}
|
|
|
|
res := &existsResponse{}
|
|
_, err := c.request(opExists, &existsRequest{Path: path, Watch: false}, res, nil)
|
|
exists := true
|
|
if err == ErrNoNode {
|
|
exists = false
|
|
err = nil
|
|
}
|
|
return exists, &res.Stat, err
|
|
}
|
|
|
|
func (c *Conn) ExistsW(path string) (bool, *Stat, <-chan Event, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return false, nil, nil, err
|
|
}
|
|
|
|
var ech <-chan Event
|
|
res := &existsResponse{}
|
|
_, err := c.request(opExists, &existsRequest{Path: path, Watch: true}, res, func(req *request, res *responseHeader, err error) {
|
|
if err == nil {
|
|
ech = c.addWatcher(path, watchTypeData)
|
|
} else if err == ErrNoNode {
|
|
ech = c.addWatcher(path, watchTypeExist)
|
|
}
|
|
})
|
|
exists := true
|
|
if err == ErrNoNode {
|
|
exists = false
|
|
err = nil
|
|
}
|
|
if err != nil {
|
|
return false, nil, nil, err
|
|
}
|
|
return exists, &res.Stat, ech, err
|
|
}
|
|
|
|
func (c *Conn) GetACL(path string) ([]ACL, *Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
res := &getAclResponse{}
|
|
_, err := c.request(opGetAcl, &getAclRequest{Path: path}, res, nil)
|
|
return res.Acl, &res.Stat, err
|
|
}
|
|
func (c *Conn) SetACL(path string, acl []ACL, version int32) (*Stat, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res := &setAclResponse{}
|
|
_, err := c.request(opSetAcl, &setAclRequest{Path: path, Acl: acl, Version: version}, res, nil)
|
|
return &res.Stat, err
|
|
}
|
|
|
|
func (c *Conn) Sync(path string) (string, error) {
|
|
if err := validatePath(path, false); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
res := &syncResponse{}
|
|
_, err := c.request(opSync, &syncRequest{Path: path}, res, nil)
|
|
return res.Path, err
|
|
}
|
|
|
|
type MultiResponse struct {
|
|
Stat *Stat
|
|
String string
|
|
Error error
|
|
}
|
|
|
|
// Multi executes multiple ZooKeeper operations or none of them. The provided
|
|
// ops must be one of *CreateRequest, *DeleteRequest, *SetDataRequest, or
|
|
// *CheckVersionRequest.
|
|
func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
|
|
req := &multiRequest{
|
|
Ops: make([]multiRequestOp, 0, len(ops)),
|
|
DoneHeader: multiHeader{Type: -1, Done: true, Err: -1},
|
|
}
|
|
for _, op := range ops {
|
|
var opCode int32
|
|
switch op.(type) {
|
|
case *CreateRequest:
|
|
opCode = opCreate
|
|
case *SetDataRequest:
|
|
opCode = opSetData
|
|
case *DeleteRequest:
|
|
opCode = opDelete
|
|
case *CheckVersionRequest:
|
|
opCode = opCheck
|
|
default:
|
|
return nil, fmt.Errorf("unknown operation type %T", op)
|
|
}
|
|
req.Ops = append(req.Ops, multiRequestOp{multiHeader{opCode, false, -1}, op})
|
|
}
|
|
res := &multiResponse{}
|
|
_, err := c.request(opMulti, req, res, nil)
|
|
mr := make([]MultiResponse, len(res.Ops))
|
|
for i, op := range res.Ops {
|
|
mr[i] = MultiResponse{Stat: op.Stat, String: op.String, Error: op.Err.toError()}
|
|
}
|
|
return mr, err
|
|
}
|
|
|
|
// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers
|
|
// by lists of members.
|
|
// Return the new configuration stats.
|
|
func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) {
|
|
// TODO: validate the shape of the member string to give early feedback.
|
|
request := &reconfigRequest{
|
|
JoiningServers: []byte(strings.Join(joining, ",")),
|
|
LeavingServers: []byte(strings.Join(leaving, ",")),
|
|
CurConfigId: version,
|
|
}
|
|
|
|
return c.internalReconfig(request)
|
|
}
|
|
|
|
// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided
|
|
// is the entire new member list.
|
|
// the optional version allows for conditional reconfigurations, -1 ignores the condition.
|
|
func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) {
|
|
request := &reconfigRequest{
|
|
NewMembers: []byte(strings.Join(members, ",")),
|
|
CurConfigId: version,
|
|
}
|
|
|
|
return c.internalReconfig(request)
|
|
}
|
|
|
|
func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) {
|
|
response := &reconfigReponse{}
|
|
_, err := c.request(opReconfig, request, response, nil)
|
|
return &response.Stat, err
|
|
}
|
|
|
|
// Server returns the current or last-connected server name.
|
|
func (c *Conn) Server() string {
|
|
c.serverMu.Lock()
|
|
defer c.serverMu.Unlock()
|
|
return c.server
|
|
}
|