This commit is contained in:
Matt T. Proud 2013-03-07 11:01:32 -08:00
parent 1e0d740f2a
commit 62b5d7ce20
4 changed files with 211 additions and 71 deletions

View File

@ -16,7 +16,6 @@ package metric
import (
"code.google.com/p/goprotobuf/proto"
"flag"
"fmt"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/coding/indexable"
"github.com/prometheus/prometheus/model"
@ -200,10 +199,6 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error)
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
c := len(samples)
if c > 1 {
fmt.Printf("Appending %d samples...", c)
}
begin := time.Now()
defer func() {
duration := time.Now().Sub(begin)
@ -244,6 +239,58 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
doneSorting.Wait()
var (
doneCommitting = sync.WaitGroup{}
)
go func() {
doneCommitting.Add(1)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
defer doneCommitting.Done()
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
if lengthOfGroup == 0 {
break
}
take := maximumChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
}
}
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
}()
var (
absentFingerprints = map[model.Fingerprint]model.Samples{}
)
@ -454,48 +501,8 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}
}
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
doneCommitting.Wait()
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
if lengthOfGroup == 0 {
break
}
take := maximumChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
}
}
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
return
}

View File

@ -62,7 +62,6 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt
if s.values.Len() == 0 {
return
}
iterator := s.values.SeekToLast()
defer iterator.Close()
@ -91,7 +90,6 @@ func (s stream) forEach(decoder storage.RecordDecoder, filter storage.RecordFilt
break
}
}
if !iterator.Previous() {
break
}

View File

@ -202,9 +202,10 @@ func (t *tieredStorage) Flush() {
// Write all pending appends.
func (t *tieredStorage) flush() (err error) {
// Trim and old values to reduce iterative write costs.
t.flushMemory()
t.writeMemory()
t.flushMemory()
return
}
@ -283,7 +284,11 @@ func (f *memoryToDiskFlusher) Flush() {
for i := 0; i < length; i++ {
samples = append(samples, <-f.toDiskQueue)
}
start := time.Now()
f.disk.AppendSamples(samples)
if false {
fmt.Printf("Took %s to append...\n", time.Since(start))
}
}
func (f memoryToDiskFlusher) Close() {
@ -382,8 +387,13 @@ func (t *tieredStorage) renderView(viewJob viewJob) (err error) {
continue
}
if seriesFrontier.lastSupertime.Before(operation.StartsAt()) && !seriesFrontier.lastTime.Before(operation.StartsAt()) {
targetKey.Timestamp = indexable.EncodeTime(seriesFrontier.lastSupertime)
} else {
targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt())
}
targetKey.Fingerprint = scanJob.fingerprint.ToDTO()
targetKey.Timestamp = indexable.EncodeTime(operation.StartsAt())
rawKey, _ := coding.NewProtocolBufferEncoder(targetKey).Encode()

View File

@ -14,6 +14,7 @@
package metric
import (
"fmt"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/utility/test"
"io/ioutil"
@ -22,6 +23,24 @@ import (
"time"
)
func sampleIncrement(from, to time.Time, interval time.Duration, m model.Metric) (v []model.Sample) {
var (
i model.SampleValue = 0
)
for from.Before(to) {
v = append(v, model.Sample{
Metric: m,
Value: i,
Timestamp: from,
})
from = from.Add(interval)
}
return
}
func testMakeView(t test.Tester) {
type in struct {
atTime []getValuesAtTimeOp
@ -104,28 +123,132 @@ func testMakeView(t test.Tester) {
},
},
},
{
data: []model.Sample{
{
Metric: metric,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Value: 1,
Timestamp: instant.Add(time.Second),
},
{
Metric: metric,
Value: 2,
Timestamp: instant.Add(time.Second * 2),
},
},
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second),
},
},
},
out: out{
atTime: [][]model.SamplePair{
{
{
Timestamp: instant.Add(time.Second),
Value: 1,
},
{
Timestamp: instant.Add(time.Second * 2),
Value: 2,
},
},
},
},
},
{
data: []model.Sample{
{
Metric: metric,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Value: 1,
Timestamp: instant.Add(time.Second * 2),
},
{
Metric: metric,
Value: 2,
Timestamp: instant.Add(time.Second * 4),
},
},
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second),
},
},
},
out: out{
atTime: [][]model.SamplePair{
{
{
Timestamp: instant,
Value: 0,
},
{
Timestamp: instant.Add(time.Second * 2),
Value: 1,
},
},
},
},
},
{
data: []model.Sample{
{
Metric: metric,
Value: 0,
Timestamp: instant,
},
{
Metric: metric,
Value: 1,
Timestamp: instant.Add(time.Second * 2),
},
{
Metric: metric,
Value: 2,
Timestamp: instant.Add(time.Second * 4),
},
},
in: in{
atTime: []getValuesAtTimeOp{
{
time: instant.Add(time.Second * 3),
},
},
},
out: out{
atTime: [][]model.SamplePair{
{
{
Timestamp: instant.Add(time.Second * 2),
Value: 1,
},
{
Timestamp: instant.Add(time.Second * 4),
Value: 2,
},
},
},
},
},
// {
// data: []model.Sample{
// {
// Metric: metric,
// Value: 0,
// Timestamp: instant,
// },
// {
// Metric: metric,
// Value: 1,
// Timestamp: instant.Add(time.Second),
// },
// {
// Metric: metric,
// Value: 2,
// Timestamp: instant.Add(time.Second * 2),
// },
// },
// data: sampleIncrement(instant, instant.Add(14*24*time.Hour), time.Second, metric),
// in: in{
// atTime: []getValuesAtTimeOp{
// {
// time: instant.Add(time.Second),
// time: instant.Add(time.Second * 3),
// },
// },
// },
@ -133,11 +256,11 @@ func testMakeView(t test.Tester) {
// atTime: [][]model.SamplePair{
// {
// {
// Timestamp: instant.Add(time.Second),
// Timestamp: instant.Add(time.Second * 2),
// Value: 1,
// },
// {
// Timestamp: instant.Add(time.Second * 2),
// Timestamp: instant.Add(time.Second * 4),
// Value: 2,
// },
// },
@ -150,7 +273,7 @@ func testMakeView(t test.Tester) {
for i, scenario := range scenarios {
var (
temporary, _ = ioutil.TempDir("", "test_make_view")
tiered = NewTieredStorage(100, 100, 100, time.Second, time.Second, 0*time.Second, temporary)
tiered = NewTieredStorage(5000000, 250, 1000, 5*time.Second, 15*time.Second, 0*time.Second, temporary)
)
if tiered == nil {
@ -171,7 +294,9 @@ func testMakeView(t test.Tester) {
}
}
start := time.Now()
tiered.Flush()
fmt.Printf("Took %s to flush %d items...\n", time.Since(start), len(scenario.data))
requestBuilder := NewViewRequestBuilder()