Add storage method to delete time series

This commit is contained in:
Fabian Reinartz 2015-05-27 17:41:57 +02:00
parent 0de6edbdfc
commit 5c6c0e2faa
3 changed files with 106 additions and 5 deletions

View File

@ -47,6 +47,9 @@ type Storage interface {
// The iterator will never return samples older than retention time,
// relative to the time NewIterator was called.
NewIterator(clientmodel.Fingerprint) SeriesIterator
// Drop all time series associated with the given fingerprints. This operation
// will not show up in the series operations metrics.
DropMetricsForFingerprints(...clientmodel.Fingerprint)
// Run the various maintenance loops in goroutines. Returns when the
// storage is ready to use. Keeps everything running in the background
// until Stop is called.

View File

@ -430,6 +430,26 @@ func (s *memorySeriesStorage) MetricForFingerprint(fp clientmodel.Fingerprint) c
}
}
// DropMetric implements Storage.
func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...clientmodel.Fingerprint) {
for _, fp := range fps {
s.fpLocker.Lock(fp)
if series, ok := s.fpToSeries.get(fp); ok {
s.fpToSeries.del(fp)
s.numSeries.Dec()
s.persistence.unindexMetric(fp, series.metric)
if _, err := s.persistence.deleteSeriesFile(fp); err != nil {
log.Errorf("Error deleting series file for %v: %v", fp, err)
}
} else if err := s.persistence.purgeArchivedMetric(fp); err != nil {
log.Errorf("Error purging metric with fingerprint %v: %v", fp, err)
}
s.fpLocker.Unlock(fp)
}
}
// Append implements Storage.
func (s *memorySeriesStorage) Append(sample *clientmodel.Sample) {
if s.getNumChunksToPersist() >= s.maxChunksToPersist {
@ -694,7 +714,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan clientmode
for {
archivedFPs, err := s.persistence.fingerprintsModifiedBefore(
clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter),
clientmodel.Now().Add(-s.dropAfter),
)
if err != nil {
log.Error("Failed to lookup archived fingerprint ranges: ", err)
@ -750,7 +770,7 @@ loop:
dirtySeriesCount = 0
checkpointTimer.Reset(s.checkpointInterval)
case fp := <-memoryFingerprints:
if s.maintainMemorySeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter)) {
if s.maintainMemorySeries(fp, clientmodel.Now().Add(-s.dropAfter)) {
dirtySeriesCount++
// Check if we have enough "dirty" series so that we need an early checkpoint.
// However, if we are already behind persisting chunks, creating a checkpoint
@ -764,7 +784,7 @@ loop:
}
}
case fp := <-archivedFingerprints:
s.maintainArchivedSeries(fp, clientmodel.TimestampFromTime(time.Now()).Add(-s.dropAfter))
s.maintainArchivedSeries(fp, clientmodel.Now().Add(-s.dropAfter))
}
}
// Wait until both channels are closed.

View File

@ -152,8 +152,7 @@ func TestRetentionCutoff(t *testing.T) {
s.dropAfter = 1 * time.Hour
samples := make(clientmodel.Samples, 120)
for i := range samples {
for i := 0; i < 120; i++ {
smpl := &clientmodel.Sample{
Metric: clientmodel.Metric{"job": "test"},
Timestamp: insertStart.Add(time.Duration(i) * time.Minute), // 1 minute intervals.
@ -204,6 +203,85 @@ func TestRetentionCutoff(t *testing.T) {
}
}
func TestDropMetrics(t *testing.T) {
now := clientmodel.Now()
insertStart := now.Add(-2 * time.Hour)
s, closer := NewTestStorage(t, 1)
defer closer.Close()
m1 := clientmodel.Metric{clientmodel.MetricNameLabel: "test", "n1": "v1"}
m2 := clientmodel.Metric{clientmodel.MetricNameLabel: "test", "n1": "v2"}
N := 120000
for j, m := range []clientmodel.Metric{m1, m2} {
for i := 0; i < N; i++ {
smpl := &clientmodel.Sample{
Metric: m,
Timestamp: insertStart.Add(time.Duration(i) * time.Millisecond), // 1 minute intervals.
Value: clientmodel.SampleValue(j),
}
s.Append(smpl)
}
}
s.WaitForIndexing()
matcher := metric.LabelMatchers{{
Type: metric.Equal,
Name: clientmodel.MetricNameLabel,
Value: "test",
}}
fps := s.FingerprintsForLabelMatchers(matcher)
if len(fps) != 2 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps))
}
it := s.NewIterator(fps[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
s.DropMetricsForFingerprints(fps[0])
s.WaitForIndexing()
fps2 := s.FingerprintsForLabelMatchers(matcher)
if len(fps2) != 1 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps2))
}
it = s.NewIterator(fps[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != N {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
s.DropMetricsForFingerprints(fps...)
s.WaitForIndexing()
fps3 := s.FingerprintsForLabelMatchers(matcher)
if len(fps3) != 0 {
t.Fatalf("unexpected number of fingerprints: %d", len(fps3))
}
it = s.NewIterator(fps[0])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
it = s.NewIterator(fps[1])
if vals := it.RangeValues(metric.Interval{insertStart, now}); len(vals) != 0 {
t.Fatalf("unexpected number of samples: %d", len(vals))
}
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster.
func TestLoop(t *testing.T) {