tsdb/agent: allow ingestion of OOO samples (#12897)
Signed-off-by: Paschalis Tsilias <paschalis.tsilias@grafana.com>
This commit is contained in:
parent
f5913266a1
commit
afab845e65
|
@ -802,11 +802,6 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
|
|||
series.Lock()
|
||||
defer series.Unlock()
|
||||
|
||||
if t < series.lastTs {
|
||||
a.metrics.totalOutOfOrderSamples.Inc()
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
// NOTE: always modify pendingSamples and sampleSeries together.
|
||||
a.pendingSamples = append(a.pendingSamples, record.RefSample{
|
||||
Ref: series.ref,
|
||||
|
@ -930,11 +925,6 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
|
|||
series.Lock()
|
||||
defer series.Unlock()
|
||||
|
||||
if t < series.lastTs {
|
||||
a.metrics.totalOutOfOrderSamples.Inc()
|
||||
return 0, storage.ErrOutOfOrderSample
|
||||
}
|
||||
|
||||
switch {
|
||||
case h != nil:
|
||||
// NOTE: always modify pendingHistograms and histogramSeries together
|
||||
|
|
|
@ -751,3 +751,130 @@ func TestStorage_DuplicateExemplarsIgnored(t *testing.T) {
|
|||
// We had 9 calls to AppendExemplar but only 4 of those should have gotten through.
|
||||
require.Equal(t, 4, walExemplarsCount)
|
||||
}
|
||||
|
||||
func TestDBAllowOOOSamples(t *testing.T) {
|
||||
const (
|
||||
numDatapoints = 5
|
||||
numHistograms = 5
|
||||
numSeries = 4
|
||||
offset = 100
|
||||
)
|
||||
|
||||
reg := prometheus.NewRegistry()
|
||||
s := createTestAgentDB(t, reg, DefaultOptions())
|
||||
app := s.Appender(context.TODO())
|
||||
|
||||
// Let's add some samples in the [offset, offset+numDatapoints) range.
|
||||
lbls := labelsForTest(t.Name(), numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
for i := offset; i < numDatapoints+offset; i++ {
|
||||
ref, err := app.Append(0, lset, int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
|
||||
e := exemplar.Exemplar{
|
||||
Labels: lset,
|
||||
Ts: int64(i) * 2,
|
||||
Value: float64(i),
|
||||
HasTs: true,
|
||||
}
|
||||
_, err = app.AppendExemplar(ref, lset, e)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
|
||||
|
||||
for i := offset; i < numDatapoints+offset; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i-offset], nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
|
||||
|
||||
for i := offset; i < numDatapoints+offset; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i-offset])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
m := gatherFamily(t, reg, "prometheus_agent_samples_appended_total")
|
||||
require.Equal(t, float64(20), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
|
||||
require.Equal(t, float64(40), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
|
||||
require.NoError(t, s.Close())
|
||||
|
||||
// Hack: s.wal.Dir() is the /wal subdirectory of the original storage path.
|
||||
// We need the original directory so we can recreate the storage for replay.
|
||||
storageDir := filepath.Dir(s.wal.Dir())
|
||||
|
||||
// Replay the storage so that the lastTs for each series is recorded.
|
||||
reg2 := prometheus.NewRegistry()
|
||||
db, err := Open(s.logger, reg2, nil, storageDir, s.opts)
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create storage for the agent: %v", err)
|
||||
}
|
||||
|
||||
app = db.Appender(context.Background())
|
||||
|
||||
// Now the lastTs will have been recorded successfully.
|
||||
// Let's try appending twice as many OOO samples in the [0, numDatapoints) range.
|
||||
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
for i := 0; i < numDatapoints; i++ {
|
||||
ref, err := app.Append(0, lset, int64(i), float64(i))
|
||||
require.NoError(t, err)
|
||||
|
||||
e := exemplar.Exemplar{
|
||||
Labels: lset,
|
||||
Ts: int64(i) * 2,
|
||||
Value: float64(i),
|
||||
HasTs: true,
|
||||
}
|
||||
_, err = app.AppendExemplar(ref, lset, e)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_histogram", numSeries*2)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
histograms := tsdbutil.GenerateTestHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numDatapoints; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), histograms[i], nil)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
lbls = labelsForTest(t.Name()+"_float_histogram", numSeries*2)
|
||||
for _, l := range lbls {
|
||||
lset := labels.New(l...)
|
||||
|
||||
floatHistograms := tsdbutil.GenerateTestFloatHistograms(numHistograms)
|
||||
|
||||
for i := 0; i < numDatapoints; i++ {
|
||||
_, err := app.AppendHistogram(0, lset, int64(i), nil, floatHistograms[i])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, app.Commit())
|
||||
m = gatherFamily(t, reg2, "prometheus_agent_samples_appended_total")
|
||||
require.Equal(t, float64(40), m.Metric[0].Counter.GetValue(), "agent wal mismatch of total appended samples")
|
||||
require.Equal(t, float64(80), m.Metric[1].Counter.GetValue(), "agent wal mismatch of total appended histograms")
|
||||
require.NoError(t, db.Close())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue