cleanup and switching removal of unsafe calls.

This commit is contained in:
Fabian Reinartz 2016-12-10 18:08:50 +01:00
parent eb9af096f9
commit 14dbc59f2b
5 changed files with 184 additions and 178 deletions

View File

@ -1,7 +1,6 @@
package tsdb
import (
"fmt"
"io"
"sort"
"unsafe"
@ -15,69 +14,11 @@ const (
// Block handles reads against a block of time series data within a time window.
type Block interface{}
type persistedSeries struct {
size int
dataref []byte
data *[maxMapSize]byte
}
const (
flagNone = 0
flagStd = 1
)
const (
metaSize = int(unsafe.Sizeof(meta{}))
seriesStatsSize = int(unsafe.Sizeof(blockStats{}))
)
type meta struct {
magic uint32
flag byte
_ [7]byte // padding/reserved
}
type blockStats struct {
chunks uint32
samples uint64
_ [4]byte // padding/reserved
}
func (s *persistedSeries) meta() *meta {
return (*meta)(unsafe.Pointer(&s.data[0]))
}
func (s *persistedSeries) stats() *blockStats {
// The stats start right behind the block meta data.
return (*blockStats)(unsafe.Pointer(&s.data[metaSize]))
}
// seriesAt returns the series stored at offset as a skiplist and the chunks
// it points to as a byte slice.
func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) {
offset += metaSize
offset += seriesStatsSize
switch b := s.data[offset]; b {
case flagStd:
default:
return nil, nil, fmt.Errorf("invalid flag: %x", b)
}
offset++
var (
slLen = *(*uint16)(unsafe.Pointer(&s.data[offset]))
slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{}))
sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize]
)
offset += 3
chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset]))
chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen]
return simpleSkiplist(sl), chunks, nil
}
// A skiplist maps offsets to values. The values found in the data at an
// offset are strictly greater than the indexed value.
type skiplist interface {

166
db.go
View File

@ -2,6 +2,7 @@
package tsdb
import (
"bytes"
"fmt"
"os"
"path/filepath"
@ -93,6 +94,72 @@ func (db *DB) Querier(start, end int64) Querier {
return nil
}
// Appender adds a batch of samples.
type Appender interface {
// Add adds a sample pair to the appended batch.
Add(l Labels, t int64, v float64)
// Commit submits the collected samples.
Commit() error
}
// Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length.
type Vector struct {
Buckets map[uint16][]Sample
reused int
}
type Sample struct {
Hash uint64
Labels Labels
Value float64
}
// Reset the vector but keep resources allocated.
func (v *Vector) Reset() {
// Do a full reset every n-th reusage to avoid memory leaks.
if v.Buckets == nil || v.reused > 100 {
v.Buckets = make(map[uint16][]Sample, 0)
return
}
for x, bkt := range v.Buckets {
v.Buckets[x] = bkt[:0]
}
v.reused++
}
// Add a sample to the vector.
func (v *Vector) Add(lset Labels, val float64) {
h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift))
v.Buckets[s] = append(v.Buckets[s], Sample{
Hash: h,
Labels: lset,
Value: val,
})
}
// func (db *DB) Appender() Appender {
// return &bucketAppender{
// samples: make([]Sample, 1024),
// }
// }
// type bucketAppender struct {
// db *DB
// // buckets []Sam
// }
// func (a *bucketAppender) Add(l Labels, t int64, v float64) {
// }
// func (a *bucketAppender) Commit() error {
// // f
// }
// AppendVector adds values for a list of label sets for the given timestamp
// in milliseconds.
func (db *DB) AppendVector(ts int64, v *Vector) error {
@ -121,59 +188,6 @@ func (db *DB) appendSingle(lset Labels, ts int64, v float64) error {
})
}
// Matcher matches a string.
type Matcher interface {
// Match returns true if the matcher applies to the string value.
Match(v string) bool
}
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Range returns the timestamp range of the Querier.
Range() (start, end int64)
// Iterator returns an interator over the inverted index that
// matches the key label by the constraints of Matcher.
Iterator(key string, m Matcher) Iterator
// Labels resolves a label reference into a set of labels.
Labels(ref LabelRefs) (Labels, error)
// Series returns series provided in the index iterator.
Series(Iterator) []Series
// LabelValues returns all potential values for a label name.
LabelValues(string) []string
// LabelValuesFor returns all potential values for a label name.
// under the constraint of another label.
LabelValuesFor(string, Label) []string
// Close releases the resources of the Querier.
Close() error
}
// Series represents a single time series.
type Series interface {
Labels() Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at ts, it advances to the last value
// before ts.
Seek(ts int64) bool
// Values returns the current timestamp/value pair.
Values() (int64, float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}
const sep = '\xff'
// SeriesShard handles reads and writes of time series falling into
@ -434,40 +448,24 @@ func LabelsFromMap(m map[string]string) Labels {
return NewLabels(l...)
}
// Vector is a set of LabelSet associated with one value each.
// Label sets and values must have equal length.
type Vector struct {
Buckets map[uint16][]Sample
reused int
}
// The MultiError type implements the error interface, and contains the
// Errors used to construct it.
type MultiError []error
type Sample struct {
Hash uint64
Labels Labels
Value float64
}
// Returns a concatenated string of the contained errors
func (es MultiError) Error() string {
var buf bytes.Buffer
// Reset the vector but keep resources allocated.
func (v *Vector) Reset() {
// Do a full reset every n-th reusage to avoid memory leaks.
if v.Buckets == nil || v.reused > 100 {
v.Buckets = make(map[uint16][]Sample, 0)
return
if len(es) > 0 {
fmt.Fprintf(&buf, "%d errors: ", len(es))
}
for x, bkt := range v.Buckets {
v.Buckets[x] = bkt[:0]
for i, err := range es {
if i != 0 {
buf.WriteString("; ")
}
buf.WriteString(err.Error())
}
v.reused++
}
// Add a sample to the vector.
func (v *Vector) Add(lset Labels, val float64) {
h := lset.Hash()
s := uint16(h >> (64 - seriesShardShift))
v.Buckets[s] = append(v.Buckets[s], Sample{
Hash: h,
Labels: lset,
Value: val,
})
return buf.String()
}

View File

@ -56,6 +56,11 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error
return nil
}
type blockStats struct {
chunks uint32
samples uint64
}
func (h *HeadBlock) stats() *blockStats {
return &blockStats{
chunks: uint32(h.index.numSeries()),

54
querier.go Normal file
View File

@ -0,0 +1,54 @@
package tsdb
// Matcher matches a string.
type Matcher interface {
// Match returns true if the matcher applies to the string value.
Match(v string) bool
}
// Querier provides querying access over time series data of a fixed
// time range.
type Querier interface {
// Range returns the timestamp range of the Querier.
Range() (start, end int64)
// Iterator returns an interator over the inverted index that
// matches the key label by the constraints of Matcher.
Iterator(key string, m Matcher) Iterator
// Labels resolves a label reference into a set of labels.
Labels(ref LabelRefs) (Labels, error)
// Series returns series provided in the index iterator.
Series(Iterator) []Series
// LabelValues returns all potential values for a label name.
LabelValues(string) []string
// LabelValuesFor returns all potential values for a label name.
// under the constraint of another label.
LabelValuesFor(string, Label) []string
// Close releases the resources of the Querier.
Close() error
}
// Series represents a single time series.
type Series interface {
Labels() Labels
// Iterator returns a new iterator of the data of the series.
Iterator() SeriesIterator
}
// SeriesIterator iterates over the data of a time series.
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at ts, it advances to the last value
// before tt.
Seek(t int64) bool
// Values returns the current timestamp/value pair.
Values() (t int64, v float64)
// Next advances the iterator by one.
Next() bool
// Err returns the current error.
Err() error
}

View File

@ -7,7 +7,6 @@ import (
"io"
"os"
"sort"
"unsafe"
)
const (
@ -60,10 +59,12 @@ func (w *seriesWriter) write(wr io.Writer, b []byte) error {
}
func (w *seriesWriter) writeMeta() error {
meta := &meta{magic: MagicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
b := [64]byte{}
return w.write(w.w, metab)
binary.BigEndian.PutUint32(b[:4], MagicSeries)
b[4] = flagStd
return w.write(w.w, b[:])
}
func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) error {
@ -78,13 +79,16 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e
h := crc32.NewIEEE()
wr := io.MultiWriter(h, w.w)
l := uint32(0)
l := 0
for _, cd := range chks {
l += uint32(len(cd.chunk.Bytes()))
l += len(cd.chunk.Bytes())
}
// For normal reads we don't need the length of the chunk section but
// it allows us to verify checksums without reading the index file.
if err := w.write(wr, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil {
b := [4]byte{}
binary.BigEndian.PutUint32(b[:], uint32(l))
if err := w.write(wr, b[:]); err != nil {
return err
}
@ -210,10 +214,11 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
h := crc32.NewIEEE()
wr := io.MultiWriter(h, w.w)
if err := w.write(wr, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil {
return err
}
if err := w.write(wr, []byte{flagStd}); err != nil {
b := [5]byte{}
binary.BigEndian.PutUint32(b[:4], l)
b[4] = flagStd
if err := w.write(wr, b[:]); err != nil {
return err
}
@ -224,10 +229,12 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er
}
func (w *indexWriter) writeMeta() error {
meta := &meta{magic: MagicSeries, flag: flagStd}
metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:]
b := [64]byte{}
return w.write(w.w, metab)
binary.BigEndian.PutUint32(b[:4], MagicIndex)
b[4] = flagStd
return w.write(w.w, b[:])
}
func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) {
@ -325,11 +332,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
return w.section(l, flagStd, func(wr io.Writer) error {
for _, v := range values {
o := w.symbols[v]
b := ((*[4]byte)(unsafe.Pointer(&o)))[:]
if err := w.write(wr, b); err != nil {
if err := binary.Write(wr, binary.BigEndian, o); err != nil {
return err
}
w.n += 4
}
return nil
})
@ -344,10 +351,13 @@ func (w *indexWriter) WritePostings(name, value string, it Iterator) error {
})
b := make([]byte, 0, 4096)
buf := [4]byte{}
for it.Next() {
v := w.series[it.Value()].offset
b = append(b, ((*[4]byte)(unsafe.Pointer(&v)))[:]...)
binary.BigEndian.PutUint32(buf[:], v)
b = append(b, buf[:]...)
}
return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error {
@ -364,20 +374,21 @@ type hashEntry struct {
offset uint32
}
const hashEntrySize = uint32(unsafe.Sizeof(hashEntry{}))
func (w *indexWriter) writeHashmap(h []hashEntry) error {
l := uint32(len(h)) * hashEntrySize
b := make([]byte, 0, 4096)
buf := make([]byte, 4)
return w.section(l, flagStd, func(wr io.Writer) error {
for _, e := range w.labelIndexes {
b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:]
for _, e := range h {
binary.PutUvarint(buf, uint64(len(e.name)))
b = append(b, buf...)
b = append(b, e.name...)
if err := w.write(w.w, b); err != nil {
return nil
}
}
return nil
binary.BigEndian.PutUint32(buf, e.offset)
b = append(b, buf...)
}
return w.section(uint32(len(buf)), flagStd, func(wr io.Writer) error {
return w.write(wr, b)
})
}
@ -399,14 +410,11 @@ func (w *indexWriter) finalize() error {
// iteration over all existing series?
// TODO(fabxc): store references like these that are not resolved via direct
// mmap using explicit endianness?
if err := w.write(w.w, ((*[hashEntrySize]byte)(unsafe.Pointer(&lo)))[:]); err != nil {
return err
}
if err := w.write(w.w, ((*[hashEntrySize]byte)(unsafe.Pointer(&po)))[:]); err != nil {
return err
}
b := [8]byte{}
binary.BigEndian.PutUint32(b[:4], lo)
binary.BigEndian.PutUint32(b[4:], po)
return nil
return w.write(w.w, b[:])
}
func (w *indexWriter) Close() error {