mirror of
https://github.com/prometheus/prometheus
synced 2024-12-29 02:02:17 +00:00
Group args to append to memSeries in chunkOpts
Signed-off-by: Justin Lei <justin.lei@grafana.com>
This commit is contained in:
parent
cb045c0e4b
commit
4c4454e4c9
@ -881,9 +881,13 @@ func (a *headAppender) Commit() (err error) {
|
||||
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef
|
||||
oooRecords [][]byte
|
||||
oooCapMax = a.head.opts.OutOfOrderCapMax.Load()
|
||||
chunkRange = a.head.chunkRange.Load()
|
||||
series *memSeries
|
||||
enc record.Encoder
|
||||
appendChunkOpts = chunkOpts{
|
||||
chunkDiskMapper: a.head.chunkDiskMapper,
|
||||
chunkRange: a.head.chunkRange.Load(),
|
||||
samplesPerChunk: a.head.opts.SamplesPerChunk,
|
||||
}
|
||||
enc record.Encoder
|
||||
)
|
||||
defer func() {
|
||||
for i := range oooRecords {
|
||||
@ -987,7 +991,7 @@ func (a *headAppender) Commit() (err error) {
|
||||
samplesAppended--
|
||||
}
|
||||
default:
|
||||
ok, chunkCreated = series.append(s.T, s.V, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
|
||||
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
|
||||
if ok {
|
||||
if s.T < inOrderMint {
|
||||
inOrderMint = s.T
|
||||
@ -1016,7 +1020,7 @@ func (a *headAppender) Commit() (err error) {
|
||||
for i, s := range a.histograms {
|
||||
series = a.histogramSeries[i]
|
||||
series.Lock()
|
||||
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
|
||||
ok, chunkCreated := series.appendHistogram(s.T, s.H, a.appendID, appendChunkOpts)
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
@ -1042,7 +1046,7 @@ func (a *headAppender) Commit() (err error) {
|
||||
for i, s := range a.floatHistograms {
|
||||
series = a.floatHistogramSeries[i]
|
||||
series.Lock()
|
||||
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, a.head.chunkDiskMapper, chunkRange, a.head.opts.SamplesPerChunk)
|
||||
ok, chunkCreated := series.appendFloatHistogram(s.T, s.FH, a.appendID, appendChunkOpts)
|
||||
series.cleanupAppendIDsBelow(a.cleanupAppendIDsBelow)
|
||||
series.pendingCommit = false
|
||||
series.Unlock()
|
||||
@ -1118,12 +1122,19 @@ func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDisk
|
||||
return ok, chunkCreated, mmapRef
|
||||
}
|
||||
|
||||
// chunkOpts are chunk-level options that are passed when appending to a memSeries.
|
||||
type chunkOpts struct {
|
||||
chunkDiskMapper *chunks.ChunkDiskMapper
|
||||
chunkRange int64
|
||||
samplesPerChunk int
|
||||
}
|
||||
|
||||
// append adds the sample (t, v) to the series. The caller also has to provide
|
||||
// the appendID for isolation. (The appendID can be zero, which results in no
|
||||
// isolation for this append.)
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, chunkDiskMapper, chunkRange, samplesPerChunk)
|
||||
func (s *memSeries) append(t int64, v float64, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncXOR, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@ -1144,7 +1155,7 @@ func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper
|
||||
|
||||
// appendHistogram adds the histogram.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable from appender before
|
||||
// appendPreprocessor because in case it ends up creating a new chunk,
|
||||
@ -1157,7 +1168,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset, gauge bool
|
||||
)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@ -1193,7 +1204,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
||||
// - okToAppend and no inserts → Chunk is ready to support our histogram.
|
||||
switch {
|
||||
case !okToAppend || counterReset:
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, chunkDiskMapper, chunkRange)
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncHistogram, o.chunkDiskMapper, o.chunkRange)
|
||||
chunkCreated = true
|
||||
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
|
||||
// New buckets have appeared. We need to recode all
|
||||
@ -1238,7 +1249,7 @@ func (s *memSeries) appendHistogram(t int64, h *histogram.Histogram, appendID ui
|
||||
|
||||
// appendFloatHistogram adds the float histogram.
|
||||
// It is unsafe to call this concurrently with s.iterator(...) without holding the series lock.
|
||||
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper, chunkRange int64, samplesPerChunk int) (sampleInOrder, chunkCreated bool) {
|
||||
func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram, appendID uint64, o chunkOpts) (sampleInOrder, chunkCreated bool) {
|
||||
// Head controls the execution of recoding, so that we own the proper
|
||||
// chunk reference afterwards. We check for Appendable from appender before
|
||||
// appendPreprocessor because in case it ends up creating a new chunk,
|
||||
@ -1251,7 +1262,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
||||
pMergedSpans, nMergedSpans []histogram.Span
|
||||
okToAppend, counterReset, gauge bool
|
||||
)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange, samplesPerChunk)
|
||||
c, sampleInOrder, chunkCreated := s.appendPreprocessor(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange, o.samplesPerChunk)
|
||||
if !sampleInOrder {
|
||||
return sampleInOrder, chunkCreated
|
||||
}
|
||||
@ -1287,7 +1298,7 @@ func (s *memSeries) appendFloatHistogram(t int64, fh *histogram.FloatHistogram,
|
||||
// - okToAppend and no inserts → Chunk is ready to support our histogram.
|
||||
switch {
|
||||
case !okToAppend || counterReset:
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, chunkDiskMapper, chunkRange)
|
||||
c = s.cutNewHeadChunk(t, chunkenc.EncFloatHistogram, o.chunkDiskMapper, o.chunkRange)
|
||||
chunkCreated = true
|
||||
case len(pForwardInserts) > 0 || len(nForwardInserts) > 0:
|
||||
// New buckets have appeared. We need to recode all
|
||||
|
@ -283,10 +283,15 @@ func BenchmarkLoadWAL(b *testing.B) {
|
||||
if c.mmappedChunkT != 0 {
|
||||
chunkDiskMapper, err := chunks.NewChunkDiskMapper(nil, mmappedChunksDir(dir), chunkenc.NewPool(), chunks.DefaultWriteBufferSize, chunks.DefaultWriteQueueSize)
|
||||
require.NoError(b, err)
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: c.mmappedChunkT,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
for k := 0; k < c.batches*c.seriesPerBatch; k++ {
|
||||
// Create one mmapped chunk per series, with one sample at the given time.
|
||||
s := newMemSeries(labels.Labels{}, chunks.HeadSeriesRef(k)*101, defaultIsolationDisabled)
|
||||
s.append(c.mmappedChunkT, 42, 0, chunkDiskMapper, c.mmappedChunkT, DefaultSamplesPerChunk)
|
||||
s.append(c.mmappedChunkT, 42, 0, cOpts)
|
||||
s.mmapCurrentHeadChunk(chunkDiskMapper)
|
||||
}
|
||||
require.NoError(b, chunkDiskMapper.Close())
|
||||
@ -799,7 +804,11 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
const chunkRange = 2000
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: 2000,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
memChunkPool := sync.Pool{
|
||||
New: func() interface{} {
|
||||
@ -810,7 +819,7 @@ func TestMemSeries_truncateChunks(t *testing.T) {
|
||||
s := newMemSeries(labels.FromStrings("a", "b"), 1, defaultIsolationDisabled)
|
||||
|
||||
for i := 0; i < 4000; i += 5 {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
@ -1336,26 +1345,30 @@ func TestMemSeries_append(t *testing.T) {
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
const chunkRange = 500
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: 500,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
// New chunk must correctly be cut at 1000.
|
||||
ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.append(998, 1, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "first sample created chunk")
|
||||
|
||||
ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(999, 2, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1000, 3, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "expected new chunk on boundary")
|
||||
|
||||
ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1001, 4, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
@ -1368,7 +1381,7 @@ func TestMemSeries_append(t *testing.T) {
|
||||
// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
|
||||
// at approximately 120 samples per chunk.
|
||||
for i := 1; i < 1000; i++ {
|
||||
ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(1001+int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
}
|
||||
|
||||
@ -1390,7 +1403,11 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
chunkRange := int64(1000)
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: int64(1000),
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
|
||||
@ -1404,19 +1421,19 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
||||
// Add first two samples at the very end of a chunk range and the next two
|
||||
// on and after it.
|
||||
// New chunk must correctly be cut at 1000.
|
||||
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.appendHistogram(998, histograms[0], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "first sample created chunk")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.appendHistogram(999, histograms[1], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.appendHistogram(1000, histograms[2], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.True(t, chunkCreated, "expected new chunk on boundary")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.appendHistogram(1001, histograms[3], 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "second sample should use same chunk")
|
||||
|
||||
@ -1426,7 +1443,7 @@ func TestMemSeries_appendHistogram(t *testing.T) {
|
||||
require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range")
|
||||
require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range")
|
||||
|
||||
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.appendHistogram(1002, histogramWithOneMoreBucket, 0, cOpts)
|
||||
require.True(t, ok, "append failed")
|
||||
require.False(t, chunkCreated, "third sample should trigger a re-encoded chunk")
|
||||
|
||||
@ -1446,7 +1463,11 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
})
|
||||
chunkRange := DefaultBlockDuration
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: DefaultBlockDuration,
|
||||
samplesPerChunk: samplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
|
||||
@ -1456,7 +1477,7 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
var nextTs int64
|
||||
var totalAppendedSamples int
|
||||
for i := 0; i < samplesPerChunk/4; i++ {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
|
||||
require.Truef(t, ok, "slow sample %d was not appended", i)
|
||||
nextTs += slowRate
|
||||
totalAppendedSamples++
|
||||
@ -1465,12 +1486,12 @@ func TestMemSeries_append_atVariableRate(t *testing.T) {
|
||||
|
||||
// Suddenly, the rate increases and we receive a sample every millisecond.
|
||||
for i := 0; i < math.MaxUint16; i++ {
|
||||
ok, _ := s.append(nextTs, float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(nextTs, float64(i), 0, cOpts)
|
||||
require.Truef(t, ok, "quick sample %d was not appended", i)
|
||||
nextTs++
|
||||
totalAppendedSamples++
|
||||
}
|
||||
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.append(DefaultBlockDuration, float64(0), 0, cOpts)
|
||||
require.True(t, ok, "new chunk sample was not appended")
|
||||
require.True(t, chunkCreated, "sample at block duration timestamp should create a new chunk")
|
||||
|
||||
@ -1490,23 +1511,29 @@ func TestGCChunkAccess(t *testing.T) {
|
||||
require.NoError(t, h.Close())
|
||||
}()
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(999, 999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
// A new chunks should be created here as it's beyond the chunk range.
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
@ -1543,23 +1570,29 @@ func TestGCSeriesAccess(t *testing.T) {
|
||||
require.NoError(t, h.Close())
|
||||
}()
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
|
||||
// Appending 2 samples for the first chunk.
|
||||
ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.append(0, 0, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(999, 999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
// A new chunks should be created here as it's beyond the chunk range.
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1000, 1000, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunks was not created")
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(1999, 1999, 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunks was created")
|
||||
|
||||
@ -1791,14 +1824,20 @@ func TestHeadReadWriterRepair(t *testing.T) {
|
||||
require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal))
|
||||
require.NoError(t, h.Init(math.MinInt64))
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: chunkRange,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
require.True(t, created, "series was not created")
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.True(t, chunkCreated, "chunk was not created")
|
||||
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, cOpts)
|
||||
require.True(t, ok, "series append failed")
|
||||
require.False(t, chunkCreated, "chunk was created")
|
||||
h.chunkDiskMapper.CutNewFile()
|
||||
@ -2144,9 +2183,15 @@ func TestIsolationAppendIDZeroIsNoop(t *testing.T) {
|
||||
|
||||
h.initTime(0)
|
||||
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
|
||||
|
||||
ok, _ := s.append(0, 0, 0, h.chunkDiskMapper, h.chunkRange.Load(), DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(0, 0, 0, cOpts)
|
||||
require.True(t, ok, "Series append failed.")
|
||||
require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.")
|
||||
}
|
||||
@ -2608,12 +2653,16 @@ func TestIteratorSeekIntoBuffer(t *testing.T) {
|
||||
defer func() {
|
||||
require.NoError(t, chunkDiskMapper.Close())
|
||||
}()
|
||||
const chunkRange = 500
|
||||
cOpts := chunkOpts{
|
||||
chunkDiskMapper: chunkDiskMapper,
|
||||
chunkRange: 500,
|
||||
samplesPerChunk: DefaultSamplesPerChunk,
|
||||
}
|
||||
|
||||
s := newMemSeries(labels.Labels{}, 1, defaultIsolationDisabled)
|
||||
|
||||
for i := 0; i < 7; i++ {
|
||||
ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper, chunkRange, DefaultSamplesPerChunk)
|
||||
ok, _ := s.append(int64(i), float64(i), 0, cOpts)
|
||||
require.True(t, ok, "sample append failed")
|
||||
}
|
||||
|
||||
|
@ -564,7 +564,11 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
|
||||
minValidTime := h.minValidTime.Load()
|
||||
mint, maxt := int64(math.MaxInt64), int64(math.MinInt64)
|
||||
chunkRange := h.chunkRange.Load()
|
||||
appendChunkOpts := chunkOpts{
|
||||
chunkDiskMapper: h.chunkDiskMapper,
|
||||
chunkRange: h.chunkRange.Load(),
|
||||
samplesPerChunk: h.opts.SamplesPerChunk,
|
||||
}
|
||||
|
||||
for in := range wp.input {
|
||||
if in.existingSeries != nil {
|
||||
@ -588,7 +592,7 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
if s.T <= ms.mmMaxTime {
|
||||
continue
|
||||
}
|
||||
if _, chunkCreated := ms.append(s.T, s.V, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk); chunkCreated {
|
||||
if _, chunkCreated := ms.append(s.T, s.V, 0, appendChunkOpts); chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
h.metrics.chunks.Inc()
|
||||
}
|
||||
@ -618,9 +622,9 @@ func (wp *walSubsetProcessor) processWALSamples(h *Head, mmappedChunks, oooMmapp
|
||||
}
|
||||
var chunkCreated bool
|
||||
if s.h != nil {
|
||||
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
|
||||
_, chunkCreated = ms.appendHistogram(s.t, s.h, 0, appendChunkOpts)
|
||||
} else {
|
||||
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, h.chunkDiskMapper, chunkRange, h.opts.SamplesPerChunk)
|
||||
_, chunkCreated = ms.appendFloatHistogram(s.t, s.fh, 0, appendChunkOpts)
|
||||
}
|
||||
if chunkCreated {
|
||||
h.metrics.chunksCreated.Inc()
|
||||
|
Loading…
Reference in New Issue
Block a user