2015-01-21 19:07:45 +00:00
// Copyright 2014 The Prometheus Authors
2014-09-19 16:18:44 +00:00
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
2014-09-16 13:47:24 +00:00
package local
2014-06-06 09:55:53 +00:00
import (
"fmt"
2014-08-14 16:23:49 +00:00
"math/rand"
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
"reflect"
2014-06-06 09:55:53 +00:00
"testing"
2014-08-14 16:23:49 +00:00
"testing/quick"
2014-06-06 09:55:53 +00:00
"time"
2014-11-13 19:50:25 +00:00
"github.com/golang/glog"
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
2014-06-06 09:55:53 +00:00
clientmodel "github.com/prometheus/client_golang/model"
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
2014-08-14 16:23:49 +00:00
"github.com/prometheus/prometheus/storage/metric"
2014-10-28 18:01:41 +00:00
"github.com/prometheus/prometheus/utility/test"
2014-06-06 09:55:53 +00:00
)
2014-10-28 18:01:41 +00:00
func TestGetFingerprintsForLabelMatchers ( t * testing . T ) {
2015-02-27 13:41:43 +00:00
storage , closer := NewTestStorage ( t )
defer closer . Close ( )
2014-10-28 18:01:41 +00:00
2015-02-27 13:41:43 +00:00
samples := make ( [ ] * clientmodel . Sample , 100 )
fingerprints := make ( clientmodel . Fingerprints , 100 )
for i := range samples {
metric := clientmodel . Metric {
clientmodel . MetricNameLabel : clientmodel . LabelValue ( fmt . Sprintf ( "test_metric_%d" , i ) ) ,
"label1" : clientmodel . LabelValue ( fmt . Sprintf ( "test_%d" , i / 10 ) ) ,
"label2" : clientmodel . LabelValue ( fmt . Sprintf ( "test_%d" , ( i + 5 ) / 10 ) ) ,
}
samples [ i ] = & clientmodel . Sample {
Metric : metric ,
Timestamp : clientmodel . Timestamp ( i ) ,
Value : clientmodel . SampleValue ( i ) ,
}
fingerprints [ i ] = metric . Fingerprint ( )
}
storage . AppendSamples ( samples )
storage . WaitForIndexing ( )
newMatcher := func ( matchType metric . MatchType , name clientmodel . LabelName , value clientmodel . LabelValue ) * metric . LabelMatcher {
lm , err := metric . NewLabelMatcher ( matchType , name , value )
if err != nil {
t . Fatalf ( "error creating label matcher: %s" , err )
}
return lm
}
var matcherTests = [ ] struct {
matchers metric . LabelMatchers
expected clientmodel . Fingerprints
} {
{
matchers : metric . LabelMatchers { newMatcher ( metric . Equal , "label1" , "x" ) } ,
expected : fingerprints [ : 0 ] ,
} ,
{
matchers : metric . LabelMatchers { newMatcher ( metric . Equal , "label1" , "test_0" ) } ,
expected : fingerprints [ : 10 ] ,
} ,
{
matchers : metric . LabelMatchers {
newMatcher ( metric . Equal , "label1" , "test_0" ) ,
newMatcher ( metric . Equal , "label2" , "test_1" ) ,
} ,
expected : fingerprints [ 5 : 10 ] ,
} ,
{
matchers : metric . LabelMatchers { newMatcher ( metric . NotEqual , "label1" , "x" ) } ,
expected : fingerprints ,
} ,
{
matchers : metric . LabelMatchers { newMatcher ( metric . NotEqual , "label1" , "test_0" ) } ,
expected : fingerprints [ 10 : ] ,
} ,
{
matchers : metric . LabelMatchers {
newMatcher ( metric . NotEqual , "label1" , "test_0" ) ,
newMatcher ( metric . NotEqual , "label1" , "test_1" ) ,
newMatcher ( metric . NotEqual , "label1" , "test_2" ) ,
} ,
expected : fingerprints [ 30 : ] ,
} ,
{
matchers : metric . LabelMatchers { newMatcher ( metric . RegexMatch , "label1" , ` test_[3-5] ` ) } ,
expected : fingerprints [ 30 : 60 ] ,
} ,
{
matchers : metric . LabelMatchers { newMatcher ( metric . RegexNoMatch , "label1" , ` test_[3-5] ` ) } ,
expected : append ( append ( clientmodel . Fingerprints { } , fingerprints [ : 30 ] ... ) , fingerprints [ 60 : ] ... ) ,
} ,
{
matchers : metric . LabelMatchers {
newMatcher ( metric . RegexMatch , "label1" , ` test_[3-5] ` ) ,
newMatcher ( metric . RegexMatch , "label2" , ` test_[4-6] ` ) ,
} ,
expected : fingerprints [ 35 : 60 ] ,
} ,
{
matchers : metric . LabelMatchers {
newMatcher ( metric . RegexMatch , "label1" , ` test_[3-5] ` ) ,
newMatcher ( metric . NotEqual , "label2" , ` test_4 ` ) ,
} ,
expected : append ( append ( clientmodel . Fingerprints { } , fingerprints [ 30 : 35 ] ... ) , fingerprints [ 45 : 60 ] ... ) ,
} ,
}
for _ , mt := range matcherTests {
resfps := storage . GetFingerprintsForLabelMatchers ( mt . matchers )
if len ( mt . expected ) != len ( resfps ) {
t . Fatalf ( "expected %d matches for %q, found %d" , len ( mt . expected ) , mt . matchers , len ( resfps ) )
}
for _ , fp1 := range resfps {
found := false
for _ , fp2 := range mt . expected {
if fp1 == fp2 {
found = true
break
}
}
if ! found {
t . Errorf ( "expected fingerprint %s for %q not in result" , fp1 , mt . matchers )
}
}
}
2014-10-28 18:01:41 +00:00
}
// TestLoop is just a smoke test for the loop method, if we can switch it on and
// off without disaster.
func TestLoop ( t * testing . T ) {
2015-02-26 14:19:44 +00:00
if testing . Short ( ) {
t . Skip ( "Skipping test in short mode." )
}
2014-10-28 18:01:41 +00:00
samples := make ( clientmodel . Samples , 1000 )
for i := range samples {
samples [ i ] = & clientmodel . Sample {
Timestamp : clientmodel . Timestamp ( 2 * i ) ,
Value : clientmodel . SampleValue ( float64 ( i ) * 0.2 ) ,
}
}
directory := test . NewTemporaryDirectory ( "test_storage" , t )
defer directory . Close ( )
o := & MemorySeriesStorageOptions {
2014-11-13 19:50:25 +00:00
MemoryChunks : 50 ,
2014-10-28 18:01:41 +00:00
PersistenceRetentionPeriod : 24 * 7 * time . Hour ,
PersistenceStoragePath : directory . Path ( ) ,
CheckpointInterval : 250 * time . Millisecond ,
}
storage , err := NewMemorySeriesStorage ( o )
if err != nil {
t . Fatalf ( "Error creating storage: %s" , err )
}
storage . Start ( )
storage . AppendSamples ( samples )
2015-02-26 14:19:44 +00:00
storage . WaitForIndexing ( )
series , _ := storage . ( * memorySeriesStorage ) . fpToSeries . get ( clientmodel . Metric { } . Fingerprint ( ) )
cdsBefore := len ( series . chunkDescs )
time . Sleep ( fpMaxWaitDuration + time . Second ) // TODO(beorn7): Ugh, need to wait for maintenance to kick in.
cdsAfter := len ( series . chunkDescs )
2014-10-28 18:01:41 +00:00
storage . Stop ( )
2015-02-26 14:19:44 +00:00
if cdsBefore <= cdsAfter {
t . Errorf (
"Number of chunk descriptors should have gone down by now. Got before %d, after %d." ,
cdsBefore , cdsAfter ,
)
}
2014-10-28 18:01:41 +00:00
}
2014-06-06 09:55:53 +00:00
func TestChunk ( t * testing . T ) {
samples := make ( clientmodel . Samples , 500000 )
for i := range samples {
samples [ i ] = & clientmodel . Sample {
Timestamp : clientmodel . Timestamp ( i ) ,
Value : clientmodel . SampleValue ( float64 ( i ) * 0.2 ) ,
}
}
s , closer := NewTestStorage ( t )
defer closer . Close ( )
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-06-06 09:55:53 +00:00
2014-10-27 19:40:48 +00:00
for m := range s . ( * memorySeriesStorage ) . fpToSeries . iter ( ) {
2014-11-13 16:11:39 +00:00
s . ( * memorySeriesStorage ) . fpLocker . Lock ( m . fp )
2014-11-13 19:50:25 +00:00
var values metric . Values
for _ , cd := range m . series . chunkDescs {
if cd . isEvicted ( ) {
continue
}
for sample := range cd . chunk . values ( ) {
values = append ( values , * sample )
}
}
for i , v := range values {
2014-06-06 09:55:53 +00:00
if samples [ i ] . Timestamp != v . Timestamp {
2014-11-13 19:50:25 +00:00
t . Errorf ( "%d. Got %v; want %v" , i , v . Timestamp , samples [ i ] . Timestamp )
2014-06-06 09:55:53 +00:00
}
if samples [ i ] . Value != v . Value {
2014-11-13 19:50:25 +00:00
t . Errorf ( "%d. Got %v; want %v" , i , v . Value , samples [ i ] . Value )
2014-06-06 09:55:53 +00:00
}
}
2014-11-13 16:11:39 +00:00
s . ( * memorySeriesStorage ) . fpLocker . Unlock ( m . fp )
2014-06-06 09:55:53 +00:00
}
2014-11-13 19:50:25 +00:00
glog . Info ( "test done, closing" )
2014-06-06 09:55:53 +00:00
}
func TestGetValueAtTime ( t * testing . T ) {
2014-08-19 11:03:35 +00:00
samples := make ( clientmodel . Samples , 1000 )
2014-06-06 09:55:53 +00:00
for i := range samples {
samples [ i ] = & clientmodel . Sample {
2014-08-14 16:23:49 +00:00
Timestamp : clientmodel . Timestamp ( 2 * i ) ,
2014-06-06 09:55:53 +00:00
Value : clientmodel . SampleValue ( float64 ( i ) * 0.2 ) ,
}
}
s , closer := NewTestStorage ( t )
defer closer . Close ( )
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-06-06 09:55:53 +00:00
fp := clientmodel . Metric { } . Fingerprint ( )
it := s . NewIterator ( fp )
2014-08-14 16:23:49 +00:00
// #1 Exactly on a sample.
2014-06-06 09:55:53 +00:00
for i , expected := range samples {
2014-08-14 16:23:49 +00:00
actual := it . GetValueAtTime ( expected . Timestamp )
2014-06-06 09:55:53 +00:00
2014-08-14 16:23:49 +00:00
if len ( actual ) != 1 {
t . Fatalf ( "1.%d. Expected exactly one result, got %d." , i , len ( actual ) )
}
2014-06-06 09:55:53 +00:00
if expected . Timestamp != actual [ 0 ] . Timestamp {
2014-08-14 16:23:49 +00:00
t . Errorf ( "1.%d. Got %v; want %v" , i , actual [ 0 ] . Timestamp , expected . Timestamp )
2014-06-06 09:55:53 +00:00
}
if expected . Value != actual [ 0 ] . Value {
2014-08-14 16:23:49 +00:00
t . Errorf ( "1.%d. Got %v; want %v" , i , actual [ 0 ] . Value , expected . Value )
2014-06-06 09:55:53 +00:00
}
}
2014-08-14 16:23:49 +00:00
// #2 Between samples.
for i , expected1 := range samples {
if i == len ( samples ) - 1 {
continue
}
expected2 := samples [ i + 1 ]
actual := it . GetValueAtTime ( expected1 . Timestamp + 1 )
if len ( actual ) != 2 {
t . Fatalf ( "2.%d. Expected exactly 2 results, got %d." , i , len ( actual ) )
}
if expected1 . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "2.%d. Got %v; want %v" , i , actual [ 0 ] . Timestamp , expected1 . Timestamp )
}
if expected1 . Value != actual [ 0 ] . Value {
t . Errorf ( "2.%d. Got %v; want %v" , i , actual [ 0 ] . Value , expected1 . Value )
}
if expected2 . Timestamp != actual [ 1 ] . Timestamp {
t . Errorf ( "2.%d. Got %v; want %v" , i , actual [ 1 ] . Timestamp , expected1 . Timestamp )
}
if expected2 . Value != actual [ 1 ] . Value {
t . Errorf ( "2.%d. Got %v; want %v" , i , actual [ 1 ] . Value , expected1 . Value )
}
}
// #3 Corner cases: Just before the first sample, just after the last.
expected := samples [ 0 ]
actual := it . GetValueAtTime ( expected . Timestamp - 1 )
if len ( actual ) != 1 {
t . Fatalf ( "3.1. Expected exactly one result, got %d." , len ( actual ) )
}
if expected . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "3.1. Got %v; want %v" , actual [ 0 ] . Timestamp , expected . Timestamp )
}
if expected . Value != actual [ 0 ] . Value {
t . Errorf ( "3.1. Got %v; want %v" , actual [ 0 ] . Value , expected . Value )
}
expected = samples [ len ( samples ) - 1 ]
actual = it . GetValueAtTime ( expected . Timestamp + 1 )
if len ( actual ) != 1 {
t . Fatalf ( "3.2. Expected exactly one result, got %d." , len ( actual ) )
}
if expected . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "3.2. Got %v; want %v" , actual [ 0 ] . Timestamp , expected . Timestamp )
}
if expected . Value != actual [ 0 ] . Value {
t . Errorf ( "3.2. Got %v; want %v" , actual [ 0 ] . Value , expected . Value )
}
2014-06-06 09:55:53 +00:00
}
func TestGetRangeValues ( t * testing . T ) {
2014-08-19 11:03:35 +00:00
samples := make ( clientmodel . Samples , 1000 )
2014-06-06 09:55:53 +00:00
for i := range samples {
samples [ i ] = & clientmodel . Sample {
2014-08-14 16:23:49 +00:00
Timestamp : clientmodel . Timestamp ( 2 * i ) ,
2014-06-06 09:55:53 +00:00
Value : clientmodel . SampleValue ( float64 ( i ) * 0.2 ) ,
}
}
s , closer := NewTestStorage ( t )
defer closer . Close ( )
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-06-06 09:55:53 +00:00
fp := clientmodel . Metric { } . Fingerprint ( )
it := s . NewIterator ( fp )
2014-08-14 16:23:49 +00:00
// #1 Zero length interval at sample.
2014-06-06 09:55:53 +00:00
for i , expected := range samples {
2014-08-14 16:23:49 +00:00
actual := it . GetRangeValues ( metric . Interval {
OldestInclusive : expected . Timestamp ,
NewestInclusive : expected . Timestamp ,
} )
2014-06-06 09:55:53 +00:00
2014-08-14 16:23:49 +00:00
if len ( actual ) != 1 {
t . Fatalf ( "1.%d. Expected exactly one result, got %d." , i , len ( actual ) )
}
2014-06-06 09:55:53 +00:00
if expected . Timestamp != actual [ 0 ] . Timestamp {
2014-08-14 16:23:49 +00:00
t . Errorf ( "1.%d. Got %v; want %v." , i , actual [ 0 ] . Timestamp , expected . Timestamp )
2014-06-06 09:55:53 +00:00
}
if expected . Value != actual [ 0 ] . Value {
2014-08-14 16:23:49 +00:00
t . Errorf ( "1.%d. Got %v; want %v." , i , actual [ 0 ] . Value , expected . Value )
}
}
// #2 Zero length interval off sample.
for i , expected := range samples {
actual := it . GetRangeValues ( metric . Interval {
OldestInclusive : expected . Timestamp + 1 ,
NewestInclusive : expected . Timestamp + 1 ,
} )
if len ( actual ) != 0 {
t . Fatalf ( "2.%d. Expected no result, got %d." , i , len ( actual ) )
}
}
// #3 2sec interval around sample.
for i , expected := range samples {
actual := it . GetRangeValues ( metric . Interval {
OldestInclusive : expected . Timestamp - 1 ,
NewestInclusive : expected . Timestamp + 1 ,
} )
if len ( actual ) != 1 {
t . Fatalf ( "3.%d. Expected exactly one result, got %d." , i , len ( actual ) )
}
if expected . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "3.%d. Got %v; want %v." , i , actual [ 0 ] . Timestamp , expected . Timestamp )
}
if expected . Value != actual [ 0 ] . Value {
t . Errorf ( "3.%d. Got %v; want %v." , i , actual [ 0 ] . Value , expected . Value )
}
}
// #4 2sec interval sample to sample.
for i , expected1 := range samples {
if i == len ( samples ) - 1 {
continue
}
expected2 := samples [ i + 1 ]
actual := it . GetRangeValues ( metric . Interval {
OldestInclusive : expected1 . Timestamp ,
NewestInclusive : expected1 . Timestamp + 2 ,
} )
if len ( actual ) != 2 {
t . Fatalf ( "4.%d. Expected exactly 2 results, got %d." , i , len ( actual ) )
}
if expected1 . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "4.%d. Got %v for 1st result; want %v." , i , actual [ 0 ] . Timestamp , expected1 . Timestamp )
}
if expected1 . Value != actual [ 0 ] . Value {
t . Errorf ( "4.%d. Got %v for 1st result; want %v." , i , actual [ 0 ] . Value , expected1 . Value )
}
if expected2 . Timestamp != actual [ 1 ] . Timestamp {
t . Errorf ( "4.%d. Got %v for 2nd result; want %v." , i , actual [ 1 ] . Timestamp , expected2 . Timestamp )
}
if expected2 . Value != actual [ 1 ] . Value {
t . Errorf ( "4.%d. Got %v for 2nd result; want %v." , i , actual [ 1 ] . Value , expected2 . Value )
2014-06-06 09:55:53 +00:00
}
}
2014-08-14 16:23:49 +00:00
// #5 corner cases: Interval ends at first sample, interval starts
// at last sample, interval entirely before/after samples.
expected := samples [ 0 ]
actual := it . GetRangeValues ( metric . Interval {
OldestInclusive : expected . Timestamp - 2 ,
NewestInclusive : expected . Timestamp ,
} )
if len ( actual ) != 1 {
t . Fatalf ( "5.1. Expected exactly one result, got %d." , len ( actual ) )
}
if expected . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "5.1. Got %v; want %v." , actual [ 0 ] . Timestamp , expected . Timestamp )
}
if expected . Value != actual [ 0 ] . Value {
t . Errorf ( "5.1. Got %v; want %v." , actual [ 0 ] . Value , expected . Value )
}
expected = samples [ len ( samples ) - 1 ]
actual = it . GetRangeValues ( metric . Interval {
OldestInclusive : expected . Timestamp ,
NewestInclusive : expected . Timestamp + 2 ,
} )
if len ( actual ) != 1 {
t . Fatalf ( "5.2. Expected exactly one result, got %d." , len ( actual ) )
}
if expected . Timestamp != actual [ 0 ] . Timestamp {
t . Errorf ( "5.2. Got %v; want %v." , actual [ 0 ] . Timestamp , expected . Timestamp )
}
if expected . Value != actual [ 0 ] . Value {
t . Errorf ( "5.2. Got %v; want %v." , actual [ 0 ] . Value , expected . Value )
}
firstSample := samples [ 0 ]
actual = it . GetRangeValues ( metric . Interval {
OldestInclusive : firstSample . Timestamp - 4 ,
NewestInclusive : firstSample . Timestamp - 2 ,
} )
if len ( actual ) != 0 {
t . Fatalf ( "5.3. Expected no results, got %d." , len ( actual ) )
}
lastSample := samples [ len ( samples ) - 1 ]
actual = it . GetRangeValues ( metric . Interval {
OldestInclusive : lastSample . Timestamp + 2 ,
NewestInclusive : lastSample . Timestamp + 4 ,
} )
if len ( actual ) != 0 {
t . Fatalf ( "5.3. Expected no results, got %d." , len ( actual ) )
}
2014-06-06 09:55:53 +00:00
}
2014-10-28 18:01:41 +00:00
func TestEvictAndPurgeSeries ( t * testing . T ) {
samples := make ( clientmodel . Samples , 1000 )
for i := range samples {
samples [ i ] = & clientmodel . Sample {
Timestamp : clientmodel . Timestamp ( 2 * i ) ,
Value : clientmodel . SampleValue ( float64 ( i ) * 0.2 ) ,
}
}
s , closer := NewTestStorage ( t )
defer closer . Close ( )
2015-02-26 14:19:44 +00:00
ms := s . ( * memorySeriesStorage ) // Going to test the internal maintain.*Series methods.
2014-10-28 18:01:41 +00:00
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-10-28 18:01:41 +00:00
fp := clientmodel . Metric { } . Fingerprint ( )
2015-02-26 14:19:44 +00:00
// Drop ~half of the chunks.
ms . maintainMemorySeries ( fp , 1000 )
2014-10-28 18:01:41 +00:00
it := s . NewIterator ( fp )
actual := it . GetBoundaryValues ( metric . Interval {
OldestInclusive : 0 ,
NewestInclusive : 10000 ,
} )
if len ( actual ) != 2 {
t . Fatal ( "expected two results after purging half of series" )
}
if actual [ 0 ] . Timestamp < 800 || actual [ 0 ] . Timestamp > 1000 {
t . Errorf ( "1st timestamp out of expected range: %v" , actual [ 0 ] . Timestamp )
}
want := clientmodel . Timestamp ( 1998 )
if actual [ 1 ] . Timestamp != want {
t . Errorf ( "2nd timestamp: want %v, got %v" , want , actual [ 1 ] . Timestamp )
}
2015-02-26 14:19:44 +00:00
// Drop everything.
ms . maintainMemorySeries ( fp , 10000 )
2014-10-28 18:01:41 +00:00
it = s . NewIterator ( fp )
actual = it . GetBoundaryValues ( metric . Interval {
OldestInclusive : 0 ,
NewestInclusive : 10000 ,
} )
if len ( actual ) != 0 {
t . Fatal ( "expected zero results after purging the whole series" )
}
// Recreate series.
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-10-28 18:01:41 +00:00
series , ok := ms . fpToSeries . get ( fp )
if ! ok {
t . Fatal ( "could not find series" )
}
2014-11-13 19:50:25 +00:00
// Persist head chunk so we can safely archive.
series . headChunkPersisted = true
2014-10-28 18:01:41 +00:00
ms . persistQueue <- persistRequest { fp , series . head ( ) }
time . Sleep ( time . Second ) // Give time for persisting to happen.
2014-11-13 19:50:25 +00:00
// Archive metrics.
2014-10-28 18:01:41 +00:00
ms . fpToSeries . del ( fp )
if err := ms . persistence . archiveMetric (
Fix a bug handling freshly unarchived series.
Usually, if you unarchive a series, it is to add something to it,
which will create a new head chunk. However, if a series in
unarchived, and before anything is added to it, it is handled by the
maintenance loop, it will be archived again. In that case, we have to
load the chunkDescs to know the lastTime of the series to be
archived. Usually, this case will happen only rarely (as a race, has
never happened so far, possibly because the locking around unarchiving
and the subsequent sample append is smart enough). However, during
crash recovery, we sometimes treat series as "freshly unarchived"
without directly appending a sample. We might add more cases of that
type later, so better deal with archiving properly and load chunkDescs
if required.
2015-01-08 15:10:31 +00:00
fp , series . metric , series . firstTime ( ) , series . head ( ) . lastTime ( ) ,
2014-10-28 18:01:41 +00:00
) ; err != nil {
t . Fatal ( err )
}
archived , _ , _ , err := ms . persistence . hasArchivedMetric ( fp )
if err != nil {
t . Fatal ( err )
}
if ! archived {
t . Fatal ( "not archived" )
}
2015-02-26 14:19:44 +00:00
// Drop ~half of the chunks of an archived series.
ms . maintainArchivedSeries ( fp , 1000 )
2014-10-28 18:01:41 +00:00
archived , _ , _ , err = ms . persistence . hasArchivedMetric ( fp )
if err != nil {
t . Fatal ( err )
}
if ! archived {
2015-02-26 14:19:44 +00:00
t . Fatal ( "archived series purged although only half of the chunks dropped" )
2014-10-28 18:01:41 +00:00
}
2015-02-26 14:19:44 +00:00
// Drop everything.
ms . maintainArchivedSeries ( fp , 10000 )
2014-10-28 18:01:41 +00:00
archived , _ , _ , err = ms . persistence . hasArchivedMetric ( fp )
if err != nil {
t . Fatal ( err )
}
if archived {
t . Fatal ( "archived series not dropped" )
}
}
2014-06-06 09:55:53 +00:00
func BenchmarkAppend ( b * testing . B ) {
samples := make ( clientmodel . Samples , b . N )
for i := range samples {
samples [ i ] = & clientmodel . Sample {
Metric : clientmodel . Metric {
clientmodel . MetricNameLabel : clientmodel . LabelValue ( fmt . Sprintf ( "test_metric_%d" , i % 10 ) ) ,
"label1" : clientmodel . LabelValue ( fmt . Sprintf ( "test_metric_%d" , i % 10 ) ) ,
"label2" : clientmodel . LabelValue ( fmt . Sprintf ( "test_metric_%d" , i % 10 ) ) ,
} ,
Timestamp : clientmodel . Timestamp ( i ) ,
Value : clientmodel . SampleValue ( i ) ,
}
}
b . ResetTimer ( )
s , closer := NewTestStorage ( b )
defer closer . Close ( )
s . AppendSamples ( samples )
}
2014-08-14 16:23:49 +00:00
2014-10-28 18:01:41 +00:00
// Append a large number of random samples and then check if we can get them out
// of the storage alright.
2014-08-14 16:23:49 +00:00
func TestFuzz ( t * testing . T ) {
2014-10-28 18:01:41 +00:00
if testing . Short ( ) {
t . Skip ( "Skipping test in short mode." )
}
2014-08-14 16:23:49 +00:00
2014-10-28 18:01:41 +00:00
check := func ( seed int64 ) bool {
rand . Seed ( seed )
2014-08-14 16:23:49 +00:00
s , c := NewTestStorage ( t )
defer c . Close ( )
2014-10-28 18:01:41 +00:00
samples := createRandomSamples ( )
2014-08-14 16:23:49 +00:00
s . AppendSamples ( samples )
2015-02-12 16:23:42 +00:00
s . WaitForIndexing ( )
2014-08-14 16:23:49 +00:00
2014-10-28 18:01:41 +00:00
return verifyStorage ( t , s , samples , 24 * 7 * time . Hour )
2014-08-14 16:23:49 +00:00
}
if err := quick . Check ( check , nil ) ; err != nil {
t . Fatal ( err )
}
}
2014-11-10 17:22:08 +00:00
// BenchmarkFuzz is the benchmark version of TestFuzz. However, it will run
// several append and verify operations in parallel, if GOMAXPROC is set
2014-10-28 18:01:41 +00:00
// accordingly. Also, the storage options are set such that evictions,
// checkpoints, and purging will happen concurrently, too. This benchmark will
// have a very long runtime (up to minutes). You can use it as an actual
// benchmark. Run it like this:
//
// go test -cpu 1,2,4,8 -short -bench BenchmarkFuzz -benchmem
//
// You can also use it as a test for races. In that case, run it like this (will
// make things even slower):
//
// go test -race -cpu 8 -short -bench BenchmarkFuzz
func BenchmarkFuzz ( b * testing . B ) {
b . StopTimer ( )
rand . Seed ( 42 )
directory := test . NewTemporaryDirectory ( "test_storage" , b )
defer directory . Close ( )
o := & MemorySeriesStorageOptions {
2014-11-13 19:50:25 +00:00
MemoryChunks : 100 ,
2014-10-28 18:01:41 +00:00
PersistenceRetentionPeriod : time . Hour ,
PersistenceStoragePath : directory . Path ( ) ,
CheckpointInterval : 3 * time . Second ,
}
s , err := NewMemorySeriesStorage ( o )
if err != nil {
b . Fatalf ( "Error creating storage: %s" , err )
}
s . Start ( )
defer s . Stop ( )
b . StartTimer ( )
b . RunParallel ( func ( pb * testing . PB ) {
var allSamples clientmodel . Samples
for pb . Next ( ) {
newSamples := createRandomSamples ( )
allSamples = append ( allSamples , newSamples [ : len ( newSamples ) / 2 ] ... )
s . AppendSamples ( newSamples [ : len ( newSamples ) / 2 ] )
verifyStorage ( b , s , allSamples , o . PersistenceRetentionPeriod )
allSamples = append ( allSamples , newSamples [ len ( newSamples ) / 2 : ] ... )
s . AppendSamples ( newSamples [ len ( newSamples ) / 2 : ] )
verifyStorage ( b , s , allSamples , o . PersistenceRetentionPeriod )
}
} )
}
func createRandomSamples ( ) clientmodel . Samples {
2014-08-14 16:23:49 +00:00
type valueCreator func ( ) clientmodel . SampleValue
type deltaApplier func ( clientmodel . SampleValue ) clientmodel . SampleValue
var (
maxMetrics = 5
maxCycles = 500
maxStreakLength = 500
maxTimeDelta = 1000
maxTimeDeltaFactor = 10
2014-10-28 18:01:41 +00:00
timestamp = clientmodel . Now ( ) - clientmodel . Timestamp ( maxTimeDelta * maxTimeDeltaFactor * maxCycles * maxStreakLength / 16 ) // So that some timestamps are in the future.
2014-08-14 16:23:49 +00:00
generators = [ ] struct {
createValue valueCreator
applyDelta [ ] deltaApplier
} {
{ // "Boolean".
createValue : func ( ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Intn ( 2 ) )
2014-08-14 16:23:49 +00:00
} ,
applyDelta : [ ] deltaApplier {
func ( _ clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Intn ( 2 ) )
2014-08-14 16:23:49 +00:00
} ,
} ,
} ,
{ // Integer with int deltas of various byte length.
createValue : func ( ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Int63 ( ) - 1 << 62 )
2014-08-14 16:23:49 +00:00
} ,
applyDelta : [ ] deltaApplier {
func ( v clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Intn ( 1 << 8 ) - 1 << 7 + int ( v ) )
2014-08-14 16:23:49 +00:00
} ,
func ( v clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Intn ( 1 << 16 ) - 1 << 15 + int ( v ) )
2014-08-14 16:23:49 +00:00
} ,
func ( v clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . Intn ( 1 << 32 ) - 1 << 31 + int ( v ) )
2014-08-14 16:23:49 +00:00
} ,
} ,
} ,
{ // Float with float32 and float64 deltas.
createValue : func ( ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return clientmodel . SampleValue ( rand . NormFloat64 ( ) )
2014-08-14 16:23:49 +00:00
} ,
applyDelta : [ ] deltaApplier {
func ( v clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return v + clientmodel . SampleValue ( float32 ( rand . NormFloat64 ( ) ) )
2014-08-14 16:23:49 +00:00
} ,
func ( v clientmodel . SampleValue ) clientmodel . SampleValue {
2014-10-28 18:01:41 +00:00
return v + clientmodel . SampleValue ( rand . NormFloat64 ( ) )
2014-08-14 16:23:49 +00:00
} ,
} ,
} ,
}
)
result := clientmodel . Samples { }
metrics := [ ] clientmodel . Metric { }
2014-10-28 18:01:41 +00:00
for n := rand . Intn ( maxMetrics ) ; n >= 0 ; n -- {
2014-08-14 16:23:49 +00:00
metrics = append ( metrics , clientmodel . Metric {
2014-10-28 18:01:41 +00:00
clientmodel . LabelName ( fmt . Sprintf ( "labelname_%d" , n + 1 ) ) : clientmodel . LabelValue ( fmt . Sprintf ( "labelvalue_%d" , rand . Int ( ) ) ) ,
2014-08-14 16:23:49 +00:00
} )
}
2014-10-28 18:01:41 +00:00
for n := rand . Intn ( maxCycles ) ; n >= 0 ; n -- {
2014-08-14 16:23:49 +00:00
// Pick a metric for this cycle.
2014-10-28 18:01:41 +00:00
metric := metrics [ rand . Intn ( len ( metrics ) ) ]
timeDelta := rand . Intn ( maxTimeDelta ) + 1
generator := generators [ rand . Intn ( len ( generators ) ) ]
2014-08-14 16:23:49 +00:00
createValue := generator . createValue
2014-10-28 18:01:41 +00:00
applyDelta := generator . applyDelta [ rand . Intn ( len ( generator . applyDelta ) ) ]
incTimestamp := func ( ) { timestamp += clientmodel . Timestamp ( timeDelta * ( rand . Intn ( maxTimeDeltaFactor ) + 1 ) ) }
switch rand . Intn ( 4 ) {
2014-08-14 16:23:49 +00:00
case 0 : // A single sample.
result = append ( result , & clientmodel . Sample {
Metric : metric ,
Value : createValue ( ) ,
2014-10-28 18:01:41 +00:00
Timestamp : timestamp ,
2014-08-14 16:23:49 +00:00
} )
incTimestamp ( )
case 1 : // A streak of random sample values.
2014-10-28 18:01:41 +00:00
for n := rand . Intn ( maxStreakLength ) ; n >= 0 ; n -- {
2014-08-14 16:23:49 +00:00
result = append ( result , & clientmodel . Sample {
Metric : metric ,
Value : createValue ( ) ,
2014-10-28 18:01:41 +00:00
Timestamp : timestamp ,
2014-08-14 16:23:49 +00:00
} )
incTimestamp ( )
}
case 2 : // A streak of sample values with incremental changes.
value := createValue ( )
2014-10-28 18:01:41 +00:00
for n := rand . Intn ( maxStreakLength ) ; n >= 0 ; n -- {
2014-08-14 16:23:49 +00:00
result = append ( result , & clientmodel . Sample {
Metric : metric ,
Value : value ,
2014-10-28 18:01:41 +00:00
Timestamp : timestamp ,
2014-08-14 16:23:49 +00:00
} )
incTimestamp ( )
value = applyDelta ( value )
}
case 3 : // A streak of constant sample values.
value := createValue ( )
2014-10-28 18:01:41 +00:00
for n := rand . Intn ( maxStreakLength ) ; n >= 0 ; n -- {
2014-08-14 16:23:49 +00:00
result = append ( result , & clientmodel . Sample {
Metric : metric ,
Value : value ,
2014-10-28 18:01:41 +00:00
Timestamp : timestamp ,
2014-08-14 16:23:49 +00:00
} )
incTimestamp ( )
}
}
}
return result
}
2014-10-28 18:01:41 +00:00
func verifyStorage ( t testing . TB , s Storage , samples clientmodel . Samples , maxAge time . Duration ) bool {
2014-08-14 16:23:49 +00:00
result := true
2014-10-28 18:01:41 +00:00
for _ , i := range rand . Perm ( len ( samples ) ) {
2014-08-14 16:23:49 +00:00
sample := samples [ i ]
2014-10-28 18:01:41 +00:00
if sample . Timestamp . Before ( clientmodel . TimestampFromTime ( time . Now ( ) . Add ( - maxAge ) ) ) {
continue
// TODO: Once we have a guaranteed cutoff at the
// retention period, we can verify here that no results
// are returned.
2014-08-14 16:23:49 +00:00
}
2014-10-28 18:01:41 +00:00
fp := sample . Metric . Fingerprint ( )
p := s . NewPreloader ( )
p . PreloadRange ( fp , sample . Timestamp , sample . Timestamp , time . Hour )
found := s . NewIterator ( fp ) . GetValueAtTime ( sample . Timestamp )
2014-08-14 16:23:49 +00:00
if len ( found ) != 1 {
2014-10-28 18:01:41 +00:00
t . Errorf ( "Sample %#v: Expected exactly one value, found %d." , sample , len ( found ) )
result = false
p . Close ( )
continue
2014-08-14 16:23:49 +00:00
}
want := float64 ( sample . Value )
got := float64 ( found [ 0 ] . Value )
2014-10-28 18:01:41 +00:00
if want != got || sample . Timestamp != found [ 0 ] . Timestamp {
t . Errorf (
"Value (or timestamp) mismatch, want %f (at time %v), got %f (at time %v)." ,
want , sample . Timestamp , got , found [ 0 ] . Timestamp ,
)
2014-08-14 16:23:49 +00:00
result = false
}
2014-10-28 18:01:41 +00:00
p . Close ( )
2014-08-14 16:23:49 +00:00
}
return result
}
Improve persisting chunks to disk.
This is done by bucketing chunks by fingerprint. If the persisting to
disk falls behind, more and more chunks are in the queue. As soon as
there are "double hits", we will now persist both chunks in one go,
doubling the disk throughput (assuming it is limited by disk
seeks). Should even more pile up so that we end wit "triple hits", we
will persist those first, and so on.
Even if we have millions of time series, this will still help,
assuming not all of them are growing with the same speed. Series that
get many samples and/or are not very compressable will accumulate
chunks faster, and they will soon get double- or triple-writes.
To improve the chance of double writes,
-storage.local.persistence-queue-capacity could be set to a higher
value. However, that will slow down shutdown a lot (as the queue has
to be worked through). So we leave it to the user to set it to a
really high value. A more fundamental solution would be to checkpoint
not only head chunks, but also chunks still in the persist queue. That
would be quite complicated for a rather limited use-case (running many
time series with high ingestion rate on slow spinning disks).
2015-02-13 19:08:52 +00:00
func TestChunkMaps ( t * testing . T ) {
cm := chunkMaps { }
cd1 := & chunkDesc { refCount : 1 } // Abuse refCount as identifier.
cd21 := & chunkDesc { refCount : 21 }
cd22 := & chunkDesc { refCount : 22 }
cd31 := & chunkDesc { refCount : 31 }
cd32 := & chunkDesc { refCount : 32 }
cd33 := & chunkDesc { refCount : 33 }
cd41 := & chunkDesc { refCount : 41 }
cd42 := & chunkDesc { refCount : 42 }
cd43 := & chunkDesc { refCount : 43 }
cd44 := & chunkDesc { refCount : 44 }
cd51 := & chunkDesc { refCount : 51 }
cd52 := & chunkDesc { refCount : 52 }
cd53 := & chunkDesc { refCount : 53 }
cd54 := & chunkDesc { refCount : 54 }
cd55 := & chunkDesc { refCount : 55 }
cm . add ( 5 , cd51 )
cm . add ( 3 , cd31 )
cm . add ( 5 , cd52 )
cm . add ( 1 , cd1 )
cm . add ( 4 , cd41 )
cm . add ( 4 , cd42 )
cm . add ( 5 , cd53 )
cm . add ( 3 , cd32 )
cm . add ( 2 , cd21 )
cm . add ( 5 , cd54 )
cm . add ( 3 , cd33 )
cm . add ( 4 , cd43 )
cm . add ( 2 , cd22 )
cm . add ( 4 , cd44 )
cm . add ( 5 , cd55 )
var fpWant , fpGot clientmodel . Fingerprint
var cdsWant , cdsGot [ ] * chunkDesc
fpWant = 5
cdsWant = [ ] * chunkDesc { cd51 , cd52 , cd53 , cd54 , cd55 }
fpGot , cdsGot = cm . pop ( )
if fpWant != fpGot {
t . Errorf ( "Want fingerprint %s, got %s." , fpWant , fpGot )
}
if ! reflect . DeepEqual ( cdsWant , cdsGot ) {
t . Errorf ( "Want chunk descriptors %v, got %v." , cdsWant , cdsGot )
}
fpWant = 4
cdsWant = [ ] * chunkDesc { cd41 , cd42 , cd43 , cd44 }
fpGot , cdsGot = cm . pop ( )
if fpWant != fpGot {
t . Errorf ( "Want fingerprint %s, got %s." , fpWant , fpGot )
}
if ! reflect . DeepEqual ( cdsWant , cdsGot ) {
t . Errorf ( "Want chunk descriptors %v, got %v." , cdsWant , cdsGot )
}
fpWant = 3
cdsWant = [ ] * chunkDesc { cd31 , cd32 , cd33 }
fpGot , cdsGot = cm . pop ( )
if fpWant != fpGot {
t . Errorf ( "Want fingerprint %s, got %s." , fpWant , fpGot )
}
if ! reflect . DeepEqual ( cdsWant , cdsGot ) {
t . Errorf ( "Want chunk descriptors %v, got %v." , cdsWant , cdsGot )
}
fpWant = 2
cdsWant = [ ] * chunkDesc { cd21 , cd22 }
fpGot , cdsGot = cm . pop ( )
if fpWant != fpGot {
t . Errorf ( "Want fingerprint %s, got %s." , fpWant , fpGot )
}
if ! reflect . DeepEqual ( cdsWant , cdsGot ) {
t . Errorf ( "Want chunk descriptors %v, got %v." , cdsWant , cdsGot )
}
fpWant = 1
cdsWant = [ ] * chunkDesc { cd1 }
fpGot , cdsGot = cm . pop ( )
if fpWant != fpGot {
t . Errorf ( "Want fingerprint %s, got %s." , fpWant , fpGot )
}
if ! reflect . DeepEqual ( cdsWant , cdsGot ) {
t . Errorf ( "Want chunk descriptors %v, got %v." , cdsWant , cdsGot )
}
}