mirror of
https://github.com/prometheus/prometheus
synced 2025-01-12 09:40:00 +00:00
Move writing of index label indices into IndexWriter.
Now you only need to provide symbols and series to IndexWriter. Signed-off-by: Brian Brazil <brian.brazil@robustperception.io>
This commit is contained in:
parent
1733724e30
commit
dee6981a6c
@ -50,10 +50,6 @@ type IndexWriter interface {
|
||||
// are added later.
|
||||
AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta) error
|
||||
|
||||
// WriteLabelIndex serializes an index from label names to values.
|
||||
// The passed in values chained tuples of strings of the length of names.
|
||||
WriteLabelIndex(names []string, values []string) error
|
||||
|
||||
// Close writes any finalization and closes the resources associated with
|
||||
// the underlying writer.
|
||||
Close() error
|
||||
|
@ -722,11 +722,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
symbols = newMergedStringIter(symbols, syms)
|
||||
}
|
||||
|
||||
var (
|
||||
values = map[string]stringset{}
|
||||
ref = uint64(0)
|
||||
)
|
||||
|
||||
for symbols.Next() {
|
||||
if err := indexw.AddSymbol(symbols.At()); err != nil {
|
||||
return errors.Wrap(err, "add symbol")
|
||||
@ -737,6 +732,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
}
|
||||
|
||||
delIter := &deletedIterator{}
|
||||
ref := uint64(0)
|
||||
for set.Next() {
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
@ -836,33 +832,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
|
||||
}
|
||||
}
|
||||
|
||||
for _, l := range lset {
|
||||
valset, ok := values[l.Name]
|
||||
if !ok {
|
||||
valset = stringset{}
|
||||
values[l.Name] = valset
|
||||
}
|
||||
valset.set(l.Value)
|
||||
}
|
||||
|
||||
ref++
|
||||
}
|
||||
if set.Err() != nil {
|
||||
return errors.Wrap(set.Err(), "iterate compaction set")
|
||||
}
|
||||
|
||||
s := make([]string, 0, 256)
|
||||
for n, v := range values {
|
||||
s = s[:0]
|
||||
|
||||
for x := range v {
|
||||
s = append(s, x)
|
||||
}
|
||||
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
|
||||
return errors.Wrap(err, "write label index")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -70,8 +70,6 @@ const (
|
||||
idxStageNone indexWriterStage = iota
|
||||
idxStageSymbols
|
||||
idxStageSeries
|
||||
idxStageLabelIndex
|
||||
idxStagePostings
|
||||
idxStageDone
|
||||
)
|
||||
|
||||
@ -83,10 +81,6 @@ func (s indexWriterStage) String() string {
|
||||
return "symbols"
|
||||
case idxStageSeries:
|
||||
return "series"
|
||||
case idxStageLabelIndex:
|
||||
return "label index"
|
||||
case idxStagePostings:
|
||||
return "postings"
|
||||
case idxStageDone:
|
||||
return "done"
|
||||
}
|
||||
@ -132,9 +126,9 @@ type Writer struct {
|
||||
symbolFile *fileutil.MmapFile
|
||||
lastSymbol string
|
||||
|
||||
labelIndexes []labelIndexHashEntry // label index offsets
|
||||
postings []postingsHashEntry // postings lists offsets
|
||||
labelNames map[string]uint64 // label names, and their usage
|
||||
labelIndexes []labelIndexHashEntry // Label index offsets.
|
||||
labelValues map[string]map[uint32]struct{} // Label names, and their values's symbol indexes.
|
||||
labelNames map[string]uint64 // Label names, and their usage.
|
||||
|
||||
// Hold last series to validate that clients insert new series in order.
|
||||
lastSeries labels.Labels
|
||||
@ -221,9 +215,9 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
|
||||
buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
buf2: encoding.Encbuf{B: make([]byte, 0, 1<<22)},
|
||||
|
||||
// Caches.
|
||||
labelNames: make(map[string]uint64, 1<<8),
|
||||
crc32: newCRC32(),
|
||||
labelNames: make(map[string]uint64, 1<<8),
|
||||
labelValues: make(map[string]map[uint32]struct{}, 1<<8),
|
||||
crc32: newCRC32(),
|
||||
}
|
||||
if err := iw.writeMeta(); err != nil {
|
||||
return nil, err
|
||||
@ -346,10 +340,12 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
|
||||
}
|
||||
w.toc.Series = w.f.pos
|
||||
|
||||
case idxStageLabelIndex:
|
||||
w.toc.LabelIndices = w.f.pos
|
||||
|
||||
case idxStageDone:
|
||||
w.toc.LabelIndices = w.f.pos
|
||||
if err := w.writeLabelIndices(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.toc.Postings = w.f.pos
|
||||
if err := w.writePostings(); err != nil {
|
||||
return err
|
||||
@ -419,6 +415,11 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta
|
||||
return errors.Errorf("symbol entry for %q does not exist, %v", l.Value, err)
|
||||
}
|
||||
w.buf2.PutUvarint32(index)
|
||||
|
||||
if _, ok := w.labelValues[l.Name]; !ok {
|
||||
w.labelValues[l.Name] = map[uint32]struct{}{}
|
||||
}
|
||||
w.labelValues[l.Name][index] = struct{}{}
|
||||
}
|
||||
|
||||
w.buf2.PutUvarint(len(chunks))
|
||||
@ -516,27 +517,34 @@ func (w *Writer) finishSymbols() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
||||
if len(values)%len(names) != 0 {
|
||||
return errors.Errorf("invalid value list length %d for %d names", len(values), len(names))
|
||||
}
|
||||
if err := w.ensureStage(idxStageLabelIndex); err != nil {
|
||||
return errors.Wrap(err, "ensure stage")
|
||||
func (w *Writer) writeLabelIndices() error {
|
||||
names := make([]string, 0, len(w.labelValues))
|
||||
for n := range w.labelValues {
|
||||
names = append(names, n)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
valt, err := NewStringTuples(values, len(names))
|
||||
if err != nil {
|
||||
return err
|
||||
for _, n := range names {
|
||||
values := make([]uint32, 0, len(w.labelValues[n]))
|
||||
for v := range w.labelValues[n] {
|
||||
values = append(values, v)
|
||||
}
|
||||
sort.Sort(uint32slice(values))
|
||||
if err := w.writeLabelIndex(n, values); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
sort.Sort(valt)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) writeLabelIndex(name string, values []uint32) error {
|
||||
// Align beginning to 4 bytes for more efficient index list scans.
|
||||
if err := w.addPadding(4); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{
|
||||
keys: names,
|
||||
keys: []string{name},
|
||||
offset: w.f.pos,
|
||||
})
|
||||
|
||||
@ -548,21 +556,16 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error {
|
||||
w.crc32.Reset()
|
||||
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32int(len(names))
|
||||
w.buf1.PutBE32int(valt.Len())
|
||||
w.buf1.PutBE32int(1) // Number of names.
|
||||
w.buf1.PutBE32int(len(values))
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// here we have an index for the symbol file if v2, otherwise it's an offset
|
||||
for _, v := range valt.entries {
|
||||
sid, err := w.symbols.ReverseLookup(v)
|
||||
if err != nil {
|
||||
return errors.Errorf("symbol entry for %q does not exist: %v", v, err)
|
||||
}
|
||||
for _, v := range values {
|
||||
w.buf1.Reset()
|
||||
w.buf1.PutBE32(sid)
|
||||
w.buf1.PutBE32(v)
|
||||
w.buf1.WriteToHash(w.crc32)
|
||||
if err := w.write(w.buf1.Get()); err != nil {
|
||||
return err
|
||||
|
@ -83,16 +83,6 @@ func (m mockIndex) AddSeries(ref uint64, l labels.Labels, chunks ...chunks.Meta)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockIndex) WriteLabelIndex(names []string, values []string) error {
|
||||
// TODO support composite indexes
|
||||
if len(names) != 1 {
|
||||
return errors.New("composite indexes not supported yet")
|
||||
}
|
||||
sort.Strings(values)
|
||||
m.labelIndex[names[0]] = values
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m mockIndex) Close() error {
|
||||
return nil
|
||||
}
|
||||
@ -200,9 +190,6 @@ func TestIndexRW_Postings(t *testing.T) {
|
||||
testutil.Ok(t, iw.AddSeries(3, series[2]))
|
||||
testutil.Ok(t, iw.AddSeries(4, series[3]))
|
||||
|
||||
testutil.Ok(t, iw.WriteLabelIndex([]string{"a"}, []string{"1"}))
|
||||
testutil.Ok(t, iw.WriteLabelIndex([]string{"b"}, []string{"1", "2", "3", "4"}))
|
||||
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
ir, err := NewFileReader(fn)
|
||||
@ -289,8 +276,6 @@ func TestPostingsMany(t *testing.T) {
|
||||
for i, s := range series {
|
||||
testutil.Ok(t, iw.AddSeries(uint64(i), s))
|
||||
}
|
||||
err = iw.WriteLabelIndex([]string{"foo"}, []string{"bar"})
|
||||
testutil.Ok(t, err)
|
||||
testutil.Ok(t, iw.Close())
|
||||
|
||||
ir, err := NewFileReader(fn)
|
||||
@ -427,17 +412,6 @@ func TestPersistence_index_e2e(t *testing.T) {
|
||||
postings.Add(uint64(i), s.labels)
|
||||
}
|
||||
|
||||
for k, v := range values {
|
||||
var vals []string
|
||||
for e := range v {
|
||||
vals = append(vals, e)
|
||||
}
|
||||
sort.Strings(vals)
|
||||
|
||||
testutil.Ok(t, iw.WriteLabelIndex([]string{k}, vals))
|
||||
testutil.Ok(t, mi.WriteLabelIndex([]string{k}, vals))
|
||||
}
|
||||
|
||||
err = iw.Close()
|
||||
testutil.Ok(t, err)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user