prometheus/tsdb/head_read.go

510 lines
14 KiB
Go
Raw Normal View History

// Copyright 2021 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tsdb
import (
"context"
"math"
"sort"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/index"
)
func (h *Head) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
return h.exemplars.ExemplarQuerier(ctx)
}
// Index returns an IndexReader against the block.
func (h *Head) Index() (IndexReader, error) {
return h.indexRange(math.MinInt64, math.MaxInt64), nil
}
func (h *Head) indexRange(mint, maxt int64) *headIndexReader {
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headIndexReader{head: h, mint: mint, maxt: maxt}
}
type headIndexReader struct {
head *Head
mint, maxt int64
}
func (h *headIndexReader) Close() error {
return nil
}
func (h *headIndexReader) Symbols() index.StringIter {
return h.head.postings.Symbols()
}
// SortedLabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) SortedLabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
values, err := h.LabelValues(name, matchers...)
if err == nil {
sort.Strings(values)
}
return values, err
}
// LabelValues returns label values present in the head for the
// specific label name that are within the time range mint to maxt.
// If matchers are specified the returned result set is reduced
// to label values of metrics matching the matchers.
func (h *headIndexReader) LabelValues(name string, matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
return h.head.postings.LabelValues(name), nil
}
return labelValuesWithMatchers(h, name, matchers...)
}
// LabelNames returns all the unique label names present in the head
// that are within the time range mint to maxt.
func (h *headIndexReader) LabelNames(matchers ...*labels.Matcher) ([]string, error) {
if h.maxt < h.head.MinTime() || h.mint > h.head.MaxTime() {
return []string{}, nil
}
if len(matchers) == 0 {
labelNames := h.head.postings.LabelNames()
sort.Strings(labelNames)
return labelNames, nil
}
return labelNamesWithMatchers(h, matchers...)
}
// Postings returns the postings list iterator for the label pairs.
func (h *headIndexReader) Postings(name string, values ...string) (index.Postings, error) {
Label values with matchers by intersecting postings (#9907) * LabelValues w/matchers by intersecting postings Instead of iterating all matched series to find the values, this checks if each one of the label values is present in the matched series (postings). Pending to be benchmarked. Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Benchmark labelValuesWithMatchers name old time/op new time/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 157ms ± 0% 48ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 1.80s ± 0% 0.46s ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 144ms ± 0% 57ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304ms ± 0% 111ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 761ms ± 0% 164ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 6.11µs ± 0% 6.62µs ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 117ms ± 0% 62ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 1.44s ± 0% 0.24s ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 92.1ms ± 0% 70.3ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 196ms ± 0% 115ms ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 1.23s ± 0% 0.21s ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 1.06ms ± 0% 0.88ms ± 0% name old alloc/op new alloc/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 29.5MB ± 0% 26.9MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 46.8MB ± 0% 251.5MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 29.5MB ± 0% 22.3MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 46.8MB ± 0% 23.9MB ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 10.3kB ± 0% 138535.2kB ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 5.54kB ± 0% 7.09kB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 39.1MB ± 0% 28.5MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 287MB ± 0% 253MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 34.3MB ± 0% 23.9MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 51.6MB ± 0% 25.5MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 144MB ± 0% 139MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 6.43kB ± 0% 8.66kB ± 0% name old allocs/op new allocs/op Querier/Head/labelValuesWithMatchers/i_with_n="1" 104k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 204k ± 0% 600k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 104k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 204k ± 0% 500k ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 66.0 ± 0% 255.0 ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 205.0 ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 304k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 5.20M ± 0% 0.70M ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 204k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304k ± 0% 600k ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 3.00M ± 0% 0.00M ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 247.0 ± 0% Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Don't expand postings to intersect them Using a min heap we can check whether matched postings intersect with each one of the label values postings. This avoid expanding postings (and thus having all of them in memory at any point). Slightly slower than the expanding postings version for some cases, but definitely pays the price once the cardinality grows. Still offers 10x latency improvement where previous latencies were reaching 1s. Benchmark results: name \ time/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 157ms ± 0% 48ms ± 0% 110ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 1.80s ± 0% 0.46s ± 0% 0.18s ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 144ms ± 0% 57ms ± 0% 125ms ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304ms ± 0% 111ms ± 0% 177ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 761ms ± 0% 164ms ± 0% 134ms ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 6.11µs ± 0% 6.62µs ± 0% 4.29µs ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 117ms ± 0% 62ms ± 0% 120ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 1.44s ± 0% 0.24s ± 0% 0.15s ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 92.1ms ± 0% 70.3ms ± 0% 125.4ms ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 196ms ± 0% 115ms ± 0% 170ms ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 1.23s ± 0% 0.21s ± 0% 0.14s ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 1.06ms ± 0% 0.88ms ± 0% 0.92ms ± 0% name \ alloc/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 29.5MB ± 0% 26.9MB ± 0% 19.1MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 46.8MB ± 0% 251.5MB ± 0% 36.3MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 29.5MB ± 0% 22.3MB ± 0% 19.1MB ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 46.8MB ± 0% 23.9MB ± 0% 20.7MB ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 10.3kB ± 0% 138535.2kB ± 0% 6.4kB ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 5.54kB ± 0% 7.09kB ± 0% 4.30kB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 39.1MB ± 0% 28.5MB ± 0% 20.7MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 287MB ± 0% 253MB ± 0% 38MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 34.3MB ± 0% 23.9MB ± 0% 20.7MB ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 51.6MB ± 0% 25.5MB ± 0% 22.3MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 144MB ± 0% 139MB ± 0% 0MB ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 6.43kB ± 0% 8.66kB ± 0% 5.86kB ± 0% name \ allocs/op old.txt intersect.txt intersect_noexpand.txt Querier/Head/labelValuesWithMatchers/i_with_n="1" 104k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="^.+$" 204k ± 0% 600k ± 0% 400k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",j!="foo" 104k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 204k ± 0% 500k ± 0% 300k ± 0% Querier/Head/labelValuesWithMatchers/n_with_j!="foo" 66.0 ± 0% 255.0 ± 0% 139.0 ± 0% Querier/Head/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 205.0 ± 0% 87.0 ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1" 304k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="^.+$" 5.20M ± 0% 0.70M ± 0% 0.50M ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",j!="foo" 204k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/i_with_n="1",i=~"^.*$",j!="foo" 304k ± 0% 600k ± 0% 400k ± 0% Querier/Block/labelValuesWithMatchers/n_with_j!="foo" 3.00M ± 0% 0.00M ± 0% 0.00M ± 0% Querier/Block/labelValuesWithMatchers/n_with_i="1" 61.0 ± 0% 247.0 ± 0% 129.0 ± 0% Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Apply comment suggestions from the code review Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * Change else { if } to else if Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> * Remove sorting of label values We were not sorting them before, so no need to sort them now Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com>
2021-12-28 14:59:03 +00:00
switch len(values) {
case 0:
return index.EmptyPostings(), nil
case 1:
return h.head.postings.Get(name, values[0]), nil
default:
res := make([]index.Postings, 0, len(values))
for _, value := range values {
res = append(res, h.head.postings.Get(name, value))
}
return index.Merge(res...), nil
}
}
func (h *headIndexReader) SortedPostings(p index.Postings) index.Postings {
series := make([]*memSeries, 0, 128)
// Fetch all the series only once.
for p.Next() {
s := h.head.series.getByID(chunks.HeadSeriesRef(p.At()))
if s == nil {
level.Debug(h.head.logger).Log("msg", "Looked up series not found")
} else {
series = append(series, s)
}
}
if err := p.Err(); err != nil {
return index.ErrPostings(errors.Wrap(err, "expand postings"))
}
sort.Slice(series, func(i, j int) bool {
return labels.Compare(series[i].lset, series[j].lset) < 0
})
// Convert back to list.
ep := make([]storage.SeriesRef, 0, len(series))
for _, p := range series {
ep = append(ep, storage.SeriesRef(p.ref))
}
return index.NewListPostings(ep)
}
// Series returns the series for the given reference.
func (h *headIndexReader) Series(ref storage.SeriesRef, lbls *labels.Labels, chks *[]chunks.Meta) error {
s := h.head.series.getByID(chunks.HeadSeriesRef(ref))
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.lset...)
s.Lock()
defer s.Unlock()
*chks = (*chks)[:0]
for i, c := range s.mmappedChunks {
// Do not expose chunks that are outside of the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
continue
}
*chks = append(*chks, chunks.Meta{
MinTime: c.minTime,
MaxTime: c.maxTime,
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(i))),
})
}
if s.headChunk != nil && s.headChunk.OverlapsClosedInterval(h.mint, h.maxt) {
*chks = append(*chks, chunks.Meta{
MinTime: s.headChunk.minTime,
MaxTime: math.MaxInt64, // Set the head chunks as open (being appended to).
Ref: chunks.ChunkRef(chunks.NewHeadChunkRef(s.ref, s.headChunkID(len(s.mmappedChunks)))),
})
}
return nil
}
// headChunkID returns the HeadChunkID corresponding to .mmappedChunks[pos]
func (s *memSeries) headChunkID(pos int) chunks.HeadChunkID {
return chunks.HeadChunkID(pos) + s.firstChunkID
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (h *headIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
if memSeries == nil {
return "", storage.ErrNotFound
}
value := memSeries.lset.Get(label)
if value == "" {
return "", storage.ErrNotFound
}
return value, nil
}
// LabelNamesFor returns all the label names for the series referred to by IDs.
// The names returned are sorted.
func (h *headIndexReader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) {
namesMap := make(map[string]struct{})
for _, id := range ids {
memSeries := h.head.series.getByID(chunks.HeadSeriesRef(id))
if memSeries == nil {
return nil, storage.ErrNotFound
}
for _, lbl := range memSeries.lset {
namesMap[lbl.Name] = struct{}{}
}
}
names := make([]string, 0, len(namesMap))
for name := range namesMap {
names = append(names, name)
}
sort.Strings(names)
return names, nil
}
// Chunks returns a ChunkReader against the block.
func (h *Head) Chunks() (ChunkReader, error) {
return h.chunksRange(math.MinInt64, math.MaxInt64, h.iso.State(math.MinInt64, math.MaxInt64))
}
func (h *Head) chunksRange(mint, maxt int64, is *isolationState) (*headChunkReader, error) {
h.closedMtx.Lock()
defer h.closedMtx.Unlock()
if h.closed {
return nil, errors.New("can't read from a closed head")
}
if hmin := h.MinTime(); hmin > mint {
mint = hmin
}
return &headChunkReader{
head: h,
mint: mint,
maxt: maxt,
isoState: is,
}, nil
}
type headChunkReader struct {
head *Head
mint, maxt int64
isoState *isolationState
}
func (h *headChunkReader) Close() error {
h.isoState.Close()
return nil
}
// Chunk returns the chunk for the reference number.
func (h *headChunkReader) Chunk(ref chunks.ChunkRef) (chunkenc.Chunk, error) {
sid, cid := chunks.HeadChunkRef(ref).Unpack()
s := h.head.series.getByID(sid)
// This means that the series has been garbage collected.
if s == nil {
return nil, storage.ErrNotFound
}
s.Lock()
c, garbageCollect, err := s.chunk(cid, h.head.chunkDiskMapper)
if err != nil {
s.Unlock()
return nil, err
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
c.chunk = nil
s.memChunkPool.Put(c)
}
}()
// This means that the chunk is outside the specified range.
if !c.OverlapsClosedInterval(h.mint, h.maxt) {
s.Unlock()
return nil, storage.ErrNotFound
}
s.Unlock()
return &safeChunk{
Chunk: c.chunk,
s: s,
cid: cid,
isoState: h.isoState,
chunkDiskMapper: h.head.chunkDiskMapper,
}, nil
}
// chunk returns the chunk for the HeadChunkID from memory or by m-mapping it from the disk.
// If garbageCollect is true, it means that the returned *memChunk
Add basic initial developer docs for TSDB (#9451) * Add basic initial developer docs for TSDB There's a decent amount of content already out there (blog posts, conference talks, etc), but: * when they get stale, they don't tend to get updated * they still leave me with questions that I'ld like to answer for developers (like me) who want to use, or work with, TSDB What I propose is developer docs inside the prometheus repository. Easy to find and harness the power of the community to expand it and keep it up to date. * perfect is the enemy of good. Let's have a base and incrementally improve * Markdown docs should be broad but not too deep. Source code comments can complement them, and are the ideal place for implementation details. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * use example code that works out of the box Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * more docs Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Update tsdb/docs/usage.md Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * final tweaks Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * workaround docs versioning issue Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Move example code to real executable, testable example. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * cleanup example test and make sure it always reproduces Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * obtain temp dir in a way that works with older Go versions Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Fix Ganesh's comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
2021-11-17 10:21:27 +00:00
// (and not the chunkenc.Chunk inside it) can be garbage collected after its usage.
func (s *memSeries) chunk(id chunks.HeadChunkID, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
// ix represents the index of chunk in the s.mmappedChunks slice. The chunk id's are
// incremented by 1 when new chunk is created, hence (id - firstChunkID) gives the slice index.
// The max index for the s.mmappedChunks slice can be len(s.mmappedChunks)-1, hence if the ix
// is len(s.mmappedChunks), it represents the next chunk, which is the head chunk.
ix := int(id) - int(s.firstChunkID)
if ix < 0 || ix > len(s.mmappedChunks) {
return nil, false, storage.ErrNotFound
}
if ix == len(s.mmappedChunks) {
if s.headChunk == nil {
return nil, false, errors.New("invalid head chunk")
}
return s.headChunk, false, nil
}
chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref)
if err != nil {
if _, ok := err.(*chunks.CorruptionErr); ok {
panic(err)
}
return nil, false, err
}
mc := s.memChunkPool.Get().(*memChunk)
mc.chunk = chk
mc.minTime = s.mmappedChunks[ix].minTime
mc.maxTime = s.mmappedChunks[ix].maxTime
return mc, true, nil
}
type safeChunk struct {
chunkenc.Chunk
s *memSeries
cid chunks.HeadChunkID
isoState *isolationState
chunkDiskMapper *chunks.ChunkDiskMapper
}
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
c.s.Lock()
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, reuseIter)
c.s.Unlock()
return it
}
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
c, garbageCollect, err := s.chunk(id, chunkDiskMapper)
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
// series's chunk, which got then garbage collected before it got
// accessed. We must ensure to not garbage collect as long as any
// readers still hold a reference.
if err != nil {
return chunkenc.NewNopIterator()
}
defer func() {
if garbageCollect {
// Set this to nil so that Go GC can collect it after it has been used.
// This should be done always at the end.
c.chunk = nil
s.memChunkPool.Put(c)
}
}()
ix := int(id) - int(s.firstChunkID)
numSamples := c.chunk.NumSamples()
stopAfter := numSamples
if isoState != nil && !isoState.IsolationDisabled() {
totalSamples := 0 // Total samples in this series.
previousSamples := 0 // Samples before this chunk.
for j, d := range s.mmappedChunks {
totalSamples += int(d.numSamples)
if j < ix {
previousSamples += int(d.numSamples)
}
}
if s.headChunk != nil {
totalSamples += s.headChunk.chunk.NumSamples()
}
// Removing the extra transactionIDs that are relevant for samples that
// come after this chunk, from the total transactionIDs.
appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))
// Iterate over the appendIDs, find the first one that the isolation state says not
// to return.
it := s.txs.iterator()
for index := 0; index < appendIDsToConsider; index++ {
appendID := it.At()
if appendID <= isoState.maxAppendID { // Easy check first.
if _, ok := isoState.incompleteAppends[appendID]; !ok {
it.Next()
continue
}
}
stopAfter = numSamples - (appendIDsToConsider - index)
if stopAfter < 0 {
stopAfter = 0 // Stopped in a previous chunk.
}
break
}
}
if stopAfter == 0 {
return chunkenc.NewNopIterator()
}
if int(id)-int(s.firstChunkID) < len(s.mmappedChunks) {
if stopAfter == numSamples {
return c.chunk.Iterator(it)
}
if msIter, ok := it.(*stopIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.stopAfter = stopAfter
return msIter
}
return &stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
}
}
// Serve the last 4 samples for the last chunk from the sample buffer
// as their compressed bytes may be mutated by added samples.
if msIter, ok := it.(*memSafeIterator); ok {
msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
msIter.i = -1
msIter.total = numSamples
msIter.stopAfter = stopAfter
msIter.buf = s.sampleBuf
return msIter
}
return &memSafeIterator{
stopIterator: stopIterator{
Iterator: c.chunk.Iterator(it),
i: -1,
stopAfter: stopAfter,
},
total: numSamples,
buf: s.sampleBuf,
}
}
Add basic initial developer docs for TSDB (#9451) * Add basic initial developer docs for TSDB There's a decent amount of content already out there (blog posts, conference talks, etc), but: * when they get stale, they don't tend to get updated * they still leave me with questions that I'ld like to answer for developers (like me) who want to use, or work with, TSDB What I propose is developer docs inside the prometheus repository. Easy to find and harness the power of the community to expand it and keep it up to date. * perfect is the enemy of good. Let's have a base and incrementally improve * Markdown docs should be broad but not too deep. Source code comments can complement them, and are the ideal place for implementation details. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * use example code that works out of the box Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * more docs Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Update tsdb/docs/usage.md Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * final tweaks Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * workaround docs versioning issue Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Move example code to real executable, testable example. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * cleanup example test and make sure it always reproduces Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * obtain temp dir in a way that works with older Go versions Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Fix Ganesh's comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
2021-11-17 10:21:27 +00:00
// memSafeIterator returns values from the wrapped stopIterator
// except the last 4, which come from buf.
type memSafeIterator struct {
stopIterator
total int
buf [4]sample
}
func (it *memSafeIterator) Seek(t int64) bool {
if it.Err() != nil {
return false
}
ts, _ := it.At()
for t > ts || it.i == -1 {
if !it.Next() {
return false
}
ts, _ = it.At()
}
return true
}
func (it *memSafeIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
if it.total-it.i > 4 {
return it.Iterator.Next()
}
return true
}
func (it *memSafeIterator) At() (int64, float64) {
if it.total-it.i > 4 {
return it.Iterator.At()
}
s := it.buf[4-(it.total-it.i)]
return s.t, s.v
}
Add basic initial developer docs for TSDB (#9451) * Add basic initial developer docs for TSDB There's a decent amount of content already out there (blog posts, conference talks, etc), but: * when they get stale, they don't tend to get updated * they still leave me with questions that I'ld like to answer for developers (like me) who want to use, or work with, TSDB What I propose is developer docs inside the prometheus repository. Easy to find and harness the power of the community to expand it and keep it up to date. * perfect is the enemy of good. Let's have a base and incrementally improve * Markdown docs should be broad but not too deep. Source code comments can complement them, and are the ideal place for implementation details. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * use example code that works out of the box Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * more docs Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * PR feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> * Apply suggestions from code review Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * feedback Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Update tsdb/docs/usage.md Signed-off-by: Dieter Plaetinck <dieter@grafana.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> * final tweaks Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * workaround docs versioning issue Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Move example code to real executable, testable example. Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * cleanup example test and make sure it always reproduces Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * obtain temp dir in a way that works with older Go versions Signed-off-by: Dieter Plaetinck <dieter@grafana.com> * Fix Ganesh's comments Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com> Co-authored-by: Ganesh Vernekar <15064823+codesome@users.noreply.github.com> Co-authored-by: Bartlomiej Plotka <bwplotka@gmail.com> Co-authored-by: Ganesh Vernekar <ganeshvern@gmail.com>
2021-11-17 10:21:27 +00:00
// stopIterator wraps an Iterator, but only returns the first
// stopAfter values, if initialized with i=-1.
type stopIterator struct {
chunkenc.Iterator
i, stopAfter int
}
func (it *stopIterator) Next() bool {
if it.i+1 >= it.stopAfter {
return false
}
it.i++
return it.Iterator.Next()
}