From 63b8e4fb88f09f90997e060d816317e0e0dd4779 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 7 Jun 2018 09:27:34 -0700 Subject: [PATCH 1/9] Bubble up errors to promql from populating iterators (#4136) This changes the Walk/Inspect API inside the promql package to bubble up errors. This is done by having the inspector return an error (instead of a bool) and then bubbling that up in the Walk. This way if any error is encountered in the Walk() the walk will stop and return the error. This avoids issues where errors from the Querier where being ignored (causing incorrect promql evaluation). Signed-off-by: Thomas Jackson Fixes #4136 --- promql/ast.go | 79 +++++++++++++++++++++++++++++++----------------- promql/engine.go | 16 +++++----- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/promql/ast.go b/promql/ast.go index af3ed4189..0c8a3ad2a 100644 --- a/promql/ast.go +++ b/promql/ast.go @@ -237,57 +237,80 @@ type VectorMatching struct { // Visitor allows visiting a Node and its child nodes. The Visit method is // invoked for each node with the path leading to the node provided additionally. -// If the result visitor w is not nil, Walk visits each of the children +// If the result visitor w is not nil and no error, Walk visits each of the children // of node with the visitor w, followed by a call of w.Visit(nil, nil). type Visitor interface { - Visit(node Node, path []Node) (w Visitor) + Visit(node Node, path []Node) (w Visitor, err error) } // Walk traverses an AST in depth-first order: It starts by calling // v.Visit(node, path); node must not be nil. If the visitor w returned by -// v.Visit(node, path) is not nil, Walk is invoked recursively with visitor -// w for each of the non-nil children of node, followed by a call of -// w.Visit(nil). +// v.Visit(node, path) is not nil and the visitor returns no error, Walk is +// invoked recursively with visitor w for each of the non-nil children of node, +// followed by a call of w.Visit(nil), returning an error // As the tree is descended the path of previous nodes is provided. -func Walk(v Visitor, node Node, path []Node) { - if v = v.Visit(node, path); v == nil { - return +func Walk(v Visitor, node Node, path []Node) error { + var err error + if v, err = v.Visit(node, path); v == nil || err != nil { + return err } path = append(path, node) switch n := node.(type) { case Statements: for _, s := range n { - Walk(v, s, path) + if err := Walk(v, s, path); err != nil { + return err + } } case *AlertStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *EvalStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *RecordStmt: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case Expressions: for _, e := range n { - Walk(v, e, path) + if err := Walk(v, e, path); err != nil { + return err + } } case *AggregateExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *BinaryExpr: - Walk(v, n.LHS, path) - Walk(v, n.RHS, path) + if err := Walk(v, n.LHS, path); err != nil { + return err + } + if err := Walk(v, n.RHS, path); err != nil { + return err + } case *Call: - Walk(v, n.Args, path) + if err := Walk(v, n.Args, path); err != nil { + return err + } case *ParenExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *UnaryExpr: - Walk(v, n.Expr, path) + if err := Walk(v, n.Expr, path); err != nil { + return err + } case *MatrixSelector, *NumberLiteral, *StringLiteral, *VectorSelector: // nothing to do @@ -296,21 +319,23 @@ func Walk(v Visitor, node Node, path []Node) { panic(fmt.Errorf("promql.Walk: unhandled node type %T", node)) } - v.Visit(nil, nil) + _, err = v.Visit(nil, nil) + return err } -type inspector func(Node, []Node) bool +type inspector func(Node, []Node) error -func (f inspector) Visit(node Node, path []Node) Visitor { - if f(node, path) { - return f +func (f inspector) Visit(node Node, path []Node) (Visitor, error) { + if err := f(node, path); err == nil { + return f, nil + } else { + return nil, err } - return nil } // Inspect traverses an AST in depth-first order: It starts by calling -// f(node, path); node must not be nil. If f returns true, Inspect invokes f +// f(node, path); node must not be nil. If f returns a nil error, Inspect invokes f // for all the non-nil children of node, recursively. -func Inspect(node Node, f func(Node, []Node) bool) { +func Inspect(node Node, f inspector) { Walk(inspector(f), node, nil) } diff --git a/promql/engine.go b/promql/engine.go index aeea2b5d3..48679c515 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -451,7 +451,7 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) ( func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, error) { var maxOffset time.Duration - Inspect(s.Expr, func(node Node, _ []Node) bool { + Inspect(s.Expr, func(node Node, _ []Node) error { switch n := node.(type) { case *VectorSelector: if maxOffset < LookbackDelta { @@ -468,7 +468,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev maxOffset = n.Offset + n.Range } } - return true + return nil }) mint := s.Start.Add(-maxOffset) @@ -478,7 +478,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev return nil, err } - Inspect(s.Expr, func(node Node, path []Node) bool { + Inspect(s.Expr, func(node Node, path []Node) error { var set storage.SeriesSet params := &storage.SelectParams{ Step: int64(s.Interval / time.Millisecond), @@ -491,13 +491,13 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev set, err = querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) - return false + return err } n.series, err = expandSeriesSet(set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return false + return err } case *MatrixSelector: @@ -506,15 +506,15 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev set, err = querier.Select(params, n.LabelMatchers...) if err != nil { level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) - return false + return err } n.series, err = expandSeriesSet(set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) - return false + return err } } - return true + return nil }) return querier, err } From fc7f45ba9e89a60e2893c6982d3a46e29af96c91 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 21 Jun 2018 00:14:51 -0700 Subject: [PATCH 2/9] Timeout if populating iterators takes too long (#4291) Right now promql won't time out a request if populating the iterators takes a long time. Signed-off-by: Thomas Jackson Fixes #4289 --- promql/engine.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/promql/engine.go b/promql/engine.go index 48679c515..a39735889 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -493,7 +493,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { // TODO(fabxc): use multi-error. level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) @@ -508,7 +508,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev level.Error(ng.logger).Log("msg", "error selecting series set", "err", err) return err } - n.series, err = expandSeriesSet(set) + n.series, err = expandSeriesSet(ctx, set) if err != nil { level.Error(ng.logger).Log("msg", "error expanding series set", "err", err) return err @@ -538,8 +538,13 @@ func extractFuncFromPath(p []Node) string { return extractFuncFromPath(p[:len(p)-1]) } -func expandSeriesSet(it storage.SeriesSet) (res []storage.Series, err error) { +func expandSeriesSet(ctx context.Context, it storage.SeriesSet) (res []storage.Series, err error) { for it.Next() { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } res = append(res, it.At()) } return res, it.Err() From 86239ee5a59e1f0502683360a91c3fb9456987c2 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Thu, 21 Jun 2018 14:43:31 -0700 Subject: [PATCH 3/9] Check for timeout in each iteration of matrixSelector (#4300) Signed-off-by: Thomas Jackson Fixes #4288 --- promql/engine.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/promql/engine.go b/promql/engine.go index a39735889..5b19f5038 100644 --- a/promql/engine.go +++ b/promql/engine.go @@ -1044,6 +1044,9 @@ func (ev *evaluator) matrixSelector(node *MatrixSelector) Matrix { var it *storage.BufferedSeriesIterator for i, s := range node.series { + if err := contextDone(ev.ctx, "expression evaluation"); err != nil { + ev.error(err) + } if it == nil { it = storage.NewBuffer(s.Iterator(), durationMilliseconds(node.Range)) } else { From 0b93fd6d5e147590516dc7dcbaa48e356d99c1ea Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 6 Jul 2018 10:39:38 +0300 Subject: [PATCH 4/9] fix the zookeper race (#4355) Signed-off-by: Krasi Georgiev --- discovery/zookeeper/zookeeper.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/discovery/zookeeper/zookeeper.go b/discovery/zookeeper/zookeeper.go index 60a26e860..487121457 100644 --- a/discovery/zookeeper/zookeeper.go +++ b/discovery/zookeeper/zookeeper.go @@ -137,8 +137,11 @@ func NewDiscovery( logger = log.NewNopLogger() } - conn, _, err := zk.Connect(srvs, timeout) - conn.SetLogger(treecache.NewZookeeperLogger(logger)) + conn, _, err := zk.Connect( + srvs, timeout, + func(c *zk.Conn) { + c.SetLogger(treecache.NewZookeeperLogger(logger)) + }) if err != nil { return nil } From f7e1a94b037172fc9d5a805310e6c7dfbc8293fd Mon Sep 17 00:00:00 2001 From: Frederic Branczyk Date: Thu, 5 Jul 2018 10:01:59 +0200 Subject: [PATCH 5/9] Merge pull request #4329 from nailgun/4327-ingress-discovery-issue discovery/kubernetes/ingress: fix TLS discovery --- discovery/kubernetes/ingress.go | 15 ++++-- discovery/kubernetes/ingress_test.go | 73 +++++++++++++++++++++------- 2 files changed, 68 insertions(+), 20 deletions(-) diff --git a/discovery/kubernetes/ingress.go b/discovery/kubernetes/ingress.go index 592550212..0ff3b0e0a 100644 --- a/discovery/kubernetes/ingress.go +++ b/discovery/kubernetes/ingress.go @@ -176,13 +176,22 @@ func (s *Ingress) buildIngress(ingress *v1beta1.Ingress) *targetgroup.Group { } tg.Labels = ingressLabels(ingress) - schema := "http" - if ingress.Spec.TLS != nil { - schema = "https" + tlsHosts := make(map[string]struct{}) + for _, tls := range ingress.Spec.TLS { + for _, host := range tls.Hosts { + tlsHosts[host] = struct{}{} + } } + for _, rule := range ingress.Spec.Rules { paths := pathsFromIngressRule(&rule.IngressRuleValue) + schema := "http" + _, isTLS := tlsHosts[rule.Host] + if isTLS { + schema = "https" + } + for _, path := range paths { tg.Targets = append(tg.Targets, model.LabelSet{ model.AddressLabel: lv(rule.Host), diff --git a/discovery/kubernetes/ingress_test.go b/discovery/kubernetes/ingress_test.go index b3832ebce..54ea8caf6 100644 --- a/discovery/kubernetes/ingress_test.go +++ b/discovery/kubernetes/ingress_test.go @@ -23,8 +23,16 @@ import ( "k8s.io/client-go/pkg/apis/extensions/v1beta1" ) -func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { - return &v1beta1.Ingress{ +type TLSMode int + +const ( + TLSNo TLSMode = iota + TLSYes + TLSMixed +) + +func makeIngress(tls TLSMode) *v1beta1.Ingress { + ret := &v1beta1.Ingress{ ObjectMeta: metav1.ObjectMeta{ Name: "testingress", Namespace: "default", @@ -32,7 +40,7 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { Annotations: map[string]string{"testannotation": "testannotationvalue"}, }, Spec: v1beta1.IngressSpec{ - TLS: tls, + TLS: nil, Rules: []v1beta1.IngressRule{ { Host: "example.com", @@ -63,31 +71,47 @@ func makeIngress(tls []v1beta1.IngressTLS) *v1beta1.Ingress { }, }, } + + switch tls { + case TLSYes: + ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com", "test.example.com"}}} + case TLSMixed: + ret.Spec.TLS = []v1beta1.IngressTLS{{Hosts: []string{"example.com"}}} + } + + return ret } -func expectedTargetGroups(ns string, tls bool) map[string]*targetgroup.Group { - scheme := "http" - if tls { - scheme = "https" +func expectedTargetGroups(ns string, tls TLSMode) map[string]*targetgroup.Group { + scheme1 := "http" + scheme2 := "http" + + switch tls { + case TLSYes: + scheme1 = "https" + scheme2 = "https" + case TLSMixed: + scheme1 = "https" } + key := fmt.Sprintf("ingress/%s/testingress", ns) return map[string]*targetgroup.Group{ key: { Targets: []model.LabelSet{ { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme1), "__meta_kubernetes_ingress_host": "example.com", "__meta_kubernetes_ingress_path": "/", "__address__": "example.com", }, { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme1), "__meta_kubernetes_ingress_host": "example.com", "__meta_kubernetes_ingress_path": "/foo", "__address__": "example.com", }, { - "__meta_kubernetes_ingress_scheme": lv(scheme), + "__meta_kubernetes_ingress_scheme": lv(scheme2), "__meta_kubernetes_ingress_host": "test.example.com", "__address__": "test.example.com", "__meta_kubernetes_ingress_path": "/", @@ -110,12 +134,12 @@ func TestIngressDiscoveryAdd(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeIngress(nil) + obj := makeIngress(TLSNo) c.ExtensionsV1beta1().Ingresses("default").Create(obj) w.Ingresses().Add(obj) }, expectedMaxItems: 1, - expectedRes: expectedTargetGroups("default", false), + expectedRes: expectedTargetGroups("default", TLSNo), }.Run(t) } @@ -125,27 +149,42 @@ func TestIngressDiscoveryAddTLS(t *testing.T) { k8sDiscoveryTest{ discovery: n, afterStart: func() { - obj := makeIngress([]v1beta1.IngressTLS{{}}) + obj := makeIngress(TLSYes) c.ExtensionsV1beta1().Ingresses("default").Create(obj) w.Ingresses().Add(obj) }, expectedMaxItems: 1, - expectedRes: expectedTargetGroups("default", true), + expectedRes: expectedTargetGroups("default", TLSYes), + }.Run(t) +} + +func TestIngressDiscoveryAddMixed(t *testing.T) { + n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"default"}}) + + k8sDiscoveryTest{ + discovery: n, + afterStart: func() { + obj := makeIngress(TLSMixed) + c.ExtensionsV1beta1().Ingresses("default").Create(obj) + w.Ingresses().Add(obj) + }, + expectedMaxItems: 1, + expectedRes: expectedTargetGroups("default", TLSMixed), }.Run(t) } func TestIngressDiscoveryNamespaces(t *testing.T) { n, c, w := makeDiscovery(RoleIngress, NamespaceDiscovery{Names: []string{"ns1", "ns2"}}) - expected := expectedTargetGroups("ns1", false) - for k, v := range expectedTargetGroups("ns2", false) { + expected := expectedTargetGroups("ns1", TLSNo) + for k, v := range expectedTargetGroups("ns2", TLSNo) { expected[k] = v } k8sDiscoveryTest{ discovery: n, afterStart: func() { for _, ns := range []string{"ns1", "ns2"} { - obj := makeIngress(nil) + obj := makeIngress(TLSNo) obj.Namespace = ns c.ExtensionsV1beta1().Ingresses(obj.Namespace).Create(obj) w.Ingresses().Add(obj) From 5e9056d2f37bb0383bf236756e6628c27feedded Mon Sep 17 00:00:00 2001 From: Michael Khalil Date: Thu, 21 Jun 2018 00:32:26 -0700 Subject: [PATCH 6/9] return error exit status in prometheus cli (#4296) Signed-off-by: mikeykhalil --- cmd/prometheus/main.go | 1 + cmd/prometheus/main_test.go | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index 1188a7bbe..a5985d543 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -595,6 +595,7 @@ func main() { } if err := g.Run(); err != nil { level.Error(logger).Log("err", err) + os.Exit(1) } level.Info(logger).Log("msg", "See you next time!") } diff --git a/cmd/prometheus/main_test.go b/cmd/prometheus/main_test.go index ee805674a..605ba816e 100644 --- a/cmd/prometheus/main_test.go +++ b/cmd/prometheus/main_test.go @@ -20,6 +20,7 @@ import ( "os" "os/exec" "path/filepath" + "syscall" "testing" "time" @@ -155,3 +156,20 @@ func TestComputeExternalURL(t *testing.T) { } } } + +// Let's provide an invalid configuration file and verify the exit status indicates the error. +func TestFailedStartupExitCode(t *testing.T) { + fakeInputFile := "fake-input-file" + expectedExitStatus := 1 + + prom := exec.Command(promPath, "--config.file="+fakeInputFile) + err := prom.Run() + testutil.NotOk(t, err, "") + + if exitError, ok := err.(*exec.ExitError); ok { + status := exitError.Sys().(syscall.WaitStatus) + testutil.Equals(t, expectedExitStatus, status.ExitStatus()) + } else { + t.Errorf("unable to retrieve the exit status for prometheus: %v", err) + } +} From 508662fb24d90c8d33e6d3bb627b336faf3b07ce Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 4 Jul 2018 13:41:16 +0100 Subject: [PATCH 7/9] Reorder startup and shutdown to prevent panics. (#4321) Start rule manager only after tsdb and config is loaded. Stop rule manager before tsdb to avoid writing to closed storage. Wait for any in-progress reloads to complete before shutting down rule manager, so that rule manager doesn't get updated after being shut down. Remove incorrect comment around shutting down query enginge. Log when config reload is completed. Fixes #4133 Fixes #4262 Signed-off-by: Brian Brazil --- cmd/prometheus/main.go | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go index a5985d543..fc9e707b9 100644 --- a/cmd/prometheus/main.go +++ b/cmd/prometheus/main.go @@ -472,7 +472,9 @@ func main() { }, func(err error) { - close(cancel) + // Wait for any in-progress reloads to complete to avoid + // reloading things after they have been shutdown. + cancel <- struct{}{} }, ) } @@ -506,6 +508,23 @@ func main() { }, ) } + { + // Rule manager. + // TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel. + cancel := make(chan struct{}) + g.Add( + func() error { + <-reloadReady.C + ruleManager.Run() + <-cancel + return nil + }, + func(err error) { + ruleManager.Stop() + close(cancel) + }, + ) + } { // TSDB. cancel := make(chan struct{}) @@ -547,30 +566,10 @@ func main() { return nil }, func(err error) { - // Keep this interrupt before the ruleManager.Stop(). - // Shutting down the query engine before the rule manager will cause pending queries - // to be canceled and ensures a quick shutdown of the rule manager. cancelWeb() }, ) } - { - // Rule manager. - - // TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel. - cancel := make(chan struct{}) - g.Add( - func() error { - ruleManager.Run() - <-cancel - return nil - }, - func(err error) { - ruleManager.Stop() - close(cancel) - }, - ) - } { // Notifier. @@ -627,6 +626,7 @@ func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config if failed { return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%s)", filename) } + level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename) return nil } From fc2a9c986b64a3354c94777261c6e90ad472dd29 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 11 Jul 2018 14:38:51 +0100 Subject: [PATCH 8/9] Update vendoring for tsdb (#4369) This pulls in tsdb PRs 330 344 348 353 354 356 Signed-off-by: Brian Brazil --- vendor/github.com/prometheus/tsdb/block.go | 31 ++- .../prometheus/tsdb/chunks/chunks.go | 12 +- vendor/github.com/prometheus/tsdb/compact.go | 19 +- vendor/github.com/prometheus/tsdb/db.go | 190 ++++++++---------- vendor/github.com/prometheus/tsdb/head.go | 33 +-- .../github.com/prometheus/tsdb/index/index.go | 4 +- vendor/github.com/prometheus/tsdb/querier.go | 2 +- .../github.com/prometheus/tsdb/tombstones.go | 49 +++-- vendor/vendor.json | 30 +-- 9 files changed, 192 insertions(+), 178 deletions(-) diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 9ae2adbde..1a45fb971 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -164,6 +164,13 @@ type BlockStats struct { NumTombstones uint64 `json:"numTombstones,omitempty"` } +// BlockDesc describes a block by ULID and time range. +type BlockDesc struct { + ULID ulid.ULID `json:"ulid"` + MinTime int64 `json:"minTime"` + MaxTime int64 `json:"maxTime"` +} + // BlockMetaCompaction holds information about compactions a block went through. type BlockMetaCompaction struct { // Maximum number of compaction cycles any source block has @@ -171,6 +178,9 @@ type BlockMetaCompaction struct { Level int `json:"level"` // ULIDs of all source head blocks that went into the block. Sources []ulid.ULID `json:"sources,omitempty"` + // Short descriptions of the direct blocks that were used to create + // this block. + Parents []BlockDesc `json:"parents,omitempty"` Failed bool `json:"failed,omitempty"` } @@ -424,7 +434,7 @@ func (pb *Block) Delete(mint, maxt int64, ms ...labels.Matcher) error { ir := pb.indexr // Choose only valid postings which have chunks in the time-range. - stones := memTombstones{} + stones := NewMemTombstones() var lset labels.Labels var chks []chunks.Meta @@ -437,10 +447,10 @@ Outer: } for _, chk := range chks { - if intervalOverlap(mint, maxt, chk.MinTime, chk.MaxTime) { + if chk.OverlapsClosedInterval(mint, maxt) { // Delete only until the current values and not beyond. tmin, tmax := clampInterval(mint, maxt, chks[0].MinTime, chks[len(chks)-1].MaxTime) - stones[p.At()] = Intervals{{tmin, tmax}} + stones.addInterval(p.At(), Interval{tmin, tmax}) continue Outer } } @@ -452,7 +462,7 @@ Outer: err = pb.tombstones.Iter(func(id uint64, ivs Intervals) error { for _, iv := range ivs { - stones.add(id, iv) + stones.addInterval(id, iv) pb.meta.Stats.NumTombstones++ } return nil @@ -475,19 +485,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) { pb.tombstones.Iter(func(id uint64, ivs Intervals) error { numStones += len(ivs) - return nil }) - if numStones == 0 { return nil, nil } - uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime) + meta := pb.Meta() + uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta) if err != nil { return nil, err } - return &uid, nil } @@ -531,6 +539,13 @@ func (pb *Block) Snapshot(dir string) error { return nil } +// Returns true if the block overlaps [mint, maxt]. +func (pb *Block) OverlapsClosedInterval(mint, maxt int64) bool { + // The block itself is a half-open interval + // [pb.meta.MinTime, pb.meta.MaxTime). + return pb.meta.MinTime <= maxt && mint < pb.meta.MaxTime +} + func clampInterval(a, b, mint, maxt int64) (int64, int64) { if a < mint { a = mint diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 9c80767ff..5eab23982 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -57,6 +57,12 @@ func (cm *Meta) writeHash(h hash.Hash) error { return nil } +// Returns true if the chunk overlaps [mint, maxt]. +func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { + // The chunk itself is a closed interval [cm.MinTime, cm.MaxTime]. + return cm.MinTime <= maxt && mint <= cm.MaxTime +} + var ( errInvalidSize = fmt.Errorf("invalid size") errInvalidFlag = fmt.Errorf("invalid flag") @@ -296,7 +302,7 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err } // Verify magic number. if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { - return nil, fmt.Errorf("invalid magic number %x", m) + return nil, errors.Errorf("invalid magic number %x", m) } } return &cr, nil @@ -357,8 +363,8 @@ func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { r := b.Range(off, off+binary.MaxVarintLen32) l, n := binary.Uvarint(r) - if n < 0 { - return nil, fmt.Errorf("reading chunk length failed") + if n <= 0 { + return nil, errors.Errorf("reading chunk length failed with %d", n) } r = b.Range(off+n, off+n+int(l)) diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 16a3bd747..1da130057 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -55,7 +55,7 @@ type Compactor interface { Plan(dir string) ([]string, error) // Write persists a Block into a directory. - Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) + Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) // Compact runs compaction against the provided directories. Must // only be called concurrently with results of Plan(). @@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { for _, s := range b.Compaction.Sources { sources[s] = struct{}{} } + res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{ + ULID: b.ULID, + MinTime: b.MinTime, + MaxTime: b.MaxTime, + }) } res.Compaction.Level++ @@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID, return uid, merr } -func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) { +func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) ( meta.Compaction.Level = 1 meta.Compaction.Sources = []ulid.ULID{uid} + if parent != nil { + meta.Compaction.Parents = []BlockDesc{ + {ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime}, + } + } + err := c.write(dest, meta, b) if err != nil { return uid, err @@ -472,7 +483,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe } // Create an empty tombstones file. - if err := writeTombstoneFile(tmp, EmptyTombstoneReader()); err != nil { + if err := writeTombstoneFile(tmp, NewMemTombstones()); err != nil { return errors.Wrap(err, "write new tombstones file") } @@ -581,7 +592,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, if len(dranges) > 0 { // Re-encode the chunk to not have deleted values. for i, chk := range chks { - if !intervalOverlap(dranges[0].Mint, dranges[len(dranges)-1].Maxt, chk.MinTime, chk.MaxTime) { + if !chk.OverlapsClosedInterval(dranges[0].Mint, dranges[len(dranges)-1].Maxt) { continue } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 28cb14f33..fcfbeeeb2 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -75,7 +75,7 @@ type Appender interface { // Returned reference numbers are ephemeral and may be rejected in calls // to AddFast() at any point. Adding the sample via Add() returns a new // reference number. - // If the reference is the empty string it must not be used for caching. + // If the reference is 0 it must not be used for caching. Add(l labels.Labels, t int64, v float64) (uint64, error) // Add adds a sample pair for the referenced series. It is generally faster @@ -267,17 +267,9 @@ func (db *DB) run() { case <-db.compactc: db.metrics.compactionsTriggered.Inc() - _, err1 := db.retentionCutoff() - if err1 != nil { - level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1) - } - - _, err2 := db.compact() - if err2 != nil { - level.Error(db.logger).Log("msg", "compaction failed", "err", err2) - } - - if err1 != nil || err2 != nil { + _, err := db.compact() + if err != nil { + level.Error(db.logger).Log("msg", "compaction failed", "err", err) backoff = exponential(backoff, 1*time.Second, 1*time.Minute) } else { backoff = 0 @@ -289,19 +281,9 @@ func (db *DB) run() { } } -func (db *DB) retentionCutoff() (b bool, err error) { - defer func() { - if !b && err == nil { - // no data had to be cut off. - return - } - db.metrics.cutoffs.Inc() - if err != nil { - db.metrics.cutoffsFailed.Inc() - } - }() +func (db *DB) beyondRetention(meta *BlockMeta) bool { if db.opts.RetentionDuration == 0 { - return false, nil + return false } db.mtx.RLock() @@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) { db.mtx.RUnlock() if len(blocks) == 0 { - return false, nil + return false } last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - dirs, err := retentionCutoffDirs(db.dir, mint) - if err != nil { - return false, err - } - // This will close the dirs and then delete the dirs. - if len(dirs) > 0 { - return true, db.reload(dirs...) - } - - return false, nil + return meta.MaxTime < mint } // Appender opens a new appender against the database. @@ -354,6 +326,13 @@ func (a dbAppender) Commit() error { return err } +// Compact data if possible. After successful compaction blocks are reloaded +// which will also trigger blocks to be deleted that fall out of the retention +// window. +// If no blocks are compacted, the retention window state doesn't change. Thus, +// this is sufficient to reliably delete old data. +// Old blocks are only deleted on reload based on the new block's parent information. +// See DB.reload documentation for further information. func (db *DB) compact() (changes bool, err error) { db.cmtx.Lock() defer db.cmtx.Unlock() @@ -381,9 +360,15 @@ func (db *DB) compact() (changes bool, err error) { head := &rangeHead{ head: db.head, mint: mint, - maxt: maxt, + // We remove 1 millisecond from maxt because block + // intervals are half-open: [b.MinTime, b.MaxTime). But + // chunk intervals are closed: [c.MinTime, c.MaxTime]; + // so in order to make sure that overlaps are evaluated + // consistently, we explicitly remove the last value + // from the block interval here. + maxt: maxt - 1, } - if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil { + if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil { return changes, errors.Wrap(err, "persist head block") } changes = true @@ -418,7 +403,7 @@ func (db *DB) compact() (changes bool, err error) { changes = true runtime.GC() - if err := db.reload(plan...); err != nil { + if err := db.reload(); err != nil { return changes, errors.Wrap(err, "reload blocks") } runtime.GC() @@ -427,39 +412,6 @@ func (db *DB) compact() (changes bool, err error) { return changes, nil } -// retentionCutoffDirs returns all directories of blocks in dir that are strictly -// before mint. -func retentionCutoffDirs(dir string, mint int64) ([]string, error) { - df, err := fileutil.OpenDir(dir) - if err != nil { - return nil, errors.Wrapf(err, "open directory") - } - defer df.Close() - - dirs, err := blockDirs(dir) - if err != nil { - return nil, errors.Wrapf(err, "list block dirs %s", dir) - } - - delDirs := []string{} - - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - return nil, errors.Wrapf(err, "read block meta %s", dir) - } - // The first block we encounter marks that we crossed the boundary - // of deletable blocks. - if meta.MaxTime >= mint { - break - } - - delDirs = append(delDirs, dir) - } - - return delDirs, nil -} - func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { for _, b := range db.blocks { if b.Meta().ULID == id { @@ -469,18 +421,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -func stringsContain(set []string, elem string) bool { - for _, e := range set { - if elem == e { - return true - } - } - return false -} - // reload on-disk blocks and trigger head truncation if new blocks appeared. It takes // a list of block directories which should be deleted during reload. -func (db *DB) reload(deleteable ...string) (err error) { +// Blocks that are obsolete due to replacement or retention will be deleted. +func (db *DB) reload() (err error) { defer func() { if err != nil { db.metrics.reloadsFailed.Inc() @@ -492,21 +436,58 @@ func (db *DB) reload(deleteable ...string) (err error) { if err != nil { return errors.Wrap(err, "find blocks") } + // We delete old blocks that have been superseded by new ones by gathering all parents + // from existing blocks. Those parents all have newer replacements and can be safely deleted + // after we loaded the other blocks. + // This makes us resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. By creating + // blocks with their parents, we can pick up the deletion where it left off during a crash. var ( - blocks []*Block - exist = map[ulid.ULID]struct{}{} + blocks []*Block + corrupted = map[ulid.ULID]error{} + opened = map[ulid.ULID]struct{}{} + deleteable = map[ulid.ULID]struct{}{} ) + for _, dir := range dirs { + meta, err := readMetaFile(dir) + if err != nil { + // The block was potentially in the middle of being deleted during a crash. + // Skip it since we may delete it properly further down again. + level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + ulid, err2 := ulid.Parse(filepath.Base(dir)) + if err2 != nil { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + continue + } + corrupted[ulid] = err + continue + } + if db.beyondRetention(meta) { + deleteable[meta.ULID] = struct{}{} + continue + } + for _, b := range meta.Compaction.Parents { + deleteable[b.ULID] = struct{}{} + } + } + // Blocks we failed to open should all be those we are want to delete anyway. + for c, err := range corrupted { + if _, ok := deleteable[c]; !ok { + return errors.Wrapf(err, "unexpected corrupted block %s", c) + } + } + // Load new blocks into memory. for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { return errors.Wrapf(err, "read meta information %s", dir) } - // If the block is pending for deletion, don't add it to the new block set. - if stringsContain(deleteable, dir) { + // Don't load blocks that are scheduled for deletion. + if _, ok := deleteable[meta.ULID]; ok { continue } - + // See if we already have the block in memory or open it otherwise. b, ok := db.getBlock(meta.ULID) if !ok { b, err = OpenBlock(dir, db.chunkPool) @@ -514,9 +495,8 @@ func (db *DB) reload(deleteable ...string) (err error) { return errors.Wrapf(err, "open block %s", dir) } } - blocks = append(blocks, b) - exist[meta.ULID] = struct{}{} + opened[meta.ULID] = struct{}{} } sort.Slice(blocks, func(i, j int) bool { return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime @@ -533,15 +513,19 @@ func (db *DB) reload(deleteable ...string) (err error) { db.blocks = blocks db.mtx.Unlock() + // Drop old blocks from memory. for _, b := range oldBlocks { - if _, ok := exist[b.Meta().ULID]; ok { + if _, ok := opened[b.Meta().ULID]; ok { continue } if err := b.Close(); err != nil { level.Warn(db.logger).Log("msg", "closing block failed", "err", err) } - if err := os.RemoveAll(b.Dir()); err != nil { - level.Warn(db.logger).Log("msg", "deleting block failed", "err", err) + } + // Delete all obsolete blocks. None of them are opened any longer. + for ulid := range deleteable { + if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { + return errors.Wrapf(err, "delete obsolete block %s", ulid) } } @@ -765,7 +749,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { if !withHead { return nil } - _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime()) + _, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil) return errors.Wrap(err, "snapshot head block") } @@ -778,8 +762,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) } } @@ -821,8 +804,7 @@ func (db *DB) Delete(mint, maxt int64, ms ...labels.Matcher) error { defer db.mtx.RUnlock() for _, b := range db.blocks { - m := b.Meta() - if intervalOverlap(mint, maxt, m.MinTime, m.MaxTime) { + if b.OverlapsClosedInterval(mint, maxt) { g.Go(func(b *Block) func() error { return func() error { return b.Delete(mint, maxt, ms...) } }(b)) @@ -859,27 +841,15 @@ func (db *DB) CleanTombstones() (err error) { blocks := db.blocks[:] db.mtx.RUnlock() - deletable := []string{} for _, b := range blocks { if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil { err = errors.Wrapf(er, "clean tombstones: %s", b.Dir()) return err } else if uid != nil { // New block was created. - deletable = append(deletable, b.Dir()) newUIDs = append(newUIDs, *uid) } } - - if len(deletable) == 0 { - return nil - } - - return errors.Wrap(db.reload(deletable...), "reload blocks") -} - -func intervalOverlap(amin, amax, bmin, bmax int64) bool { - // Checks Overlap: http://stackoverflow.com/questions/3269434/ - return amin <= bmax && bmin <= amax + return errors.Wrap(db.reload(), "reload blocks") } func isBlockDir(fi os.FileInfo) bool { diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index 2c7c7ec38..372842865 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -69,7 +69,7 @@ type Head struct { postings *index.MemPostings // postings lists for terms - tombstones memTombstones + tombstones *memTombstones } type headMetrics struct { @@ -189,7 +189,7 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal WAL, chunkRange int64) ( values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: memTombstones{}, + tombstones: NewMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -300,7 +300,7 @@ func (h *Head) ReadWAL() error { if itv.Maxt < mint { continue } - h.tombstones.add(s.ref, itv) + h.tombstones.addInterval(s.ref, itv) } } } @@ -521,7 +521,8 @@ func (a *headAppender) AddFast(ref uint64, t int64, v float64) error { } func (a *headAppender) Commit() error { - defer a.Rollback() + defer a.head.metrics.activeAppenders.Dec() + defer a.head.putAppendBuffer(a.samples) if err := a.head.wal.LogSeries(a.series); err != nil { return err @@ -565,7 +566,9 @@ func (a *headAppender) Rollback() error { a.head.metrics.activeAppenders.Dec() a.head.putAppendBuffer(a.samples) - return nil + // Series are created in the head memory regardless of rollback. Thus we have + // to log them to the WAL in any case. + return a.head.wal.LogSeries(a.series) } // Delete all samples in the range of [mint, maxt] for series that satisfy the given @@ -602,7 +605,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { return err } for _, s := range stones { - h.tombstones.add(s.ref, s.intervals[0]) + h.tombstones.addInterval(s.ref, s.intervals[0]) } return nil } @@ -732,19 +735,14 @@ func (h *headChunkReader) Chunk(ref uint64) (chunkenc.Chunk, error) { s.Lock() c := s.chunk(int(cid)) - // This means that the chunk has been garbage collected. - if c == nil { + // This means that the chunk has been garbage collected or is outside + // the specified range. + if c == nil || !c.OverlapsClosedInterval(h.mint, h.maxt) { s.Unlock() return nil, ErrNotFound } - - mint, maxt := c.minTime, c.maxTime s.Unlock() - // Do not expose chunks that are outside of the specified range. - if c == nil || !intervalOverlap(mint, maxt, h.mint, h.maxt) { - return nil, ErrNotFound - } return &safeChunk{ Chunk: c.chunk, s: s, @@ -849,7 +847,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]chunks for i, c := range s.chunks { // Do not expose chunks that are outside of the specified range. - if !intervalOverlap(c.minTime, c.maxTime, h.mint, h.maxt) { + if !c.OverlapsClosedInterval(h.mint, h.maxt) { continue } *chks = append(*chks, chunks.Meta{ @@ -1288,6 +1286,11 @@ type memChunk struct { minTime, maxTime int64 } +// Returns true if the chunk overlaps [mint, maxt]. +func (mc *memChunk) OverlapsClosedInterval(mint, maxt int64) bool { + return mc.minTime <= maxt && mint <= mc.maxTime +} + type memSafeIterator struct { chunkenc.Iterator diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 72ca3835a..c58ff6ea8 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -740,8 +740,8 @@ func (r *Reader) decbufUvarintAt(off int) decbuf { b := r.b.Range(off, off+binary.MaxVarintLen32) l, n := binary.Uvarint(b) - if n > binary.MaxVarintLen32 { - return decbuf{e: errors.New("invalid uvarint")} + if n <= 0 || n > binary.MaxVarintLen32 { + return decbuf{e: errors.Errorf("invalid uvarint %d", n)} } if r.b.Len() < off+n+int(l)+4 { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index b5b9ae050..2e048e495 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -478,7 +478,7 @@ type baseChunkSeries struct { // over them. It drops chunks based on tombstones in the given reader. func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher) (ChunkSeriesSet, error) { if tr == nil { - tr = EmptyTombstoneReader() + tr = NewMemTombstones() } p, err := PostingsForMatchers(ir, ms...) if err != nil { diff --git a/vendor/github.com/prometheus/tsdb/tombstones.go b/vendor/github.com/prometheus/tsdb/tombstones.go index 8c760cdce..d4a3d0ef1 100644 --- a/vendor/github.com/prometheus/tsdb/tombstones.go +++ b/vendor/github.com/prometheus/tsdb/tombstones.go @@ -16,12 +16,12 @@ package tsdb import ( "encoding/binary" "fmt" + "github.com/pkg/errors" "io" "io/ioutil" "os" "path/filepath" - - "github.com/pkg/errors" + "sync" ) const tombstoneFilename = "tombstones" @@ -107,10 +107,10 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (memTombstones, error) { +func readTombstones(dir string) (*memTombstones, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return memTombstones{}, nil + return NewMemTombstones(), nil } else if err != nil { return nil, err } @@ -140,7 +140,7 @@ func readTombstones(dir string) (memTombstones, error) { return nil, errors.New("checksum did not match") } - stonesMap := memTombstones{} + stonesMap := NewMemTombstones() for d.len() > 0 { k := d.uvarint64() @@ -150,27 +150,31 @@ func readTombstones(dir string) (memTombstones, error) { return nil, d.err() } - stonesMap.add(k, Interval{mint, maxt}) + stonesMap.addInterval(k, Interval{mint, maxt}) } return stonesMap, nil } -type memTombstones map[uint64]Intervals - -var emptyTombstoneReader = memTombstones{} - -// EmptyTombstoneReader returns a TombstoneReader that is always empty. -func EmptyTombstoneReader() TombstoneReader { - return emptyTombstoneReader +type memTombstones struct { + intvlGroups map[uint64]Intervals + mtx sync.RWMutex } -func (t memTombstones) Get(ref uint64) (Intervals, error) { - return t[ref], nil +func NewMemTombstones() *memTombstones { + return &memTombstones{intvlGroups: make(map[uint64]Intervals)} } -func (t memTombstones) Iter(f func(uint64, Intervals) error) error { - for ref, ivs := range t { +func (t *memTombstones) Get(ref uint64) (Intervals, error) { + t.mtx.RLock() + defer t.mtx.RUnlock() + return t.intvlGroups[ref], nil +} + +func (t *memTombstones) Iter(f func(uint64, Intervals) error) error { + t.mtx.RLock() + defer t.mtx.RUnlock() + for ref, ivs := range t.intvlGroups { if err := f(ref, ivs); err != nil { return err } @@ -178,8 +182,13 @@ func (t memTombstones) Iter(f func(uint64, Intervals) error) error { return nil } -func (t memTombstones) add(ref uint64, itv Interval) { - t[ref] = t[ref].add(itv) +// addInterval to an existing memTombstones +func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { + t.mtx.Lock() + defer t.mtx.Unlock() + for _, itv := range itvs { + t.intvlGroups[ref] = t.intvlGroups[ref].add(itv) + } } func (memTombstones) Close() error { @@ -208,7 +217,7 @@ func (tr Interval) isSubrange(dranges Intervals) bool { // Intervals represents a set of increasing and non-overlapping time-intervals. type Intervals []Interval -// This adds the new time-range to the existing ones. +// add the new time-range to the existing ones. // The existing ones must be sorted. func (itvs Intervals) add(n Interval) Intervals { for i, r := range itvs { diff --git a/vendor/vendor.json b/vendor/vendor.json index 1c5c47c51..4ae303726 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -820,40 +820,40 @@ "revisionTime": "2016-04-11T19:08:41Z" }, { - "checksumSHA1": "eohOTRwnox/+qrSrgYmnxeJB2yM=", + "checksumSHA1": "gzvR+g1v/ILXxAt/NuxzIPWk1x0=", "path": "github.com/prometheus/tsdb", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "QI0UME2olSr4kH6Z8UkpffM59Mc=", "path": "github.com/prometheus/tsdb/chunkenc", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { - "checksumSHA1": "746Mjy2y6wdsGjY/FcGhc8tI4w8=", + "checksumSHA1": "+5bPifRe479zdFeTYhZ+CZRLMgw=", "path": "github.com/prometheus/tsdb/chunks", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "dnyelqeik/xHDRCvCmKFv/Op9XQ=", "path": "github.com/prometheus/tsdb/fileutil", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { - "checksumSHA1": "A2uIFwIgeHmXGBzOpna95kM80RY=", + "checksumSHA1": "AZGFK4UtJe8/j8pHqGTNQ8wu27g=", "path": "github.com/prometheus/tsdb/index", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=", "path": "github.com/prometheus/tsdb/labels", - "revision": "c848349f07c83bd38d5d19faa5ea71c7fd8923ea", - "revisionTime": "2018-06-05T09:24:13Z" + "revision": "99a2c4314ff70f0673c0d07b512e2ea7a715889e", + "revisionTime": "2018-07-11T11:21:26Z" }, { "checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=", From ebe107b71b38a5cf69804f9d37a4e1f2b570b3f3 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Wed, 11 Jul 2018 15:57:22 +0100 Subject: [PATCH 9/9] Release 2.3.2 Signed-off-by: Brian Brazil --- CHANGELOG.md | 10 ++++++++++ VERSION | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 11699d053..4134fb7eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 2.3.2 / 2018-07-12 + +* [BUGFIX] Fix various tsdb bugs #4369 +* [BUGFIX] Reorder startup and shutdown to prevent panics. #4321 +* [BUGFIX] Exit with non-zero code on error #4296 +* [BUGFIX] discovery/kubernetes/ingress: fix scheme discovery #4329 +* [BUGFIX] Fix race in zookeeper sd #4355 +* [BUGFIX] Better timeout handling in promql #4291 #4300 +* [BUGFIX] Propogate errors when selecting series from the tsdb #4136 + ## 2.3.1 / 2018-06-19 * [BUGFIX] Avoid infinite loop on duplicate NaN values. #4275 diff --git a/VERSION b/VERSION index 2bf1c1ccf..f90b1afc0 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.3.1 +2.3.2