Write chunk skiplist and add series reader
This commit is contained in:
parent
ae379f385b
commit
b334c3ade8
|
@ -34,9 +34,9 @@ type Querier interface {
|
||||||
|
|
||||||
// Series represents a single time series.
|
// Series represents a single time series.
|
||||||
type Series interface {
|
type Series interface {
|
||||||
Labels() Labels
|
Labels() (Labels, error)
|
||||||
// Iterator returns a new iterator of the data of the series.
|
// Iterator returns a new iterator of the data of the series.
|
||||||
Iterator() SeriesIterator
|
Iterator() (SeriesIterator, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SeriesIterator iterates over the data of a time series.
|
// SeriesIterator iterates over the data of a time series.
|
||||||
|
|
100
reader.go
100
reader.go
|
@ -188,18 +188,112 @@ func (r *indexReader) LabelValues(names ...string) (StringTuples, error) {
|
||||||
if flag != flagStd {
|
if flag != flagStd {
|
||||||
return nil, errInvalidFlag
|
return nil, errInvalidFlag
|
||||||
}
|
}
|
||||||
if len(b) < 1 {
|
l, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
return nil, errInvalidSize
|
return nil, errInvalidSize
|
||||||
}
|
}
|
||||||
|
|
||||||
st := &serializedStringTuples{
|
st := &serializedStringTuples{
|
||||||
l: int(b[0]),
|
l: int(l),
|
||||||
b: b[1:],
|
b: b[n:],
|
||||||
lookup: r.lookupSymbol,
|
lookup: r.lookupSymbol,
|
||||||
}
|
}
|
||||||
return st, nil
|
return st, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *indexReader) Series(ref uint32) (Series, error) {
|
||||||
|
k, n := binary.Uvarint(r.b[ref:])
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
|
||||||
|
b := r.b[int(ref)+n:]
|
||||||
|
offsets := make([]uint32, 0, k)
|
||||||
|
|
||||||
|
for i := 0; i < int(k); i++ {
|
||||||
|
o, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
offsets = append(offsets, uint32(o))
|
||||||
|
|
||||||
|
b = b[n:]
|
||||||
|
}
|
||||||
|
// Offests must occur in pairs representing name and value.
|
||||||
|
if len(offsets)&1 != 0 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(fabxc): Fully materialize series for now. Figure out later if it
|
||||||
|
// makes sense to decode those lazily.
|
||||||
|
// The references are expected to be sorted and match the order of
|
||||||
|
// the underlying strings.
|
||||||
|
labels := make(Labels, 0, k)
|
||||||
|
|
||||||
|
for i := 0; i < int(k); i += 2 {
|
||||||
|
n, err := r.lookupSymbol(offsets[i])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
v, err := r.lookupSymbol(offsets[i+1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
labels = append(labels, Label{
|
||||||
|
Name: string(n),
|
||||||
|
Value: string(v),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read the chunk offsets.
|
||||||
|
k, n = binary.Uvarint(r.b[ref:])
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
|
||||||
|
b = b[n:]
|
||||||
|
coffsets := make([]ChunkOffset, 0, k)
|
||||||
|
|
||||||
|
for i := 0; i < int(k); i++ {
|
||||||
|
v, n := binary.Varint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
o, n := binary.Uvarint(b)
|
||||||
|
if n < 1 {
|
||||||
|
return nil, errInvalidSize
|
||||||
|
}
|
||||||
|
b = b[n:]
|
||||||
|
|
||||||
|
coffsets = append(coffsets, ChunkOffset{
|
||||||
|
Offset: uint32(o),
|
||||||
|
Value: v,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
s := &series{
|
||||||
|
labels: labels,
|
||||||
|
offsets: coffsets,
|
||||||
|
}
|
||||||
|
return s, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type series struct {
|
||||||
|
labels Labels
|
||||||
|
offsets []ChunkOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *series) Labels() (Labels, error) {
|
||||||
|
return s.labels, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *series) Iterator() (SeriesIterator, error) {
|
||||||
|
// dereference skiplist and construct from chunk iterators.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
type stringTuples struct {
|
type stringTuples struct {
|
||||||
l int // tuple length
|
l int // tuple length
|
||||||
s []string // flattened tuple entries
|
s []string // flattened tuple entries
|
||||||
|
|
29
writer.go
29
writer.go
|
@ -295,18 +295,31 @@ func (w *indexWriter) writeSymbols() error {
|
||||||
|
|
||||||
func (w *indexWriter) writeSeries() error {
|
func (w *indexWriter) writeSeries() error {
|
||||||
b := make([]byte, 0, 4096)
|
b := make([]byte, 0, 4096)
|
||||||
buf := [binary.MaxVarintLen32]byte{}
|
buf := make([]byte, binary.MaxVarintLen64)
|
||||||
|
|
||||||
for _, s := range w.series {
|
for _, s := range w.series {
|
||||||
|
// Write label set symbol references.
|
||||||
s.offset = uint32(w.n) + uint32(len(b))
|
s.offset = uint32(w.n) + uint32(len(b))
|
||||||
|
|
||||||
n := binary.PutUvarint(buf[:], uint64(len(s.labels)))
|
n := binary.PutUvarint(buf, uint64(len(s.labels)))
|
||||||
b = append(b, buf[:n]...)
|
b = append(b, buf[:n]...)
|
||||||
|
|
||||||
for _, l := range s.labels {
|
for _, l := range s.labels {
|
||||||
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Name]))
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Name]))
|
||||||
b = append(b, buf[:n]...)
|
b = append(b, buf[:n]...)
|
||||||
n = binary.PutUvarint(buf[:], uint64(w.symbols[l.Value]))
|
n = binary.PutUvarint(buf, uint64(w.symbols[l.Value]))
|
||||||
|
b = append(b, buf[:n]...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write skiplist to chunk offsets.
|
||||||
|
n = binary.PutUvarint(buf, uint64(len(s.chunks)))
|
||||||
|
b = append(b, buf[:n]...)
|
||||||
|
|
||||||
|
for _, c := range s.chunks {
|
||||||
|
n = binary.PutVarint(buf, c.Value)
|
||||||
|
b = append(b, buf[:n]...)
|
||||||
|
|
||||||
|
n = binary.PutUvarint(buf, uint64(c.Offset))
|
||||||
b = append(b, buf[:n]...)
|
b = append(b, buf[:n]...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -330,14 +343,16 @@ func (w *indexWriter) WriteLabelIndex(names []string, values []string) error {
|
||||||
offset: uint32(w.n),
|
offset: uint32(w.n),
|
||||||
})
|
})
|
||||||
|
|
||||||
l := 1 + uint32(len(values)*4)
|
buf := make([]byte, binary.MaxVarintLen32)
|
||||||
|
n := binary.PutUvarint(buf, uint64(len(names)))
|
||||||
|
|
||||||
|
l := uint32(n) + uint32(len(values)*4)
|
||||||
|
|
||||||
return w.section(l, flagStd, func(wr io.Writer) error {
|
return w.section(l, flagStd, func(wr io.Writer) error {
|
||||||
// First byte indicates tuple size for index.
|
// First byte indicates tuple size for index.
|
||||||
if err := w.write(wr, []byte{byte(len(names))}); err != nil {
|
if err := w.write(wr, buf[:n]); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
buf := make([]byte, 4)
|
|
||||||
|
|
||||||
for _, v := range valt.s {
|
for _, v := range valt.s {
|
||||||
binary.BigEndian.PutUint32(buf, w.symbols[v])
|
binary.BigEndian.PutUint32(buf, w.symbols[v])
|
||||||
|
|
Loading…
Reference in New Issue