diff --git a/block.go b/block.go index e11a3fe46..aa70017d9 100644 --- a/block.go +++ b/block.go @@ -1,7 +1,6 @@ package tsdb import ( - "fmt" "io" "sort" "unsafe" @@ -15,69 +14,11 @@ const ( // Block handles reads against a block of time series data within a time window. type Block interface{} -type persistedSeries struct { - size int - dataref []byte - data *[maxMapSize]byte -} - const ( flagNone = 0 flagStd = 1 ) -const ( - metaSize = int(unsafe.Sizeof(meta{})) - seriesStatsSize = int(unsafe.Sizeof(blockStats{})) -) - -type meta struct { - magic uint32 - flag byte - _ [7]byte // padding/reserved -} - -type blockStats struct { - chunks uint32 - samples uint64 - _ [4]byte // padding/reserved -} - -func (s *persistedSeries) meta() *meta { - return (*meta)(unsafe.Pointer(&s.data[0])) -} - -func (s *persistedSeries) stats() *blockStats { - // The stats start right behind the block meta data. - return (*blockStats)(unsafe.Pointer(&s.data[metaSize])) -} - -// seriesAt returns the series stored at offset as a skiplist and the chunks -// it points to as a byte slice. -func (s *persistedSeries) seriesAt(offset int) (skiplist, []byte, error) { - offset += metaSize - offset += seriesStatsSize - - switch b := s.data[offset]; b { - case flagStd: - default: - return nil, nil, fmt.Errorf("invalid flag: %x", b) - } - offset++ - - var ( - slLen = *(*uint16)(unsafe.Pointer(&s.data[offset])) - slSize = int(slLen) / int(unsafe.Sizeof(skiplistPair{})) - sl = ((*[maxAllocSize]skiplistPair)(unsafe.Pointer(&s.data[offset+2])))[:slSize] - ) - offset += 3 - - chunksLen := *(*uint32)(unsafe.Pointer(&s.data[offset])) - chunks := ((*[maxAllocSize]byte)(unsafe.Pointer(&s.data[offset])))[:chunksLen] - - return simpleSkiplist(sl), chunks, nil -} - // A skiplist maps offsets to values. The values found in the data at an // offset are strictly greater than the indexed value. type skiplist interface { diff --git a/db.go b/db.go index 54e4632bd..6decbd611 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,7 @@ package tsdb import ( + "bytes" "fmt" "os" "path/filepath" @@ -93,6 +94,72 @@ func (db *DB) Querier(start, end int64) Querier { return nil } +// Appender adds a batch of samples. +type Appender interface { + // Add adds a sample pair to the appended batch. + Add(l Labels, t int64, v float64) + + // Commit submits the collected samples. + Commit() error +} + +// Vector is a set of LabelSet associated with one value each. +// Label sets and values must have equal length. +type Vector struct { + Buckets map[uint16][]Sample + reused int +} + +type Sample struct { + Hash uint64 + Labels Labels + Value float64 +} + +// Reset the vector but keep resources allocated. +func (v *Vector) Reset() { + // Do a full reset every n-th reusage to avoid memory leaks. + if v.Buckets == nil || v.reused > 100 { + v.Buckets = make(map[uint16][]Sample, 0) + return + } + for x, bkt := range v.Buckets { + v.Buckets[x] = bkt[:0] + } + v.reused++ +} + +// Add a sample to the vector. +func (v *Vector) Add(lset Labels, val float64) { + h := lset.Hash() + s := uint16(h >> (64 - seriesShardShift)) + + v.Buckets[s] = append(v.Buckets[s], Sample{ + Hash: h, + Labels: lset, + Value: val, + }) +} + +// func (db *DB) Appender() Appender { +// return &bucketAppender{ +// samples: make([]Sample, 1024), +// } +// } + +// type bucketAppender struct { +// db *DB +// // buckets []Sam +// } + +// func (a *bucketAppender) Add(l Labels, t int64, v float64) { + +// } + +// func (a *bucketAppender) Commit() error { +// // f +// } + // AppendVector adds values for a list of label sets for the given timestamp // in milliseconds. func (db *DB) AppendVector(ts int64, v *Vector) error { @@ -121,59 +188,6 @@ func (db *DB) appendSingle(lset Labels, ts int64, v float64) error { }) } -// Matcher matches a string. -type Matcher interface { - // Match returns true if the matcher applies to the string value. - Match(v string) bool -} - -// Querier provides querying access over time series data of a fixed -// time range. -type Querier interface { - // Range returns the timestamp range of the Querier. - Range() (start, end int64) - - // Iterator returns an interator over the inverted index that - // matches the key label by the constraints of Matcher. - Iterator(key string, m Matcher) Iterator - - // Labels resolves a label reference into a set of labels. - Labels(ref LabelRefs) (Labels, error) - - // Series returns series provided in the index iterator. - Series(Iterator) []Series - - // LabelValues returns all potential values for a label name. - LabelValues(string) []string - // LabelValuesFor returns all potential values for a label name. - // under the constraint of another label. - LabelValuesFor(string, Label) []string - - // Close releases the resources of the Querier. - Close() error -} - -// Series represents a single time series. -type Series interface { - Labels() Labels - // Iterator returns a new iterator of the data of the series. - Iterator() SeriesIterator -} - -// SeriesIterator iterates over the data of a time series. -type SeriesIterator interface { - // Seek advances the iterator forward to the given timestamp. - // If there's no value exactly at ts, it advances to the last value - // before ts. - Seek(ts int64) bool - // Values returns the current timestamp/value pair. - Values() (int64, float64) - // Next advances the iterator by one. - Next() bool - // Err returns the current error. - Err() error -} - const sep = '\xff' // SeriesShard handles reads and writes of time series falling into @@ -434,40 +448,24 @@ func LabelsFromMap(m map[string]string) Labels { return NewLabels(l...) } -// Vector is a set of LabelSet associated with one value each. -// Label sets and values must have equal length. -type Vector struct { - Buckets map[uint16][]Sample - reused int -} +// The MultiError type implements the error interface, and contains the +// Errors used to construct it. +type MultiError []error -type Sample struct { - Hash uint64 - Labels Labels - Value float64 -} +// Returns a concatenated string of the contained errors +func (es MultiError) Error() string { + var buf bytes.Buffer -// Reset the vector but keep resources allocated. -func (v *Vector) Reset() { - // Do a full reset every n-th reusage to avoid memory leaks. - if v.Buckets == nil || v.reused > 100 { - v.Buckets = make(map[uint16][]Sample, 0) - return + if len(es) > 0 { + fmt.Fprintf(&buf, "%d errors: ", len(es)) } - for x, bkt := range v.Buckets { - v.Buckets[x] = bkt[:0] + + for i, err := range es { + if i != 0 { + buf.WriteString("; ") + } + buf.WriteString(err.Error()) } - v.reused++ -} -// Add a sample to the vector. -func (v *Vector) Add(lset Labels, val float64) { - h := lset.Hash() - s := uint16(h >> (64 - seriesShardShift)) - - v.Buckets[s] = append(v.Buckets[s], Sample{ - Hash: h, - Labels: lset, - Value: val, - }) + return buf.String() } diff --git a/head.go b/head.go index c14be744b..73db76086 100644 --- a/head.go +++ b/head.go @@ -56,6 +56,11 @@ func (h *HeadBlock) append(hash uint64, lset Labels, ts int64, v float64) error return nil } +type blockStats struct { + chunks uint32 + samples uint64 +} + func (h *HeadBlock) stats() *blockStats { return &blockStats{ chunks: uint32(h.index.numSeries()), diff --git a/querier.go b/querier.go new file mode 100644 index 000000000..dd5c160fe --- /dev/null +++ b/querier.go @@ -0,0 +1,54 @@ +package tsdb + +// Matcher matches a string. +type Matcher interface { + // Match returns true if the matcher applies to the string value. + Match(v string) bool +} + +// Querier provides querying access over time series data of a fixed +// time range. +type Querier interface { + // Range returns the timestamp range of the Querier. + Range() (start, end int64) + + // Iterator returns an interator over the inverted index that + // matches the key label by the constraints of Matcher. + Iterator(key string, m Matcher) Iterator + + // Labels resolves a label reference into a set of labels. + Labels(ref LabelRefs) (Labels, error) + + // Series returns series provided in the index iterator. + Series(Iterator) []Series + + // LabelValues returns all potential values for a label name. + LabelValues(string) []string + // LabelValuesFor returns all potential values for a label name. + // under the constraint of another label. + LabelValuesFor(string, Label) []string + + // Close releases the resources of the Querier. + Close() error +} + +// Series represents a single time series. +type Series interface { + Labels() Labels + // Iterator returns a new iterator of the data of the series. + Iterator() SeriesIterator +} + +// SeriesIterator iterates over the data of a time series. +type SeriesIterator interface { + // Seek advances the iterator forward to the given timestamp. + // If there's no value exactly at ts, it advances to the last value + // before tt. + Seek(t int64) bool + // Values returns the current timestamp/value pair. + Values() (t int64, v float64) + // Next advances the iterator by one. + Next() bool + // Err returns the current error. + Err() error +} diff --git a/writer.go b/writer.go index 2adb50e6d..98e1fbbd7 100644 --- a/writer.go +++ b/writer.go @@ -7,7 +7,6 @@ import ( "io" "os" "sort" - "unsafe" ) const ( @@ -60,10 +59,12 @@ func (w *seriesWriter) write(wr io.Writer, b []byte) error { } func (w *seriesWriter) writeMeta() error { - meta := &meta{magic: MagicSeries, flag: flagStd} - metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] + b := [64]byte{} - return w.write(w.w, metab) + binary.BigEndian.PutUint32(b[:4], MagicSeries) + b[4] = flagStd + + return w.write(w.w, b[:]) } func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) error { @@ -78,13 +79,16 @@ func (w *seriesWriter) WriteSeries(ref uint32, lset Labels, chks []*chunkDesc) e h := crc32.NewIEEE() wr := io.MultiWriter(h, w.w) - l := uint32(0) + l := 0 for _, cd := range chks { - l += uint32(len(cd.chunk.Bytes())) + l += len(cd.chunk.Bytes()) } // For normal reads we don't need the length of the chunk section but // it allows us to verify checksums without reading the index file. - if err := w.write(wr, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil { + b := [4]byte{} + binary.BigEndian.PutUint32(b[:], uint32(l)) + + if err := w.write(wr, b[:]); err != nil { return err } @@ -210,10 +214,11 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er h := crc32.NewIEEE() wr := io.MultiWriter(h, w.w) - if err := w.write(wr, ((*[4]byte)(unsafe.Pointer(&l)))[:]); err != nil { - return err - } - if err := w.write(wr, []byte{flagStd}); err != nil { + b := [5]byte{} + binary.BigEndian.PutUint32(b[:4], l) + b[4] = flagStd + + if err := w.write(wr, b[:]); err != nil { return err } @@ -224,10 +229,12 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er } func (w *indexWriter) writeMeta() error { - meta := &meta{magic: MagicSeries, flag: flagStd} - metab := ((*[metaSize]byte)(unsafe.Pointer(meta)))[:] + b := [64]byte{} - return w.write(w.w, metab) + binary.BigEndian.PutUint32(b[:4], MagicIndex) + b[4] = flagStd + + return w.write(w.w, b[:]) } func (w *indexWriter) AddSeries(ref uint32, lset Labels, offsets ...ChunkOffset) { @@ -325,11 +332,11 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error { return w.section(l, flagStd, func(wr io.Writer) error { for _, v := range values { o := w.symbols[v] - b := ((*[4]byte)(unsafe.Pointer(&o)))[:] - if err := w.write(wr, b); err != nil { + if err := binary.Write(wr, binary.BigEndian, o); err != nil { return err } + w.n += 4 } return nil }) @@ -344,10 +351,13 @@ func (w *indexWriter) WritePostings(name, value string, it Iterator) error { }) b := make([]byte, 0, 4096) + buf := [4]byte{} for it.Next() { v := w.series[it.Value()].offset - b = append(b, ((*[4]byte)(unsafe.Pointer(&v)))[:]...) + binary.BigEndian.PutUint32(buf[:], v) + + b = append(b, buf[:]...) } return w.section(uint32(len(b)), flagStd, func(wr io.Writer) error { @@ -364,20 +374,21 @@ type hashEntry struct { offset uint32 } -const hashEntrySize = uint32(unsafe.Sizeof(hashEntry{})) - func (w *indexWriter) writeHashmap(h []hashEntry) error { - l := uint32(len(h)) * hashEntrySize + b := make([]byte, 0, 4096) + buf := make([]byte, 4) - return w.section(l, flagStd, func(wr io.Writer) error { - for _, e := range w.labelIndexes { - b := ((*[hashEntrySize]byte)(unsafe.Pointer(&e)))[:] + for _, e := range h { + binary.PutUvarint(buf, uint64(len(e.name))) + b = append(b, buf...) + b = append(b, e.name...) - if err := w.write(w.w, b); err != nil { - return nil - } - } - return nil + binary.BigEndian.PutUint32(buf, e.offset) + b = append(b, buf...) + } + + return w.section(uint32(len(buf)), flagStd, func(wr io.Writer) error { + return w.write(wr, b) }) } @@ -399,14 +410,11 @@ func (w *indexWriter) finalize() error { // iteration over all existing series? // TODO(fabxc): store references like these that are not resolved via direct // mmap using explicit endianness? - if err := w.write(w.w, ((*[hashEntrySize]byte)(unsafe.Pointer(&lo)))[:]); err != nil { - return err - } - if err := w.write(w.w, ((*[hashEntrySize]byte)(unsafe.Pointer(&po)))[:]); err != nil { - return err - } + b := [8]byte{} + binary.BigEndian.PutUint32(b[:4], lo) + binary.BigEndian.PutUint32(b[4:], po) - return nil + return w.write(w.w, b[:]) } func (w *indexWriter) Close() error {