From 1733724e304b6e201a718ed8eb04da06cfd2aeaf Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 17 Dec 2019 21:54:13 +0000 Subject: [PATCH] Factor out index file writing code. Now that we have more than one file open at a time, deduplicate a bit. Signed-off-by: Brian Brazil --- tsdb/index/index.go | 173 ++++++++++++++++++++++++++------------------ 1 file changed, 104 insertions(+), 69 deletions(-) diff --git a/tsdb/index/index.go b/tsdb/index/index.go index d28be4f72..ed21417d2 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -111,16 +111,14 @@ func newCRC32() hash.Hash32 { // Writer implements the IndexWriter interface for the standard // serialization format. type Writer struct { - ctx context.Context - f *os.File - fbuf *bufio.Writer - pos uint64 + ctx context.Context + + // For the main index file. + f *fileWriter // Temporary file for posting offsets table. - fPO *os.File - fbufPO *bufio.Writer - posPO uint64 - cntPO uint64 + fPO *fileWriter + cntPO uint64 toc TOC stage indexWriterStage @@ -200,12 +198,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } // Main index file we are building. - f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666) + f, err := newFileWriter(fn) if err != nil { return nil, err } // Temporary file for posting offset table. - fPO, err := os.OpenFile(fn+"_tmp_po", os.O_CREATE|os.O_RDWR, 0666) + fPO, err := newFileWriter(fn + "_tmp_po") if err != nil { return nil, err } @@ -214,14 +212,10 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } iw := &Writer{ - ctx: ctx, - f: f, - fbuf: bufio.NewWriterSize(f, 1<<22), - pos: 0, - fPO: fPO, - fbufPO: bufio.NewWriterSize(fPO, 1<<22), - posPO: 0, - stage: idxStageNone, + ctx: ctx, + f: f, + fPO: fPO, + stage: idxStageNone, // Reusable memory. buf1: encoding.Encbuf{B: make([]byte, 0, 1<<22)}, @@ -238,9 +232,41 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) { } func (w *Writer) write(bufs ...[]byte) error { + return w.f.write(bufs...) +} + +func (w *Writer) writeAt(buf []byte, pos uint64) error { + return w.f.writeAt(buf, pos) +} + +func (w *Writer) addPadding(size int) error { + return w.f.addPadding(size) +} + +type fileWriter struct { + f *os.File + fbuf *bufio.Writer + pos uint64 + name string +} + +func newFileWriter(name string) (*fileWriter, error) { + f, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + return nil, err + } + return &fileWriter{ + f: f, + fbuf: bufio.NewWriterSize(f, 1<<22), + pos: 0, + name: name, + }, nil +} + +func (fw *fileWriter) write(bufs ...[]byte) error { for _, b := range bufs { - n, err := w.fbuf.Write(b) - w.pos += uint64(n) + n, err := fw.fbuf.Write(b) + fw.pos += uint64(n) if err != nil { return err } @@ -248,29 +274,47 @@ func (w *Writer) write(bufs ...[]byte) error { // offset references in v1 are only 4 bytes large. // Once we move to compressed/varint representations in those areas, this limitation // can be lifted. - if w.pos > 16*math.MaxUint32 { + if fw.pos > 16*math.MaxUint32 { return errors.Errorf("exceeding max size of 64GiB") } } return nil } -func (w *Writer) writeAt(buf []byte, pos uint64) error { - if err := w.fbuf.Flush(); err != nil { +func (fw *fileWriter) flush() error { + return fw.fbuf.Flush() +} + +func (fw *fileWriter) writeAt(buf []byte, pos uint64) error { + if err := fw.flush(); err != nil { return err } - _, err := w.f.WriteAt(buf, int64(pos)) + _, err := fw.f.WriteAt(buf, int64(pos)) return err } // addPadding adds zero byte padding until the file size is a multiple size. -func (w *Writer) addPadding(size int) error { - p := w.pos % uint64(size) +func (fw *fileWriter) addPadding(size int) error { + p := fw.pos % uint64(size) if p == 0 { return nil } p = uint64(size) - p - return errors.Wrap(w.write(make([]byte, p)), "add padding") + return errors.Wrap(fw.write(make([]byte, p)), "add padding") +} + +func (fw *fileWriter) close() error { + if err := fw.flush(); err != nil { + return err + } + if err := fw.f.Sync(); err != nil { + return err + } + return fw.f.Close() +} + +func (fw *fileWriter) remove() error { + return os.Remove(fw.name) } // ensureStage handles transitions between write stages and ensures that IndexWriter @@ -292,7 +336,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error { // Mark start of sections in table of contents. switch s { case idxStageSymbols: - w.toc.Symbols = w.pos + w.toc.Symbols = w.f.pos if err := w.startSymbols(); err != nil { return err } @@ -300,22 +344,22 @@ func (w *Writer) ensureStage(s indexWriterStage) error { if err := w.finishSymbols(); err != nil { return err } - w.toc.Series = w.pos + w.toc.Series = w.f.pos case idxStageLabelIndex: - w.toc.LabelIndices = w.pos + w.toc.LabelIndices = w.f.pos case idxStageDone: - w.toc.Postings = w.pos + w.toc.Postings = w.f.pos if err := w.writePostings(); err != nil { return err } - w.toc.LabelIndicesTable = w.pos + w.toc.LabelIndicesTable = w.f.pos if err := w.writeLabelIndexesOffsetTable(); err != nil { return err } - w.toc.PostingsTable = w.pos + w.toc.PostingsTable = w.f.pos if err := w.writePostingsOffsetTable(); err != nil { return err } @@ -354,8 +398,8 @@ func (w *Writer) AddSeries(ref uint64, lset labels.Labels, chunks ...chunks.Meta return errors.Errorf("failed to write padding bytes: %v", err) } - if w.pos%16 != 0 { - return errors.Errorf("series write not 16-byte aligned at %d", w.pos) + if w.f.pos%16 != 0 { + return errors.Errorf("series write not 16-byte aligned at %d", w.f.pos) } w.buf2.Reset() @@ -436,24 +480,24 @@ func (w *Writer) AddSymbol(sym string) error { func (w *Writer) finishSymbols() error { // Write out the length and symbol count. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - w.toc.Symbols - 4)) + w.buf1.PutBE32int(int(w.f.pos - w.toc.Symbols - 4)) w.buf1.PutBE32int(int(w.numSymbols)) if err := w.writeAt(w.buf1.Get(), w.toc.Symbols); err != nil { return err } - hashPos := w.pos + hashPos := w.f.pos // Leave space for the hash. We can only calculate it // now that the number of symbols is known, so mmap and do it from there. if err := w.write([]byte("hash")); err != nil { return err } - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } var err error - w.symbolFile, err = fileutil.OpenMmapFile(w.f.Name()) + w.symbolFile, err = fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -493,10 +537,10 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { w.labelIndexes = append(w.labelIndexes, labelIndexHashEntry{ keys: names, - offset: w.pos, + offset: w.f.pos, }) - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -527,7 +571,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -539,7 +583,7 @@ func (w *Writer) WriteLabelIndex(names []string, values []string) error { // writeLabelIndexesOffsetTable writes the label indices offset table. func (w *Writer) writeLabelIndexesOffsetTable() error { - startPos := w.pos + startPos := w.f.pos // Leave 4 bytes of space for the length, which will be calculated later. if err := w.write([]byte("alen")); err != nil { return err @@ -567,7 +611,7 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { } // Write out the length. w.buf1.Reset() - w.buf1.PutBE32int(int(w.pos - startPos - 4)) + w.buf1.PutBE32int(int(w.f.pos - startPos - 4)) if err := w.writeAt(w.buf1.Get(), startPos); err != nil { return err } @@ -580,12 +624,12 @@ func (w *Writer) writeLabelIndexesOffsetTable() error { // writePostingsOffsetTable writes the postings offset table. func (w *Writer) writePostingsOffsetTable() error { // Ensure everything is in the temporary file. - if err := w.fbufPO.Flush(); err != nil { + if err := w.fPO.flush(); err != nil { return err } w.buf1.Reset() - w.buf1.PutBE32int(int(w.posPO) + 4) // Length, including the count. + w.buf1.PutBE32int(int(w.fPO.pos) + 4) // Length, including the count. if err := w.write(w.buf1.Get()); err != nil { return err } @@ -598,14 +642,14 @@ func (w *Writer) writePostingsOffsetTable() error { return err } // Copy temporary file into main index. - if _, err := w.fPO.Seek(0, 0); err != nil { + if _, err := w.fPO.f.Seek(0, 0); err != nil { return err } buf := make([]byte, 1<<20) l := 0 for { - n, err := w.fPO.Read(buf) + n, err := w.fPO.f.Read(buf) if err != nil && err != io.EOF { return err } @@ -618,19 +662,18 @@ func (w *Writer) writePostingsOffsetTable() error { return err } } - if w.posPO != uint64(l) { - return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.posPO, l) + if w.fPO.pos != uint64(l) { + return errors.Errorf("wrote %d bytes to posting offset temporary file, but only read back %d", w.fPO.pos, l) } // Cleanup temporary file. - name := w.fPO.Name() - if err := w.fPO.Close(); err != nil { + if err := w.fPO.close(); err != nil { + return err + } + if err := w.fPO.remove(); err != nil { return err } w.fPO = nil - if err := os.Remove(name); err != nil { - return err - } // Finally write the hash. w.buf1.Reset() @@ -662,10 +705,10 @@ func (w *Writer) writePostings() error { } sort.Strings(names) - if err := w.fbuf.Flush(); err != nil { + if err := w.f.flush(); err != nil { return err } - f, err := fileutil.OpenMmapFile(w.f.Name()) + f, err := fileutil.OpenMmapFile(w.f.name) if err != nil { return err } @@ -790,11 +833,9 @@ func (w *Writer) writePosting(name, value string, offs []uint32) error { w.buf1.PutUvarint(2) w.buf1.PutUvarintStr(name) w.buf1.PutUvarintStr(value) - w.buf1.PutUvarint64(w.pos) - if n, err := w.fbufPO.Write(w.buf1.Get()); err != nil { + w.buf1.PutUvarint64(w.f.pos) + if err := w.fPO.write(w.buf1.Get()); err != nil { return err - } else { - w.posPO += uint64(n) } w.cntPO++ @@ -840,17 +881,11 @@ func (w *Writer) Close() error { } } if w.fPO != nil { - if err := w.fPO.Close(); err != nil { + if err := w.fPO.close(); err != nil { return err } } - if err := w.fbuf.Flush(); err != nil { - return err - } - if err := w.f.Sync(); err != nil { - return err - } - return w.f.Close() + return w.f.close() } // StringTuples provides access to a sorted list of string tuples.