diff --git a/block.go b/block.go index 14275247d..3f92b0863 100644 --- a/block.go +++ b/block.go @@ -64,8 +64,6 @@ type persistedBlock struct { dir string meta BlockMeta - indexf *mmapFile - chunkr *chunkReader indexr *indexReader } @@ -124,24 +122,14 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { if err != nil { return nil, err } - // ir, err := newIndexReader(dir) - // if err != nil { - // return nil, err - // } - - indexf, err := openMmapFile(indexFileName(dir)) + ir, err := newIndexReader(dir) if err != nil { - return nil, errors.Wrap(err, "open index file") - } - ir, err := newIndexReader(indexf.b) - if err != nil { - return nil, errors.Wrap(err, "create index reader") + return nil, err } pb := &persistedBlock{ dir: dir, meta: *meta, - indexf: indexf, chunkr: cr, indexr: ir, } @@ -150,7 +138,7 @@ func newPersistedBlock(dir string) (*persistedBlock, error) { func (pb *persistedBlock) Close() error { err0 := pb.chunkr.Close() - err1 := pb.indexf.Close() + err1 := pb.indexr.Close() if err0 != nil { return err0 diff --git a/compact.go b/compact.go index 67d536b74..4b883b830 100644 --- a/compact.go +++ b/compact.go @@ -163,16 +163,14 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { return err } - indexf, err := os.OpenFile(indexFileName(dir), os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - return errors.Wrap(err, "create index file") - } - - indexw := newIndexWriter(indexf) chunkw, err := newChunkWriter(filepath.Join(dir, "chunks")) if err != nil { return errors.Wrap(err, "open chunk writer") } + indexw, err := newIndexWriter(dir) + if err != nil { + return errors.Wrap(err, "open index writer") + } if err = c.write(dir, blocks, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") @@ -184,12 +182,6 @@ func (c *compactor) compact(dir string, blocks ...Block) (err error) { if err = indexw.Close(); err != nil { return errors.Wrap(err, "close index writer") } - if err = fileutil.Fsync(indexf); err != nil { - return errors.Wrap(err, "fsync index file") - } - if err = indexf.Close(); err != nil { - return errors.Wrap(err, "close index file") - } return nil } diff --git a/reader.go b/reader.go index ee3738b89..d4d816c1f 100644 --- a/reader.go +++ b/reader.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "io" + "path/filepath" "strings" "github.com/fabxc/tsdb/chunks" @@ -26,6 +27,7 @@ type chunkReader struct { // The underlying bytes holding the encoded series data. bs [][]byte + // Closers for resources behind the byte slices. cs []io.Closer } @@ -104,6 +106,9 @@ type IndexReader interface { // LabelIndices returns the label pairs for which indices exist. LabelIndices() ([][]string, error) + + // Close released the underlying resources of the reader. + Close() error } // StringTuples provides access to a sorted list of string tuples. @@ -118,6 +123,9 @@ type indexReader struct { // The underlying byte slice holding the encoded series data. b []byte + // Close that releases the underlying resources of the byte slice. + c io.Closer + // Cached hashmaps of section offsets. labels map[string]uint32 postings map[string]uint32 @@ -128,34 +136,38 @@ var ( errInvalidFlag = fmt.Errorf("invalid flag") ) -func newIndexReader(b []byte) (*indexReader, error) { - if len(b) < 4 { - return nil, errors.Wrap(errInvalidSize, "index header") +// newIndexReader returns a new indexReader on the given directory. +func newIndexReader(dir string) (*indexReader, error) { + f, err := openMmapFile(filepath.Join(dir, "index")) + if err != nil { + return nil, err } - r := &indexReader{b: b} + r := &indexReader{b: f.b, c: f} // Verify magic number. - if m := binary.BigEndian.Uint32(b[:4]); m != MagicIndex { - return nil, fmt.Errorf("invalid magic number %x", m) + if len(f.b) < 4 { + return nil, errors.Wrap(errInvalidSize, "index header") + } + if m := binary.BigEndian.Uint32(r.b[:4]); m != MagicIndex { + return nil, errors.Errorf("invalid magic number %x", m) } - var err error // The last two 4 bytes hold the pointers to the hashmaps. - loff := binary.BigEndian.Uint32(b[len(b)-8 : len(b)-4]) - poff := binary.BigEndian.Uint32(b[len(b)-4:]) + loff := binary.BigEndian.Uint32(r.b[len(r.b)-8 : len(r.b)-4]) + poff := binary.BigEndian.Uint32(r.b[len(r.b)-4:]) - f, b, err := r.section(loff) + flag, b, err := r.section(loff) if err != nil { return nil, errors.Wrapf(err, "label index hashmap section at %d", loff) } - if r.labels, err = readHashmap(f, b); err != nil { + if r.labels, err = readHashmap(flag, b); err != nil { return nil, errors.Wrap(err, "read label index hashmap") } - f, b, err = r.section(poff) + flag, b, err = r.section(poff) if err != nil { return nil, errors.Wrapf(err, "postings hashmap section at %d", loff) } - if r.postings, err = readHashmap(f, b); err != nil { + if r.postings, err = readHashmap(flag, b); err != nil { return nil, errors.Wrap(err, "read postings hashmap") } @@ -193,6 +205,10 @@ func readHashmap(flag byte, b []byte) (map[string]uint32, error) { return h, nil } +func (r *indexReader) Close() error { + return r.c.Close() +} + func (r *indexReader) section(o uint32) (byte, []byte, error) { b := r.b[o:] diff --git a/writer.go b/writer.go index 6b98b2b95..bafb0e8cd 100644 --- a/writer.go +++ b/writer.go @@ -7,6 +7,7 @@ import ( "hash/crc32" "io" "os" + "path/filepath" "sort" "strings" @@ -35,9 +36,6 @@ type ChunkWriter interface { // is set and can be used to retrieve the chunks from the written data. WriteChunks(chunks ...ChunkMeta) error - // Size returns the size of the data written so far. - Size() int64 - // Close writes any required finalization and closes the resources // associated with the underlying writer. Close() error @@ -214,10 +212,6 @@ func (w *chunkWriter) seq() int { return len(w.files) - 1 } -func (w *chunkWriter) Size() int64 { - return w.n -} - func (w *chunkWriter) Close() error { return w.finalizeTail() } @@ -240,7 +234,7 @@ type IndexWriter interface { // of chunks that the index can reference. // The reference number is used to resolve a series against the postings // list iterator. It only has to be available during the write processing. - AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) + AddSeries(ref uint32, l labels.Labels, chunks ...ChunkMeta) error // WriteLabelIndex serializes an index from label names to values. // The passed in values chained tuples of strings of the length of names. @@ -249,9 +243,6 @@ type IndexWriter interface { // WritePostings writes a postings list for a single label pair. WritePostings(name, value string, it Postings) error - // Size returns the size of the data written so far. - Size() int64 - // Close writes any finalization and closes theresources associated with // the underlying writer. Close() error @@ -266,13 +257,12 @@ type indexWriterSeries struct { // indexWriter implements the IndexWriter interface for the standard // serialization format. type indexWriter struct { - ow io.Writer - w *bufio.Writer + f *os.File + bufw *bufio.Writer n int64 started bool - series map[uint32]*indexWriterSeries - + series map[uint32]*indexWriterSeries symbols map[string]uint32 // symbol offsets labelIndexes []hashEntry // label index offsets postings []hashEntry // postings lists offsets @@ -280,15 +270,31 @@ type indexWriter struct { crc32 hash.Hash } -func newIndexWriter(w io.Writer) *indexWriter { - return &indexWriter{ - w: bufio.NewWriterSize(w, 1*1024*1024), - ow: w, +func newIndexWriter(dir string) (*indexWriter, error) { + df, err := fileutil.OpenDir(dir) + if err != nil { + return nil, err + } + f, err := os.OpenFile(filepath.Join(dir, "index"), os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return nil, err + } + if err := fileutil.Fsync(df); err != nil { + return nil, errors.Wrap(err, "sync dir") + } + + iw := &indexWriter{ + f: f, + bufw: bufio.NewWriterSize(f, 1*1024*1024), n: 0, symbols: make(map[string]uint32, 4096), series: make(map[uint32]*indexWriterSeries, 4096), crc32: crc32.New(crc32.MakeTable(crc32.Castagnoli)), } + if err := iw.writeMeta(); err != nil { + return nil, err + } + return iw, nil } func (w *indexWriter) write(wr io.Writer, b []byte) error { @@ -300,7 +306,7 @@ func (w *indexWriter) write(wr io.Writer, b []byte) error { // section writes a CRC32 checksummed section of length l and guarded by flag. func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) error { w.crc32.Reset() - wr := io.MultiWriter(w.crc32, w.w) + wr := io.MultiWriter(w.crc32, w.bufw) b := [5]byte{flag, 0, 0, 0, 0} binary.BigEndian.PutUint32(b[1:], l) @@ -310,9 +316,9 @@ func (w *indexWriter) section(l uint32, flag byte, f func(w io.Writer) error) er } if err := f(wr); err != nil { - return errors.Wrap(err, "contents write func") + return errors.Wrap(err, "write contents") } - if err := w.write(w.w, w.crc32.Sum(nil)); err != nil { + if err := w.write(w.bufw, w.crc32.Sum(nil)); err != nil { return errors.Wrap(err, "writing checksum") } return nil @@ -324,10 +330,13 @@ func (w *indexWriter) writeMeta() error { binary.BigEndian.PutUint32(b[:4], MagicIndex) b[4] = flagStd - return w.write(w.w, b[:]) + return w.write(w.bufw, b[:]) } -func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) { +func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkMeta) error { + if _, ok := w.series[ref]; ok { + return errors.Errorf("series with reference %d already added", ref) + } // Populate the symbol table from all label sets we have to reference. for _, l := range lset { w.symbols[l.Name] = 0 @@ -338,6 +347,7 @@ func (w *indexWriter) AddSeries(ref uint32, lset labels.Labels, chunks ...ChunkM labels: lset, chunks: chunks, } + return nil } func (w *indexWriter) writeSymbols() error { @@ -425,9 +435,6 @@ func (w *indexWriter) writeSeries() error { } func (w *indexWriter) init() error { - if err := w.writeMeta(); err != nil { - return err - } if err := w.writeSymbols(); err != nil { return err } @@ -524,10 +531,6 @@ func (w *indexWriter) WritePostings(name, value string, it Postings) error { }) } -func (w *indexWriter) Size() int64 { - return w.n -} - type hashEntry struct { name string offset uint32 @@ -567,24 +570,22 @@ func (w *indexWriter) finalize() error { // for any index query. // TODO(fabxc): also store offset to series section to allow plain // iteration over all existing series? - // TODO(fabxc): store references like these that are not resolved via direct - // mmap using explicit endianness? b := [8]byte{} binary.BigEndian.PutUint32(b[:4], lo) binary.BigEndian.PutUint32(b[4:], po) - return w.write(w.w, b[:]) + return w.write(w.bufw, b[:]) } func (w *indexWriter) Close() error { - // Handle blocks without any data. - if !w.started { - if err := w.init(); err != nil { - return err - } - } if err := w.finalize(); err != nil { return err } - return w.w.Flush() + if err := w.bufw.Flush(); err != nil { + return err + } + if err := fileutil.Fsync(w.f); err != nil { + return err + } + return w.f.Close() }