2015-03-04 12:40:18 +00:00
|
|
|
// Copyright 2014 The Prometheus Authors
|
|
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
// you may not use this file except in compliance with the License.
|
|
|
|
// You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2016-09-21 21:44:27 +00:00
|
|
|
package chunk
|
2015-03-04 12:40:18 +00:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"math"
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
"github.com/prometheus/common/model"
|
2015-03-04 12:40:18 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// The 37-byte header of a delta-encoded chunk looks like:
|
|
|
|
//
|
|
|
|
// - used buf bytes: 2 bytes
|
|
|
|
// - time double-delta bytes: 1 bytes
|
|
|
|
// - value double-delta bytes: 1 bytes
|
|
|
|
// - is integer: 1 byte
|
|
|
|
// - base time: 8 bytes
|
|
|
|
// - base value: 8 bytes
|
|
|
|
// - base time delta: 8 bytes
|
|
|
|
// - base value delta: 8 bytes
|
|
|
|
const (
|
2016-08-29 15:22:12 +00:00
|
|
|
doubleDeltaHeaderBytes = 37
|
|
|
|
doubleDeltaHeaderMinBytes = 21 // header isn't full for chunk w/ one sample
|
2015-03-04 12:40:18 +00:00
|
|
|
|
|
|
|
doubleDeltaHeaderBufLenOffset = 0
|
|
|
|
doubleDeltaHeaderTimeBytesOffset = 2
|
|
|
|
doubleDeltaHeaderValueBytesOffset = 3
|
|
|
|
doubleDeltaHeaderIsIntOffset = 4
|
|
|
|
doubleDeltaHeaderBaseTimeOffset = 5
|
|
|
|
doubleDeltaHeaderBaseValueOffset = 13
|
|
|
|
doubleDeltaHeaderBaseTimeDeltaOffset = 21
|
|
|
|
doubleDeltaHeaderBaseValueDeltaOffset = 29
|
|
|
|
)
|
|
|
|
|
|
|
|
// A doubleDeltaEncodedChunk adaptively stores sample timestamps and values with
|
|
|
|
// a double-delta encoding of various types (int, float) and bit widths. A base
|
|
|
|
// value and timestamp and a base delta for each is saved in the header. The
|
|
|
|
// payload consists of double-deltas, i.e. deviations from the values and
|
|
|
|
// timestamps calculated by applying the base value and time and the base deltas.
|
|
|
|
// However, once 8 bytes would be needed to encode a double-delta value, a
|
|
|
|
// fall-back to the absolute numbers happens (so that timestamps are saved
|
|
|
|
// directly as int64 and values as float64).
|
|
|
|
// doubleDeltaEncodedChunk implements the chunk interface.
|
|
|
|
type doubleDeltaEncodedChunk []byte
|
|
|
|
|
|
|
|
// newDoubleDeltaEncodedChunk returns a newly allocated doubleDeltaEncodedChunk.
|
|
|
|
func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doubleDeltaEncodedChunk {
|
|
|
|
if tb < 1 {
|
|
|
|
panic("need at least 1 time delta byte")
|
|
|
|
}
|
|
|
|
if length < doubleDeltaHeaderBytes+16 {
|
|
|
|
panic(fmt.Errorf(
|
|
|
|
"chunk length %d bytes is insufficient, need at least %d",
|
|
|
|
length, doubleDeltaHeaderBytes+16,
|
|
|
|
))
|
|
|
|
}
|
|
|
|
c := make(doubleDeltaEncodedChunk, doubleDeltaHeaderIsIntOffset+1, length)
|
|
|
|
|
|
|
|
c[doubleDeltaHeaderTimeBytesOffset] = byte(tb)
|
|
|
|
c[doubleDeltaHeaderValueBytesOffset] = byte(vb)
|
|
|
|
if vb < d8 && isInt { // Only use int for fewer than 8 value double-delta bytes.
|
|
|
|
c[doubleDeltaHeaderIsIntOffset] = 1
|
|
|
|
} else {
|
|
|
|
c[doubleDeltaHeaderIsIntOffset] = 0
|
|
|
|
}
|
|
|
|
return &c
|
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// Add implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
|
2016-03-12 20:34:51 +00:00
|
|
|
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
|
2015-03-06 11:53:00 +00:00
|
|
|
if c.len() == 0 {
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
return c.addFirstSample(s), nil
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
tb := c.timeBytes()
|
|
|
|
vb := c.valueBytes()
|
|
|
|
|
2015-03-06 11:53:00 +00:00
|
|
|
if c.len() == 1 {
|
|
|
|
return c.addSecondSample(s, tb, vb)
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
remainingBytes := cap(c) - len(c)
|
|
|
|
sampleSize := c.sampleSize()
|
|
|
|
|
|
|
|
// Do we generally have space for another sample in this chunk? If not,
|
|
|
|
// overflow into a new one.
|
|
|
|
if remainingBytes < sampleSize {
|
2016-03-12 20:34:51 +00:00
|
|
|
return addToOverflowChunk(&c, s)
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta()
|
2015-03-06 15:03:03 +00:00
|
|
|
ddt := s.Timestamp - projectedTime
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
projectedValue := c.baseValue() + model.SampleValue(c.len())*c.baseValueDelta()
|
2015-03-06 15:03:03 +00:00
|
|
|
ddv := s.Value - projectedValue
|
2015-03-04 12:40:18 +00:00
|
|
|
|
2015-03-13 14:49:07 +00:00
|
|
|
ntb, nvb, nInt := tb, vb, c.isInt()
|
2015-03-04 12:40:18 +00:00
|
|
|
// If the new sample is incompatible with the current encoding, reencode the
|
|
|
|
// existing chunk data into new chunk(s).
|
2015-03-06 15:03:03 +00:00
|
|
|
if c.isInt() && !isInt64(ddv) {
|
2015-03-13 14:49:07 +00:00
|
|
|
// int->float.
|
|
|
|
nvb = d4
|
|
|
|
nInt = false
|
2015-08-20 15:18:46 +00:00
|
|
|
} else if !c.isInt() && vb == d4 && projectedValue+model.SampleValue(float32(ddv)) != s.Value {
|
2015-03-13 14:49:07 +00:00
|
|
|
// float32->float64.
|
|
|
|
nvb = d8
|
|
|
|
} else {
|
|
|
|
if tb < d8 {
|
|
|
|
// Maybe more bytes for timestamp.
|
|
|
|
ntb = max(tb, bytesNeededForSignedTimestampDelta(ddt))
|
|
|
|
}
|
|
|
|
if c.isInt() && vb < d8 {
|
|
|
|
// Maybe more bytes for sample value.
|
|
|
|
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(ddv))
|
|
|
|
}
|
2015-03-06 15:03:03 +00:00
|
|
|
}
|
2015-03-13 14:49:07 +00:00
|
|
|
if tb != ntb || vb != nvb || c.isInt() != nInt {
|
|
|
|
if len(c)*2 < cap(c) {
|
|
|
|
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
|
|
|
|
}
|
|
|
|
// Chunk is already half full. Better create a new one and save the transcoding efforts.
|
2016-03-12 20:34:51 +00:00
|
|
|
return addToOverflowChunk(&c, s)
|
2015-03-06 15:03:03 +00:00
|
|
|
}
|
|
|
|
|
2015-03-04 12:40:18 +00:00
|
|
|
offset := len(c)
|
|
|
|
c = c[:offset+sampleSize]
|
|
|
|
|
|
|
|
switch tb {
|
|
|
|
case d1:
|
2015-03-06 15:03:03 +00:00
|
|
|
c[offset] = byte(ddt)
|
2015-03-04 12:40:18 +00:00
|
|
|
case d2:
|
2015-03-06 15:03:03 +00:00
|
|
|
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
|
2015-03-04 12:40:18 +00:00
|
|
|
case d4:
|
2015-03-06 15:03:03 +00:00
|
|
|
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
|
2015-03-04 12:40:18 +00:00
|
|
|
case d8:
|
|
|
|
// Store the absolute value (no delta) in case of d8.
|
|
|
|
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
|
|
|
|
default:
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
offset += int(tb)
|
|
|
|
|
|
|
|
if c.isInt() {
|
|
|
|
switch vb {
|
|
|
|
case d0:
|
|
|
|
// No-op. Constant delta is stored as base value.
|
|
|
|
case d1:
|
2015-07-13 09:19:11 +00:00
|
|
|
c[offset] = byte(int8(ddv))
|
2015-03-04 12:40:18 +00:00
|
|
|
case d2:
|
2015-07-13 09:19:11 +00:00
|
|
|
binary.LittleEndian.PutUint16(c[offset:], uint16(int16(ddv)))
|
2015-03-04 12:40:18 +00:00
|
|
|
case d4:
|
2015-07-13 09:19:11 +00:00
|
|
|
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
|
2015-03-04 12:40:18 +00:00
|
|
|
// d8 must not happen. Those samples are encoded as float64.
|
|
|
|
default:
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
} else {
|
|
|
|
switch vb {
|
|
|
|
case d4:
|
2015-03-06 15:03:03 +00:00
|
|
|
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
|
2015-03-04 12:40:18 +00:00
|
|
|
case d8:
|
|
|
|
// Store the absolute value (no delta) in case of d8.
|
|
|
|
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
|
|
|
|
default:
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
}
|
2016-09-21 15:56:55 +00:00
|
|
|
return []Chunk{&c}, nil
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// Clone implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) Clone() Chunk {
|
2015-03-13 14:49:07 +00:00
|
|
|
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
|
|
|
|
copy(clone, c)
|
|
|
|
return &clone
|
|
|
|
}
|
|
|
|
|
2016-09-21 21:44:27 +00:00
|
|
|
// FirstTime implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
|
2015-03-13 14:49:07 +00:00
|
|
|
return c.baseTime()
|
|
|
|
}
|
|
|
|
|
2016-09-21 15:56:55 +00:00
|
|
|
// NewIterator( implements chunk.
|
2016-09-21 21:44:27 +00:00
|
|
|
func (c *doubleDeltaEncodedChunk) NewIterator() Iterator {
|
2016-03-07 19:23:14 +00:00
|
|
|
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
|
|
|
|
c: *c,
|
|
|
|
baseT: c.baseTime(),
|
|
|
|
baseΔT: c.baseTimeDelta(),
|
|
|
|
baseV: c.baseValue(),
|
|
|
|
baseΔV: c.baseValueDelta(),
|
|
|
|
tBytes: c.timeBytes(),
|
|
|
|
vBytes: c.valueBytes(),
|
|
|
|
isInt: c.isInt(),
|
|
|
|
})
|
2015-03-13 14:49:07 +00:00
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// Marshal implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
|
2015-03-13 14:49:07 +00:00
|
|
|
if len(c) > math.MaxUint16 {
|
2016-01-25 15:36:36 +00:00
|
|
|
panic("chunk buffer length would overflow a 16 bit uint")
|
2015-03-13 14:49:07 +00:00
|
|
|
}
|
|
|
|
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
|
|
|
|
|
|
|
|
n, err := w.Write(c[:cap(c)])
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if n != cap(c) {
|
2016-01-25 15:36:36 +00:00
|
|
|
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-09-21 15:56:55 +00:00
|
|
|
// MarshalToBuf implements chunk.
|
|
|
|
func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
|
2016-01-25 15:36:36 +00:00
|
|
|
if len(c) > math.MaxUint16 {
|
|
|
|
panic("chunk buffer length would overflow a 16 bit uint")
|
|
|
|
}
|
|
|
|
binary.LittleEndian.PutUint16(c[doubleDeltaHeaderBufLenOffset:], uint16(len(c)))
|
|
|
|
|
|
|
|
n := copy(buf, c)
|
|
|
|
if n != len(c) {
|
|
|
|
return fmt.Errorf("wanted to copy %d bytes to buffer, copied %d", len(c), n)
|
2015-03-13 14:49:07 +00:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// Unmarshal implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
|
2015-03-13 14:49:07 +00:00
|
|
|
*c = (*c)[:cap(*c)]
|
2015-04-13 18:20:26 +00:00
|
|
|
if _, err := io.ReadFull(r, *c); err != nil {
|
|
|
|
return err
|
2015-03-13 14:49:07 +00:00
|
|
|
}
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
|
|
|
|
if int(l) > cap(*c) {
|
|
|
|
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
|
|
|
|
}
|
2016-08-29 15:22:12 +00:00
|
|
|
if int(l) < doubleDeltaHeaderMinBytes {
|
|
|
|
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
|
2016-08-25 15:16:11 +00:00
|
|
|
}
|
|
|
|
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
*c = (*c)[:l]
|
2015-03-13 14:49:07 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// UnmarshalFromBuf implements chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
|
2015-04-13 18:20:26 +00:00
|
|
|
*c = (*c)[:cap(*c)]
|
|
|
|
copy(*c, buf)
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
|
|
|
|
if int(l) > cap(*c) {
|
|
|
|
return fmt.Errorf("chunk length exceeded during unmarshaling: %d", l)
|
|
|
|
}
|
2016-08-29 15:22:12 +00:00
|
|
|
if int(l) < doubleDeltaHeaderMinBytes {
|
|
|
|
return fmt.Errorf("chunk length less than header size: %d < %d", l, doubleDeltaHeaderMinBytes)
|
2016-08-25 15:16:11 +00:00
|
|
|
}
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
*c = (*c)[:l]
|
|
|
|
return nil
|
2015-04-13 18:20:26 +00:00
|
|
|
}
|
|
|
|
|
2016-10-06 19:53:40 +00:00
|
|
|
// Encoding implements chunk.
|
2016-09-21 21:44:27 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta }
|
2015-03-13 14:49:07 +00:00
|
|
|
|
2016-10-05 18:32:55 +00:00
|
|
|
// Utilization implements chunk.
|
|
|
|
func (c doubleDeltaEncodedChunk) Utilization() float64 {
|
|
|
|
return float64(len(c)) / float64(cap(c))
|
|
|
|
}
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) baseTime() model.Time {
|
|
|
|
return model.Time(
|
2015-03-13 14:49:07 +00:00
|
|
|
binary.LittleEndian.Uint64(
|
|
|
|
c[doubleDeltaHeaderBaseTimeOffset:],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) baseValue() model.SampleValue {
|
|
|
|
return model.SampleValue(
|
2015-03-13 14:49:07 +00:00
|
|
|
math.Float64frombits(
|
|
|
|
binary.LittleEndian.Uint64(
|
|
|
|
c[doubleDeltaHeaderBaseValueOffset:],
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) baseTimeDelta() model.Time {
|
2015-04-14 11:46:38 +00:00
|
|
|
if len(c) < doubleDeltaHeaderBaseTimeDeltaOffset+8 {
|
|
|
|
return 0
|
|
|
|
}
|
2015-08-20 15:18:46 +00:00
|
|
|
return model.Time(
|
2015-03-13 14:49:07 +00:00
|
|
|
binary.LittleEndian.Uint64(
|
|
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
2015-08-20 15:18:46 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) baseValueDelta() model.SampleValue {
|
2015-04-14 11:46:38 +00:00
|
|
|
if len(c) < doubleDeltaHeaderBaseValueDeltaOffset+8 {
|
|
|
|
return 0
|
|
|
|
}
|
2015-08-20 15:18:46 +00:00
|
|
|
return model.SampleValue(
|
2015-03-13 14:49:07 +00:00
|
|
|
math.Float64frombits(
|
|
|
|
binary.LittleEndian.Uint64(
|
|
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c doubleDeltaEncodedChunk) timeBytes() deltaBytes {
|
|
|
|
return deltaBytes(c[doubleDeltaHeaderTimeBytesOffset])
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c doubleDeltaEncodedChunk) valueBytes() deltaBytes {
|
|
|
|
return deltaBytes(c[doubleDeltaHeaderValueBytesOffset])
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c doubleDeltaEncodedChunk) sampleSize() int {
|
|
|
|
return int(c.timeBytes() + c.valueBytes())
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c doubleDeltaEncodedChunk) len() int {
|
|
|
|
if len(c) <= doubleDeltaHeaderIsIntOffset+1 {
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
if len(c) <= doubleDeltaHeaderBaseValueOffset+8 {
|
|
|
|
return 1
|
|
|
|
}
|
|
|
|
return (len(c)-doubleDeltaHeaderBytes)/c.sampleSize() + 2
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c doubleDeltaEncodedChunk) isInt() bool {
|
|
|
|
return c[doubleDeltaHeaderIsIntOffset] == 1
|
|
|
|
}
|
|
|
|
|
2015-03-06 11:53:00 +00:00
|
|
|
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
|
|
|
|
// value as base time and value.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk {
|
2015-03-06 11:53:00 +00:00
|
|
|
c = c[:doubleDeltaHeaderBaseValueOffset+8]
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseTimeOffset:],
|
|
|
|
uint64(s.Timestamp),
|
|
|
|
)
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseValueOffset:],
|
|
|
|
math.Float64bits(float64(s.Value)),
|
|
|
|
)
|
2016-09-21 15:56:55 +00:00
|
|
|
return []Chunk{&c}
|
2015-03-06 11:53:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// addSecondSample is a helper method only used by c.add(). It calculates the
|
|
|
|
// base delta from the provided sample and adds it to the chunk.
|
2016-09-21 15:56:55 +00:00
|
|
|
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) {
|
2015-03-06 11:53:00 +00:00
|
|
|
baseTimeDelta := s.Timestamp - c.baseTime()
|
|
|
|
if baseTimeDelta < 0 {
|
Handle errors caused by data corruption more gracefully
This requires all the panic calls upon unexpected data to be converted
into errors returned. This pollute the function signatures quite
lot. Well, this is Go...
The ideas behind this are the following:
- panic only if it's a programming error. Data corruptions happen, and
they are not programming errors.
- If we detect a data corruption, we "quarantine" the series,
essentially removing it from the database and putting its data into
a separate directory for forensics.
- Failure during writing to a series file is not considered corruption
automatically. It will call setDirty, though, so that a
crashrecovery upon the next restart will commence and check for
that.
- Series quarantining and setDirty calls are logged and counted in
metrics, but are hidden from the user of the interfaces in
interface.go, whith the notable exception of Append(). The reasoning
is that we treat corruption by removing the corrupted series, i.e. a
query for it will return no results on its next call anyway, so
return no results right now. In the case of Append(), we want to
tell the user that no data has been appended, though.
Minor side effects:
- Now consistently using filepath.* instead of path.*.
- Introduced structured logging where I touched it. This makes things
less consistent, but a complete change to structured logging would
be out of scope for this PR.
2016-02-25 11:23:42 +00:00
|
|
|
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
|
2015-03-06 11:53:00 +00:00
|
|
|
}
|
|
|
|
c = c[:doubleDeltaHeaderBytes]
|
|
|
|
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
|
|
|
|
// If already the base delta needs d8 (or we are at d8
|
|
|
|
// already, anyway), we better encode this timestamp
|
|
|
|
// directly rather than as a delta and switch everything
|
|
|
|
// to d8.
|
|
|
|
c[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
|
|
uint64(s.Timestamp),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
|
|
|
|
uint64(baseTimeDelta),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
baseValue := c.baseValue()
|
|
|
|
baseValueDelta := s.Value - baseValue
|
|
|
|
if vb >= d8 || baseValue+baseValueDelta != s.Value {
|
|
|
|
// If we can't reproduce the original sample value (or
|
|
|
|
// if we are at d8 already, anyway), we better encode
|
|
|
|
// this value directly rather than as a delta and switch
|
|
|
|
// everything to d8.
|
|
|
|
c[doubleDeltaHeaderValueBytesOffset] = byte(d8)
|
|
|
|
c[doubleDeltaHeaderIsIntOffset] = 0
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
|
|
math.Float64bits(float64(s.Value)),
|
|
|
|
)
|
|
|
|
} else {
|
|
|
|
binary.LittleEndian.PutUint64(
|
|
|
|
c[doubleDeltaHeaderBaseValueDeltaOffset:],
|
|
|
|
math.Float64bits(float64(baseValueDelta)),
|
|
|
|
)
|
|
|
|
}
|
2016-09-21 15:56:55 +00:00
|
|
|
return []Chunk{&c}, nil
|
2015-03-06 11:53:00 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
// doubleDeltaEncodedIndexAccessor implements indexAccessor.
|
|
|
|
type doubleDeltaEncodedIndexAccessor struct {
|
2015-04-14 11:46:38 +00:00
|
|
|
c doubleDeltaEncodedChunk
|
2015-08-20 15:18:46 +00:00
|
|
|
baseT, baseΔT model.Time
|
|
|
|
baseV, baseΔV model.SampleValue
|
2015-04-14 11:46:38 +00:00
|
|
|
tBytes, vBytes deltaBytes
|
|
|
|
isInt bool
|
2016-03-07 18:50:13 +00:00
|
|
|
lastErr error
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) err() error {
|
|
|
|
return acc.lastErr
|
2016-03-07 18:50:13 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
|
2015-04-14 11:46:38 +00:00
|
|
|
if idx == 0 {
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseT
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
if idx == 1 {
|
|
|
|
// If time bytes are at d8, the time is saved directly rather
|
|
|
|
// than as a difference.
|
2016-03-07 19:23:14 +00:00
|
|
|
if acc.tBytes == d8 {
|
|
|
|
return acc.baseΔT
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseT + acc.baseΔT
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes)
|
2015-04-14 11:46:38 +00:00
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
switch acc.tBytes {
|
2015-04-14 11:46:38 +00:00
|
|
|
case d1:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseT +
|
|
|
|
model.Time(idx)*acc.baseΔT +
|
|
|
|
model.Time(int8(acc.c[offset]))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d2:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseT +
|
|
|
|
model.Time(idx)*acc.baseΔT +
|
|
|
|
model.Time(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d4:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseT +
|
|
|
|
model.Time(idx)*acc.baseΔT +
|
|
|
|
model.Time(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d8:
|
|
|
|
// Take absolute value for d8.
|
2016-03-07 19:23:14 +00:00
|
|
|
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
|
2015-04-14 11:46:38 +00:00
|
|
|
default:
|
2016-03-07 19:23:14 +00:00
|
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
|
2016-03-17 13:37:24 +00:00
|
|
|
return model.Earliest
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
func (acc *doubleDeltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
|
2015-04-14 11:46:38 +00:00
|
|
|
if idx == 0 {
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
if idx == 1 {
|
|
|
|
// If value bytes are at d8, the value is saved directly rather
|
|
|
|
// than as a difference.
|
2016-03-07 19:23:14 +00:00
|
|
|
if acc.vBytes == d8 {
|
|
|
|
return acc.baseΔV
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV + acc.baseΔV
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
offset := doubleDeltaHeaderBytes + (idx-2)*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
|
2015-04-14 11:46:38 +00:00
|
|
|
|
2016-03-07 19:23:14 +00:00
|
|
|
if acc.isInt {
|
|
|
|
switch acc.vBytes {
|
2015-04-14 11:46:38 +00:00
|
|
|
case d0:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV +
|
|
|
|
model.SampleValue(idx)*acc.baseΔV
|
2015-04-14 11:46:38 +00:00
|
|
|
case d1:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV +
|
|
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
|
|
model.SampleValue(int8(acc.c[offset]))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d2:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV +
|
|
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
|
|
model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d4:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV +
|
|
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
|
|
model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
// No d8 for ints.
|
|
|
|
default:
|
2016-03-07 19:23:14 +00:00
|
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
|
2016-03-17 13:37:24 +00:00
|
|
|
return 0
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
} else {
|
2016-03-07 19:23:14 +00:00
|
|
|
switch acc.vBytes {
|
2015-04-14 11:46:38 +00:00
|
|
|
case d4:
|
2016-03-07 19:23:14 +00:00
|
|
|
return acc.baseV +
|
|
|
|
model.SampleValue(idx)*acc.baseΔV +
|
|
|
|
model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
case d8:
|
|
|
|
// Take absolute value for d8.
|
2016-03-07 19:23:14 +00:00
|
|
|
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
|
2015-04-14 11:46:38 +00:00
|
|
|
default:
|
2016-03-07 19:23:14 +00:00
|
|
|
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
|
2016-03-17 13:37:24 +00:00
|
|
|
return 0
|
2015-04-14 11:46:38 +00:00
|
|
|
}
|
|
|
|
}
|
2015-03-04 12:40:18 +00:00
|
|
|
}
|