mirror of
https://github.com/prometheus/prometheus
synced 2025-01-16 11:53:16 +00:00
Merge pull request #12003 from codesome/redundant-chunk-access
Remove unnecessary chunk fetch in Head queries
This commit is contained in:
commit
66da1d51fd
@ -304,12 +304,10 @@ func (h *headChunkReader) Chunk(meta chunks.Meta) (chunkenc.Chunk, error) {
|
||||
s.Unlock()
|
||||
|
||||
return &safeChunk{
|
||||
Chunk: c.chunk,
|
||||
s: s,
|
||||
cid: cid,
|
||||
isoState: h.isoState,
|
||||
chunkDiskMapper: h.head.chunkDiskMapper,
|
||||
memChunkPool: &h.head.memChunkPool,
|
||||
Chunk: c.chunk,
|
||||
s: s,
|
||||
cid: cid,
|
||||
isoState: h.isoState,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -600,43 +598,24 @@ func (b boundedIterator) Seek(t int64) chunkenc.ValueType {
|
||||
// safeChunk makes sure that the chunk can be accessed without a race condition
|
||||
type safeChunk struct {
|
||||
chunkenc.Chunk
|
||||
s *memSeries
|
||||
cid chunks.HeadChunkID
|
||||
isoState *isolationState
|
||||
chunkDiskMapper *chunks.ChunkDiskMapper
|
||||
memChunkPool *sync.Pool
|
||||
s *memSeries
|
||||
cid chunks.HeadChunkID
|
||||
isoState *isolationState
|
||||
}
|
||||
|
||||
func (c *safeChunk) Iterator(reuseIter chunkenc.Iterator) chunkenc.Iterator {
|
||||
c.s.Lock()
|
||||
it := c.s.iterator(c.cid, c.isoState, c.chunkDiskMapper, c.memChunkPool, reuseIter)
|
||||
it := c.s.iterator(c.cid, c.Chunk, c.isoState, reuseIter)
|
||||
c.s.Unlock()
|
||||
return it
|
||||
}
|
||||
|
||||
// iterator returns a chunk iterator for the requested chunkID, or a NopIterator if the requested ID is out of range.
|
||||
// It is unsafe to call this concurrently with s.append(...) without holding the series lock.
|
||||
func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, memChunkPool *sync.Pool, it chunkenc.Iterator) chunkenc.Iterator {
|
||||
c, garbageCollect, err := s.chunk(id, chunkDiskMapper, memChunkPool)
|
||||
// TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
|
||||
// series's chunk, which got then garbage collected before it got
|
||||
// accessed. We must ensure to not garbage collect as long as any
|
||||
// readers still hold a reference.
|
||||
if err != nil {
|
||||
return chunkenc.NewNopIterator()
|
||||
}
|
||||
defer func() {
|
||||
if garbageCollect {
|
||||
// Set this to nil so that Go GC can collect it after it has been used.
|
||||
// This should be done always at the end.
|
||||
c.chunk = nil
|
||||
memChunkPool.Put(c)
|
||||
}
|
||||
}()
|
||||
|
||||
func (s *memSeries) iterator(id chunks.HeadChunkID, c chunkenc.Chunk, isoState *isolationState, it chunkenc.Iterator) chunkenc.Iterator {
|
||||
ix := int(id) - int(s.firstChunkID)
|
||||
|
||||
numSamples := c.chunk.NumSamples()
|
||||
numSamples := c.NumSamples()
|
||||
stopAfter := numSamples
|
||||
|
||||
if isoState != nil && !isoState.IsolationDisabled() {
|
||||
@ -681,9 +660,9 @@ func (s *memSeries) iterator(id chunks.HeadChunkID, isoState *isolationState, ch
|
||||
return chunkenc.NewNopIterator()
|
||||
}
|
||||
if stopAfter == numSamples {
|
||||
return c.chunk.Iterator(it)
|
||||
return c.Iterator(it)
|
||||
}
|
||||
return makeStopIterator(c.chunk, it, stopAfter)
|
||||
return makeStopIterator(c, it, stopAfter)
|
||||
}
|
||||
|
||||
// stopIterator wraps an Iterator, but only returns the first
|
||||
|
@ -537,11 +537,18 @@ func TestHead_ReadWAL(t *testing.T) {
|
||||
require.NoError(t, c.Err())
|
||||
return x
|
||||
}
|
||||
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
|
||||
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
|
||||
|
||||
c, _, err := s10.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{100, 2, nil, nil}, {101, 5, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
c, _, err = s50.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 6, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
// The samples before the new series record should be discarded since a duplicate record
|
||||
// is only possible when old samples were compacted.
|
||||
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil, nil)))
|
||||
c, _, err = s100.chunk(0, head.chunkDiskMapper, &head.memChunkPool)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, []sample{{101, 7, nil, nil}}, expandChunk(c.chunk.Iterator(nil)))
|
||||
|
||||
q, err := head.ExemplarQuerier(context.Background())
|
||||
require.NoError(t, err)
|
||||
@ -2566,7 +2573,13 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
it := s.iterator(s.headChunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil, nil)
|
||||
c, _, err := s.chunk(0, chunkDiskMapper, &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return &memChunk{}
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
it := c.chunk.Iterator(nil)
|
||||
|
||||
// First point.
|
||||
require.Equal(t, chunkenc.ValFloat, it.Seek(0))
|
||||
|
0
tsdb/test.txt
Normal file
0
tsdb/test.txt
Normal file
Loading…
Reference in New Issue
Block a user