Rework logging to use explicitly passed logger

Mostly cleaned up the global logger use. Still some uses in discovery
package.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>
This commit is contained in:
Goutham Veeramachaneni 2017-06-16 15:52:44 +05:30
parent c399563ec1
commit 507790a357
No known key found for this signature in database
GPG Key ID: F1C217E8E9023CAD
13 changed files with 118 additions and 72 deletions

View File

@ -56,9 +56,8 @@ var cfg = struct {
alertmanagerURLs stringset
prometheusURL string
// Deprecated storage flags, kept for backwards compatibility.
deprecatedMemoryChunks uint64
deprecatedMaxChunksToPersist uint64
logFormat string
logLevel string
}{
alertmanagerURLs: stringset{},
notifier: notifier.Options{

View File

@ -171,10 +171,17 @@ func newRootCmd() *cobra.Command {
"Maximum number of queries executed concurrently.",
)
cfg.fs = rootCmd.PersistentFlags()
// Logging.
rootCmd.PersistentFlags().StringVar(
&cfg.logLevel, "log.level", "info",
"Only log messages with the given severity or above. Valid levels: [debug, info, warn, error, fatal]",
)
rootCmd.PersistentFlags().StringVar(
&cfg.logFormat, "log.format", "logger:stderr",
`Set the log target and format. Example: "logger:syslog?appname=bob&local=7" or "logger:stdout?json=true"`,
)
// TODO(gouthamve): Flags from the log package have to be added explicitly to our custom flag set.
//log.AddFlags(rootCmd.PersistentFlags())
cfg.fs = rootCmd.PersistentFlags()
return rootCmd
}
@ -185,14 +192,18 @@ func Main() int {
return 2
}
logger := log.NewLogger(os.Stdout)
logger.SetLevel(cfg.logLevel)
logger.SetFormat(cfg.logFormat)
if cfg.printVersion {
fmt.Fprintln(os.Stdout, version.Print("prometheus"))
return 0
}
log.Infoln("Starting prometheus", version.Info())
log.Infoln("Build context", version.BuildContext())
log.Infoln("Host details", Uname())
logger.Infoln("Starting prometheus", version.Info())
logger.Infoln("Build context", version.BuildContext())
logger.Infoln("Host details", Uname())
var (
// sampleAppender = storage.Fanout{}
@ -204,21 +215,22 @@ func Main() int {
hup := make(chan os.Signal)
hupReady := make(chan bool)
signal.Notify(hup, syscall.SIGHUP)
log.Infoln("Starting tsdb")
logger.Infoln("Starting tsdb")
localStorage, err := tsdb.Open(cfg.localStoragePath, prometheus.DefaultRegisterer, &cfg.tsdb)
if err != nil {
log.Errorf("Opening storage failed: %s", err)
return 1
}
log.Infoln("tsdb started")
logger.Infoln("tsdb started")
// remoteStorage := &remote.Storage{}
// sampleAppender = append(sampleAppender, remoteStorage)
// reloadables = append(reloadables, remoteStorage)
cfg.queryEngine.Logger = logger
var (
notifier = notifier.New(&cfg.notifier, log.Base())
targetManager = retrieval.NewTargetManager(localStorage, log.Base())
notifier = notifier.New(&cfg.notifier, logger)
targetManager = retrieval.NewTargetManager(localStorage, logger)
queryEngine = promql.NewEngine(localStorage, &cfg.queryEngine)
ctx, cancelCtx = context.WithCancel(context.Background())
)
@ -229,6 +241,7 @@ func Main() int {
QueryEngine: queryEngine,
Context: ctx,
ExternalURL: cfg.web.ExternalURL,
Logger: logger,
})
cfg.web.Context = ctx
@ -256,8 +269,8 @@ func Main() int {
reloadables = append(reloadables, targetManager, ruleManager, webHandler, notifier)
if err := reloadConfig(cfg.configFile, reloadables...); err != nil {
log.Errorf("Error loading config: %s", err)
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error loading config: %s", err)
return 1
}
@ -269,12 +282,12 @@ func Main() int {
for {
select {
case <-hup:
if err := reloadConfig(cfg.configFile, reloadables...); err != nil {
log.Errorf("Error reloading config: %s", err)
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error reloading config: %s", err)
}
case rc := <-webHandler.Reload():
if err := reloadConfig(cfg.configFile, reloadables...); err != nil {
log.Errorf("Error reloading config: %s", err)
if err := reloadConfig(cfg.configFile, logger, reloadables...); err != nil {
logger.Errorf("Error reloading config: %s", err)
rc <- err
} else {
rc <- nil
@ -286,7 +299,7 @@ func Main() int {
// Start all components. The order is NOT arbitrary.
defer func() {
if err := localStorage.Close(); err != nil {
log.Errorln("Error stopping storage:", err)
logger.Errorln("Error stopping storage:", err)
}
}()
@ -319,14 +332,14 @@ func Main() int {
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
select {
case <-term:
log.Warn("Received SIGTERM, exiting gracefully...")
logger.Warn("Received SIGTERM, exiting gracefully...")
case <-webHandler.Quit():
log.Warn("Received termination request via web service, exiting gracefully...")
logger.Warn("Received termination request via web service, exiting gracefully...")
case err := <-webHandler.ListenError():
log.Errorln("Error starting web server, exiting gracefully:", err)
logger.Errorln("Error starting web server, exiting gracefully:", err)
}
log.Info("See you next time!")
logger.Info("See you next time!")
return 0
}
@ -336,8 +349,8 @@ type Reloadable interface {
ApplyConfig(*config.Config) error
}
func reloadConfig(filename string, rls ...Reloadable) (err error) {
log.Infof("Loading configuration file %s", filename)
func reloadConfig(filename string, logger log.Logger, rls ...Reloadable) (err error) {
logger.Infof("Loading configuration file %s", filename)
defer func() {
if err == nil {
configSuccess.Set(1)
@ -355,7 +368,7 @@ func reloadConfig(filename string, rls ...Reloadable) (err error) {
failed := false
for _, rl := range rls {
if err := rl.ApplyConfig(conf); err != nil {
log.Error("Failed to apply configuration: ", err)
logger.Error("Failed to apply configuration: ", err)
failed = true
}
}

View File

@ -77,7 +77,7 @@ func NewDiscovery(
logger: logger,
}
for _, path := range paths {
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates))
sd.treeCaches = append(sd.treeCaches, treecache.NewZookeeperTreeCache(conn, path, updates, logger))
}
return sd
}

View File

@ -205,6 +205,8 @@ type Engine struct {
// The gate limiting the maximum number of concurrent and waiting queries.
gate *queryGate
options *EngineOptions
logger log.Logger
}
// Queryable allows opening a storage querier.
@ -222,6 +224,7 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
queryable: queryable,
gate: newQueryGate(o.MaxConcurrentQueries),
options: o,
logger: o.Logger,
}
}
@ -229,12 +232,14 @@ func NewEngine(queryable Queryable, o *EngineOptions) *Engine {
type EngineOptions struct {
MaxConcurrentQueries int
Timeout time.Duration
Logger log.Logger
}
// DefaultEngineOptions are the default engine options.
var DefaultEngineOptions = &EngineOptions{
MaxConcurrentQueries: 20,
Timeout: 2 * time.Minute,
Logger: log.Base(),
}
// NewInstantQuery returns an evaluation query for the given expression at the given time.
@ -374,6 +379,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
evaluator := &evaluator{
Timestamp: start,
ctx: ctx,
logger: ng.logger,
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
@ -409,6 +415,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
evaluator := &evaluator{
Timestamp: t,
ctx: ctx,
logger: ng.logger,
}
val, err := evaluator.Eval(s.Expr)
if err != nil {
@ -510,7 +517,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
// TODO(fabxc): use multi-error.
log.Errorln("expand series set:", err)
ng.logger.Errorln("expand series set:", err)
return false
}
for _, s := range n.series {
@ -521,7 +528,7 @@ func (ng *Engine) populateIterators(ctx context.Context, s *EvalStmt) (storage.Q
case *MatrixSelector:
n.series, err = expandSeriesSet(querier.Select(n.LabelMatchers...))
if err != nil {
log.Errorln("expand series set:", err)
ng.logger.Errorln("expand series set:", err)
return false
}
for _, s := range n.series {
@ -550,6 +557,8 @@ type evaluator struct {
Timestamp int64 // time in milliseconds
finalizers []func()
logger log.Logger
}
func (ev *evaluator) close() {
@ -577,7 +586,7 @@ func (ev *evaluator) recover(errp *error) {
buf := make([]byte, 64<<10)
buf = buf[:runtime.Stack(buf, false)]
log.Errorf("parser panic: %v\n%s", e, buf)
ev.logger.Errorf("parser panic: %v\n%s", e, buf)
*errp = fmt.Errorf("unexpected error")
} else {
*errp = e.(error)

View File

@ -21,6 +21,7 @@ import (
"golang.org/x/net/context"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -295,7 +296,9 @@ load 10s
}
func TestRecoverEvaluatorRuntime(t *testing.T) {
var ev *evaluator
ev := &evaluator{
logger: log.Base(),
}
var err error
defer ev.recover(&err)
@ -309,7 +312,7 @@ func TestRecoverEvaluatorRuntime(t *testing.T) {
}
func TestRecoverEvaluatorError(t *testing.T) {
var ev *evaluator
ev := &evaluator{logger: log.Base()}
var err error
e := fmt.Errorf("custom error")

View File

@ -112,13 +112,15 @@ type scrapePool struct {
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, func() storage.Appender, func() storage.Appender, log.Logger) loop
logger log.Logger
}
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable) *scrapePool {
func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable, logger log.Logger) *scrapePool {
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
newLoop := func(
@ -138,6 +140,7 @@ func newScrapePool(ctx context.Context, cfg *config.ScrapeConfig, app Appendable
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: logger,
}
}
@ -175,7 +178,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
client, err := httputil.NewClientFromConfig(cfg.HTTPClientConfig)
if err != nil {
// Any errors that could occur here should be caught during config validation.
log.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
sp.logger.Errorf("Error creating HTTP client for job %q: %s", cfg.JobName, err)
}
sp.config = cfg
sp.client = client
@ -197,7 +200,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
func() storage.Appender {
return sp.reportAppender(t)
},
log.With("target", t.labels.String()),
sp.logger.With("target", t.labels.String()),
)
)
wg.Add(1)
@ -227,7 +230,7 @@ func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
log.With("err", err).Error("creating targets failed")
sp.logger.With("err", err).Error("creating targets failed")
continue
}
all = append(all, targets...)
@ -267,7 +270,7 @@ func (sp *scrapePool) sync(targets []*Target) {
func() storage.Appender {
return sp.reportAppender(t)
},
log.With("target", t.labels.String()),
sp.logger.With("target", t.labels.String()),
)
sp.targets[hash] = t

View File

@ -44,7 +44,7 @@ func TestNewScrapePool(t *testing.T) {
var (
app = &nopAppendable{}
cfg = &config.ScrapeConfig{}
sp = newScrapePool(context.Background(), cfg, app)
sp = newScrapePool(context.Background(), cfg, app, log.Base())
)
if a, ok := sp.appendable.(*nopAppendable); !ok || a != app {
@ -167,6 +167,7 @@ func TestScrapePoolReload(t *testing.T) {
targets: map[uint64]*Target{},
loops: map[uint64]loop{},
newLoop: newLoop,
logger: log.Base(),
}
// Reloading a scrape pool with a new scrape configuration must stop all scrape
@ -236,7 +237,7 @@ func TestScrapePoolReportAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app)
sp := newScrapePool(context.Background(), cfg, app, log.Base())
cfg.HonorLabels = false
wrapped := sp.reportAppender(target)
@ -271,7 +272,7 @@ func TestScrapePoolSampleAppender(t *testing.T) {
target := newTestTarget("example.com:80", 10*time.Millisecond, nil)
app := &nopAppendable{}
sp := newScrapePool(context.Background(), cfg, app)
sp := newScrapePool(context.Background(), cfg, app, log.Base())
cfg.HonorLabels = false
wrapped := sp.sampleAppender(target)

View File

@ -106,7 +106,7 @@ func (tm *TargetManager) reload() {
ts = &targetSet{
ctx: ctx,
cancel: cancel,
sp: newScrapePool(ctx, scfg, tm.append),
sp: newScrapePool(ctx, scfg, tm.append, tm.logger),
}
ts.ts = discovery.NewTargetSet(ts.sp)

View File

@ -102,10 +102,12 @@ type AlertingRule struct {
// A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to.
active map[uint64]*Alert
logger log.Logger
}
// NewAlertingRule constructs a new AlertingRule.
func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels) *AlertingRule {
func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, anns labels.Labels, logger log.Logger) *AlertingRule {
return &AlertingRule{
name: name,
vector: vec,
@ -113,6 +115,7 @@ func NewAlertingRule(name string, vec promql.Expr, hold time.Duration, lbls, ann
labels: lbls,
annotations: anns,
active: map[uint64]*Alert{},
logger: logger.With("alert", name),
}
}
@ -197,7 +200,7 @@ func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, engine *promql.En
result, err := tmpl.Expand()
if err != nil {
result = fmt.Sprintf("<error expanding template: %s>", err)
log.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err)
r.logger.Warnf("Error expanding alert template %v with data '%v': %s", r.Name(), tmplData, err)
}
return result
}

View File

@ -16,6 +16,7 @@ package rules
import (
"testing"
"github.com/prometheus/common/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)
@ -25,7 +26,7 @@ func TestAlertingRuleHTMLSnippet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"))
rule := NewAlertingRule("testrule", expr, 0, labels.FromStrings("html", "<b>BOLD</b>"), labels.FromStrings("html", "<b>BOLD</b>"), log.Base())
const want = `ALERT <a href="/test/prefix/graph?g0.expr=ALERTS%7Balertname%3D%22testrule%22%7D&g0.tab=0">testrule</a>
IF <a href="/test/prefix/graph?g0.expr=foo%7Bhtml%3D%22%3Cb%3EBOLD%3Cb%3E%22%7D&g0.tab=0">foo{html=&#34;&lt;b&gt;BOLD&lt;b&gt;&#34;}</a>

View File

@ -134,6 +134,8 @@ type Group struct {
done chan struct{}
terminated chan struct{}
logger log.Logger
}
// NewGroup makes a new Group with the given name, options, and rules.
@ -146,6 +148,7 @@ func NewGroup(name string, interval time.Duration, rules []Rule, opts *ManagerOp
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
logger: opts.Logger.With("group", name),
}
}
@ -293,7 +296,7 @@ func (g *Group) Eval(ts time.Time) {
// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
log.Warnf("Error while evaluating rule %q: %s", rule, err)
g.logger.Warnf("Error while evaluating rule %q: %s", rule, err)
}
evalFailures.WithLabelValues(rtyp).Inc()
return
@ -309,7 +312,7 @@ func (g *Group) Eval(ts time.Time) {
app, err := g.opts.Appendable.Appender()
if err != nil {
log.With("err", err).Warn("creating appender failed")
g.logger.With("err", err).Warn("creating appender failed")
return
}
@ -319,22 +322,22 @@ func (g *Group) Eval(ts time.Time) {
switch err {
case storage.ErrOutOfOrderSample:
numOutOfOrder++
log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
g.logger.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
log.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
g.logger.With("sample", s).With("err", err).Debug("Rule evaluation result discarded")
default:
log.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
g.logger.With("sample", s).With("err", err).Warn("Rule evaluation result discarded")
}
} else {
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
log.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order result from rule evaluation")
g.logger.With("numDropped", numOutOfOrder).Warn("Error on ingesting out-of-order result from rule evaluation")
}
if numDuplicates > 0 {
log.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
g.logger.With("numDropped", numDuplicates).Warn("Error on ingesting results from rule evaluation with different value but same timestamp")
}
for metric, lset := range g.seriesInPreviousEval[i] {
@ -347,12 +350,12 @@ func (g *Group) Eval(ts time.Time) {
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
log.With("sample", metric).With("err", err).Warn("adding stale sample failed")
g.logger.With("sample", metric).With("err", err).Warn("adding stale sample failed")
}
}
}
if err := app.Commit(); err != nil {
log.With("err", err).Warn("rule sample appending failed")
g.logger.With("err", err).Warn("rule sample appending failed")
} else {
g.seriesInPreviousEval[i] = seriesReturned
}
@ -397,6 +400,8 @@ type Manager struct {
groups map[string]*Group
mtx sync.RWMutex
block chan struct{}
logger log.Logger
}
type Appendable interface {
@ -410,17 +415,18 @@ type ManagerOptions struct {
Context context.Context
Notifier *notifier.Notifier
Appendable Appendable
Logger log.Logger
}
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
manager := &Manager{
return &Manager{
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
logger: o.Logger,
}
return manager
}
// Run starts processing of the rule manager.
@ -433,13 +439,13 @@ func (m *Manager) Stop() {
m.mtx.Lock()
defer m.mtx.Unlock()
log.Info("Stopping rule manager...")
m.logger.Info("Stopping rule manager...")
for _, eg := range m.groups {
eg.stop()
}
log.Info("Rule manager stopped.")
m.logger.Info("Rule manager stopped.")
}
// ApplyConfig updates the rule manager's state as the config requires. If
@ -522,7 +528,7 @@ func (m *Manager) loadGroups(interval time.Duration, filenames ...string) (map[s
switch r := stmt.(type) {
case *promql.AlertStmt:
rule = NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations)
rule = NewAlertingRule(r.Name, r.Expr, r.Duration, r.Labels, r.Annotations, m.logger)
case *promql.RecordStmt:
rule = NewRecordingRule(r.Name, r.Expr, r.Labels)

View File

@ -22,6 +22,7 @@ import (
"testing"
"time"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -58,6 +59,7 @@ func TestAlertingRule(t *testing.T) {
time.Minute,
labels.FromStrings("severity", "{{\"c\"}}ritical"),
nil,
log.Base(),
)
baseTime := time.Unix(0, 0)
@ -165,6 +167,7 @@ func TestStaleness(t *testing.T) {
QueryEngine: engine,
Appendable: storage,
Context: context.Background(),
Logger: log.Base(),
}
expr, err := promql.ParseExpr("a + 1")
@ -241,7 +244,7 @@ func readSeriesSet(ss storage.SeriesSet) (map[string][]promql.Point, error) {
func TestCopyState(t *testing.T) {
oldGroup := &Group{
rules: []Rule{
NewAlertingRule("alert", nil, 0, nil, nil),
NewAlertingRule("alert", nil, 0, nil, nil, log.Base()),
NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule2", nil, nil),
NewRecordingRule("rule3", nil, nil),
@ -261,7 +264,7 @@ func TestCopyState(t *testing.T) {
NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil),
NewRecordingRule("rule3", nil, nil),
NewAlertingRule("alert", nil, 0, nil, nil),
NewAlertingRule("alert", nil, 0, nil, nil, log.Base()),
NewRecordingRule("rule1", nil, nil),
NewRecordingRule("rule4", nil, nil),
},

View File

@ -45,11 +45,12 @@ func init() {
}
type ZookeeperLogger struct {
logger log.Logger
}
// Implements zk.Logger
func (zl ZookeeperLogger) Printf(s string, i ...interface{}) {
log.Infof(s, i...)
zl.logger.Infof(s, i...)
}
type ZookeeperTreeCache struct {
@ -59,6 +60,8 @@ type ZookeeperTreeCache struct {
zkEvents chan zk.Event
stop chan struct{}
head *zookeeperTreeCacheNode
logger log.Logger
}
type ZookeeperTreeCacheEvent struct {
@ -74,12 +77,14 @@ type zookeeperTreeCacheNode struct {
children map[string]*zookeeperTreeCacheNode
}
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent) *ZookeeperTreeCache {
func NewZookeeperTreeCache(conn *zk.Conn, path string, events chan ZookeeperTreeCacheEvent, logger log.Logger) *ZookeeperTreeCache {
tc := &ZookeeperTreeCache{
conn: conn,
prefix: path,
events: events,
stop: make(chan struct{}),
logger: logger,
}
tc.head = &zookeeperTreeCacheNode{
events: make(chan zk.Event),
@ -108,20 +113,20 @@ func (tc *ZookeeperTreeCache) loop(path string) {
err := tc.recursiveNodeUpdate(path, tc.head)
if err != nil {
log.Errorf("Error during initial read of Zookeeper: %s", err)
tc.logger.Errorf("Error during initial read of Zookeeper: %s", err)
failure()
}
for {
select {
case ev := <-tc.head.events:
log.Debugf("Received Zookeeper event: %s", ev)
tc.logger.Debugf("Received Zookeeper event: %s", ev)
if failureMode {
continue
}
if ev.Type == zk.EventNotWatching {
log.Infof("Lost connection to Zookeeper.")
tc.logger.Infof("Lost connection to Zookeeper.")
failure()
} else {
path := strings.TrimPrefix(ev.Path, tc.prefix)
@ -142,15 +147,15 @@ func (tc *ZookeeperTreeCache) loop(path string) {
err := tc.recursiveNodeUpdate(ev.Path, node)
if err != nil {
log.Errorf("Error during processing of Zookeeper event: %s", err)
tc.logger.Errorf("Error during processing of Zookeeper event: %s", err)
failure()
} else if tc.head.data == nil {
log.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix)
tc.logger.Errorf("Error during processing of Zookeeper event: path %s no longer exists", tc.prefix)
failure()
}
}
case <-retryChan:
log.Infof("Attempting to resync state with Zookeeper")
tc.logger.Infof("Attempting to resync state with Zookeeper")
previousState := &zookeeperTreeCacheNode{
children: tc.head.children,
}
@ -158,13 +163,13 @@ func (tc *ZookeeperTreeCache) loop(path string) {
tc.head.children = make(map[string]*zookeeperTreeCacheNode)
if err := tc.recursiveNodeUpdate(tc.prefix, tc.head); err != nil {
log.Errorf("Error during Zookeeper resync: %s", err)
tc.logger.Errorf("Error during Zookeeper resync: %s", err)
// Revert to our previous state.
tc.head.children = previousState.children
failure()
} else {
tc.resyncState(tc.prefix, tc.head, previousState)
log.Infof("Zookeeper resync successful")
tc.logger.Infof("Zookeeper resync successful")
failureMode = false
}
case <-tc.stop: