diff --git a/storage/local/chunk.go b/storage/local/chunk.go index e1bac7f8bc..243d622d6c 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -124,7 +124,7 @@ func (cd *chunkDesc) lastTime() clientmodel.Timestamp { if cd.chunk == nil { return cd.chunkLastTime } - return cd.chunk.lastTime() + return cd.chunk.newIterator().getLastTimestamp() } func (cd *chunkDesc) isEvicted() bool { @@ -169,7 +169,7 @@ func (cd *chunkDesc) maybeEvict() bool { return false } cd.chunkFirstTime = cd.chunk.firstTime() - cd.chunkLastTime = cd.chunk.lastTime() + cd.chunkLastTime = cd.chunk.newIterator().getLastTimestamp() cd.chunk = nil chunkOps.WithLabelValues(evict).Inc() atomic.AddInt64(&numMemChunks, -1) @@ -188,23 +188,27 @@ type chunk interface { add(sample *metric.SamplePair) []chunk clone() chunk firstTime() clientmodel.Timestamp - lastTime() clientmodel.Timestamp newIterator() chunkIterator marshal(io.Writer) error unmarshal(io.Reader) error unmarshalFromBuf([]byte) encoding() chunkEncoding - // values returns a channel, from which all sample values in the chunk - // can be received in order. The channel is closed after the last - // one. It is generally not safe to mutate the chunk while the channel - // is still open. - values() <-chan *metric.SamplePair } // A chunkIterator enables efficient access to the content of a chunk. It is // generally not safe to use a chunkIterator concurrently with or after chunk // mutation. type chunkIterator interface { + // length returns the number of samples in the chunk. + length() int + // Gets the timestamp of the n-th sample in the chunk. + getTimestampAtIndex(int) clientmodel.Timestamp + // Gets the last timestamp in the chunk. + getLastTimestamp() clientmodel.Timestamp + // Gets the sample value of the n-th sample in the chunk. + getSampleValueAtIndex(int) clientmodel.SampleValue + // Gets the last sample value in the chunk. + getLastSampleValue() clientmodel.SampleValue // Gets the two values that are immediately adjacent to a given time. In // case a value exist at precisely the given time, only that single // value is returned. Only the first or last value is returned (as a @@ -216,6 +220,11 @@ type chunkIterator interface { // Whether a given timestamp is contained between first and last value // in the chunk. contains(clientmodel.Timestamp) bool + // values returns a channel, from which all sample values in the chunk + // can be received in order. The channel is closed after the last + // one. It is generally not safe to mutate the chunk while the channel + // is still open. + values() <-chan *metric.SamplePair } func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { @@ -223,7 +232,7 @@ func transcodeAndAdd(dst chunk, src chunk, s *metric.SamplePair) []chunk { head := dst body := []chunk{} - for v := range src.values() { + for v := range src.newIterator().values() { newChunks := head.add(v) body = append(body, newChunks[:len(newChunks)-1]...) head = newChunks[len(newChunks)-1] diff --git a/storage/local/delta.go b/storage/local/delta.go index 1be169febd..76cfde7fcc 100644 --- a/storage/local/delta.go +++ b/storage/local/delta.go @@ -188,18 +188,19 @@ func (c deltaEncodedChunk) clone() chunk { // firstTime implements chunk. func (c deltaEncodedChunk) firstTime() clientmodel.Timestamp { - return c.valueAtIndex(0).Timestamp -} - -// lastTime implements chunk. -func (c deltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp + return c.baseTime() } // newIterator implements chunk. func (c *deltaEncodedChunk) newIterator() chunkIterator { return &deltaEncodedChunkIterator{ - chunk: c, + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseV: c.baseValue(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), } } @@ -237,19 +238,6 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) { *c = (*c)[:binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])] } -// values implements chunk. -func (c deltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - // encoding implements chunk. func (c deltaEncodedChunk) encoding() chunkEncoding { return delta } @@ -284,106 +272,157 @@ func (c deltaEncodedChunk) len() int { return (len(c) - deltaHeaderBytes) / c.sampleSize() } -func (c deltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { - offset := deltaHeaderBytes + idx*c.sampleSize() - - var ts clientmodel.Timestamp - switch c.timeBytes() { - case d1: - ts = c.baseTime() + clientmodel.Timestamp(uint8(c[offset])) - case d2: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint16(c[offset:])) - case d4: - ts = c.baseTime() + clientmodel.Timestamp(binary.LittleEndian.Uint32(c[offset:])) - case d8: - // Take absolute value for d8. - ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) - default: - panic("Invalid number of bytes for time delta") - } - - offset += int(c.timeBytes()) - - var v clientmodel.SampleValue - if c.isInt() { - switch c.valueBytes() { - case d0: - v = c.baseValue() - case d1: - v = c.baseValue() + clientmodel.SampleValue(int8(c[offset])) - case d2: - v = c.baseValue() + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - v = c.baseValue() + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) - // No d8 for ints. - default: - panic("Invalid number of bytes for integer delta") - } - } else { - switch c.valueBytes() { - case d4: - v = c.baseValue() + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) - default: - panic("Invalid number of bytes for floating point delta") - } - } - return &metric.SamplePair{ - Timestamp: ts, - Value: v, - } -} - // deltaEncodedChunkIterator implements chunkIterator. type deltaEncodedChunkIterator struct { - chunk *deltaEncodedChunk - // TODO: add more fields here to keep track of last position. + c deltaEncodedChunk + len int + baseT clientmodel.Timestamp + baseV clientmodel.SampleValue + tBytes, vBytes deltaBytes + isInt bool } +// length implements chunkIterator. +func (it *deltaEncodedChunkIterator) length() int { return it.len } + // getValueAtTime implements chunkIterator. func (it *deltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { - i := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(t) + i := sort.Search(it.len, func(i int) bool { + return !it.getTimestampAtIndex(i).Before(t) }) switch i { case 0: - return metric.Values{*it.chunk.valueAtIndex(0)} - case it.chunk.len(): - return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} + return metric.Values{metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(0), + Value: it.getSampleValueAtIndex(0), + }} + case it.len: + return metric.Values{metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(it.len - 1), + Value: it.getSampleValueAtIndex(it.len - 1), + }} default: - v := it.chunk.valueAtIndex(i) - if v.Timestamp.Equal(t) { - return metric.Values{*v} + ts := it.getTimestampAtIndex(i) + if ts.Equal(t) { + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.getSampleValueAtIndex(i), + }} + } + return metric.Values{ + metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i - 1), + Value: it.getSampleValueAtIndex(i - 1), + }, + metric.SamplePair{ + Timestamp: ts, + Value: it.getSampleValueAtIndex(i), + }, } - return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } } // getRangeValues implements chunkIterator. func (it *deltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { - oldest := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) + oldest := sort.Search(it.len, func(i int) bool { + return !it.getTimestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.chunk.len(), func(i int) bool { - return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive) + newest := sort.Search(it.len, func(i int) bool { + return it.getTimestampAtIndex(i).After(in.NewestInclusive) }) - if oldest == it.chunk.len() { + if oldest == it.len { return nil } result := make(metric.Values, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, *it.chunk.valueAtIndex(i)) + result = append(result, metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i), + Value: it.getSampleValueAtIndex(i), + }) } return result } // contains implements chunkIterator. func (it *deltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { - return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) + return !t.Before(it.baseT) && !t.After(it.getTimestampAtIndex(it.len-1)) +} + +// values implements chunkIterator. +func (it *deltaEncodedChunkIterator) values() <-chan *metric.SamplePair { + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < it.len; i++ { + valuesChan <- &metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i), + Value: it.getSampleValueAtIndex(i), + } + } + close(valuesChan) + }() + return valuesChan +} + +// getTimestampAtIndex implements chunkIterator. +func (it *deltaEncodedChunkIterator) getTimestampAtIndex(idx int) clientmodel.Timestamp { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + clientmodel.Timestamp(uint8(it.c[offset])) + case d2: + return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint16(it.c[offset:])) + case d4: + return it.baseT + clientmodel.Timestamp(binary.LittleEndian.Uint32(it.c[offset:])) + case d8: + // Take absolute value for d8. + return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) + default: + panic("Invalid number of bytes for time delta") + } +} + +// getLastTimestamp implements chunkIterator. +func (it *deltaEncodedChunkIterator) getLastTimestamp() clientmodel.Timestamp { + return it.getTimestampAtIndex(it.len - 1) +} + +// getSampleValueAtIndex implements chunkIterator. +func (it *deltaEncodedChunkIterator) getSampleValueAtIndex(idx int) clientmodel.SampleValue { + offset := deltaHeaderBytes + idx*int(it.tBytes+it.vBytes) + int(it.tBytes) + + if it.isInt { + switch it.vBytes { + case d0: + return it.baseV + case d1: + return it.baseV + clientmodel.SampleValue(int8(it.c[offset])) + case d2: + return it.baseV + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseV + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + // No d8 for ints. + default: + panic("Invalid number of bytes for integer delta") + } + } else { + switch it.vBytes { + case d4: + return it.baseV + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + default: + panic("Invalid number of bytes for floating point delta") + } + } +} + +// getLastSampleValue implements chunkIterator. +func (it *deltaEncodedChunkIterator) getLastSampleValue() clientmodel.SampleValue { + return it.getSampleValueAtIndex(it.len - 1) } diff --git a/storage/local/doubledelta.go b/storage/local/doubledelta.go index dcfd155ed2..ff6cda7f0e 100644 --- a/storage/local/doubledelta.go +++ b/storage/local/doubledelta.go @@ -199,15 +199,18 @@ func (c doubleDeltaEncodedChunk) firstTime() clientmodel.Timestamp { return c.baseTime() } -// lastTime implements chunk. -func (c doubleDeltaEncodedChunk) lastTime() clientmodel.Timestamp { - return c.valueAtIndex(c.len() - 1).Timestamp -} - // newIterator implements chunk. func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator { return &doubleDeltaEncodedChunkIterator{ - chunk: c, + c: *c, + len: c.len(), + baseT: c.baseTime(), + baseΔT: c.baseTimeDelta(), + baseV: c.baseValue(), + baseΔV: c.baseValueDelta(), + tBytes: c.timeBytes(), + vBytes: c.valueBytes(), + isInt: c.isInt(), } } @@ -245,19 +248,6 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) { *c = (*c)[:binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])] } -// values implements chunk. -func (c doubleDeltaEncodedChunk) values() <-chan *metric.SamplePair { - n := c.len() - valuesChan := make(chan *metric.SamplePair) - go func() { - for i := 0; i < n; i++ { - valuesChan <- c.valueAtIndex(i) - } - close(valuesChan) - }() - return valuesChan -} - // encoding implements chunk. func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta } @@ -280,6 +270,9 @@ func (c doubleDeltaEncodedChunk) baseValue() clientmodel.SampleValue { } func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp { + if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 { + return 0 + } return clientmodel.Timestamp( binary.LittleEndian.Uint64( c[doubleDeltaHeaderBaseTimeDeltaOffset:], @@ -288,6 +281,9 @@ func (c doubleDeltaEncodedChunk) baseTimeDelta() clientmodel.Timestamp { } func (c doubleDeltaEncodedChunk) baseValueDelta() clientmodel.SampleValue { + if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 { + return 0 + } return clientmodel.SampleValue( math.Float64frombits( binary.LittleEndian.Uint64( @@ -387,120 +383,56 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s *metric.SamplePair, tb, vb de return []chunk{&c} } -func (c doubleDeltaEncodedChunk) valueAtIndex(idx int) *metric.SamplePair { - if idx == 0 { - return &metric.SamplePair{ - Timestamp: c.baseTime(), - Value: c.baseValue(), - } - } - if idx == 1 { - // If time and/or value bytes are at d8, the time and value is - // saved directly rather than as a difference. - timestamp := c.baseTimeDelta() - if c.timeBytes() < d8 { - timestamp += c.baseTime() - } - value := c.baseValueDelta() - if c.valueBytes() < d8 { - value += c.baseValue() - } - return &metric.SamplePair{ - Timestamp: timestamp, - Value: value, - } - } - offset := doubleDeltaHeaderBytes + (idx-2)*c.sampleSize() - - var ts clientmodel.Timestamp - switch c.timeBytes() { - case d1: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int8(c[offset])) - case d2: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - ts = c.baseTime() + - clientmodel.Timestamp(idx)*c.baseTimeDelta() + - clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - ts = clientmodel.Timestamp(binary.LittleEndian.Uint64(c[offset:])) - default: - panic("Invalid number of bytes for time delta") - } - - offset += int(c.timeBytes()) - - var v clientmodel.SampleValue - if c.isInt() { - switch c.valueBytes() { - case d0: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() - case d1: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int8(c[offset])) - case d2: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(c[offset:]))) - case d4: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(c[offset:]))) - // No d8 for ints. - default: - panic("Invalid number of bytes for integer delta") - } - } else { - switch c.valueBytes() { - case d4: - v = c.baseValue() + - clientmodel.SampleValue(idx)*c.baseValueDelta() + - clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(c[offset:]))) - case d8: - // Take absolute value for d8. - v = clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[offset:]))) - default: - panic("Invalid number of bytes for floating point delta") - } - } - return &metric.SamplePair{ - Timestamp: ts, - Value: v, - } -} - // doubleDeltaEncodedChunkIterator implements chunkIterator. type doubleDeltaEncodedChunkIterator struct { - chunk *doubleDeltaEncodedChunk - // TODO(beorn7): add more fields here to keep track of last position. + c doubleDeltaEncodedChunk + len int + baseT, baseΔT clientmodel.Timestamp + baseV, baseΔV clientmodel.SampleValue + tBytes, vBytes deltaBytes + isInt bool } +// length implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) length() int { return it.len } + // getValueAtTime implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestamp) metric.Values { // TODO(beorn7): Implement in a more efficient way making use of the // state of the iterator and internals of the doubleDeltaChunk. - i := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(t) + i := sort.Search(it.len, func(i int) bool { + return !it.getTimestampAtIndex(i).Before(t) }) switch i { case 0: - return metric.Values{*it.chunk.valueAtIndex(0)} - case it.chunk.len(): - return metric.Values{*it.chunk.valueAtIndex(it.chunk.len() - 1)} + return metric.Values{metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(0), + Value: it.getSampleValueAtIndex(0), + }} + case it.len: + return metric.Values{metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(it.len - 1), + Value: it.getSampleValueAtIndex(it.len - 1), + }} default: - v := it.chunk.valueAtIndex(i) - if v.Timestamp.Equal(t) { - return metric.Values{*v} + ts := it.getTimestampAtIndex(i) + if ts.Equal(t) { + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.getSampleValueAtIndex(i), + }} + } + return metric.Values{ + metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i - 1), + Value: it.getSampleValueAtIndex(i - 1), + }, + metric.SamplePair{ + Timestamp: ts, + Value: it.getSampleValueAtIndex(i), + }, } - return metric.Values{*it.chunk.valueAtIndex(i - 1), *v} } } @@ -508,26 +440,143 @@ func (it *doubleDeltaEncodedChunkIterator) getValueAtTime(t clientmodel.Timestam func (it *doubleDeltaEncodedChunkIterator) getRangeValues(in metric.Interval) metric.Values { // TODO(beorn7): Implement in a more efficient way making use of the // state of the iterator and internals of the doubleDeltaChunk. - oldest := sort.Search(it.chunk.len(), func(i int) bool { - return !it.chunk.valueAtIndex(i).Timestamp.Before(in.OldestInclusive) + oldest := sort.Search(it.len, func(i int) bool { + return !it.getTimestampAtIndex(i).Before(in.OldestInclusive) }) - newest := sort.Search(it.chunk.len(), func(i int) bool { - return it.chunk.valueAtIndex(i).Timestamp.After(in.NewestInclusive) + newest := sort.Search(it.len, func(i int) bool { + return it.getTimestampAtIndex(i).After(in.NewestInclusive) }) - if oldest == it.chunk.len() { + if oldest == it.len { return nil } result := make(metric.Values, 0, newest-oldest) for i := oldest; i < newest; i++ { - result = append(result, *it.chunk.valueAtIndex(i)) + result = append(result, metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i), + Value: it.getSampleValueAtIndex(i), + }) } return result } // contains implements chunkIterator. func (it *doubleDeltaEncodedChunkIterator) contains(t clientmodel.Timestamp) bool { - return !t.Before(it.chunk.firstTime()) && !t.After(it.chunk.lastTime()) + return !t.Before(it.baseT) && !t.After(it.getTimestampAtIndex(it.len-1)) +} + +// values implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) values() <-chan *metric.SamplePair { + valuesChan := make(chan *metric.SamplePair) + go func() { + for i := 0; i < it.len; i++ { + valuesChan <- &metric.SamplePair{ + Timestamp: it.getTimestampAtIndex(i), + Value: it.getSampleValueAtIndex(i), + } + } + close(valuesChan) + }() + return valuesChan +} + +// getTimestampAtIndex implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getTimestampAtIndex(idx int) clientmodel.Timestamp { + if idx == 0 { + return it.baseT + } + if idx == 1 { + // If time bytes are at d8, the time is saved directly rather + // than as a difference. + if it.tBytes == d8 { + return it.baseΔT + } + return it.baseT + it.baseΔT + } + + offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + + switch it.tBytes { + case d1: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int8(it.c[offset])) + case d2: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseT + + clientmodel.Timestamp(idx)*it.baseΔT + + clientmodel.Timestamp(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.Timestamp(binary.LittleEndian.Uint64(it.c[offset:])) + default: + panic("Invalid number of bytes for time delta") + } +} + +// getLastTimestamp implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getLastTimestamp() clientmodel.Timestamp { + return it.getTimestampAtIndex(it.len - 1) +} + +// getSampleValueAtIndex implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getSampleValueAtIndex(idx int) clientmodel.SampleValue { + if idx == 0 { + return it.baseV + } + if idx == 1 { + // If value bytes are at d8, the value is saved directly rather + // than as a difference. + if it.vBytes == d8 { + return it.baseΔV + } + return it.baseV + it.baseΔV + } + + offset := doubleDeltaHeaderBytes + (idx-2)*int(it.tBytes+it.vBytes) + int(it.tBytes) + + if it.isInt { + switch it.vBytes { + case d0: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + case d1: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int8(it.c[offset])) + case d2: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int16(binary.LittleEndian.Uint16(it.c[offset:]))) + case d4: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(int32(binary.LittleEndian.Uint32(it.c[offset:]))) + // No d8 for ints. + default: + panic("Invalid number of bytes for integer delta") + } + } else { + switch it.vBytes { + case d4: + return it.baseV + + clientmodel.SampleValue(idx)*it.baseΔV + + clientmodel.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(it.c[offset:]))) + case d8: + // Take absolute value for d8. + return clientmodel.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(it.c[offset:]))) + default: + panic("Invalid number of bytes for floating point delta") + } + } +} + +// getLastSampleValue implements chunkIterator. +func (it *doubleDeltaEncodedChunkIterator) getLastSampleValue() clientmodel.SampleValue { + return it.getSampleValueAtIndex(it.len - 1) } diff --git a/storage/local/persistence.go b/storage/local/persistence.go index 96d0adf08a..4b7f56718e 100644 --- a/storage/local/persistence.go +++ b/storage/local/persistence.go @@ -880,7 +880,7 @@ func (p *persistence) dropAndPersistChunks( // too old. If that's the case, the chunks in the series file // are all too old, too. i := 0 - for ; i < len(chunks) && chunks[i].lastTime().Before(beforeTime); i++ { + for ; i < len(chunks) && chunks[i].newIterator().getLastTimestamp().Before(beforeTime); i++ { } if i < len(chunks) { firstTimeNotDropped = chunks[i].firstTime() @@ -1567,8 +1567,14 @@ func chunkIndexForOffset(offset int64) (int, error) { func writeChunkHeader(w io.Writer, c chunk) error { header := make([]byte, chunkHeaderLen) header[chunkHeaderTypeOffset] = byte(c.encoding()) - binary.LittleEndian.PutUint64(header[chunkHeaderFirstTimeOffset:], uint64(c.firstTime())) - binary.LittleEndian.PutUint64(header[chunkHeaderLastTimeOffset:], uint64(c.lastTime())) + binary.LittleEndian.PutUint64( + header[chunkHeaderFirstTimeOffset:], + uint64(c.firstTime()), + ) + binary.LittleEndian.PutUint64( + header[chunkHeaderLastTimeOffset:], + uint64(c.newIterator().getLastTimestamp()), + ) _, err := w.Write(header) return err } diff --git a/storage/local/persistence_test.go b/storage/local/persistence_test.go index 00e36b792e..ce20458020 100644 --- a/storage/local/persistence_test.go +++ b/storage/local/persistence_test.go @@ -70,8 +70,8 @@ func buildTestChunks(encoding chunkEncoding) map[clientmodel.Fingerprint][]chunk } func chunksEqual(c1, c2 chunk) bool { - values2 := c2.values() - for v1 := range c1.values() { + values2 := c2.newIterator().values() + for v1 := range c1.newIterator().values() { v2 := <-values2 if !v1.Equal(v2) { return false diff --git a/storage/local/series.go b/storage/local/series.go index 419ae78bcb..2c733f2377 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -486,41 +486,57 @@ func (it *memorySeriesIterator) GetValueAtTime(t clientmodel.Timestamp) metric.V return it.chunkIt.getValueAtTime(t) } - it.chunkIt = nil - if len(it.chunks) == 0 { return nil } // Before or exactly on the first sample of the series. - if !t.After(it.chunks[0].firstTime()) { + it.chunkIt = it.chunks[0].newIterator() + ts := it.chunkIt.getTimestampAtIndex(0) + if !t.After(ts) { // return first value of first chunk - return it.chunks[0].newIterator().getValueAtTime(t) - } - // After or exactly on the last sample of the series. - if !t.Before(it.chunks[len(it.chunks)-1].lastTime()) { - // return last value of last chunk - return it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(t) + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.getSampleValueAtIndex(0), + }} } - // Find first chunk where lastTime() is after or equal to t. + // After or exactly on the last sample of the series. + it.chunkIt = it.chunks[len(it.chunks)-1].newIterator() + ts = it.chunkIt.getLastTimestamp() + if !t.Before(ts) { + // return last value of last chunk + return metric.Values{metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.getSampleValueAtIndex(it.chunkIt.length() - 1), + }} + } + + // Find last chunk where firstTime() is before or equal to t. + l := len(it.chunks) - 1 i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(t) + return !it.chunks[l-i].firstTime().After(t) }) if i == len(it.chunks) { panic("out of bounds") } - - if t.Before(it.chunks[i].firstTime()) { + it.chunkIt = it.chunks[l-i].newIterator() + ts = it.chunkIt.getLastTimestamp() + if t.After(ts) { // We ended up between two chunks. + sp1 := metric.SamplePair{ + Timestamp: ts, + Value: it.chunkIt.getSampleValueAtIndex(it.chunkIt.length() - 1), + } + it.chunkIt = it.chunks[l-i+1].newIterator() return metric.Values{ - it.chunks[i-1].newIterator().getValueAtTime(t)[0], - it.chunks[i].newIterator().getValueAtTime(t)[0], + sp1, + metric.SamplePair{ + it.chunkIt.getTimestampAtIndex(0), + it.chunkIt.getSampleValueAtIndex(0), + }, } } - // We ended up in the middle of a chunk. We might stay there for a while, - // so save it as the current chunk iterator. - it.chunkIt = it.chunks[i].newIterator() return it.chunkIt.getValueAtTime(t) } @@ -529,26 +545,34 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val it.lock() defer it.unlock() - // Find the first relevant chunk. + // Find the first chunk for which the first sample is within the interval. i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(in.OldestInclusive) + return !it.chunks[i].firstTime().Before(in.OldestInclusive) }) + // Only now check the last timestamp of the previous chunk (which is + // fairly expensive). + if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) { + i-- + } + values := make(metric.Values, 0, 2) - for i, c := range it.chunks[i:] { - var chunkIt chunkIterator + for j, c := range it.chunks[i:] { if c.firstTime().After(in.NewestInclusive) { if len(values) == 1 { - // We found the first value before, but are now + // We found the first value before but are now // already past the last value. The value we // want must be the last value of the previous // chunk. So backtrack... - chunkIt = it.chunks[i-1].newIterator() - values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) + chunkIt := it.chunks[j-1].newIterator() + values = append(values, metric.SamplePair{ + Timestamp: chunkIt.getLastTimestamp(), + Value: chunkIt.getLastSampleValue(), + }) } break } + chunkIt := c.newIterator() if len(values) == 0 { - chunkIt = c.newIterator() firstValues := chunkIt.getValueAtTime(in.OldestInclusive) switch len(firstValues) { case 2: @@ -559,20 +583,18 @@ func (it *memorySeriesIterator) GetBoundaryValues(in metric.Interval) metric.Val panic("unexpected return from getValueAtTime") } } - if c.lastTime().After(in.NewestInclusive) { - if chunkIt == nil { - chunkIt = c.newIterator() - } + if chunkIt.getLastTimestamp().After(in.NewestInclusive) { values = append(values, chunkIt.getValueAtTime(in.NewestInclusive)[0]) break } } if len(values) == 1 { // We found exactly one value. In that case, add the most recent we know. - values = append( - values, - it.chunks[len(it.chunks)-1].newIterator().getValueAtTime(in.NewestInclusive)[0], - ) + chunkIt := it.chunks[len(it.chunks)-1].newIterator() + values = append(values, metric.SamplePair{ + Timestamp: chunkIt.getLastTimestamp(), + Value: chunkIt.getLastSampleValue(), + }) } if len(values) == 2 && values[0].Equal(&values[1]) { return values[:1] @@ -585,10 +607,17 @@ func (it *memorySeriesIterator) GetRangeValues(in metric.Interval) metric.Values it.lock() defer it.unlock() - // Find the first relevant chunk. + // Find the first chunk for which the first sample is within the interval. i := sort.Search(len(it.chunks), func(i int) bool { - return !it.chunks[i].lastTime().Before(in.OldestInclusive) + // TODO: Avoid the expensive newIterator().getLastTimestamp() call. + return !it.chunks[i].firstTime().Before(in.OldestInclusive) }) + // Only now check the last timestamp of the previous chunk (which is + // fairly expensive). + if i > 0 && !it.chunks[i-1].newIterator().getLastTimestamp().Before(in.OldestInclusive) { + i-- + } + values := metric.Values{} for _, c := range it.chunks[i:] { if c.firstTime().After(in.NewestInclusive) { diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index efb16d0a0e..43aec3a631 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -208,7 +208,7 @@ func testChunk(t *testing.T, encoding chunkEncoding) { if cd.isEvicted() { continue } - for sample := range cd.chunk.values() { + for sample := range cd.chunk.newIterator().values() { values = append(values, *sample) } }