Separate iterators by offset

Add test that exposes the problem.
This commit is contained in:
beorn7 2016-03-02 13:45:17 +01:00
parent 059295332f
commit 2581648f70
4 changed files with 12 additions and 10 deletions

View File

@ -125,8 +125,10 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
}() }()
// Preload all analyzed ranges. // Preload all analyzed ranges.
iters := map[model.Fingerprint]local.SeriesIterator{} iters := map[time.Duration]map[model.Fingerprint]local.SeriesIterator{}
for offset, pt := range a.offsetPreloadTimes { for offset, pt := range a.offsetPreloadTimes {
itersForDuration := map[model.Fingerprint]local.SeriesIterator{}
iters[offset] = itersForDuration
start := a.Start.Add(-offset) start := a.Start.Add(-offset)
end := a.End.Add(-offset) end := a.End.Add(-offset)
for fp, rangeDuration := range pt.ranges { for fp, rangeDuration := range pt.ranges {
@ -148,7 +150,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
iters[fp] = iter itersForDuration[fp] = iter
} }
for fp := range pt.instants { for fp := range pt.instants {
if err = contextDone(ctx, env); err != nil { if err = contextDone(ctx, env); err != nil {
@ -161,7 +163,7 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
iters[fp] = iter itersForDuration[fp] = iter
} }
} }
@ -170,11 +172,11 @@ func (a *Analyzer) Prepare(ctx context.Context) (local.Preloader, error) {
switch n := node.(type) { switch n := node.(type) {
case *VectorSelector: case *VectorSelector:
for fp := range n.metrics { for fp := range n.metrics {
n.iterators[fp] = iters[fp] n.iterators[fp] = iters[n.Offset][fp]
} }
case *MatrixSelector: case *MatrixSelector:
for fp := range n.metrics { for fp := range n.metrics {
n.iterators[fp] = iters[fp] n.iterators[fp] = iters[n.Offset][fp]
} }
} }
return true return true

View File

@ -54,7 +54,7 @@ const (
) )
// chunkDesc contains meta-data for a chunk. Pay special attention to the // chunkDesc contains meta-data for a chunk. Pay special attention to the
// documented requirements for calling its method concurrently (WRT pinning and // documented requirements for calling its methods concurrently (WRT pinning and
// locking). The doc comments spell out the requirements for each method, but // locking). The doc comments spell out the requirements for each method, but
// here is an overview and general explanation: // here is an overview and general explanation:
// //
@ -71,7 +71,7 @@ const (
// or creation) or by locking the fingerprint of the series the chunkDesc // or creation) or by locking the fingerprint of the series the chunkDesc
// belongs to. The affected methods are: add, maybePopulateLastTime, setChunk. // belongs to. The affected methods are: add, maybePopulateLastTime, setChunk.
// //
// Finally, there is the special cases firstTime and lastTime. lastTime requires // Finally, there are the special cases firstTime and lastTime. lastTime requires
// to have locked the fingerprint of the series but the chunk does not need to // to have locked the fingerprint of the series but the chunk does not need to
// be pinned. That's because the chunkLastTime field in chunkDesc gets populated // be pinned. That's because the chunkLastTime field in chunkDesc gets populated
// upon completion of the chunk (when it is still pinned, and which happens // upon completion of the chunk (when it is still pinned, and which happens
@ -292,7 +292,7 @@ type chunkIterator interface {
// Gets the last sample value in the chunk. // Gets the last sample value in the chunk.
lastSampleValue() model.SampleValue lastSampleValue() model.SampleValue
// Gets the value that is closest before the given time. In case a value // Gets the value that is closest before the given time. In case a value
// exist at precisely the given time, that value is returned. If no // exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest // applicable value exists, a SamplePair with timestamp model.Earliest
// and value 0.0 is returned. // and value 0.0 is returned.
valueAtOrBeforeTime(model.Time) model.SamplePair valueAtOrBeforeTime(model.Time) model.SamplePair

View File

@ -72,7 +72,7 @@ type Storage interface {
// of the series prior the modification. // of the series prior the modification.
type SeriesIterator interface { type SeriesIterator interface {
// Gets the value that is closest before the given time. In case a value // Gets the value that is closest before the given time. In case a value
// exist at precisely the given time, that value is returned. If no // exists at precisely the given time, that value is returned. If no
// applicable value exists, a SamplePair with timestamp model.Earliest // applicable value exists, a SamplePair with timestamp model.Earliest
// and value 0.0 is returned. // and value 0.0 is returned.
ValueAtOrBeforeTime(model.Time) model.SamplePair ValueAtOrBeforeTime(model.Time) model.SamplePair

View File

@ -28,7 +28,7 @@ func (p *memorySeriesPreloader) PreloadRange(
) (SeriesIterator, error) { ) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through) cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
if err != nil { if err != nil {
return iter, err return nil, err
} }
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...) p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil return iter, nil