Merge pull request #175 from prometheus/seriesnotfound
Clarify postings index semantics, handle staleness
This commit is contained in:
commit
efe9509d04
|
@ -332,20 +332,22 @@ func exitWithError(err error) {
|
|||
os.Exit(1)
|
||||
}
|
||||
|
||||
func printBlocks(blocks []tsdb.DiskBlock) {
|
||||
func printBlocks(blocks []*tsdb.Block) {
|
||||
tw := tsdb.GetNewTabWriter(os.Stdout)
|
||||
defer tw.Flush()
|
||||
|
||||
fmt.Fprintln(tw, "BLOCK ULID\tMIN TIME\tMAX TIME\tNUM SAMPLES\tNUM CHUNKS\tNUM SERIES")
|
||||
for _, b := range blocks {
|
||||
meta := b.Meta()
|
||||
|
||||
fmt.Fprintf(tw,
|
||||
"%v\t%v\t%v\t%v\t%v\t%v\n",
|
||||
b.Meta().ULID,
|
||||
b.Meta().MinTime,
|
||||
b.Meta().MaxTime,
|
||||
b.Meta().Stats.NumSamples,
|
||||
b.Meta().Stats.NumChunks,
|
||||
b.Meta().Stats.NumSeries,
|
||||
meta.ULID,
|
||||
meta.MinTime,
|
||||
meta.MaxTime,
|
||||
meta.Stats.NumSamples,
|
||||
meta.Stats.NumChunks,
|
||||
meta.Stats.NumSeries,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
16
head.go
16
head.go
|
@ -74,6 +74,7 @@ type headMetrics struct {
|
|||
series prometheus.Gauge
|
||||
seriesCreated prometheus.Counter
|
||||
seriesRemoved prometheus.Counter
|
||||
seriesNotFound prometheus.Counter
|
||||
chunks prometheus.Gauge
|
||||
chunksCreated prometheus.Gauge
|
||||
chunksRemoved prometheus.Gauge
|
||||
|
@ -103,6 +104,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
Name: "tsdb_head_series_removed_total",
|
||||
Help: "Total number of series removed in the head",
|
||||
})
|
||||
m.seriesNotFound = prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Name: "tsdb_head_series_not_found",
|
||||
Help: "Total number of requests for series that were not found.",
|
||||
})
|
||||
m.chunks = prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "tsdb_head_chunks",
|
||||
Help: "Total number of chunks in the head block.",
|
||||
|
@ -149,6 +154,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics {
|
|||
m.series,
|
||||
m.seriesCreated,
|
||||
m.seriesRemoved,
|
||||
m.seriesNotFound,
|
||||
m.minTime,
|
||||
m.maxTime,
|
||||
m.gcDuration,
|
||||
|
@ -833,24 +839,17 @@ func (h *headIndexReader) SortedPostings(p Postings) Postings {
|
|||
if err := p.Err(); err != nil {
|
||||
return errPostings{err: errors.Wrap(err, "expand postings")}
|
||||
}
|
||||
var err error
|
||||
|
||||
sort.Slice(ep, func(i, j int) bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
a := h.head.series.getByID(ep[i])
|
||||
b := h.head.series.getByID(ep[j])
|
||||
|
||||
if a == nil || b == nil {
|
||||
err = errors.Errorf("series not found")
|
||||
level.Debug(h.head.logger).Log("msg", "looked up series not found")
|
||||
return false
|
||||
}
|
||||
return labels.Compare(a.lset, b.lset) < 0
|
||||
})
|
||||
if err != nil {
|
||||
return errPostings{err: err}
|
||||
}
|
||||
return newListPostings(ep)
|
||||
}
|
||||
|
||||
|
@ -859,6 +858,7 @@ func (h *headIndexReader) Series(ref uint64, lbls *labels.Labels, chks *[]ChunkM
|
|||
s := h.head.series.getByID(ref)
|
||||
|
||||
if s == nil {
|
||||
h.head.metrics.seriesNotFound.Inc()
|
||||
return ErrNotFound
|
||||
}
|
||||
*lbls = append((*lbls)[:0], s.lset...)
|
||||
|
|
3
index.go
3
index.go
|
@ -525,6 +525,8 @@ type IndexReader interface {
|
|||
|
||||
// Postings returns the postings list iterator for the label pair.
|
||||
// The Postings here contain the offsets to the series inside the index.
|
||||
// Found IDs are not strictly required to point to a valid Series, e.g. during
|
||||
// background garbage collections.
|
||||
Postings(name, value string) (Postings, error)
|
||||
|
||||
// SortedPostings returns a postings list that is reordered to be sorted
|
||||
|
@ -533,6 +535,7 @@ type IndexReader interface {
|
|||
|
||||
// Series populates the given labels and chunk metas for the series identified
|
||||
// by the reference.
|
||||
// Returns ErrNotFound if the ref does not resolve to a known series.
|
||||
Series(ref uint64, lset *labels.Labels, chks *[]ChunkMeta) error
|
||||
|
||||
// LabelIndices returns the label pairs for which indices exist.
|
||||
|
|
|
@ -450,6 +450,10 @@ Outer:
|
|||
for s.p.Next() {
|
||||
ref := s.p.At()
|
||||
if err := s.index.Series(ref, &lset, &chunks); err != nil {
|
||||
// Postings may be stale. Skip if no underlying series exists.
|
||||
if errors.Cause(err) == ErrNotFound {
|
||||
continue
|
||||
}
|
||||
s.err = err
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -702,9 +702,8 @@ func TestBaseChunkSeries(t *testing.T) {
|
|||
ref: 108,
|
||||
},
|
||||
},
|
||||
postings: []uint64{12, 10, 108},
|
||||
|
||||
expIdxs: []int{0, 1, 3},
|
||||
postings: []uint64{12, 13, 10, 108}, // 13 doesn't exist and should just be skipped over.
|
||||
expIdxs: []int{0, 1, 3},
|
||||
},
|
||||
{
|
||||
series: []refdSeries{
|
||||
|
@ -718,12 +717,11 @@ func TestBaseChunkSeries(t *testing.T) {
|
|||
{
|
||||
lset: labels.New([]labels.Label{{"b", "c"}}...),
|
||||
chunks: []ChunkMeta{{Ref: 8282}},
|
||||
ref: 1,
|
||||
ref: 3,
|
||||
},
|
||||
},
|
||||
postings: []uint64{},
|
||||
|
||||
expIdxs: []int{},
|
||||
expIdxs: []int{},
|
||||
},
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue