remove deleted zookeeper nodes
This commit is contained in:
parent
07a3cd4851
commit
3730255392
|
@ -19,10 +19,31 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/samuel/go-zookeeper/zk"
|
||||
)
|
||||
|
||||
var (
|
||||
failureCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "zookeeper_failures_total",
|
||||
Help: "The total number of ZooKeeper failures.",
|
||||
})
|
||||
numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: "prometheus",
|
||||
Subsystem: "treecache",
|
||||
Name: "watcher_goroutines",
|
||||
Help: "The current number of watcher goroutines.",
|
||||
})
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(failureCounter)
|
||||
prometheus.MustRegister(numWatchers)
|
||||
}
|
||||
|
||||
type ZookeeperLogger struct {
|
||||
}
|
||||
|
||||
|
@ -81,6 +102,7 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) {
|
|||
retryChan := make(chan struct{})
|
||||
|
||||
failure := func() {
|
||||
failureCounter.Inc()
|
||||
failureMode = true
|
||||
time.AfterFunc(time.Second*10, func() {
|
||||
retryChan <- struct{}{}
|
||||
|
@ -129,13 +151,19 @@ func (tc *ZookeeperTreeCache) loop(failureMode bool) {
|
|||
}
|
||||
case <-retryChan:
|
||||
log.Infof("Attempting to resync state with Zookeeper")
|
||||
previousState := &zookeeperTreeCacheNode{
|
||||
children: tc.head.children,
|
||||
}
|
||||
// Reset root child nodes before traversing the Zookeeper path.
|
||||
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
|
||||
err := tc.recursiveNodeUpdate(tc.prefix, tc.head)
|
||||
if err != nil {
|
||||
|
||||
if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil {
|
||||
log.Errorf("Error during Zookeeper resync: %s", err)
|
||||
// Revert to our previous state.
|
||||
tc.head.children = previousState.children
|
||||
failure()
|
||||
} else {
|
||||
tc.resyncState(tc.prefix, tc.head, previousState)
|
||||
log.Infof("Zookeeper resync successful")
|
||||
failureMode = false
|
||||
}
|
||||
|
@ -199,6 +227,7 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
|||
}
|
||||
|
||||
go func() {
|
||||
numWatchers.Inc()
|
||||
// Pass up zookeeper events, until the node is deleted.
|
||||
select {
|
||||
case event := <-dataWatcher:
|
||||
|
@ -207,10 +236,21 @@ func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTr
|
|||
node.events <- event
|
||||
case <-node.done:
|
||||
}
|
||||
numWatchers.Dec()
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc *ZookeeperTreeCache) resyncState(path string, currentState, previousState *zookeeperTreeCacheNode) {
|
||||
for child, previousNode := range previousState.children {
|
||||
if currentNode, present := currentState.children[child]; present {
|
||||
tc.resyncState(path+"/"+child, currentNode, previousNode)
|
||||
} else {
|
||||
tc.recursiveDelete(path+"/"+child, previousNode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
|
||||
if !node.stopped {
|
||||
node.done <- struct{}{}
|
||||
|
|
Loading…
Reference in New Issue