prometheus/storage/local/gorilla.go

1102 lines
34 KiB
Go

// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package local
import (
"encoding/binary"
"fmt"
"io"
"math"
"github.com/prometheus/common/model"
)
// Gorilla chunk encoding is inspired by the following paper:
// Gorilla: A Fast, Scalable, In-Memory Time Series Database
// T. Pelkonen et al., Facebook Inc.
// http://www.vldb.org/pvldb/vol8/p1816-teller.pdf
// Note that there are significant differences in detail, some due to the way
// Prometheus chunks work, others to optimize for the Prometheus use-case.
//
// Layout of a 1024 byte gorilla chunk (big endian, wherever it matters):
// - first time (int64): 8 bytes bit 0000-0063
// - first value (float64): 8 bytes bit 0064-0127
// - last time (int64): 8 bytes bit 0128-0191
// - last value (float64): 8 bytes bit 0192-0255
// - first Δt (t1-t0, unsigned): 3 bytes bit 0256-0279
// - flags (so far just encoding, byte) 1 byte bit 0280-0287
// - bit offset for next sample 2 bytes bit 0288-0303
// - first Δv for value encoding 1, otherwise payload
// 4 bytes bit 0304-0335
// - payload 973 bytes bit 0336-8119
// The following only exists if the chunk is still open. Otherwise, it might be
// used by payload.
// - bit offset for current ΔΔt=0 count 2 bytes bit 8120-8135
// - last Δt 3 bytes bit 8136-8159
// - special bytes for value encoding 4 bytes bit 8160-8191
// - for encoding 1: last Δv 4 bytes bit 8160-8191
// - for encoding 2: count of
// - last leading zeros (1 byte) 1 byte bit 8160-8167
// - last significant bits (1 byte) 1 byte bit 8168-8175
//
// TIMESTAMP ENCODING
//
// The 1st timestamp is saved directly.
//
// The difference to the 2nd timestamp is saved as first Δt. 3 bytes is enough
// for about 4.5h. Since we close a chunk after sitting idle for 1h, this
// limitation has no practical consequences. Should, for whatever reason, a
// larger delta be required, the chunk would be closed and the new sample added
// to a new chunk.
//
// From the 3rd timestamp on, a double-delta (ΔΔt) is saved:
// (t_{n} - t_{n-1}) - (t_{n-2} - t_{n-1})
// To perform that operation, the last Δt is saved at the end of the chunk for
// as long the chunk is not closed yet (see above).
//
// Most of the times, ΔΔt is zero, even with the ms-precision of
// Prometheus. Therefore, we save a ΔΔt of zero as a leading '0' bit followed by
// 7 bits counting the number of consecutive ΔΔt==0 (the count is offset by -1,
// so the range of 0 to 127 represents 1 to 128 repetitions).
//
// If ΔΔt != 0, we essentially apply the Gorilla scheme verbatim (cf. section
// 4.1.1 in the paper), but with different bit buckets as Prometheus uses ms
// rather than s, and the default scrape interval is 1m rather than 4m). In
// particular:
//
// - If ΔΔt is between [-32,31], store '10' followed by a 6 bit value. This is
// for minor irregularities in the scrape interval.
//
// - If ΔΔt is between [-65536,65535], store '110' followed by a 17 bit
// value. This will typically happen if a scrape is missed completely.
//
// - If ΔΔt is betwees [-4194304,4194303], store '111' followed by a 23 bit
// value. This spans more than 1h, which is usually enough as we close a
// chunk anyway if it doesn't receive any sample in 1h.
//
// - Should we nevertheless encounter a larger ΔΔt, we simply close the chunk
// and overflow into a new chunk.
//
// VALUE ENCODING
//
// Value encoding can change and is determined by the two least significant bits
// of the 'flags' byte at bit position 280. (The remaining bits could be used
// for other flags in the future.) The encoding can be changed without
// transcoding upon adding the 3rd sample. After that, an encoding change
// results either in transcoding or in closing the chunk and overflowing into a
// new chunk.
//
// The 1st sample value is always saved directly. The 2nd sample value is saved
// as the last value. Upon saving the 3rd value, an encoding is chosen, and the
// chunk is prepared accordingly.
//
// The following value encodings exist (with their value in the flags byte):
//
// 0: "Zero encoding".
//
// In many time series, the value simply stays constant over a long time
// (e.g. the "up" time series). In that case, all sample values are determined
// by the 1st value, and no further value encoding is happening at all. The
// payload consists entirely of timestamps.
//
// 1: Integer double-delta encoding.
//
// Many Prometheus metrics are integer counters and change in a quite regular
// fashion, similar to timestamps. Thus, the same double-delta encoding can be
// applied. This encoding works like the timestamp encoding described above, but
// with different bit buckets and without counting of repeated ΔΔv=0. The case
// of ΔΔv=0 is represented by a single '0' bit for each occurrence. The first Δv
// is saved as an int32 at bit position 288. The most recent Δv is saved as an
// int32 at the end of the chunk (see above). If Δv cannot be represented as a
// 32 bit signed integer, no integer double-delta encoding can be applied.
//
// Bit buckets (lead-in bytes followed by (signed) value bits):
// - '0': 0 bit
// - '10': 6 bit
// - '110': 13 bit
// - '1110': 20 bit
// - '1111': 33 bit
// Since Δv is restricted to 32 bit, 33 bit are always enough for ΔΔv.
//
// 2: XOR encoding.
//
// This follows verbatim the Gorilla value encoding (cf. section 4.1.2 of the
// paper). The last count of leading zeros and the last count of meaningful bits
// in the XOR value is saved at the end of the chunk for as long as the chunk is
// not closed yet (see above). Note, though, that the number of significant bits
// is saved as (count-1), i.e. a saved value of 0 means 1 significant bit, a
// saved value of 1 means 2, and so on. Also, we save the numbers of leading
// zeros and significant bits anew if they drop a lot. Otherwise, you can easily
// be locked in with a high number of significant bits.
//
// 3: Direct encoding.
//
// If the sample values are just random, it is most efficient to save sample
// values directly as float64.
//
// ZIPPING TIMESTAMPS AND VALUES TOGETHER
//
// Usually, encoded timestamps and encoded values simply alternate. There are
// two exceptions:
//
// (1) With the "zero encoding" for values, the payload only contains
// timestamps.
//
// (2) In a consecutive row of up to 128 ΔΔt=0 repeats, the count of timestamps
// determines how many sample values will follow directly after another.
const (
gorillaMinLength = 128
gorillaMaxLength = 8192
// Useful byte offsets.
gorillaFirstTimeOffset = 0
gorillaFirstValueOffset = 8
gorillaLastTimeOffset = 16
gorillaLastValueOffset = 24
gorillaFirstTimeDeltaOffset = 32
gorillaFlagOffset = 35
gorillaNextSampleBitOffsetOffset = 36
gorillaFirstValueDeltaOffset = 38
// The following are in the "footer" and only usable if the chunk is
// still open.
gorillaCountOffsetBitOffset = chunkLen - 9
gorillaLastTimeDeltaOffset = chunkLen - 7
gorillaLastValueDeltaOffset = chunkLen - 4
gorillaLastLeadingZerosCountOffset = chunkLen - 4
gorillaLastSignificantBitsCountOffset = chunkLen - 3
gorillaFirstSampleBitOffset uint16 = 0 // Symbolic, don't really read or write here.
gorillaSecondSampleBitOffset uint16 = 1 // Symbolic, don't really read or write here.
// gorillaThirdSampleBitOffset is a bit special. Depending on the encoding, there can
// be various things at this offset. It's most of the time symbolic, but in the best
// case (zero encoding for values), it will be the real offset for the 3rd sample.
gorillaThirdSampleBitOffset uint16 = gorillaFirstValueDeltaOffset * 8
// If the bit offset for the next sample is above this threshold, no new
// samples can be added to the chunk (because the payload has already
// reached the footer). The chunk is considered closed.
gorillaNextSampleBitOffsetThreshold = 8 * gorillaCountOffsetBitOffset
gorillaMaxTimeDelta = 1 << 24 // What fits into a 3-byte timestamp.
)
type gorillaValueEncoding byte
const (
gorillaZeroEncoding gorillaValueEncoding = iota
gorillaIntDoubleDeltaEncoding
gorillaXOREncoding
gorillaDirectEncoding
)
// gorillaWorstCaseBitsPerSample provides the worst-case number of bits needed
// per sample with the various value encodings. The counts already include the
// up to 27 bits taken by a timestamp.
var gorillaWorstCaseBitsPerSample = map[gorillaValueEncoding]int{
gorillaZeroEncoding: 27 + 0,
gorillaIntDoubleDeltaEncoding: 27 + 38,
gorillaXOREncoding: 27 + 13 + 64,
gorillaDirectEncoding: 27 + 64,
}
// gorillaChunk implements the chunk interface.
type gorillaChunk []byte
// newGorillaChunk returns a newly allocated gorillaChunk. For simplicity, all
// Gorilla chunks must have the length as determined by the chunkLen constant.
func newGorillaChunk(enc gorillaValueEncoding) *gorillaChunk {
if chunkLen < gorillaMinLength || chunkLen > gorillaMaxLength {
panic(fmt.Errorf(
"invalid chunk length of %d bytes , need at least %d bytes and at most %d bytes",
chunkLen, gorillaMinLength, gorillaMaxLength,
))
}
if enc > gorillaDirectEncoding {
panic(fmt.Errorf("unknown Gorilla value encoding: %v", enc))
}
c := make(gorillaChunk, chunkLen)
c[gorillaFlagOffset] = byte(enc)
return &c
}
// add implements chunk.
func (c *gorillaChunk) add(s model.SamplePair) ([]chunk, error) {
offset := c.nextSampleOffset()
switch {
case offset > gorillaNextSampleBitOffsetThreshold:
return addToOverflowChunk(c, s)
case offset == gorillaFirstSampleBitOffset:
return c.addFirstSample(s), nil
case offset == gorillaSecondSampleBitOffset:
return c.addSecondSample(s)
}
return c.addLaterSample(s, offset)
}
// clone implements chunk.
func (c gorillaChunk) clone() chunk {
clone := make(gorillaChunk, len(c))
copy(clone, c)
return &clone
}
// newIterator implements chunk.
func (c gorillaChunk) newIterator() chunkIterator {
return newGorillaChunkIterator(c)
}
// marshal implements chunk.
func (c gorillaChunk) marshal(w io.Writer) error {
n, err := w.Write(c)
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
}
return nil
}
// marshalToBuf implements chunk.
func (c gorillaChunk) marshalToBuf(buf []byte) error {
n := copy(buf, c)
if n != len(c) {
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
}
return nil
}
// unmarshal implements chunk.
func (c gorillaChunk) unmarshal(r io.Reader) error {
_, err := io.ReadFull(r, c)
return err
}
// unmarshalFromBuf implements chunk.
func (c gorillaChunk) unmarshalFromBuf(buf []byte) error {
if copied := copy(c, buf); copied != cap(c) {
return fmt.Errorf("insufficient bytes copied from buffer during unmarshaling, want %d, got %d", cap(c), copied)
}
return nil
}
// encoding implements chunk.
func (c gorillaChunk) encoding() chunkEncoding { return gorilla }
// firstTime implements chunk.
func (c gorillaChunk) firstTime() model.Time {
return model.Time(
binary.BigEndian.Uint64(
c[gorillaFirstTimeOffset:],
),
)
}
func (c gorillaChunk) firstValue() model.SampleValue {
return model.SampleValue(
math.Float64frombits(
binary.BigEndian.Uint64(
c[gorillaFirstValueOffset:],
),
),
)
}
func (c gorillaChunk) lastTime() model.Time {
return model.Time(
binary.BigEndian.Uint64(
c[gorillaLastTimeOffset:],
),
)
}
func (c gorillaChunk) lastValue() model.SampleValue {
return model.SampleValue(
math.Float64frombits(
binary.BigEndian.Uint64(
c[gorillaLastValueOffset:],
),
),
)
}
func (c gorillaChunk) firstTimeDelta() model.Time {
// Only the first 3 bytes are actually the timestamp, so get rid of the
// last one by bitshifting.
return model.Time(c[gorillaFirstTimeDeltaOffset+2]) |
model.Time(c[gorillaFirstTimeDeltaOffset+1])<<8 |
model.Time(c[gorillaFirstTimeDeltaOffset])<<16
}
// firstValueDelta returns an undefined result if the encoding type is not 1.
func (c gorillaChunk) firstValueDelta() int32 {
return int32(binary.BigEndian.Uint32(c[gorillaFirstValueDeltaOffset:]))
}
// lastTimeDelta returns an undefined result if the chunk is closed already.
func (c gorillaChunk) lastTimeDelta() model.Time {
return model.Time(c[gorillaLastTimeDeltaOffset+2]) |
model.Time(c[gorillaLastTimeDeltaOffset+1])<<8 |
model.Time(c[gorillaLastTimeDeltaOffset])<<16
}
// setLastTimeDelta must not be called if the chunk is closed already. It most
// not be called with a time that doesn't fit into 24bit, either.
func (c gorillaChunk) setLastTimeDelta(dT model.Time) {
if dT > gorillaMaxTimeDelta {
panic("Δt overflows 24 bit")
}
c[gorillaLastTimeDeltaOffset] = byte(dT >> 16)
c[gorillaLastTimeDeltaOffset+1] = byte(dT >> 8)
c[gorillaLastTimeDeltaOffset+2] = byte(dT)
}
// lastValueDelta returns an undefined result if the chunk is closed already.
func (c gorillaChunk) lastValueDelta() int32 {
return int32(binary.BigEndian.Uint32(c[gorillaLastValueDeltaOffset:]))
}
// setLastValueDelta must not be called if the chunk is closed already.
func (c gorillaChunk) setLastValueDelta(dV int32) {
binary.BigEndian.PutUint32(c[gorillaLastValueDeltaOffset:], uint32(dV))
}
func (c gorillaChunk) nextSampleOffset() uint16 {
return binary.BigEndian.Uint16(c[gorillaNextSampleBitOffsetOffset:])
}
func (c gorillaChunk) setNextSampleOffset(offset uint16) {
binary.BigEndian.PutUint16(c[gorillaNextSampleBitOffsetOffset:], offset)
}
func (c gorillaChunk) zeroDDTRepeats() (repeats uint64, offset uint16) {
offset = binary.BigEndian.Uint16(c[gorillaCountOffsetBitOffset:])
if offset == 0 {
return 0, 0
}
return c.readBitPattern(offset, 7) + 1, offset
}
func (c gorillaChunk) setZeroDDTRepeats(repeats uint64, offset uint16) {
switch repeats {
case 0:
// Just clear the offset.
binary.BigEndian.PutUint16(c[gorillaCountOffsetBitOffset:], 0)
return
case 1:
// First time we set a repeat here, so set the offset. But only
// if we haven't reached the footer yet. (If that's the case, we
// would overwrite ourselves below, and we don't need the offset
// later anyway because no more samples will be added to this
// chunk.)
if offset+7 <= gorillaNextSampleBitOffsetThreshold {
binary.BigEndian.PutUint16(c[gorillaCountOffsetBitOffset:], offset)
}
default:
// For a change, we are writing somewhere where we have written
// before. We need to clear the bits first.
posIn1stByte := offset % 8
c[offset/8] &^= bitMask[7][posIn1stByte]
if posIn1stByte > 1 {
c[offset/8+1] &^= bitMask[posIn1stByte-1][0]
}
}
c.addBitPattern(offset, repeats-1, 7)
}
func (c gorillaChunk) setLastSample(s model.SamplePair) {
binary.BigEndian.PutUint64(
c[gorillaLastTimeOffset:],
uint64(s.Timestamp),
)
binary.BigEndian.PutUint64(
c[gorillaLastValueOffset:],
math.Float64bits(float64(s.Value)),
)
}
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c *gorillaChunk) addFirstSample(s model.SamplePair) []chunk {
binary.BigEndian.PutUint64(
(*c)[gorillaFirstTimeOffset:],
uint64(s.Timestamp),
)
binary.BigEndian.PutUint64(
(*c)[gorillaFirstValueOffset:],
math.Float64bits(float64(s.Value)),
)
c.setLastSample(s) // To simplify handling of single-sample chunks.
c.setNextSampleOffset(gorillaSecondSampleBitOffset)
return []chunk{c}
}
// addSecondSample is a helper method only used by c.add(). It calculates the
// first time delta from the provided sample and adds it to the chunk together
// with the provided sample as the last sample.
func (c *gorillaChunk) addSecondSample(s model.SamplePair) ([]chunk, error) {
firstTimeDelta := s.Timestamp - c.firstTime()
if firstTimeDelta < 0 {
return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
}
if firstTimeDelta > gorillaMaxTimeDelta {
return addToOverflowChunk(c, s)
}
(*c)[gorillaFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16)
(*c)[gorillaFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8)
(*c)[gorillaFirstTimeDeltaOffset+2] = byte(firstTimeDelta)
// Also set firstTimeDelta as the last time delta to be able to use the
// normal methods for adding later samples.
c.setLastTimeDelta(firstTimeDelta)
c.setLastSample(s)
c.setNextSampleOffset(gorillaThirdSampleBitOffset)
return []chunk{c}, nil
}
// addLaterSample is a helper method only used by c.add(). It adds a third or
// later sample.
func (c *gorillaChunk) addLaterSample(s model.SamplePair, offset uint16) ([]chunk, error) {
var (
lastTime = c.lastTime()
lastTimeDelta = c.lastTimeDelta()
newTimeDelta = s.Timestamp - lastTime
lastValue = c.lastValue()
encoding = gorillaValueEncoding((*c)[gorillaFlagOffset])
)
if newTimeDelta < 0 {
return nil, fmt.Errorf("Δt is less than zero: %v", newTimeDelta)
}
if newTimeDelta > gorillaMaxTimeDelta {
return addToOverflowChunk(c, s)
}
if offset == gorillaThirdSampleBitOffset {
offset, encoding = c.prepForThirdSample(lastValue, s.Value, encoding)
}
// Analyze worst case, does it fit? If not, overflow into new chunk.
if int(offset)+gorillaWorstCaseBitsPerSample[encoding] > chunkLen*8 {
return addToOverflowChunk(c, s)
}
// Transcoding/overflow decisions first.
if encoding == gorillaZeroEncoding && s.Value != lastValue {
// Cannot go on with zero encoding.
if offset > chunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
}
if isInt32(s.Value - lastValue) {
// Trying int encoding looks promising.
return transcodeAndAdd(newGorillaChunk(gorillaIntDoubleDeltaEncoding), c, s)
}
return transcodeAndAdd(newGorillaChunk(gorillaXOREncoding), c, s)
}
if encoding == gorillaIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) {
// Cannot go on with int encoding.
if offset > chunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
}
return transcodeAndAdd(newGorillaChunk(gorillaXOREncoding), c, s)
}
offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta)
if overflow {
return addToOverflowChunk(c, s)
}
switch encoding {
case gorillaZeroEncoding:
// Nothing to do.
case gorillaIntDoubleDeltaEncoding:
offset = c.addDDValue(offset, lastValue, s.Value)
case gorillaXOREncoding:
offset = c.addXORValue(offset, lastValue, s.Value)
case gorillaDirectEncoding:
offset = c.addBitPattern(offset, math.Float64bits(float64(s.Value)), 64)
default:
return nil, fmt.Errorf("unknown Gorilla value encoding: %v", encoding)
}
c.setNextSampleOffset(offset)
c.setLastSample(s)
return []chunk{c}, nil
}
func (c gorillaChunk) prepForThirdSample(
lastValue, newValue model.SampleValue, encoding gorillaValueEncoding,
) (uint16, gorillaValueEncoding) {
var (
offset = gorillaThirdSampleBitOffset
firstValue = c.firstValue()
firstValueDelta = lastValue - firstValue
firstXOR = math.Float64bits(float64(firstValue)) ^ math.Float64bits(float64(lastValue))
_, firstSignificantBits = countBits(firstXOR)
secondXOR = math.Float64bits(float64(lastValue)) ^ math.Float64bits(float64(newValue))
_, secondSignificantBits = countBits(secondXOR)
)
// Now pick an initial encoding and prepare things accordingly.
// However, never pick an encoding "below" the one initially set.
switch {
case encoding == gorillaZeroEncoding && lastValue == firstValue && lastValue == newValue:
// Stay at zero encoding.
// No value to be set.
// No offset change required.
case encoding <= gorillaIntDoubleDeltaEncoding && isInt32(firstValueDelta):
encoding = gorillaIntDoubleDeltaEncoding
binary.BigEndian.PutUint32(
c[gorillaFirstValueDeltaOffset:],
uint32(int32(firstValueDelta)),
)
c.setLastValueDelta(int32(firstValueDelta))
offset += 32
case encoding == gorillaDirectEncoding || firstSignificantBits+secondSignificantBits > 100:
// Heuristics based on three samples only is a bit weak,
// but if we need 50+13 = 63 bits per sample already
// now, we might be better off going for direct encoding.
encoding = gorillaDirectEncoding
// Put bit pattern directly where otherwise the delta would have gone.
binary.BigEndian.PutUint64(
c[gorillaFirstValueDeltaOffset:],
math.Float64bits(float64(lastValue)),
)
offset += 64
default:
encoding = gorillaXOREncoding
offset = c.addXORValue(offset, firstValue, lastValue)
}
c[gorillaFlagOffset] = byte(encoding)
c.setNextSampleOffset(offset)
return offset, encoding
}
// addDDTime requires that lastTimeDelta and newTimeDelta are positive and don't overflow 24bit.
func (c gorillaChunk) addDDTime(offset uint16, lastTimeDelta, newTimeDelta model.Time) (newOffset uint16, overflow bool) {
timeDD := newTimeDelta - lastTimeDelta
if !isSignedIntN(int64(timeDD), 23) {
return offset, true
}
c.setLastTimeDelta(newTimeDelta)
repeats, repeatsOffset := c.zeroDDTRepeats()
if timeDD == 0 {
if repeats == 0 || repeats == 128 {
// First zeroDDT, or counter full, prepare new counter.
offset = c.addZeroBit(offset)
repeatsOffset = offset
offset += 7
repeats = 0
}
c.setZeroDDTRepeats(repeats+1, repeatsOffset)
return offset, false
}
// No zero repeat. If we had any before, clear the DDT offset.
c.setZeroDDTRepeats(0, repeatsOffset)
switch {
case isSignedIntN(int64(timeDD), 6):
offset = c.addOneBitsWithTrailingZero(offset, 1)
offset = c.addSignedInt(offset, int64(timeDD), 6)
case isSignedIntN(int64(timeDD), 17):
offset = c.addOneBitsWithTrailingZero(offset, 2)
offset = c.addSignedInt(offset, int64(timeDD), 17)
case isSignedIntN(int64(timeDD), 23):
offset = c.addOneBits(offset, 3)
offset = c.addSignedInt(offset, int64(timeDD), 23)
default:
panic("unexpected required bits for ΔΔt")
}
return offset, false
}
// addDDValue requires that newValue-lastValue can be represented with an int32.
func (c gorillaChunk) addDDValue(offset uint16, lastValue, newValue model.SampleValue) uint16 {
newValueDelta := int64(newValue - lastValue)
lastValueDelta := c.lastValueDelta()
valueDD := newValueDelta - int64(lastValueDelta)
c.setLastValueDelta(int32(newValueDelta))
switch {
case valueDD == 0:
return c.addZeroBit(offset)
case isSignedIntN(valueDD, 6):
offset = c.addOneBitsWithTrailingZero(offset, 1)
return c.addSignedInt(offset, valueDD, 6)
case isSignedIntN(valueDD, 13):
offset = c.addOneBitsWithTrailingZero(offset, 2)
return c.addSignedInt(offset, valueDD, 13)
case isSignedIntN(valueDD, 20):
offset = c.addOneBitsWithTrailingZero(offset, 3)
return c.addSignedInt(offset, valueDD, 20)
case isSignedIntN(valueDD, 33):
offset = c.addOneBits(offset, 4)
return c.addSignedInt(offset, valueDD, 33)
default:
panic("unexpected required bits for ΔΔv")
}
}
func (c gorillaChunk) addXORValue(offset uint16, lastValue, newValue model.SampleValue) uint16 {
lastPattern := math.Float64bits(float64(lastValue))
newPattern := math.Float64bits(float64(newValue))
xor := lastPattern ^ newPattern
if xor == 0 {
return c.addZeroBit(offset)
}
lastLeadingBits := c[gorillaLastLeadingZerosCountOffset]
lastSignificantBits := c[gorillaLastSignificantBitsCountOffset]
newLeadingBits, newSignificantBits := countBits(xor)
// Short entry if the new significant bits fit into the same box as the
// last significant bits. However, should the new significant bits be
// shorter by 10 or more, go for a long entry instead, as we will
// probably save more (11 bit one-time overhead, potentially more to
// save later).
if newLeadingBits >= lastLeadingBits &&
newLeadingBits+newSignificantBits <= lastLeadingBits+lastSignificantBits &&
lastSignificantBits-newSignificantBits < 10 {
offset = c.addOneBitsWithTrailingZero(offset, 1)
return c.addBitPattern(
offset,
xor>>(64-lastLeadingBits-lastSignificantBits),
uint16(lastSignificantBits),
)
}
// Long entry.
c[gorillaLastLeadingZerosCountOffset] = newLeadingBits
c[gorillaLastSignificantBitsCountOffset] = newSignificantBits
offset = c.addOneBits(offset, 2)
offset = c.addBitPattern(offset, uint64(newLeadingBits), 5)
offset = c.addBitPattern(offset, uint64(newSignificantBits-1), 6) // Note -1!
return c.addBitPattern(
offset,
xor>>(64-newLeadingBits-newSignificantBits),
uint16(newSignificantBits),
)
}
func (c gorillaChunk) addZeroBit(offset uint16) uint16 {
if offset < gorillaNextSampleBitOffsetThreshold {
// Writing a zero to a never touched area is a no-op.
// Just increase the offset.
return offset + 1
}
c[offset/8] &^= bitMask[1][offset%8]
return offset + 1
}
func (c gorillaChunk) addOneBits(offset uint16, n uint16) uint16 {
if n > 7 {
panic("unexpected number of control bits")
}
b := 8 - offset%8
if b > n {
b = n
}
c[offset/8] |= bitMask[b][offset%8]
offset += b
b = n - b
if b > 0 {
c[offset/8] |= bitMask[b][0]
offset += b
}
return offset
}
func (c gorillaChunk) addOneBitsWithTrailingZero(offset uint16, n uint16) uint16 {
offset = c.addOneBits(offset, n)
return c.addZeroBit(offset)
}
// addSignedInt adds i as a signed integer with n bits. It requires i to be
// representable as such. (Check with isSignedIntN first.)
func (c gorillaChunk) addSignedInt(offset uint16, i int64, n uint16) uint16 {
if i < 0 && n < 64 {
i += 1 << n
}
return c.addBitPattern(offset, uint64(i), n)
}
// addBitPattern adds the last n bits of the given pattern. Other bits in the
// pattern must be 0.
func (c gorillaChunk) addBitPattern(offset uint16, pattern uint64, n uint16) uint16 {
var (
byteOffset = offset / 8
bitsToWrite = 8 - offset%8
newOffset = offset + n
)
if newOffset > gorillaNextSampleBitOffsetThreshold {
// We'll write into the footer. Clean it first.
for i := gorillaNextSampleBitOffsetThreshold / 8; i < len(c); i++ {
c[i] = 0
}
}
for n > 0 {
if n <= bitsToWrite {
c[byteOffset] |= byte(pattern << (bitsToWrite - n))
break
}
c[byteOffset] |= byte(pattern >> (n - bitsToWrite))
n -= bitsToWrite
bitsToWrite = 8
byteOffset++
}
return newOffset
}
// readBitPattern reads n bits at the given offset and returns them as the last
// n bits in a uint64.
func (c gorillaChunk) readBitPattern(offset, n uint16) uint64 {
var (
result uint64
byteOffset = offset / 8
bitOffset = offset % 8
trailingBits, bitsToRead uint16
)
for n > 0 {
trailingBits = 0
bitsToRead = 8 - bitOffset
if bitsToRead > n {
trailingBits = bitsToRead - n
bitsToRead = n
}
result <<= bitsToRead
result |= uint64(
(c[byteOffset] & bitMask[bitsToRead][bitOffset]) >> trailingBits,
)
n -= bitsToRead
byteOffset++
bitOffset = 0
}
return result
}
type gorillaChunkIterator struct {
c gorillaChunk
pos, len uint16
t, dT model.Time
repeats byte // Repeats of ΔΔt=0.
v model.SampleValue
dV int64 // Only used for int value encoding.
leading, significant uint16
enc gorillaValueEncoding
lastError error
rewound bool
nextT model.Time // Only for rewound state.
nextV model.SampleValue // Only for rewound state.
}
func newGorillaChunkIterator(c gorillaChunk) *gorillaChunkIterator {
return &gorillaChunkIterator{
c: c,
len: c.nextSampleOffset(),
t: model.Earliest,
enc: gorillaValueEncoding(c[gorillaFlagOffset]),
significant: 1,
}
}
// lastTimestamp implements chunkIterator.
func (it *gorillaChunkIterator) lastTimestamp() (model.Time, error) {
if it.len == gorillaFirstSampleBitOffset {
// No samples in the chunk yet.
return model.Earliest, it.lastError
}
return it.c.lastTime(), it.lastError
}
// contains implements chunkIterator.
func (it *gorillaChunkIterator) contains(t model.Time) (bool, error) {
last, err := it.lastTimestamp()
if err != nil {
it.lastError = err
return false, err
}
return !t.Before(it.c.firstTime()) &&
!t.After(last), it.lastError
}
// scan implements chunkIterator.
func (it *gorillaChunkIterator) scan() bool {
if it.lastError != nil {
return false
}
if it.rewound {
it.t = it.nextT
it.v = it.nextV
it.rewound = false
return true
}
if it.pos >= it.len && it.repeats == 0 {
return false
}
if it.pos == gorillaFirstSampleBitOffset {
it.t = it.c.firstTime()
it.v = it.c.firstValue()
it.pos = gorillaSecondSampleBitOffset
return it.lastError == nil
}
if it.pos == gorillaSecondSampleBitOffset {
if it.len == gorillaThirdSampleBitOffset {
// Special case: Chunk has only two samples.
it.t = it.c.lastTime()
it.v = it.c.lastValue()
it.pos = it.len
return it.lastError == nil
}
it.dT = it.c.firstTimeDelta()
it.t += it.dT
// Value depends on encoding.
switch it.enc {
case gorillaZeroEncoding:
it.pos = gorillaThirdSampleBitOffset
case gorillaIntDoubleDeltaEncoding:
it.dV = int64(it.c.firstValueDelta())
it.v += model.SampleValue(it.dV)
it.pos = gorillaThirdSampleBitOffset + 32
case gorillaXOREncoding:
it.pos = gorillaThirdSampleBitOffset
it.readXOR()
case gorillaDirectEncoding:
it.v = model.SampleValue(math.Float64frombits(
binary.BigEndian.Uint64(it.c[gorillaThirdSampleBitOffset/8:]),
))
it.pos = gorillaThirdSampleBitOffset + 64
default:
it.lastError = fmt.Errorf("unknown Gorilla value encoding: %v", it.enc)
}
return it.lastError == nil
}
// 3rd sample or later does not have special cases anymore.
it.readDDT()
switch it.enc {
case gorillaZeroEncoding:
// Do nothing.
case gorillaIntDoubleDeltaEncoding:
it.readDDV()
case gorillaXOREncoding:
it.readXOR()
case gorillaDirectEncoding:
it.v = model.SampleValue(math.Float64frombits(it.readBitPattern(64)))
return it.lastError == nil
default:
it.lastError = fmt.Errorf("unknown Gorilla value encoding: %v", it.enc)
return false
}
return it.lastError == nil
}
// findAtOrBefore implements chunkIterator.
func (it *gorillaChunkIterator) findAtOrBefore(t model.Time) bool {
if it.len == 0 || t.Before(it.c.firstTime()) {
return false
}
last := it.c.lastTime()
if !t.Before(last) {
it.t = last
it.v = it.c.lastValue()
it.pos = it.len
return true
}
if t == it.t {
return it.lastError == nil
}
if t.Before(it.t) || it.rewound {
it.reset()
}
var (
prevT = model.Earliest
prevV model.SampleValue
)
for it.scan() && t.After(it.t) {
prevT = it.t
prevV = it.v
// TODO(beorn7): If we are in a repeat, we could iterate forward
// much faster.
}
if t == it.t {
return it.lastError == nil
}
it.rewind(prevT, prevV)
return it.lastError == nil
}
// findAtOrAfter implements chunkIterator.
func (it *gorillaChunkIterator) findAtOrAfter(t model.Time) bool {
if it.len == 0 || t.After(it.c.lastTime()) {
return false
}
first := it.c.firstTime()
if !t.After(first) {
it.reset()
return it.scan()
}
if t == it.t {
return it.lastError == nil
}
if t.Before(it.t) {
it.reset()
}
for it.scan() && t.After(it.t) {
// TODO(beorn7): If we are in a repeat, we could iterate forward
// much faster.
}
return it.lastError == nil
}
// value implements chunkIterator.
func (it *gorillaChunkIterator) value() model.SamplePair {
return model.SamplePair{
Timestamp: it.t,
Value: it.v,
}
}
// err implements chunkIterator.
func (it *gorillaChunkIterator) err() error {
return it.lastError
}
func (it *gorillaChunkIterator) readDDT() {
if it.repeats > 0 {
it.repeats--
} else {
switch it.readControlBits(3) {
case 0:
it.repeats = byte(it.readBitPattern(7))
case 1:
it.dT += model.Time(it.readSignedInt(6))
case 2:
it.dT += model.Time(it.readSignedInt(17))
case 3:
it.dT += model.Time(it.readSignedInt(23))
default:
panic("unexpected number of control bits")
}
}
it.t += it.dT
}
func (it *gorillaChunkIterator) readDDV() {
switch it.readControlBits(4) {
case 0:
// Do nothing.
case 1:
it.dV += it.readSignedInt(6)
case 2:
it.dV += it.readSignedInt(13)
case 3:
it.dV += it.readSignedInt(20)
case 4:
it.dV += it.readSignedInt(33)
default:
panic("unexpected number of control bits")
}
it.v += model.SampleValue(it.dV)
}
func (it *gorillaChunkIterator) readXOR() {
switch it.readControlBits(2) {
case 0:
return
case 1:
// Do nothing right now. All done below.
case 2:
it.leading = uint16(it.readBitPattern(5))
it.significant = uint16(it.readBitPattern(6)) + 1
default:
panic("unexpected number of control bits")
}
pattern := math.Float64bits(float64(it.v))
pattern ^= it.readBitPattern(it.significant) << (64 - it.significant - it.leading)
it.v = model.SampleValue(math.Float64frombits(pattern))
}
// readControlBits reads successive 1-bits and stops after reading the first
// 0-bit. It also stops once it has read max bits. It returns the number of read
// 1-bits.
func (it *gorillaChunkIterator) readControlBits(max uint16) uint16 {
var count uint16
for count < max && int(it.pos/8) < len(it.c) {
b := it.c[it.pos/8] & bitMask[1][it.pos%8]
it.pos++
if b == 0 {
return count
}
count++
}
if int(it.pos/8) >= len(it.c) {
it.lastError = errChunkBoundsExceeded
}
return count
}
func (it *gorillaChunkIterator) readBitPattern(n uint16) uint64 {
if len(it.c)*8 < int(it.pos)+int(n) {
it.lastError = errChunkBoundsExceeded
return 0
}
u := it.c.readBitPattern(it.pos, n)
it.pos += n
return u
}
func (it *gorillaChunkIterator) readSignedInt(n uint16) int64 {
u := it.readBitPattern(n)
if n < 64 && u >= 1<<(n-1) {
u -= 1 << n
}
return int64(u)
}
// reset puts the chunk iterator into the state it had upon creation.
func (it *gorillaChunkIterator) reset() {
it.pos = 0
it.t = model.Earliest
it.dT = 0
it.repeats = 0
it.v = 0
it.dV = 0
it.leading = 0
it.significant = 1
it.rewound = false
}
// rewind "rewinds" the chunk iterator by one step. Since one cannot simply
// rewind a Gorilla chunk, the old values have to be provided by the
// caller. Rewinding an already rewound chunk panics. After a call of scan or
// reset, a chunk can be rewound again.
func (it *gorillaChunkIterator) rewind(t model.Time, v model.SampleValue) {
if it.rewound {
panic("cannet rewind Gorilla chunk twice")
}
it.rewound = true
it.nextT = it.t
it.nextV = it.v
it.t = t
it.v = v
}