mirror of
https://github.com/prometheus/prometheus
synced 2024-12-26 00:23:18 +00:00
8fdfa8abea
i) Uses the more idiomatic Wrap and Wrapf methods for creating nested errors. ii) Fixes some incorrect usages of fmt.Errorf where the error messages don't have any formatting directives. iii) Does away with the use of fmt package for errors in favour of pkg/errors Signed-off-by: tariqibrahim <tariq181290@gmail.com>
294 lines
7.9 KiB
Go
294 lines
7.9 KiB
Go
// Copyright 2016 The Prometheus Authors
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package treecache
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/samuel/go-zookeeper/zk"
|
|
)
|
|
|
|
var (
|
|
failureCounter = prometheus.NewCounter(prometheus.CounterOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "treecache",
|
|
Name: "zookeeper_failures_total",
|
|
Help: "The total number of ZooKeeper failures.",
|
|
})
|
|
numWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
|
|
Namespace: "prometheus",
|
|
Subsystem: "treecache",
|
|
Name: "watcher_goroutines",
|
|
Help: "The current number of watcher goroutines.",
|
|
})
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(failureCounter)
|
|
prometheus.MustRegister(numWatchers)
|
|
}
|
|
|
|
// ZookeeperLogger wraps a log.Logger into a zk.Logger.
|
|
type ZookeeperLogger struct {
|
|
logger log.Logger
|
|
}
|
|
|
|
// NewZookeeperLogger is a constructor for ZookeeperLogger.
|
|
func NewZookeeperLogger(logger log.Logger) ZookeeperLogger {
|
|
return ZookeeperLogger{logger: logger}
|
|
}
|
|
|
|
// Printf implements zk.Logger.
|
|
func (zl ZookeeperLogger) Printf(s string, i ...interface{}) {
|
|
level.Info(zl.logger).Log("msg", fmt.Sprintf(s, i...))
|
|
}
|
|
|
|
// A ZookeeperTreeCache keeps data from all children of a Zookeeper path
|
|
// locally cached and updated according to received events.
|
|
type ZookeeperTreeCache struct {
|
|
conn *zk.Conn
|
|
prefix string
|
|
events chan ZookeeperTreeCacheEvent
|
|
stop chan struct{}
|
|
head *zookeeperTreeCacheNode
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
// A ZookeeperTreeCacheEvent models a Zookeeper event for a path.
|
|
type ZookeeperTreeCacheEvent struct {
|
|
Path string
|
|
Data *[]byte
|
|
}
|
|
|
|
type zookeeperTreeCacheNode struct {
|
|
data *[]byte
|
|
events chan zk.Event
|
|
done chan struct{}
|
|
stopped bool
|
|
children map[string]*zookeeperTreeCacheNode
|
|
}
|
|
|
|
// NewZookeeperTreeCache creates a new ZookeeperTreeCache for a given path.
|
|
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent, logger log.Logger) *ZookeeperTreeCache {
|
|
tc := &ZookeeperTreeCache{
|
|
conn: conn,
|
|
prefix: path,
|
|
events: events,
|
|
stop: make(chan struct{}),
|
|
|
|
logger: logger,
|
|
}
|
|
tc.head = &zookeeperTreeCacheNode{
|
|
events: make(chan zk.Event),
|
|
children: map[string]*zookeeperTreeCacheNode{},
|
|
stopped: true,
|
|
}
|
|
go tc.loop(path)
|
|
return tc
|
|
}
|
|
|
|
// Stop stops the tree cache.
|
|
func (tc *ZookeeperTreeCache) Stop() {
|
|
tc.stop <- struct{}{}
|
|
}
|
|
|
|
func (tc *ZookeeperTreeCache) loop(path string) {
|
|
failureMode := false
|
|
retryChan := make(chan struct{})
|
|
|
|
failure := func() {
|
|
failureCounter.Inc()
|
|
failureMode = true
|
|
time.AfterFunc(time.Second*10, func() {
|
|
retryChan <- struct{}{}
|
|
})
|
|
}
|
|
|
|
err := tc.recursiveNodeUpdate(path, tc.head)
|
|
if err != nil {
|
|
level.Error(tc.logger).Log("msg", "Error during initial read of Zookeeper", "err", err)
|
|
failure()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case ev := <-tc.head.events:
|
|
level.Debug(tc.logger).Log("msg", "Received Zookeeper event", "event", ev)
|
|
if failureMode {
|
|
continue
|
|
}
|
|
|
|
if ev.Type == zk.EventNotWatching {
|
|
level.Info(tc.logger).Log("msg", "Lost connection to Zookeeper.")
|
|
failure()
|
|
} else {
|
|
path := strings.TrimPrefix(ev.Path, tc.prefix)
|
|
parts := strings.Split(path, "/")
|
|
node := tc.head
|
|
for _, part := range parts[1:] {
|
|
childNode := node.children[part]
|
|
if childNode == nil {
|
|
childNode = &zookeeperTreeCacheNode{
|
|
events: tc.head.events,
|
|
children: map[string]*zookeeperTreeCacheNode{},
|
|
done: make(chan struct{}, 1),
|
|
}
|
|
node.children[part] = childNode
|
|
}
|
|
node = childNode
|
|
}
|
|
|
|
err := tc.recursiveNodeUpdate(ev.Path, node)
|
|
if err != nil {
|
|
level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", err)
|
|
failure()
|
|
} else if tc.head.data == nil {
|
|
level.Error(tc.logger).Log("msg", "Error during processing of Zookeeper event", "err", "path no longer exists", "path", tc.prefix)
|
|
failure()
|
|
}
|
|
}
|
|
case <-retryChan:
|
|
level.Info(tc.logger).Log("msg", "Attempting to resync state with Zookeeper")
|
|
previousState := &zookeeperTreeCacheNode{
|
|
children: tc.head.children,
|
|
}
|
|
// Reset root child nodes before traversing the Zookeeper path.
|
|
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
|
|
|
|
if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil {
|
|
level.Error(tc.logger).Log("msg", "Error during Zookeeper resync", "err", err)
|
|
// Revert to our previous state.
|
|
tc.head.children = previousState.children
|
|
failure()
|
|
} else {
|
|
tc.resyncState(tc.prefix, tc.head, previousState)
|
|
level.Info(tc.logger).Log("Zookeeper resync successful")
|
|
failureMode = false
|
|
}
|
|
case <-tc.stop:
|
|
tc.recursiveStop(tc.head)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tc *ZookeeperTreeCache) recursiveNodeUpdate(path string, node *zookeeperTreeCacheNode) error {
|
|
data, _, dataWatcher, err := tc.conn.GetW(path)
|
|
if err == zk.ErrNoNode {
|
|
tc.recursiveDelete(path, node)
|
|
if node == tc.head {
|
|
return errors.Errorf("path %s does not exist", path)
|
|
}
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
if node.data == nil || !bytes.Equal(*node.data, data) {
|
|
node.data = &data
|
|
tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: node.data}
|
|
}
|
|
|
|
children, _, childWatcher, err := tc.conn.ChildrenW(path)
|
|
if err == zk.ErrNoNode {
|
|
tc.recursiveDelete(path, node)
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
|
|
currentChildren := map[string]struct{}{}
|
|
for _, child := range children {
|
|
currentChildren[child] = struct{}{}
|
|
childNode := node.children[child]
|
|
// Does not already exists or we previous had a watch that
|
|
// triggered.
|
|
if childNode == nil || childNode.stopped {
|
|
node.children[child] = &zookeeperTreeCacheNode{
|
|
events: node.events,
|
|
children: map[string]*zookeeperTreeCacheNode{},
|
|
done: make(chan struct{}, 1),
|
|
}
|
|
err = tc.recursiveNodeUpdate(path+"/"+child, node.children[child])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Remove nodes that no longer exist
|
|
for name, childNode := range node.children {
|
|
if _, ok := currentChildren[name]; !ok || node.data == nil {
|
|
tc.recursiveDelete(path+"/"+name, childNode)
|
|
delete(node.children, name)
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
numWatchers.Inc()
|
|
// Pass up zookeeper events, until the node is deleted.
|
|
select {
|
|
case event := <-dataWatcher:
|
|
node.events <- event
|
|
case event := <-childWatcher:
|
|
node.events <- event
|
|
case <-node.done:
|
|
}
|
|
numWatchers.Dec()
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (tc *ZookeeperTreeCache) resyncState(path string, currentState, previousState *zookeeperTreeCacheNode) {
|
|
for child, previousNode := range previousState.children {
|
|
if currentNode, present := currentState.children[child]; present {
|
|
tc.resyncState(path+"/"+child, currentNode, previousNode)
|
|
} else {
|
|
tc.recursiveDelete(path+"/"+child, previousNode)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (tc *ZookeeperTreeCache) recursiveDelete(path string, node *zookeeperTreeCacheNode) {
|
|
if !node.stopped {
|
|
node.done <- struct{}{}
|
|
node.stopped = true
|
|
}
|
|
if node.data != nil {
|
|
tc.events <- ZookeeperTreeCacheEvent{Path: path, Data: nil}
|
|
node.data = nil
|
|
}
|
|
for name, childNode := range node.children {
|
|
tc.recursiveDelete(path+"/"+name, childNode)
|
|
}
|
|
}
|
|
|
|
func (tc *ZookeeperTreeCache) recursiveStop(node *zookeeperTreeCacheNode) {
|
|
if !node.stopped {
|
|
node.done <- struct{}{}
|
|
node.stopped = true
|
|
}
|
|
for _, childNode := range node.children {
|
|
tc.recursiveStop(childNode)
|
|
}
|
|
}
|