mirror of
https://github.com/prometheus/prometheus
synced 2025-03-11 07:59:57 +00:00
Merge pull request #6475 from prometheus/constant-compaction
Constant memory compaction
This commit is contained in:
commit
92f31c3ec2
@ -50,10 +50,6 @@ type IndexWriter interface {
|
|||||||
// are added later.
|
// are added later.
|
||||||
AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
|
AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
|
||||||
|
|
||||||
// WriteLabelIndex serializes an index from label names to values.
|
|
||||||
// The passed in values chained tuples of strings of the length of names.
|
|
||||||
WriteLabelIndex(names []string, values []string) error
|
|
||||||
|
|
||||||
// Close writes any finalization and closes the resources associated with
|
// Close writes any finalization and closes the resources associated with
|
||||||
// the underlying writer.
|
// the underlying writer.
|
||||||
Close() error
|
Close() error
|
||||||
|
@ -722,11 +722,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||||||
symbols = newMergedStringIter(symbols, syms)
|
symbols = newMergedStringIter(symbols, syms)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
values = map[string]stringset{}
|
|
||||||
ref = uint64(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
for symbols.Next() {
|
for symbols.Next() {
|
||||||
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
||||||
return errors.Wrap(err, "add symbol")
|
return errors.Wrap(err, "add symbol")
|
||||||
@ -737,6 +732,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||||||
}
|
}
|
||||||
|
|
||||||
delIter := &deletedIterator{}
|
delIter := &deletedIterator{}
|
||||||
|
ref := uint64(0)
|
||||||
for set.Next() {
|
for set.Next() {
|
||||||
select {
|
select {
|
||||||
case <-c.ctx.Done():
|
case <-c.ctx.Done():
|
||||||
@ -836,33 +832,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, l := range lset {
|
|
||||||
valset, ok := values[l.Name]
|
|
||||||
if !ok {
|
|
||||||
valset = stringset{}
|
|
||||||
values[l.Name] = valset
|
|
||||||
}
|
|
||||||
valset.set(l.Value)
|
|
||||||
}
|
|
||||||
|
|
||||||
ref++
|
ref++
|
||||||
}
|
}
|
||||||
if set.Err() != nil {
|
if set.Err() != nil {
|
||||||
return errors.Wrap(set.Err(), "iterate compaction set")
|
return errors.Wrap(set.Err(), "iterate compaction set")
|
||||||
}
|
}
|
||||||
|
|
||||||
s := make([]string, 0, 256)
|
|
||||||
for n, v := range values {
|
|
||||||
s = s[:0]
|
|
||||||
|
|
||||||
for x := range v {
|
|
||||||
s = append(s, x)
|
|
||||||
}
|
|
||||||
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
|
||||||
return errors.Wrap(err, "write label index")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,6 +15,7 @@ package index
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"hash"
|
"hash"
|
||||||
@ -70,8 +71,6 @@ const (
|
|||||||
idxStageNone indexWriterStage = iota
|
idxStageNone indexWriterStage = iota
|
||||||
idxStageSymbols
|
idxStageSymbols
|
||||||
idxStageSeries
|
idxStageSeries
|
||||||
idxStageLabelIndex
|
|
||||||
idxStagePostings
|
|
||||||
idxStageDone
|
idxStageDone
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -83,10 +82,6 @@ func (s indexWriterStage) String() string {
|
|||||||
return "symbols"
|
return "symbols"
|
||||||
case idxStageSeries:
|
case idxStageSeries:
|
||||||
return "series"
|
return "series"
|
||||||
case idxStageLabelIndex:
|
|
||||||
return "label index"
|
|
||||||
case idxStagePostings:
|
|
||||||
return "postings"
|
|
||||||
case idxStageDone:
|
case idxStageDone:
|
||||||
return "done"
|
return "done"
|
||||||
}
|
}
|
||||||
@ -111,13 +106,20 @@ func newCRC32() hash.Hash32 {
|
|||||||
// Writer implements the IndexWriter interface for the standard
|
// Writer implements the IndexWriter interface for the standard
|
||||||
// serialization format.
|
// serialization format.
|
||||||
type Writer struct {
|
type Writer struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
f *os.File
|
|
||||||
fbuf *bufio.Writer
|
|
||||||
pos uint64
|
|
||||||
|
|
||||||
toc TOC
|
// For the main index file.
|
||||||
stage indexWriterStage
|
f *fileWriter
|
||||||
|
|
||||||
|
// Temporary file for postings.
|
||||||
|
fP *fileWriter
|
||||||
|
// Temporary file for posting offsets table.
|
||||||
|
fPO *fileWriter
|
||||||
|
cntPO uint64
|
||||||
|
|
||||||
|
toc TOC
|
||||||
|
stage indexWriterStage
|
||||||
|
postingsStart uint64 // Due to padding, can differ from TOC entry.
|
||||||
|
|
||||||
// Reusable memory.
|
// Reusable memory.
|
||||||
buf1 encoding.Encbuf
|
buf1 encoding.Encbuf
|
||||||
@ -128,9 +130,8 @@ type Writer struct {
|
|||||||
symbolFile *fileutil.MmapFile
|
symbolFile *fileutil.MmapFile
|
||||||
lastSymbol string
|
lastSymbol string
|
||||||
|
|
||||||
labelIndexes []labelIndexHashEntry // label index offsets
|
labelIndexes []labelIndexHashEntry // Label index offsets.
|
||||||
postings []postingsHashEntry // postings lists offsets
|
labelNames map[string]uint64 // Label names, and their usage.
|
||||||
labelNames map[string]uint64 // label names, and their usage
|
|
||||||
|
|
||||||
// Hold last series to validate that clients insert new series in order.
|
// Hold last series to validate that clients insert new series in order.
|
||||||
lastSeries labels.Labels
|
lastSeries labels.Labels
|
||||||
@ -193,7 +194,18 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|||||||
return nil, errors.Wrap(err, "remove any existing index at path")
|
return nil, errors.Wrap(err, "remove any existing index at path")
|
||||||
}
|
}
|
||||||
|
|
||||||
f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666)
|
// Main index file we are building.
|
||||||
|
f, err := newFileWriter(fn)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Temporary file for postings.
|
||||||
|
fP, err := newFileWriter(fn + "_tmp_p")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Temporary file for posting offset table.
|
||||||
|
fPO, err := newFileWriter(fn + "_tmp_po")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -204,15 +216,14 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|||||||
iw := &Writer{
|
iw := &Writer{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
f: f,
|
f: f,
|
||||||
fbuf: bufio.NewWriterSize(f, 1<<22),
|
fP: fP,
|
||||||
pos: 0,
|
fPO: fPO,
|
||||||
stage: idxStageNone,
|
stage: idxStageNone,
|
||||||
|
|
||||||
// Reusable memory.
|
// Reusable memory.
|
||||||
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||||
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||||
|
|
||||||
// Caches.
|
|
||||||
labelNames: make(map[string]uint64, 1<<8),
|
labelNames: make(map[string]uint64, 1<<8),
|
||||||
crc32: newCRC32(),
|
crc32: newCRC32(),
|
||||||
}
|
}
|
||||||
@ -223,9 +234,41 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) write(bufs ...[]byte) error {
|
func (w *Writer) write(bufs ...[]byte) error {
|
||||||
|
return w.f.write(bufs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) writeAt(buf []byte, pos uint64) error {
|
||||||
|
return w.f.writeAt(buf, pos)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) addPadding(size int) error {
|
||||||
|
return w.f.addPadding(size)
|
||||||
|
}
|
||||||
|
|
||||||
|
type fileWriter struct {
|
||||||
|
f *os.File
|
||||||
|
fbuf *bufio.Writer
|
||||||
|
pos uint64
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFileWriter(name string) (*fileWriter, error) {
|
||||||
|
f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &fileWriter{
|
||||||
|
f: f,
|
||||||
|
fbuf: bufio.NewWriterSize(f, 1<<22),
|
||||||
|
pos: 0,
|
||||||
|
name: name,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fw *fileWriter) write(bufs ...[]byte) error {
|
||||||
for _, b := range bufs {
|
for _, b := range bufs {
|
||||||
n, err := w.fbuf.Write(b)
|
n, err := fw.fbuf.Write(b)
|
||||||
w.pos += uint64(n)
|
fw.pos += uint64(n)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -233,29 +276,47 @@ func (w *Writer) write(bufs ...[]byte) error {
|
|||||||
// offset references in v1 are only 4 bytes large.
|
// offset references in v1 are only 4 bytes large.
|
||||||
// Once we move to compressed/varint representations in those areas, this limitation
|
// Once we move to compressed/varint representations in those areas, this limitation
|
||||||
// can be lifted.
|
// can be lifted.
|
||||||
if w.pos > 16*math.MaxUint32 {
|
if fw.pos > 16*math.MaxUint32 {
|
||||||
return errors.Errorf("exceeding max size of 64GiB")
|
return errors.Errorf("%q exceeding max size of 64GiB", fw.name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) writeAt(buf []byte, pos uint64) error {
|
func (fw *fileWriter) flush() error {
|
||||||
if err := w.fbuf.Flush(); err != nil {
|
return fw.fbuf.Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fw *fileWriter) writeAt(buf []byte, pos uint64) error {
|
||||||
|
if err := fw.flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err := w.f.WriteAt(buf, int64(pos))
|
_, err := fw.f.WriteAt(buf, int64(pos))
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// addPadding adds zero byte padding until the file size is a multiple size.
|
// addPadding adds zero byte padding until the file size is a multiple size.
|
||||||
func (w *Writer) addPadding(size int) error {
|
func (fw *fileWriter) addPadding(size int) error {
|
||||||
p := w.pos % uint64(size)
|
p := fw.pos % uint64(size)
|
||||||
if p == 0 {
|
if p == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p = uint64(size) - p
|
p = uint64(size) - p
|
||||||
return errors.Wrap(w.write(make([]byte, p)), "add padding")
|
return errors.Wrap(fw.write(make([]byte, p)), "add padding")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fw *fileWriter) close() error {
|
||||||
|
if err := fw.flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := fw.f.Sync(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return fw.f.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fw *fileWriter) remove() error {
|
||||||
|
return os.Remove(fw.name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
// ensureStage handles transitions between write stages and ensures that IndexWriter
|
||||||
@ -270,6 +331,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|||||||
if w.stage == s {
|
if w.stage == s {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if w.stage < s-1 {
|
||||||
|
// A stage has been skipped.
|
||||||
|
if err := w.ensureStage(s - 1); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
if w.stage > s {
|
if w.stage > s {
|
||||||
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
return errors.Errorf("invalid stage %q, currently at %q", s, w.stage)
|
||||||
}
|
}
|
||||||
@ -277,7 +344,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|||||||
// Mark start of sections in table of contents.
|
// Mark start of sections in table of contents.
|
||||||
switch s {
|
switch s {
|
||||||
case idxStageSymbols:
|
case idxStageSymbols:
|
||||||
w.toc.Symbols = w.pos
|
w.toc.Symbols = w.f.pos
|
||||||
if err := w.startSymbols(); err != nil {
|
if err := w.startSymbols(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -285,22 +352,30 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
|||||||
if err := w.finishSymbols(); err != nil {
|
if err := w.finishSymbols(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.toc.Series = w.pos
|
w.toc.Series = w.f.pos
|
||||||
|
|
||||||
case idxStageLabelIndex:
|
|
||||||
w.toc.LabelIndices = w.pos
|
|
||||||
|
|
||||||
case idxStageDone:
|
case idxStageDone:
|
||||||
w.toc.Postings = w.pos
|
w.toc.LabelIndices = w.f.pos
|
||||||
|
// LabelIndices generation depends on the posting offset
|
||||||
|
// table produced at this stage.
|
||||||
|
if err := w.writePostingsToTmpFiles(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.writeLabelIndices(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
w.toc.Postings = w.f.pos
|
||||||
if err := w.writePostings(); err != nil {
|
if err := w.writePostings(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.toc.LabelIndicesTable = w.pos
|
w.toc.LabelIndicesTable = w.f.pos
|
||||||
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
if err := w.writeLabelIndexesOffsetTable(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.toc.PostingsTable = w.pos
|
|
||||||
|
w.toc.PostingsTable = w.f.pos
|
||||||
if err := w.writePostingsOffsetTable(); err != nil {
|
if err := w.writePostingsOffsetTable(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -339,15 +414,14 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
|||||||
return errors.Errorf("failed to write padding bytes: %v", err)
|
return errors.Errorf("failed to write padding bytes: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.pos%16 != 0 {
|
if w.f.pos%16 != 0 {
|
||||||
return errors.Errorf("series write not 16-byte aligned at %d", w.pos)
|
return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos)
|
||||||
}
|
}
|
||||||
|
|
||||||
w.buf2.Reset()
|
w.buf2.Reset()
|
||||||
w.buf2.PutUvarint(len(lset))
|
w.buf2.PutUvarint(len(lset))
|
||||||
|
|
||||||
for _, l := range lset {
|
for _, l := range lset {
|
||||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
|
||||||
index, err := w.symbols.ReverseLookup(l.Name)
|
index, err := w.symbols.ReverseLookup(l.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
|
return errors.Errorf("symbol entry for %q does not exist, %v", l.Name, err)
|
||||||
@ -421,24 +495,24 @@ func (w *Writer) AddSymbol(sym string) error {
|
|||||||
func (w *Writer) finishSymbols() error {
|
func (w *Writer) finishSymbols() error {
|
||||||
// Write out the length and symbol count.
|
// Write out the length and symbol count.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4))
|
w.buf1.PutBE32int(int(w.f.pos - w.toc.Symbols - 4))
|
||||||
w.buf1.PutBE32int(int(w.numSymbols))
|
w.buf1.PutBE32int(int(w.numSymbols))
|
||||||
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
|
if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
hashPos := w.pos
|
hashPos := w.f.pos
|
||||||
// Leave space for the hash. We can only calculate it
|
// Leave space for the hash. We can only calculate it
|
||||||
// now that the number of symbols is known, so mmap and do it from there.
|
// now that the number of symbols is known, so mmap and do it from there.
|
||||||
if err := w.write([]byte("hash")); err != nil {
|
if err := w.write([]byte("hash")); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := w.fbuf.Flush(); err != nil {
|
if err := w.f.flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name())
|
w.symbolFile, err = fileutil.OpenMmapFile(w.f.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -457,31 +531,71 @@ func (w *Writer) finishSymbols() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
func (w *Writer) writeLabelIndices() error {
|
||||||
if len(values)%len(names) != 0 {
|
if err := w.fPO.flush(); err != nil {
|
||||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
return err
|
||||||
}
|
|
||||||
if err := w.ensureStage(idxStageLabelIndex); err != nil {
|
|
||||||
return errors.Wrap(err, "ensure stage")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
valt, err := NewStringTuples(values, len(names))
|
// Find all the label values in the tmp posting offset table.
|
||||||
|
f, err := fileutil.OpenMmapFile(w.fPO.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sort.Sort(valt)
|
defer f.Close()
|
||||||
|
|
||||||
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
|
||||||
|
cnt := w.cntPO
|
||||||
|
current := []byte{}
|
||||||
|
values := []uint32{}
|
||||||
|
for d.Err() == nil && cnt > 0 {
|
||||||
|
cnt--
|
||||||
|
d.Uvarint() // Keycount.
|
||||||
|
name := d.UvarintBytes() // Label name.
|
||||||
|
value := yoloString(d.UvarintBytes()) // Label value.
|
||||||
|
d.Uvarint64() // Offset.
|
||||||
|
if len(name) == 0 {
|
||||||
|
continue // All index is ignored.
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(name, current) && len(values) > 0 {
|
||||||
|
// We've reached a new label name.
|
||||||
|
if err := w.writeLabelIndex(string(current), values); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
values = values[:0]
|
||||||
|
}
|
||||||
|
current = name
|
||||||
|
sid, err := w.symbols.ReverseLookup(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
values = append(values, sid)
|
||||||
|
}
|
||||||
|
if d.Err() != nil {
|
||||||
|
return d.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle the last label.
|
||||||
|
if len(values) > 0 {
|
||||||
|
if err := w.writeLabelIndex(string(current), values); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) writeLabelIndex(name string, values []uint32) error {
|
||||||
// Align beginning to 4 bytes for more efficient index list scans.
|
// Align beginning to 4 bytes for more efficient index list scans.
|
||||||
if err := w.addPadding(4); err != nil {
|
if err := w.addPadding(4); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
|
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
|
||||||
keys: names,
|
keys: []string{name},
|
||||||
offset: w.pos,
|
offset: w.f.pos,
|
||||||
})
|
})
|
||||||
|
|
||||||
startPos := w.pos
|
startPos := w.f.pos
|
||||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||||
if err := w.write([]byte("alen")); err != nil {
|
if err := w.write([]byte("alen")); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -489,21 +603,16 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|||||||
w.crc32.Reset()
|
w.crc32.Reset()
|
||||||
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(len(names))
|
w.buf1.PutBE32int(1) // Number of names.
|
||||||
w.buf1.PutBE32int(valt.Len())
|
w.buf1.PutBE32int(len(values))
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
for _, v := range values {
|
||||||
for _, v := range valt.entries {
|
|
||||||
sid, err := w.symbols.ReverseLookup(v)
|
|
||||||
if err != nil {
|
|
||||||
return errors.Errorf("symbol entry for %q does not exist: %v", v, err)
|
|
||||||
}
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32(sid)
|
w.buf1.PutBE32(v)
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -512,7 +621,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|||||||
|
|
||||||
// Write out the length.
|
// Write out the length.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
|
||||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -524,7 +633,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
|||||||
|
|
||||||
// writeLabelIndexesOffsetTable writes the label indices offset table.
|
// writeLabelIndexesOffsetTable writes the label indices offset table.
|
||||||
func (w *Writer) writeLabelIndexesOffsetTable() error {
|
func (w *Writer) writeLabelIndexesOffsetTable() error {
|
||||||
startPos := w.pos
|
startPos := w.f.pos
|
||||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||||
if err := w.write([]byte("alen")); err != nil {
|
if err := w.write([]byte("alen")); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -552,7 +661,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
|
|||||||
}
|
}
|
||||||
// Write out the length.
|
// Write out the length.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
|
||||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -564,39 +673,69 @@ func (w *Writer) writeLabelIndexesOffsetTable() error {
|
|||||||
|
|
||||||
// writePostingsOffsetTable writes the postings offset table.
|
// writePostingsOffsetTable writes the postings offset table.
|
||||||
func (w *Writer) writePostingsOffsetTable() error {
|
func (w *Writer) writePostingsOffsetTable() error {
|
||||||
startPos := w.pos
|
// Ensure everything is in the temporary file.
|
||||||
|
if err := w.fPO.flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
startPos := w.f.pos
|
||||||
// Leave 4 bytes of space for the length, which will be calculated later.
|
// Leave 4 bytes of space for the length, which will be calculated later.
|
||||||
if err := w.write([]byte("alen")); err != nil {
|
if err := w.write([]byte("alen")); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
w.crc32.Reset()
|
|
||||||
|
// Copy over the tmp posting offset table, however we need to
|
||||||
|
// adjust the offsets.
|
||||||
|
adjustment := w.postingsStart
|
||||||
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(len(w.postings))
|
w.crc32.Reset()
|
||||||
|
w.buf1.PutBE32int(int(w.cntPO)) // Count.
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, e := range w.postings {
|
f, err := fileutil.OpenMmapFile(w.fPO.name)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
d := encoding.NewDecbufRaw(realByteSlice(f.Bytes()), int(w.fPO.pos))
|
||||||
|
cnt := w.cntPO
|
||||||
|
for d.Err() == nil && cnt > 0 {
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutUvarint(2)
|
w.buf1.PutUvarint(d.Uvarint()) // Keycount.
|
||||||
w.buf1.PutUvarintStr(e.name)
|
w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label name.
|
||||||
w.buf1.PutUvarintStr(e.value)
|
w.buf1.PutUvarintStr(yoloString(d.UvarintBytes())) // Label value.
|
||||||
w.buf1.PutUvarint64(e.offset)
|
w.buf1.PutUvarint64(d.Uvarint64() + adjustment) // Offset.
|
||||||
w.buf1.WriteToHash(w.crc32)
|
w.buf1.WriteToHash(w.crc32)
|
||||||
if err := w.write(w.buf1.Get()); err != nil {
|
if err := w.write(w.buf1.Get()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
cnt--
|
||||||
}
|
}
|
||||||
|
if d.Err() != nil {
|
||||||
|
return d.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup temporary file.
|
||||||
|
if err := w.fPO.close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.fPO.remove(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.fPO = nil
|
||||||
|
|
||||||
// Write out the length.
|
// Write out the length.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(int(w.pos - startPos - 4))
|
w.buf1.PutBE32int(int(w.f.pos - startPos - 4))
|
||||||
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
if err := w.writeAt(w.buf1.Get(), startPos); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Finally write the hash.
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutHashSum(w.crc32)
|
w.buf1.PutHashSum(w.crc32)
|
||||||
return w.write(w.buf1.Get())
|
return w.write(w.buf1.Get())
|
||||||
@ -619,17 +758,17 @@ func (w *Writer) writeTOC() error {
|
|||||||
return w.write(w.buf1.Get())
|
return w.write(w.buf1.Get())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Writer) writePostings() error {
|
func (w *Writer) writePostingsToTmpFiles() error {
|
||||||
names := make([]string, 0, len(w.labelNames))
|
names := make([]string, 0, len(w.labelNames))
|
||||||
for n := range w.labelNames {
|
for n := range w.labelNames {
|
||||||
names = append(names, n)
|
names = append(names, n)
|
||||||
}
|
}
|
||||||
sort.Strings(names)
|
sort.Strings(names)
|
||||||
|
|
||||||
if err := w.fbuf.Flush(); err != nil {
|
if err := w.f.flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
f, err := fileutil.OpenMmapFile(w.f.Name())
|
f, err := fileutil.OpenMmapFile(w.f.name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -647,9 +786,10 @@ func (w *Writer) writePostings() error {
|
|||||||
}
|
}
|
||||||
offsets = append(offsets, uint32(startPos/16))
|
offsets = append(offsets, uint32(startPos/16))
|
||||||
// Skip to next series.
|
// Skip to next series.
|
||||||
d.Skip(d.Uvarint() + crc32.Size)
|
x := d.Uvarint()
|
||||||
|
d.Skip(x + crc32.Size)
|
||||||
if err := d.Err(); err != nil {
|
if err := d.Err(); err != nil {
|
||||||
return nil
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := w.writePosting("", "", offsets); err != nil {
|
if err := w.writePosting("", "", offsets); err != nil {
|
||||||
@ -745,15 +885,20 @@ func (w *Writer) writePostings() error {
|
|||||||
|
|
||||||
func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
||||||
// Align beginning to 4 bytes for more efficient postings list scans.
|
// Align beginning to 4 bytes for more efficient postings list scans.
|
||||||
if err := w.addPadding(4); err != nil {
|
if err := w.fP.addPadding(4); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.postings = append(w.postings, postingsHashEntry{
|
// Write out postings offset table to temporary file as we go.
|
||||||
name: name,
|
w.buf1.Reset()
|
||||||
value: value,
|
w.buf1.PutUvarint(2)
|
||||||
offset: w.pos,
|
w.buf1.PutUvarintStr(name)
|
||||||
})
|
w.buf1.PutUvarintStr(value)
|
||||||
|
w.buf1.PutUvarint64(w.fP.pos) // This is relative to the postings tmp file, not the final index file.
|
||||||
|
if err := w.fPO.write(w.buf1.Get()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.cntPO++
|
||||||
|
|
||||||
w.buf1.Reset()
|
w.buf1.Reset()
|
||||||
w.buf1.PutBE32int(len(offs))
|
w.buf1.PutBE32int(len(offs))
|
||||||
@ -768,7 +913,41 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error {
|
|||||||
w.buf2.Reset()
|
w.buf2.Reset()
|
||||||
w.buf2.PutBE32int(w.buf1.Len())
|
w.buf2.PutBE32int(w.buf1.Len())
|
||||||
w.buf1.PutHash(w.crc32)
|
w.buf1.PutHash(w.crc32)
|
||||||
return w.write(w.buf2.Get(), w.buf1.Get())
|
return w.fP.write(w.buf2.Get(), w.buf1.Get())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Writer) writePostings() error {
|
||||||
|
// There's padding in the tmp file, make sure it actually works.
|
||||||
|
if err := w.f.addPadding(4); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.postingsStart = w.f.pos
|
||||||
|
|
||||||
|
// Copy temporary file into main index.
|
||||||
|
if err := w.fP.flush(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := w.fP.f.Seek(0, 0); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Don't need to calculate a checksum, so can copy directly.
|
||||||
|
n, err := io.CopyBuffer(w.f.fbuf, w.fP.f, make([]byte, 1<<20))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if uint64(n) != w.fP.pos {
|
||||||
|
return errors.Errorf("wrote %d bytes to posting temporary file, but only read back %d", w.fP.pos, n)
|
||||||
|
}
|
||||||
|
w.f.pos += uint64(n)
|
||||||
|
|
||||||
|
if err := w.fP.close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := w.fP.remove(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.fP = nil
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type uint32slice []uint32
|
type uint32slice []uint32
|
||||||
@ -782,11 +961,6 @@ type labelIndexHashEntry struct {
|
|||||||
offset uint64
|
offset uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
type postingsHashEntry struct {
|
|
||||||
name, value string
|
|
||||||
offset uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Writer) Close() error {
|
func (w *Writer) Close() error {
|
||||||
if err := w.ensureStage(idxStageDone); err != nil {
|
if err := w.ensureStage(idxStageDone); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -796,13 +970,17 @@ func (w *Writer) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := w.fbuf.Flush(); err != nil {
|
if w.fP != nil {
|
||||||
return err
|
if err := w.fP.close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if err := w.f.Sync(); err != nil {
|
if w.fPO != nil {
|
||||||
return err
|
if err := w.fPO.close(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return w.f.Close()
|
return w.f.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// StringTuples provides access to a sorted list of string tuples.
|
// StringTuples provides access to a sorted list of string tuples.
|
||||||
|
@ -83,16 +83,6 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta)
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
|
|
||||||
// TODO support composite indexes
|
|
||||||
if len(names) != 1 {
|
|
||||||
return errors.New("composite indexes not supported yet")
|
|
||||||
}
|
|
||||||
sort.Strings(values)
|
|
||||||
m.labelIndex[names[0]] = values
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m mockIndex) Close() error {
|
func (m mockIndex) Close() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -200,9 +190,6 @@ func TestIndexRW_Postings(t *testing.T) {
|
|||||||
testutil.Ok(t, iw.AddSeries(3, series[2]))
|
testutil.Ok(t, iw.AddSeries(3, series[2]))
|
||||||
testutil.Ok(t, iw.AddSeries(4, series[3]))
|
testutil.Ok(t, iw.AddSeries(4, series[3]))
|
||||||
|
|
||||||
testutil.Ok(t, iw.WriteLabelIndex([]string{"a"}, []string{"1"}))
|
|
||||||
testutil.Ok(t, iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}))
|
|
||||||
|
|
||||||
testutil.Ok(t, iw.Close())
|
testutil.Ok(t, iw.Close())
|
||||||
|
|
||||||
ir, err := NewFileReader(fn)
|
ir, err := NewFileReader(fn)
|
||||||
@ -289,8 +276,6 @@ func TestPostingsMany(t *testing.T) {
|
|||||||
for i, s := range series {
|
for i, s := range series {
|
||||||
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
||||||
}
|
}
|
||||||
err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"})
|
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Ok(t, iw.Close())
|
testutil.Ok(t, iw.Close())
|
||||||
|
|
||||||
ir, err := NewFileReader(fn)
|
ir, err := NewFileReader(fn)
|
||||||
@ -427,17 +412,6 @@ func TestPersistence_index_e2e(t *testing.T) {
|
|||||||
postings.Add(uint64(i), s.labels)
|
postings.Add(uint64(i), s.labels)
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, v := range values {
|
|
||||||
var vals []string
|
|
||||||
for e := range v {
|
|
||||||
vals = append(vals, e)
|
|
||||||
}
|
|
||||||
sort.Strings(vals)
|
|
||||||
|
|
||||||
testutil.Ok(t, iw.WriteLabelIndex([]string{k}, vals))
|
|
||||||
testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals))
|
|
||||||
}
|
|
||||||
|
|
||||||
err = iw.Close()
|
err = iw.Close()
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user