diff --git a/tsdb/index/index.go b/tsdb/index/index.go index fb68c71a2..d28be4f72 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -116,6 +116,12 @@ type Writer struct { fbuf *bufio.Writer pos uint64 + // Temporary file for posting offsets table. + fPO *os.File + fbufPO *bufio.Writer + posPO uint64 + cntPO uint64 + toc TOC stage indexWriterStage @@ -193,7 +199,13 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { return nil, errors.Wrap(err, "remove any existing index at path") } - f, err := os.OpenFile(fn, os.O_CREATE|os.O_WRONLY, 0666) + // Main index file we are building. + f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + // Temporary file for posting offset table. + fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666) if err != nil { return nil, err } @@ -202,11 +214,14 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } iw := &Writer{ - ctx: ctx, - f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, - stage: idxStageNone, + ctx: ctx, + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + fPO: fPO, + fbufPO: bufio.NewWriterSize(fPO, 1<<22), + posPO: 0, + stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, @@ -564,39 +579,60 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { - startPos := w.pos - // Leave 4 bytes of space for the length, which will be calculated later. - if err := w.write([]byte("alen")); err != nil { + // Ensure everything is in the temporary file. + if err := w.fbufPO.Flush(); err != nil { return err } - w.crc32.Reset() w.buf1.Reset() - w.buf1.PutBE32int(len(w.postings)) - w.buf1.WriteToHash(w.crc32) + w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count. if err := w.write(w.buf1.Get()); err != nil { return err } - for _, e := range w.postings { - w.buf1.Reset() - w.buf1.PutUvarint(2) - w.buf1.PutUvarintStr(e.name) - w.buf1.PutUvarintStr(e.value) - w.buf1.PutUvarint64(e.offset) - w.buf1.WriteToHash(w.crc32) - if err := w.write(w.buf1.Get()); err != nil { - return err - } - } - - // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) - if err := w.writeAt(w.buf1.Get(), startPos); err != nil { + w.crc32.Reset() + w.buf1.PutBE32int(int(w.cntPO)) // Count. + w.buf1.WriteToHash(w.crc32) + if err := w.write(w.buf1.Get()); err != nil { + return err + } + // Copy temporary file into main index. + if _, err := w.fPO.Seek(0, 0); err != nil { return err } + buf := make([]byte, 1<<20) + l := 0 + for { + n, err := w.fPO.Read(buf) + if err != nil && err != io.EOF { + return err + } + if n == 0 { + break + } + l += n + w.crc32.Write(buf[:n]) + if err := w.write(buf[:n]); err != nil { + return err + } + } + if w.posPO != uint64(l) { + return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l) + } + + // Cleanup temporary file. + name := w.fPO.Name() + if err := w.fPO.Close(); err != nil { + return err + } + w.fPO = nil + if err := os.Remove(name); err != nil { + return err + } + + // Finally write the hash. w.buf1.Reset() w.buf1.PutHashSum(w.crc32) return w.write(w.buf1.Get()) @@ -749,11 +785,18 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { return err } - w.postings = append(w.postings, postingsHashEntry{ - name: name, - value: value, - offset: w.pos, - }) + // Write out postings offset table to temporary file as we go. + w.buf1.Reset() + w.buf1.PutUvarint(2) + w.buf1.PutUvarintStr(name) + w.buf1.PutUvarintStr(value) + w.buf1.PutUvarint64(w.pos) + if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil { + return err + } else { + w.posPO += uint64(n) + } + w.cntPO++ w.buf1.Reset() w.buf1.PutBE32int(len(offs)) @@ -796,6 +839,11 @@ func (w *Writer) Close() error { return err } } + if w.fPO != nil { + if err := w.fPO.Close(); err != nil { + return err + } + } if err := w.fbuf.Flush(); err != nil { return err }