diff --git a/db.go b/db.go index 065205760..074cd8021 100644 --- a/db.go +++ b/db.go @@ -3,13 +3,11 @@ package tsdb import ( "encoding/binary" - "path/filepath" "sync" "time" "github.com/fabxc/tsdb/chunks" "github.com/prometheus/common/log" - "github.com/prometheus/common/model" ) // DefaultOptions used for the DB. @@ -27,10 +25,7 @@ type DB struct { logger log.Logger opts *Options - memChunks *memChunks - persistence *persistence - indexer *indexer - stopc chan struct{} + shards map[uint64]*TimeShards } // Open or create a new DB. @@ -39,433 +34,43 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) { opts = DefaultOptions } - indexer, err := newMetricIndexer(filepath.Join(path, "index"), defaultIndexerQsize, defaultIndexerTimeout) - if err != nil { - return nil, err - } - persistence, err := newPersistence(filepath.Join(path, "chunks"), defaultIndexerQsize, defaultIndexerTimeout) - if err != nil { - return nil, err - } - - mchunks := newMemChunks(l, indexer, persistence, 10, opts.StalenessDelta) - indexer.mc = mchunks - persistence.mc = mchunks - c := &DB{ - logger: l, - opts: opts, - memChunks: mchunks, - persistence: persistence, - indexer: indexer, - stopc: make(chan struct{}), + logger: l, + opts: opts, } - go c.memChunks.run(c.stopc) return c, nil } -// Close the storage and persist all writes. -func (c *DB) Close() error { - close(c.stopc) - // TODO(fabxc): blocking further writes here necessary? - c.indexer.wait() - c.persistence.wait() - - err0 := c.indexer.close() - err1 := c.persistence.close() - if err0 != nil { - return err0 - } - return err1 +type Label struct { + Name, Value string } -// Append ingestes the samples in the scrape into the storage. -func (c *DB) Append(scrape *Scrape) error { - // Sequentially add samples to in-memory chunks. - // TODO(fabxc): evaluate cost of making this atomic. - for _, s := range scrape.m { - if err := c.memChunks.append(s.met, scrape.ts, s.val); err != nil { - // TODO(fabxc): collect in multi error. - return err - } - // TODO(fabxc): increment ingested samples metric. +// LabelSet is a sorted set of labels. Order has to be guaranteed upon +// instantiation. +type LabelSet []Label + +func (ls LabelSet) Len() int { return len(ls) } +func (ls LabelSet) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i]} +func (ls LabelSet) Less(i, j int) bool { return ls[i].Name < ls[j].Name } + +// NewLabelSet returns a sorted LabelSet from the given labels. +// The caller has to guarantee that all label names are unique. +func NewLabelSet(ls ...Label) LabelSet { + set := make(LabelSet, 0, len(l)) + for _, l := range ls { + set = append(set, l) } + sort.Sort(set) + + return set +} + +type Vector struct { + LabelSets []LabelSet + Values []float64 +} + +func (db *DB) AppendVector(v *Vector) error { return nil -} - -// memChunks holds the chunks that are currently being appended to. -type memChunks struct { - logger log.Logger - stalenessDelta time.Duration - - mtx sync.RWMutex - // Chunks by their ID as accessed when retrieving a chunk ID from - // an index query. - chunks map[ChunkID]*chunkDesc - // The highest time slice chunks currently have. A new chunk can not - // be in a higher slice before all chunks with lower IDs have been - // added to the slice. - highTime model.Time - - // Power of 2 of chunk shards. - num uint8 - // Memory chunks sharded by leading bits of the chunk's metric's - // fingerprints. Used to quickly find chunks for new incoming samples - // where the metric is known but the chunk ID is not. - shards []*memChunksShard - - indexer *indexer - persistence *persistence -} - -// newMemChunks returns a new memChunks sharded by n locks. -func newMemChunks(l log.Logger, ix *indexer, p *persistence, n uint8, staleness time.Duration) *memChunks { - c := &memChunks{ - logger: l, - stalenessDelta: staleness, - num: n, - chunks: map[ChunkID]*chunkDesc{}, - persistence: p, - indexer: ix, - } - - if n > 63 { - panic("invalid shard power") - } - - // Initialize 2^n shards. - for i := 0; i < 1<>(64-mc.num)] - - cs.Lock() - defer cs.Unlock() - - chkd, created := cs.get(fp, m) - if created { - mc.indexer.enqueue(chkd) - } - if err := chkd.append(ts, v); err != chunks.ErrChunkFull { - return err - } - // Chunk was full, remove it so a new head chunk can be created. - // TODO(fabxc): should we just remove them during maintenance if we set a 'persisted' - // flag? - // If we shutdown we work down the persistence queue before exiting, so we should - // lose no data. If we crash, the last snapshot will still have the chunk. Theoretically, - // deleting it here should not be a problem. - cs.del(fp, chkd) - - mc.persistence.enqueue(chkd) - - // Create a new chunk lazily and continue. - chkd, created = cs.get(fp, m) - if !created { - // Bug if the chunk was not newly created. - panic("expected newly created chunk") - } - mc.indexer.enqueue(chkd) - - return chkd.append(ts, v) -} - -type memChunksShard struct { - sync.RWMutex - - // chunks holds chunk descriptors for one or more chunks - // with a given fingerprint. - descs map[model.Fingerprint][]*chunkDesc - csize int -} - -// get returns the chunk descriptor for the given fingerprint/metric combination. -// If none exists, a new chunk descriptor is created and true is returned. -func (cs *memChunksShard) get(fp model.Fingerprint, m model.Metric) (*chunkDesc, bool) { - chks := cs.descs[fp] - for _, cd := range chks { - if cd != nil && cd.met.Equal(m) { - return cd, false - } - } - // None of the given chunks was for the metric, create a new one. - cd := &chunkDesc{ - met: m, - chunk: chunks.NewPlainChunk(cs.csize), - } - // Try inserting chunk in existing whole before appending. - for i, c := range chks { - if c == nil { - chks[i] = cd - return cd, true - } - } - cs.descs[fp] = append(chks, cd) - return cd, true -} - -// del frees the field of the chunk descriptor for the fingerprint. -func (cs *memChunksShard) del(fp model.Fingerprint, chkd *chunkDesc) { - for i, d := range cs.descs[fp] { - if d == chkd { - cs.descs[fp][i] = nil - return - } - } -} - -// ChunkID is a unique identifier for a chunks. -type ChunkID uint64 - -func (id ChunkID) bytes() []byte { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, uint64(id)) - return b -} - -// ChunkIDs is a sortable list of chunk IDs. -type ChunkIDs []ChunkID - -func (c ChunkIDs) Len() int { return len(c) } -func (c ChunkIDs) Swap(i, j int) { c[i], c[j] = c[j], c[i] } -func (c ChunkIDs) Less(i, j int) bool { return c[i] < c[j] } - -// chunkDesc wraps a plain data chunk and provides cached meta data about it. -type chunkDesc struct { - id ChunkID - met model.Metric - chunk chunks.Chunk - - // Caching fields. - firstTime model.Time - lastSample model.SamplePair - - app chunks.Appender // Current appender for the chunks. -} - -func (cd *chunkDesc) append(ts model.Time, v model.SampleValue) error { - if cd.app == nil { - cd.app = cd.chunk.Appender() - // TODO(fabxc): set correctly once loading from snapshot is added. - cd.firstTime = ts - } - cd.lastSample.Timestamp = ts - cd.lastSample.Value = v - - return cd.app.Append(ts, v) -} - -// Scrape gathers samples for a single timestamp. -type Scrape struct { - ts model.Time - m []sample -} - -type sample struct { - met model.Metric - val model.SampleValue -} - -// Reset resets the scrape data and initializes it for a new scrape at -// the given time. The underlying memory remains allocated for the next scrape. -func (s *Scrape) Reset(ts model.Time) { - s.ts = ts - s.m = s.m[:0] -} - -// Dump returns all samples that are part of the scrape. -func (s *Scrape) Dump() []*model.Sample { - d := make([]*model.Sample, 0, len(s.m)) - for _, sa := range s.m { - d = append(d, &model.Sample{ - Metric: sa.met, - Timestamp: s.ts, - Value: sa.val, - }) - } - return d -} - -// Add adds a sample value for the given metric to the scrape. -func (s *Scrape) Add(m model.Metric, v model.SampleValue) { - for ln, lv := range m { - if len(lv) == 0 { - delete(m, ln) - } - } - // TODO(fabxc): pre-sort added samples into the correct buckets - // of fingerprint shards so we only have to lock each memChunkShard once. - s.m = append(s.m, sample{met: m, val: v}) -} - -type chunkBatchProcessor struct { - processf func(...*chunkDesc) error - - mtx sync.RWMutex - logger log.Logger - q []*chunkDesc - - qcap int - timeout time.Duration - - timer *time.Timer - trigger chan struct{} - empty chan struct{} -} - -func newChunkBatchProcessor(l log.Logger, cap int, to time.Duration) *chunkBatchProcessor { - if l == nil { - l = log.NewNopLogger() - } - p := &chunkBatchProcessor{ - logger: l, - qcap: cap, - timeout: to, - timer: time.NewTimer(to), - trigger: make(chan struct{}, 1), - empty: make(chan struct{}), - } - // Start with closed channel so we don't block on wait if nothing - // has ever been indexed. - close(p.empty) - - go p.run() - return p -} - -func (p *chunkBatchProcessor) run() { - for { - // Process pending indexing batch if triggered - // or timeout since last indexing has passed. - select { - case <-p.trigger: - case <-p.timer.C: - } - - if err := p.process(); err != nil { - p.logger. - With("err", err).With("num", len(p.q)). - Error("batch failed, dropping chunks descs") - } - } -} - -func (p *chunkBatchProcessor) process() error { - // TODO(fabxc): locking the entire time will cause lock contention. - p.mtx.Lock() - defer p.mtx.Unlock() - - if len(p.q) == 0 { - return nil - } - // Leave chunk descs behind whether successful or not. - defer func() { - p.q = p.q[:0] - close(p.empty) - }() - - return p.processf(p.q...) -} - -func (p *chunkBatchProcessor) enqueue(cds ...*chunkDesc) { - p.mtx.Lock() - defer p.mtx.Unlock() - - if len(p.q) == 0 { - p.timer.Reset(p.timeout) - p.empty = make(chan struct{}) - } - - p.q = append(p.q, cds...) - if len(p.q) > p.qcap { - select { - case p.trigger <- struct{}{}: - default: - // If we cannot send a signal is already set. - } - } -} - -// wait blocks until the queue becomes empty. -func (p *chunkBatchProcessor) wait() { - p.mtx.RLock() - c := p.empty - p.mtx.RUnlock() - <-c -} +} \ No newline at end of file diff --git a/db_test.go b/db_test.go deleted file mode 100644 index 0436a3033..000000000 --- a/db_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package tsdb - -import ( - "fmt" - "io" - "io/ioutil" - "math/rand" - "os" - "testing" - "time" - - "github.com/fabxc/tindex" - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/storage/cinamon/chunk" - "github.com/prometheus/prometheus/storage/metric" - "github.com/stretchr/testify/require" -) - -func TestE2E(t *testing.T) { - dir, err := ioutil.TempDir("", "cinamon_test") - require.NoError(t, err) - defer os.RemoveAll(dir) - - c, err := Open(dir, log.Base(), nil) - require.NoError(t, err) - - c.memChunks.indexer.timeout = 50 * time.Millisecond - - // Set indexer size to be triggered exactly when we hit the limit. - // c.memChunks.indexer.qmax = 10 - - mets := generateMetrics(100000) - // var wg sync.WaitGroup - // for k := 0; k < len(mets)/100+1; k++ { - // wg.Add(1) - // go func(mets []model.Metric) { - var s Scrape - for i := 0; i < 2*64; i++ { - s.Reset(model.Time(i) * 100000) - - for _, m := range mets { - s.Add(m, model.SampleValue(rand.Float64())) - } - require.NoError(t, c.Append(&s)) - } - // wg.Done() - // }(mets[k*100 : (k+1)*100]) - // } - // wg.Wait() - - start := time.Now() - c.memChunks.indexer.wait() - fmt.Println("index wait", time.Since(start)) - - start = time.Now() - q, err := c.Querier() - require.NoError(t, err) - defer q.Close() - - m1, err := metric.NewLabelMatcher(metric.Equal, "job", "somejob") - require.NoError(t, err) - m2, err := metric.NewLabelMatcher(metric.Equal, "label2", "value0") - require.NoError(t, err) - m3, err := metric.NewLabelMatcher(metric.Equal, "label4", "value0") - require.NoError(t, err) - - it, err := q.Iterator(m1, m2, m3) - require.NoError(t, err) - res, err := tindex.ExpandIterator(it) - require.NoError(t, err) - fmt.Println("result len", len(res)) - - fmt.Println("querying", time.Since(start)) -} - -func generateMetrics(n int) (res []model.Metric) { - for i := 0; i < n; i++ { - res = append(res, model.Metric{ - "job": "somejob", - "label5": model.LabelValue(fmt.Sprintf("value%d", i%10)), - "label4": model.LabelValue(fmt.Sprintf("value%d", i%5)), - "label3": model.LabelValue(fmt.Sprintf("value%d", i%3)), - "label2": model.LabelValue(fmt.Sprintf("value%d", i%2)), - "label1": model.LabelValue(fmt.Sprintf("value%d", i)), - }) - } - return res -} - -func TestMemChunksShardGet(t *testing.T) { - cs := &memChunksShard{ - descs: map[model.Fingerprint][]*chunkDesc{}, - csize: 100, - } - cdesc1, created1 := cs.get(123, model.Metric{"x": "1"}) - require.True(t, created1) - require.Equal(t, 1, len(cs.descs[123])) - require.Equal(t, &chunkDesc{ - met: model.Metric{"x": "1"}, - chunk: chunk.NewPlainChunk(100), - }, cdesc1) - - // Add colliding metric. - cdesc2, created2 := cs.get(123, model.Metric{"x": "2"}) - require.True(t, created2) - require.Equal(t, 2, len(cs.descs[123])) - require.Equal(t, &chunkDesc{ - met: model.Metric{"x": "2"}, - chunk: chunk.NewPlainChunk(100), - }, cdesc2) - // First chunk desc can still be retrieved correctly. - cdesc1, created1 = cs.get(123, model.Metric{"x": "1"}) - require.False(t, created1) - require.Equal(t, &chunkDesc{ - met: model.Metric{"x": "1"}, - chunk: chunk.NewPlainChunk(100), - }, cdesc1) -} - -func TestChunkSeriesIterator(t *testing.T) { - newChunk := func(s []model.SamplePair) chunk.Chunk { - c := chunk.NewPlainChunk(1000) - app := c.Appender() - for _, sp := range s { - if err := app.Append(sp.Timestamp, sp.Value); err != nil { - t.Fatal(err) - } - } - return c - } - it := newChunkSeriesIterator(metric.Metric{}, []chunk.Chunk{ - newChunk([]model.SamplePair{{1, 1}, {2, 2}, {3, 3}}), - newChunk([]model.SamplePair{{4, 4}, {5, 5}, {6, 6}}), - newChunk([]model.SamplePair{{7, 7}, {8, 8}, {9, 9}}), - }) - - var res []model.SamplePair - for sp, ok := it.Seek(0); ok; sp, ok = it.Next() { - fmt.Println(sp) - res = append(res, sp) - } - require.Equal(t, io.EOF, it.Err()) - require.Equal(t, []model.SamplePair{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, res) -} diff --git a/index.go b/index.go deleted file mode 100644 index 83506da5c..000000000 --- a/index.go +++ /dev/null @@ -1,130 +0,0 @@ -package tsdb - -import ( - "sort" - "strconv" - "sync/atomic" - "time" - - "github.com/fabxc/tsdb/index" - "github.com/prometheus/common/log" - "github.com/prometheus/common/model" -) - -const ( - defaultIndexerTimeout = 1 * time.Second - defaultIndexerQsize = 500000 -) - -// indexer asynchronously indexes chunks in batches. It indexes all labels -// of a chunk with a forward mapping and additionally indexes the chunk for -// the time slice of its first sample. -type indexer struct { - *chunkBatchProcessor - - ix *index.Index - mc *memChunks -} - -// Create batch indexer that creates new index documents -// and indexes them by the metric fields. -// Its post-indexing hook populates the in-memory chunk forward index. -func newMetricIndexer(path string, qsz int, qto time.Duration) (*indexer, error) { - ix, err := index.Open(path, nil) - if err != nil { - return nil, err - } - - i := &indexer{ - ix: ix, - chunkBatchProcessor: newChunkBatchProcessor(log.Base(), qsz, qto), - } - i.chunkBatchProcessor.processf = i.index - - return i, nil -} - -func (ix *indexer) Querier() (*index.Querier, error) { - return ix.ix.Querier() -} - -const ( - timeSliceField = "__ts__" - timeSliceSize = 3 * time.Hour -) - -func timeSlice(t model.Time) model.Time { - return t - (t % model.Time(timeSliceSize/time.Millisecond)) -} - -func timeString(t model.Time) string { - return strconv.FormatInt(int64(t), 16) -} - -func (ix *indexer) close() error { - return ix.ix.Close() -} - -func (ix *indexer) index(cds ...*chunkDesc) error { - b, err := ix.ix.Batch() - if err != nil { - return err - } - - ids := make([]ChunkID, len(cds)) - for i, cd := range cds { - terms := make(index.Terms, 0, len(cd.met)) - for k, v := range cd.met { - t := index.Term{Field: string(k), Val: string(v)} - terms = append(terms, t) - } - id := b.Add(terms) - ts := timeSlice(cd.firstTime) - - // If the chunk has a higher time slice than the high one, - // don't index. It will be indexed when the next time slice - // is initiated over all memory chunks. - if ts <= ix.mc.highTime { - b.SecondaryIndex(id, index.Term{ - Field: timeSliceField, - Val: timeString(ts), - }) - } - - ids[i] = ChunkID(id) - } - - if err := b.Commit(); err != nil { - return err - } - - // We have to lock here already instead of post-commit as otherwise we might - // generate new chunk IDs, skip their indexing, and have a reindexTime being - // called with the chunk ID not being visible yet. - // TODO(fabxc): move back up - ix.mc.mtx.Lock() - defer ix.mc.mtx.Unlock() - - // Make in-memory chunks visible for read. - for i, cd := range cds { - atomic.StoreUint64((*uint64)(&cd.id), uint64(ids[i])) - ix.mc.chunks[cd.id] = cd - } - return nil -} - -// reindexTime creates an initial time slice index over all chunk IDs. -// Any future chunks indexed for the same time slice must have higher IDs. -func (ix *indexer) reindexTime(ids ChunkIDs, ts model.Time) error { - b, err := ix.ix.Batch() - if err != nil { - return err - } - sort.Sort(ids) - t := index.Term{Field: timeSliceField, Val: timeString(ts)} - - for _, id := range ids { - b.SecondaryIndex(index.DocID(id), t) - } - return b.Commit() -} diff --git a/index/LICENSE b/index/LICENSE deleted file mode 100644 index 261eeb9e9..000000000 --- a/index/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/index/cmd/index/.gitignore b/index/cmd/index/.gitignore deleted file mode 100644 index cc4be6ea2..000000000 --- a/index/cmd/index/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -index -benchout -testdata* diff --git a/index/cmd/index/Makefile b/index/cmd/index/Makefile deleted file mode 100644 index cc2156b3f..000000000 --- a/index/cmd/index/Makefile +++ /dev/null @@ -1,14 +0,0 @@ -all: bench svg - -bench: build - @echo ">> running benchmark" - @./tindex bench write testdata - -build: - @go build . - -svg: - @echo ">> create svgs" - @go tool pprof -svg ./tindex benchout/cpu.prof > benchout/cpuprof.svg - @go tool pprof -svg ./tindex benchout/mem.prof > benchout/memprof.svg - @go tool pprof -svg ./tindex benchout/block.prof > benchout/blockprof.svg diff --git a/index/cmd/index/main.go b/index/cmd/index/main.go deleted file mode 100644 index 698a3eb93..000000000 --- a/index/cmd/index/main.go +++ /dev/null @@ -1,253 +0,0 @@ -package main - -import ( - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "runtime" - "runtime/pprof" - "time" - - "github.com/fabxc/tsdb/index" - dto "github.com/prometheus/client_model/go" - "github.com/prometheus/common/expfmt" - "github.com/spf13/cobra" -) - -func main() { - root := &cobra.Command{ - Use: "index", - Short: "CLI tool for index", - } - - root.AddCommand( - NewBenchCommand(), - ) - - root.Execute() -} - -func NewBenchCommand() *cobra.Command { - c := &cobra.Command{ - Use: "bench", - Short: "run benchmarks", - } - c.AddCommand(NewBenchWriteCommand()) - - return c -} - -type writeBenchmark struct { - outPath string - cleanup bool - - cpuprof *os.File - memprof *os.File - blockprof *os.File -} - -func NewBenchWriteCommand() *cobra.Command { - var wb writeBenchmark - c := &cobra.Command{ - Use: "write ", - Short: "run a write performance benchmark", - Run: wb.run, - } - c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path") - - return c -} - -func (b *writeBenchmark) run(cmd *cobra.Command, args []string) { - if len(args) != 1 { - exitWithError(fmt.Errorf("missing file argument")) - } - if b.outPath == "" { - dir, err := ioutil.TempDir("", "index_bench") - if err != nil { - exitWithError(err) - } - b.outPath = dir - b.cleanup = true - } - if err := os.RemoveAll(b.outPath); err != nil { - exitWithError(err) - } - if err := os.MkdirAll(b.outPath, 0777); err != nil { - exitWithError(err) - } - - var docs []*InsertDoc - - measureTime("readData", func() { - f, err := os.Open(args[0]) - if err != nil { - exitWithError(err) - } - defer f.Close() - - docs, err = readPrometheusLabels(f) - if err != nil { - exitWithError(err) - } - }) - - dir := filepath.Join(b.outPath, "ix") - - ix, err := index.Open(dir, nil) - if err != nil { - exitWithError(err) - } - defer func() { - ix.Close() - reportSize(dir) - if b.cleanup { - os.RemoveAll(b.outPath) - } - }() - - measureTime("indexData", func() { - b.startProfiling() - indexDocs(ix, docs, 100000) - indexDocs(ix, docs, 100000) - indexDocs(ix, docs, 100000) - indexDocs(ix, docs, 100000) - b.stopProfiling() - }) -} - -func (b *writeBenchmark) startProfiling() { - var err error - - // Start CPU profiling. - b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) - if err != nil { - exitWithError(fmt.Errorf("bench: could not create cpu profile: %v\n", err)) - } - pprof.StartCPUProfile(b.cpuprof) - - // Start memory profiling. - b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) - if err != nil { - exitWithError(fmt.Errorf("bench: could not create memory profile: %v\n", err)) - } - runtime.MemProfileRate = 4096 - - // Start fatal profiling. - b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) - if err != nil { - exitWithError(fmt.Errorf("bench: could not create block profile: %v\n", err)) - } - runtime.SetBlockProfileRate(1) -} - -func (b *writeBenchmark) stopProfiling() { - if b.cpuprof != nil { - pprof.StopCPUProfile() - b.cpuprof.Close() - b.cpuprof = nil - } - if b.memprof != nil { - pprof.Lookup("heap").WriteTo(b.memprof, 0) - b.memprof.Close() - b.memprof = nil - } - if b.blockprof != nil { - pprof.Lookup("block").WriteTo(b.blockprof, 0) - b.blockprof.Close() - b.blockprof = nil - runtime.SetBlockProfileRate(0) - } -} - -func indexDocs(ix *index.Index, docs []*InsertDoc, batchSize int) { - remDocs := docs[:] - var ids []index.DocID - - for len(remDocs) > 0 { - n := batchSize - if n > len(remDocs) { - n = len(remDocs) - } - - b, err := ix.Batch() - if err != nil { - exitWithError(err) - } - for _, d := range remDocs[:n] { - id := b.Add(d.Terms) - ids = append(ids, id) - } - if err := b.Commit(); err != nil { - exitWithError(err) - } - - remDocs = remDocs[n:] - } -} - -func reportSize(dir string) { - err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil || path == dir { - return err - } - fmt.Printf(" > file=%s size=%.03fGiB\n", path[len(dir):], float64(info.Size())/1024/1024/1024) - return nil - }) - if err != nil { - exitWithError(err) - } -} - -func measureTime(stage string, f func()) { - fmt.Printf(">> start stage=%s\n", stage) - start := time.Now() - f() - fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) -} - -type InsertDoc struct { - Terms index.Terms -} - -func readPrometheusLabels(r io.Reader) ([]*InsertDoc, error) { - dec := expfmt.NewDecoder(r, expfmt.FmtProtoText) - - var docs []*InsertDoc - var mf dto.MetricFamily - - for { - if err := dec.Decode(&mf); err != nil { - if err == io.EOF { - break - } - return nil, err - } - - for _, m := range mf.GetMetric() { - d := &InsertDoc{ - Terms: make(index.Terms, len(m.GetLabel())+1), - } - d.Terms[0] = index.Term{ - Field: "__name__", - Val: mf.GetName(), - } - for i, l := range m.GetLabel() { - d.Terms[i+1] = index.Term{ - Field: l.GetName(), - Val: l.GetValue(), - } - } - docs = append(docs, d) - } - } - - return docs, nil -} - -func exitWithError(err error) { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) -} diff --git a/index/coding.go b/index/coding.go deleted file mode 100644 index fb8ec4da7..000000000 --- a/index/coding.go +++ /dev/null @@ -1,206 +0,0 @@ -package index - -import ( - "encoding/binary" - "errors" - "io" - "sync" - - "github.com/boltdb/bolt" -) - -var encpool buffers - -type buffers struct { - pool sync.Pool -} - -func (b *buffers) get(l int) []byte { - x := b.pool.Get() - if x == nil { - return make([]byte, l) - } - buf := x.([]byte) - if cap(buf) < l { - return make([]byte, l) - } - return buf[:l] -} - -func (b *buffers) getZero(l int) []byte { - buf := b.get(l) - for i := range buf { - buf[i] = 0 - } - return buf -} - -func (b *buffers) put(buf []byte) { - b.pool.Put(buf) -} - -func (b *buffers) bucketPut(bkt *bolt.Bucket, k, v []byte) error { - err := bkt.Put(k, v) - b.put(k) - return err -} - -func (b *buffers) bucketGet(bkt *bolt.Bucket, k []byte) []byte { - v := bkt.Get(k) - b.put(k) - return v -} - -func (b *buffers) uint64be(x uint64) []byte { - buf := b.get(8) - binary.BigEndian.PutUint64(buf, x) - return buf -} - -func (b *buffers) uvarint(x uint64) []byte { - buf := b.get(binary.MaxVarintLen64) - return buf[:binary.PutUvarint(buf, x)] -} - -type txbuffs struct { - buffers *buffers - done [][]byte -} - -func (b *txbuffs) get(l int) []byte { - buf := b.buffers.get(l) - b.done = append(b.done, buf) - return buf -} - -func (b *txbuffs) getZero(l int) []byte { - buf := b.buffers.getZero(l) - b.done = append(b.done, buf) - return buf -} - -func (b *txbuffs) release() { - for _, buf := range b.done { - b.buffers.put(buf) - } -} - -func (b *txbuffs) put(buf []byte) { - b.done = append(b.done, buf) -} - -func (b *txbuffs) uint64be(x uint64) []byte { - buf := b.get(8) - binary.BigEndian.PutUint64(buf, x) - return buf -} - -func (b *txbuffs) uvarint(x uint64) []byte { - buf := b.get(binary.MaxVarintLen64) - return buf[:binary.PutUvarint(buf, x)] -} - -// reuse of buffers -var pagePool sync.Pool - -// getBuf returns a buffer from the pool. The length of the returned slice is l. -func getPage(l int) []byte { - x := pagePool.Get() - if x == nil { - return make([]byte, l) - } - buf := x.([]byte) - if cap(buf) < l { - return make([]byte, l) - } - return buf[:l] -} - -// putBuf returns a buffer to the pool. -func putPage(buf []byte) { - pagePool.Put(buf) -} - -// bufPool is a pool for staging buffers. Using a pool allows concurrency-safe -// reuse of buffers -var bufPool sync.Pool - -// getBuf returns a buffer from the pool. The length of the returned slice is l. -func getBuf(l int) []byte { - x := bufPool.Get() - if x == nil { - return make([]byte, l) - } - buf := x.([]byte) - if cap(buf) < l { - return make([]byte, l) - } - return buf[:l] -} - -// putBuf returns a buffer to the pool. -func putBuf(buf []byte) { - bufPool.Put(buf) -} - -func encodeUint64(x uint64) []byte { - buf := getBuf(8) - binary.BigEndian.PutUint64(buf, x) - return buf -} - -func decodeUint64(buf []byte) uint64 { - return binary.BigEndian.Uint64(buf) -} - -func writeUvarint(w io.ByteWriter, x uint64) (i int, err error) { - for x >= 0x80 { - if err = w.WriteByte(byte(x) | 0x80); err != nil { - return i, err - } - x >>= 7 - i++ - } - if err = w.WriteByte(byte(x)); err != nil { - return i, err - } - return i + 1, err -} - -func writeVarint(w io.ByteWriter, x int64) (i int, err error) { - ux := uint64(x) << 1 - if x < 0 { - ux = ^ux - } - return writeUvarint(w, ux) -} - -func readUvarint(r io.ByteReader) (uint64, int, error) { - var ( - x uint64 - s uint - ) - for i := 0; ; i++ { - b, err := r.ReadByte() - if err != nil { - return x, i, err - } - if b < 0x80 { - if i > 9 || i == 9 && b > 1 { - return x, i + 1, errors.New("varint overflows a 64-bit integer") - } - return x | uint64(b)<> 1) - if ux&1 != 0 { - x = ^x - } - return x, n, err -} diff --git a/index/index.go b/index/index.go deleted file mode 100644 index 034cd557e..000000000 --- a/index/index.go +++ /dev/null @@ -1,716 +0,0 @@ -package index - -import ( - "bytes" - "encoding/binary" - "encoding/gob" - "errors" - "fmt" - "io" - "math" - "os" - "path/filepath" - "regexp" - "sync" - - "github.com/boltdb/bolt" - "github.com/fabxc/tsdb/pages" -) - -var ( - errOutOfOrder = errors.New("out of order") - errNotFound = errors.New("not found") -) - -// Options for an Index. -type Options struct { -} - -// DefaultOptions used for opening a new index. -var DefaultOptions = &Options{} - -// Index is a fully persistent inverted index of documents with any number of fields -// that map to exactly one term. -type Index struct { - pbuf *pages.DB - bolt *bolt.DB - meta *meta - - rwlock sync.Mutex -} - -// Open returns an index located in the given path. If none exists a new -// one is created. -func Open(path string, opts *Options) (*Index, error) { - if opts == nil { - opts = DefaultOptions - } - - if err := os.MkdirAll(path, 0777); err != nil { - return nil, err - } - - bdb, err := bolt.Open(filepath.Join(path, "kv"), 0666, nil) - if err != nil { - return nil, err - } - pdb, err := pages.Open(filepath.Join(path, "pb"), 0666, &pages.Options{ - PageSize: pageSize, - }) - if err != nil { - return nil, err - } - ix := &Index{ - bolt: bdb, - pbuf: pdb, - meta: &meta{}, - } - if err := ix.bolt.Update(ix.init); err != nil { - return nil, err - } - return ix, nil -} - -// Close closes the index. -func (ix *Index) Close() error { - err0 := ix.pbuf.Close() - err1 := ix.bolt.Close() - if err0 != nil { - return err0 - } - return err1 -} - -var ( - bktMeta = []byte("meta") - bktDocs = []byte("docs") - bktTerms = []byte("terms") - bktTermIDs = []byte("term_ids") - bktSkiplist = []byte("skiplist") - - keyMeta = []byte("meta") -) - -func (ix *Index) init(tx *bolt.Tx) error { - // Ensure all buckets exist. Any other index methods assume - // that these buckets exist and may panic otherwise. - for _, bn := range [][]byte{ - bktMeta, bktTerms, bktTermIDs, bktDocs, bktSkiplist, - } { - if _, err := tx.CreateBucketIfNotExists(bn); err != nil { - return fmt.Errorf("create bucket %q failed: %s", string(bn), err) - } - } - - // Read the meta state if the index was already initialized. - mbkt := tx.Bucket(bktMeta) - if v := mbkt.Get(keyMeta); v != nil { - if err := ix.meta.read(v); err != nil { - return fmt.Errorf("decoding meta failed: %s", err) - } - } else { - // Index not initialized yet, set up meta information. - ix.meta = &meta{ - LastDocID: 0, - LastTermID: 0, - } - v, err := ix.meta.bytes() - if err != nil { - return fmt.Errorf("encoding meta failed: %s", err) - } - if err := mbkt.Put(keyMeta, v); err != nil { - return fmt.Errorf("creating meta failed: %s", err) - } - } - - return nil -} - -// Querier starts a new query session against the index. -func (ix *Index) Querier() (*Querier, error) { - kvtx, err := ix.bolt.Begin(false) - if err != nil { - return nil, err - } - pbtx, err := ix.pbuf.Begin(false) - if err != nil { - kvtx.Rollback() - return nil, err - } - return &Querier{ - kvtx: kvtx, - pbtx: pbtx, - // TODO(fabxc): consider getting these buckets lazily. - termBkt: kvtx.Bucket(bktTerms), - termidBkt: kvtx.Bucket(bktTermIDs), - docBkt: kvtx.Bucket(bktDocs), - skiplistBkt: kvtx.Bucket(bktSkiplist), - }, nil -} - -// Querier encapsulates the index for several queries. -type Querier struct { - kvtx *bolt.Tx - pbtx *pages.Tx - - termBkt *bolt.Bucket - termidBkt *bolt.Bucket - docBkt *bolt.Bucket - skiplistBkt *bolt.Bucket -} - -// Close closes the underlying index transactions. -func (q *Querier) Close() error { - err0 := q.pbtx.Rollback() - err1 := q.kvtx.Rollback() - if err0 != nil { - return err0 - } - return err1 -} - -// Terms returns all terms for the key field matching the provided matcher. -// If the matcher is nil, all terms for the field are returned. -func (q *Querier) Terms(key string, m Matcher) []string { - if m == nil { - m = AnyMatcher - } - return q.termsForMatcher(key, m) -} - -// Search returns an iterator over all document IDs that match all -// provided matchers. -func (q *Querier) Search(key string, m Matcher) (Iterator, error) { - tids := q.termIDsForMatcher(key, m) - its := make([]Iterator, 0, len(tids)) - - for _, t := range tids { - it, err := q.postingsIter(t) - if err != nil { - return nil, err - } - its = append(its, it) - } - - if len(its) == 0 { - return nil, nil - } - return Merge(its...), nil -} - -// postingsIter returns an iterator over the postings list of term t. -func (q *Querier) postingsIter(t termid) (Iterator, error) { - b := q.skiplistBkt.Bucket(t.bytes()) - if b == nil { - return nil, errNotFound - } - - it := &skippingIterator{ - skiplist: &boltSkiplistCursor{ - k: uint64(t), - c: b.Cursor(), - bkt: b, - }, - iterators: iteratorStoreFunc(func(k uint64) (Iterator, error) { - data, err := q.pbtx.Get(k) - if err != nil { - return nil, errNotFound - } - // TODO(fabxc): for now, offset is zero, pages have no header - // and are always delta encoded. - return newPageDelta(data).cursor(), nil - }), - } - - return it, nil -} - -func (q *Querier) termsForMatcher(key string, m Matcher) []string { - c := q.termBkt.Cursor() - pref := append([]byte(key), 0xff) - - var terms []string - // TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct - // and prefixed seeks depending on the matcher. - for k, _ := c.Seek(pref); bytes.HasPrefix(k, pref); k, _ = c.Next() { - if m.Match(string(k[len(pref):])) { - terms = append(terms, string(k[len(pref):])) - } - } - return terms -} - -func (q *Querier) termIDsForMatcher(key string, m Matcher) termids { - c := q.termBkt.Cursor() - pref := append([]byte(key), 0xff) - - var ids termids - // TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct - // and prefixed seeks depending on the matcher. - for k, v := c.Seek(pref); bytes.HasPrefix(k, pref); k, v = c.Next() { - if m.Match(string(k[len(pref):])) { - ids = append(ids, newTermID(v)) - } - } - return ids -} - -// Doc returns the document with the given ID. -func (q *Querier) Doc(id DocID) (Terms, error) { - v := q.docBkt.Get(id.bytes()) - if v == nil { - return nil, errNotFound - } - tids := newTermIDs(v) - - // TODO(fabxc): consider at least a per-session cache for these. - terms := make(Terms, len(tids)) - for i, t := range tids { - // TODO(fabxc): is this encode/decode cycle here worth the space savings? - // If we stored plain uint64s we can just pass the slice back in. - v := q.termidBkt.Get(t.bytes()) - if v == nil { - return nil, fmt.Errorf("term not found") - } - term, err := newTerm(v) - if err != nil { - return nil, err - } - terms[i] = term - } - return terms, nil -} - -// Delete removes all documents in the iterator from the index. -// It returns the number of deleted documents. -func (ix *Index) Delete(it Iterator) (int, error) { - panic("not implemented") -} - -// Batch starts a new batch against the index. -func (ix *Index) Batch() (*Batch, error) { - // Lock writes so we can safely pre-allocate term and doc IDs. - ix.rwlock.Lock() - - tx, err := ix.bolt.Begin(false) - if err != nil { - return nil, err - } - b := &Batch{ - ix: ix, - tx: tx, - meta: &meta{}, - termBkt: tx.Bucket(bktTerms), - termidBkt: tx.Bucket(bktTermIDs), - terms: map[Term]*batchTerm{}, - } - *b.meta = *ix.meta - return b, nil -} - -// meta contains information about the state of the index. -type meta struct { - LastDocID DocID - LastTermID termid -} - -// read initilizes the meta from a byte slice. -func (m *meta) read(b []byte) error { - return gob.NewDecoder(bytes.NewReader(b)).Decode(m) -} - -// bytes returns a byte slice representation of the meta. -func (m *meta) bytes() ([]byte, error) { - var buf bytes.Buffer - if err := gob.NewEncoder(&buf).Encode(m); err != nil { - return nil, err - } - return buf.Bytes(), nil -} - -// Terms is a sortable list of terms. -type Terms []Term - -func (t Terms) Len() int { return len(t) } -func (t Terms) Swap(i, j int) { t[i], t[j] = t[j], t[i] } - -func (t Terms) Less(i, j int) bool { - if t[i].Field < t[j].Field { - return true - } - if t[i].Field > t[j].Field { - return false - } - return t[i].Val < t[j].Val -} - -// Term is a term for the specified field. -type Term struct { - Field, Val string -} - -func newTerm(b []byte) (t Term, e error) { - c := bytes.SplitN(b, []byte{0xff}, 2) - if len(c) != 2 { - return t, fmt.Errorf("invalid term") - } - t.Field = string(c[0]) - t.Val = string(c[1]) - return t, nil -} - -// bytes returns a byte slice representation of the term. -func (t *Term) bytes() []byte { - b := make([]byte, 0, len(t.Field)+1+len(t.Val)) - b = append(b, []byte(t.Field)...) - b = append(b, 0xff) - return append(b, []byte(t.Val)...) -} - -// Matcher checks whether a value for a key satisfies a check condition. -type Matcher interface { - Match(value string) bool -} - -// AnyMatcher matches any term value for a field. -var AnyMatcher = anyMatcher{} - -type anyMatcher struct{} - -func (anyMatcher) Match(_ string) bool { - return true -} - -// EqualMatcher matches exactly one value for a particular label. -type EqualMatcher struct { - val string -} - -func NewEqualMatcher(val string) *EqualMatcher { - return &EqualMatcher{val: val} -} - -func (m *EqualMatcher) Match(s string) bool { return m.val == s } - -// RegexpMatcher matches labels for the fixed key for which the value -// matches a regular expression. -type RegexpMatcher struct { - re *regexp.Regexp -} - -func NewRegexpMatcher(expr string) (*RegexpMatcher, error) { - re, err := regexp.Compile(expr) - if err != nil { - return nil, err - } - return &RegexpMatcher{re: re}, nil -} - -func (m *RegexpMatcher) Match(s string) bool { return m.re.MatchString(s) } - -// DocID is a unique identifier for a document. -type DocID uint64 - -func newDocID(b []byte) DocID { - return DocID(decodeUint64(b)) -} - -func (d DocID) bytes() []byte { - return encodeUint64(uint64(d)) -} - -type termid uint64 - -func newTermID(b []byte) termid { - return termid(decodeUint64(b)) -} - -func (t termid) bytes() []byte { - return encodeUint64(uint64(t)) -} - -type termids []termid - -func (t termids) Len() int { return len(t) } -func (t termids) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t termids) Less(i, j int) bool { return t[i] < t[j] } - -// newTermIDs reads a sequence of uvarints from b and appends them -// to the term IDs. -func newTermIDs(b []byte) (t termids) { - for len(b) > 0 { - k, n := binary.Uvarint(b) - t = append(t, termid(k)) - b = b[n:] - } - return t -} - -// bytes encodes the term IDs as a sequence of uvarints. -func (t termids) bytes() []byte { - b := make([]byte, len(t)*binary.MaxVarintLen64) - n := 0 - for _, x := range t { - n += binary.PutUvarint(b[n:], uint64(x)) - } - return b[:n] -} - -// Batch collects multiple indexing actions and allows to apply them -// to the persistet index all at once for improved performance. -type Batch struct { - ix *Index - tx *bolt.Tx - meta *meta - - termBkt *bolt.Bucket - termidBkt *bolt.Bucket - - docs []*batchDoc - terms map[Term]*batchTerm -} - -type batchDoc struct { - id DocID - terms termids -} - -type batchTerm struct { - id termid // zero if term has not been added yet - docs []DocID // documents to be indexed for the term -} - -// Add adds a new document with the given terms to the index and -// returns a new unique ID for it. -// The ID only becomes valid after the batch has been committed successfully. -func (b *Batch) Add(terms Terms) DocID { - b.meta.LastDocID++ - id := b.meta.LastDocID - tids := make(termids, 0, len(terms)) - - // Subtract last document ID before this batch was started. - for _, t := range terms { - tids = append(tids, b.addTerm(id, t)) - } - - b.docs = append(b.docs, &batchDoc{id: id, terms: tids}) - return id -} - -// SecondaryIndex indexes the document ID for additional terms. The temrs -// are not stored as part of the document's forward index as the initial terms. -// The caller has to ensure that the document IDs are added to terms in -// increasing order. -func (b *Batch) SecondaryIndex(id DocID, terms ...Term) { - for _, t := range terms { - b.addTerm(id, t) - } -} - -// addTerm adds the document ID to the term's postings list and returns -// the Term's ID. -func (b *Batch) addTerm(id DocID, t Term) termid { - tb := b.terms[t] - // Populate term if necessary and allocate a new ID if it - // hasn't been created in the database before. - if tb == nil { - tb = &batchTerm{docs: make([]DocID, 0, 1024)} - b.terms[t] = tb - - if idb := b.termBkt.Get(t.bytes()); idb != nil { - tb.id = termid(decodeUint64(idb)) - } else { - b.meta.LastTermID++ - tb.id = b.meta.LastTermID - } - } - tb.docs = append(tb.docs, id) - return tb.id -} - -// Commit executes the batched indexing against the underlying index. -func (b *Batch) Commit() error { - defer b.ix.rwlock.Unlock() - // Close read transaction to open a write transaction. The outer rwlock - // stil guards against intermittend writes between switching. - if err := b.tx.Rollback(); err != nil { - return err - } - err := b.ix.bolt.Update(func(tx *bolt.Tx) error { - docsBkt := tx.Bucket(bktDocs) - // Add document IDs to forward index, - for _, d := range b.docs { - if err := docsBkt.Put(d.id.bytes(), d.terms.bytes()); err != nil { - return err - } - } - // Add newly allocated terms. - termBkt := tx.Bucket(bktTerms) - termidBkt := tx.Bucket(bktTermIDs) - - for t, tb := range b.terms { - if tb.id > b.ix.meta.LastTermID { - bid := encodeUint64(uint64(tb.id)) - tby := t.bytes() - - if err := termBkt.Put(tby, bid); err != nil { - return fmt.Errorf("setting term failed: %s", err) - } - if err := termidBkt.Put(bid, tby); err != nil { - return fmt.Errorf("setting term failed: %s", err) - } - } - } - - pbtx, err := b.ix.pbuf.Begin(true) - if err != nil { - return err - } - if err := b.writePostingsBatch(tx, pbtx); err != nil { - pbtx.Rollback() - return err - } - if err := pbtx.Commit(); err != nil { - return err - } - return b.updateMeta(tx) - }) - return err -} - -// Rollback drops all changes applied in the batch. -func (b *Batch) Rollback() error { - b.ix.rwlock.Unlock() - return b.tx.Rollback() -} - -// writePostings adds the postings batch to the index. -func (b *Batch) writePostingsBatch(kvtx *bolt.Tx, pbtx *pages.Tx) error { - skiplist := kvtx.Bucket(bktSkiplist) - - // createPage allocates a new delta-encoded page starting with id as its first entry. - createPage := func(id DocID) (page, error) { - pg := newPageDelta(make([]byte, pageSize-pages.PageHeaderSize)) - if err := pg.init(id); err != nil { - return nil, err - } - return pg, nil - } - - for _, tb := range b.terms { - ids := tb.docs - - b, err := skiplist.CreateBucketIfNotExists(tb.id.bytes()) - if err != nil { - return err - } - sl := &boltSkiplistCursor{ - k: uint64(tb.id), - c: b.Cursor(), - bkt: b, - } - - var ( - pg page // Page we are currently appending to. - pc pageCursor // Its cursor. - pid uint64 // Its ID. - ) - // Get the most recent page. If none exist, the entire postings list is new. - _, pid, err = sl.seek(math.MaxUint64) - if err != nil { - if err != io.EOF { - return err - } - // No most recent page for the key exists. The postings list is new and - // we have to allocate a new page ID for it. - if pg, err = createPage(ids[0]); err != nil { - return err - } - pc = pg.cursor() - ids = ids[1:] - } else { - // Load the most recent page. - pdata, err := pbtx.Get(pid) - if pdata == nil { - return fmt.Errorf("error getting page for ID %q: %s", pid, err) - } - - pdatac := make([]byte, len(pdata)) - // The byte slice is mmaped from bolt. We have to copy it to make modifications. - // pdatac := make([]byte, len(pdata)) - copy(pdatac, pdata) - - pg = newPageDelta(pdatac) - pc = pg.cursor() - } - - for i := 0; i < len(ids); i++ { - if err = pc.append(ids[i]); err == errPageFull { - // We couldn't append to the page because it was full. - // Store away the old page... - if pid == 0 { - // The page was new. - pid, err = pbtx.Add(pg.data()) - if err != nil { - return err - } - first, err := pc.Seek(0) - if err != nil { - return err - } - if err := sl.append(first, pid); err != nil { - return err - } - } else { - if err = pbtx.Set(pid, pg.data()); err != nil { - return err - } - } - - // ... and allocate a new page. - pid = 0 - if pg, err = createPage(ids[i]); err != nil { - return err - } - pc = pg.cursor() - } else if err != nil { - return err - } - } - // Save the last page we have written to. - if pid == 0 { - // The page was new. - pid, err = pbtx.Add(pg.data()) - if err != nil { - return err - } - first, err := pc.Seek(0) - if err != nil { - return err - } - if err := sl.append(first, pid); err != nil { - return err - } - } else { - if err = pbtx.Set(pid, pg.data()); err != nil { - return err - } - } - } - return nil -} - -// updateMeta updates the index's meta information based on the changes -// applied with the batch. -func (b *Batch) updateMeta(tx *bolt.Tx) error { - b.ix.meta = b.meta - bkt := tx.Bucket([]byte(bktMeta)) - if bkt == nil { - return fmt.Errorf("meta bucket not found") - } - v, err := b.ix.meta.bytes() - if err != nil { - return fmt.Errorf("error encoding meta: %s", err) - } - return bkt.Put([]byte(keyMeta), v) -} diff --git a/index/index_test.go b/index/index_test.go deleted file mode 100644 index 60a0228e1..000000000 --- a/index/index_test.go +++ /dev/null @@ -1 +0,0 @@ -package index diff --git a/index/iter.go b/index/iter.go deleted file mode 100644 index cf44f3678..000000000 --- a/index/iter.go +++ /dev/null @@ -1,294 +0,0 @@ -package index - -import ( - "io" - "sort" -) - -// An Iterator provides sorted iteration over a list of uint64s. -type Iterator interface { - // Next retrieves the next document ID in the postings list. - Next() (DocID, error) - // Seek moves the cursor to ID or the closest following one, if it doesn't exist. - // It returns the ID at the position. - Seek(id DocID) (DocID, error) -} - -type mergeIterator struct { - i1, i2 Iterator - v1, v2 DocID - e1, e2 error -} - -func (it *mergeIterator) Next() (DocID, error) { - if it.e1 == io.EOF && it.e2 == io.EOF { - return 0, io.EOF - } - if it.e1 != nil { - if it.e1 != io.EOF { - return 0, it.e1 - } - x := it.v2 - it.v2, it.e2 = it.i2.Next() - return x, nil - } - if it.e2 != nil { - if it.e2 != io.EOF { - return 0, it.e2 - } - x := it.v1 - it.v1, it.e1 = it.i1.Next() - return x, nil - } - if it.v1 < it.v2 { - x := it.v1 - it.v1, it.e1 = it.i1.Next() - return x, nil - } else if it.v2 < it.v1 { - x := it.v2 - it.v2, it.e2 = it.i2.Next() - return x, nil - } else { - x := it.v1 - it.v1, it.e1 = it.i1.Next() - it.v2, it.e2 = it.i2.Next() - return x, nil - } -} - -func (it *mergeIterator) Seek(id DocID) (DocID, error) { - // We just have to advance the first iterator. The next common match is also - // the next seeked ID of the intersection. - it.v1, it.e1 = it.i1.Seek(id) - it.v2, it.e2 = it.i2.Seek(id) - return it.Next() -} - -// Merge returns a new Iterator over the union of the input iterators. -func Merge(its ...Iterator) Iterator { - if len(its) == 0 { - return nil - } - i1 := its[0] - - for _, i2 := range its[1:] { - i1 = &mergeIterator{i1: i1, i2: i2} - } - return i1 -} - -// ExpandIterator walks through the iterator and returns the result list. -// The iterator is closed after completion. -func ExpandIterator(it Iterator) ([]DocID, error) { - var ( - res = []DocID{} - v DocID - err error - ) - for v, err = it.Seek(0); err == nil; v, err = it.Next() { - res = append(res, v) - } - if err == io.EOF { - return res, nil - } - return res, err -} - -type intersectIterator struct { - i1, i2 Iterator - v1, v2 DocID - e1, e2 error -} - -// Intersect returns a new Iterator over the intersection of the input iterators. -func Intersect(its ...Iterator) Iterator { - if len(its) == 0 { - return nil - } - i1 := its[0] - - for _, i2 := range its[1:] { - i1 = &intersectIterator{i1: i1, i2: i2} - } - return i1 -} - -func (it *intersectIterator) Next() (DocID, error) { - for { - if it.e1 != nil { - return 0, it.e1 - } - if it.e2 != nil { - return 0, it.e2 - } - if it.v1 < it.v2 { - it.v1, it.e1 = it.i1.Seek(it.v2) - } else if it.v2 < it.v1 { - it.v2, it.e2 = it.i2.Seek(it.v1) - } else { - v := it.v1 - it.v1, it.e1 = it.i1.Next() - it.v2, it.e2 = it.i2.Next() - return v, nil - } - } -} - -func (it *intersectIterator) Seek(id DocID) (DocID, error) { - // We have to advance both iterators. Otherwise, we get a false-positive - // match on 0 if only on of the iterators has it. - it.v1, it.e1 = it.i1.Seek(id) - it.v2, it.e2 = it.i2.Seek(id) - return it.Next() -} - -// A skiplist iterator iterates through a list of value/pointer pairs. -type skiplistIterator interface { - // seek returns the value and pointer at or before v. - seek(v DocID) (val DocID, ptr uint64, err error) - // next returns the next value/pointer pair. - next() (val DocID, ptr uint64, err error) -} - -// iteratorStore allows to retrieve an iterator based on a key. -type iteratorStore interface { - get(uint64) (Iterator, error) -} - -// skippingIterator implements the iterator interface based on skiplist, which -// allows to jump to the iterator closest to the seeked value. -// -// This iterator allows for speed up in seeks if the underlying data cannot -// be searched in O(log n). -// Ideally, the skiplist is seekable in O(log n). -type skippingIterator struct { - skiplist skiplistIterator - iterators iteratorStore - - // The iterator holding the next value. - cur Iterator -} - -// Seek implements the Iterator interface. -func (it *skippingIterator) Seek(id DocID) (DocID, error) { - _, ptr, err := it.skiplist.seek(id) - if err != nil { - return 0, err - } - cur, err := it.iterators.get(ptr) - if err != nil { - return 0, err - } - it.cur = cur - - return it.cur.Seek(id) -} - -// Next implements the Iterator interface. -func (it *skippingIterator) Next() (DocID, error) { - // If next was called initially. - // TODO(fabxc): should this just panic and initial call to seek() be required? - if it.cur == nil { - return it.Seek(0) - } - - if id, err := it.cur.Next(); err == nil { - return id, nil - } else if err != io.EOF { - return 0, err - } - // We reached the end of the current iterator. Get the next iterator through - // our skiplist. - _, ptr, err := it.skiplist.next() - if err != nil { - // Here we return the actual io.EOF if we reached the end of the iterator - // retrieved from the last skiplist entry. - return 0, err - } - // Iterate over the next iterator. - cur, err := it.iterators.get(ptr) - if err != nil { - return 0, err - } - it.cur = cur - - // Return the first value in the new iterator. - return it.cur.Seek(0) -} - -// plainListIterator implements the iterator interface on a sorted list of integers. -type plainListIterator struct { - list list - pos int -} - -func newPlainListIterator(l []DocID) *plainListIterator { - it := &plainListIterator{list: list(l)} - sort.Sort(it.list) - return it -} - -func (it *plainListIterator) Seek(id DocID) (DocID, error) { - it.pos = sort.Search(it.list.Len(), func(i int) bool { return it.list[i] >= id }) - return it.Next() - -} - -func (it *plainListIterator) Next() (DocID, error) { - if it.pos >= it.list.Len() { - return 0, io.EOF - } - x := it.list[it.pos] - it.pos++ - return x, nil -} - -type list []DocID - -func (l list) Len() int { return len(l) } -func (l list) Less(i, j int) bool { return l[i] < l[j] } -func (l list) Swap(i, j int) { l[i], l[j] = l[j], l[i] } - -// plainSkiplistIterator implements the skiplistIterator interface on plain -// in-memory mapping. -type plainSkiplistIterator struct { - m map[DocID]uint64 - keys list - pos int -} - -func newPlainSkiplistIterator(m map[DocID]uint64) *plainSkiplistIterator { - var keys list - for k := range m { - keys = append(keys, k) - } - sort.Sort(keys) - - return &plainSkiplistIterator{ - m: m, - keys: keys, - } -} - -// seek implements the skiplistIterator interface. -func (it *plainSkiplistIterator) seek(id DocID) (DocID, uint64, error) { - pos := sort.Search(len(it.keys), func(i int) bool { return it.keys[i] >= id }) - // The skiplist iterator points to the element at or before the last value. - if pos > 0 && it.keys[pos] > id { - it.pos = pos - 1 - } else { - it.pos = pos - } - return it.next() - -} - -// next implements the skiplistIterator interface. -func (it *plainSkiplistIterator) next() (DocID, uint64, error) { - if it.pos >= len(it.keys) { - return 0, 0, io.EOF - } - k := it.keys[it.pos] - it.pos++ - return k, it.m[k], nil -} diff --git a/index/iter_test.go b/index/iter_test.go deleted file mode 100644 index 763db7492..000000000 --- a/index/iter_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package index - -import ( - "reflect" - "testing" -) - -func TestMultiIntersect(t *testing.T) { - var cases = []struct { - a, b, c []DocID - res []DocID - }{ - { - a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001}, - b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001}, - c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200}, - res: []DocID{2, 5, 6, 1001}, - }, - } - - for _, c := range cases { - i1 := newPlainListIterator(c.a) - i2 := newPlainListIterator(c.b) - i3 := newPlainListIterator(c.c) - - res, err := ExpandIterator(Intersect(i1, i2, i3)) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } - } -} - -func TestIntersectIterator(t *testing.T) { - var cases = []struct { - a, b []DocID - res []DocID - }{ - { - a: []DocID{1, 2, 3, 4, 5}, - b: []DocID{6, 7, 8, 9, 10}, - res: []DocID{}, - }, - { - a: []DocID{1, 2, 3, 4, 5}, - b: []DocID{4, 5, 6, 7, 8}, - res: []DocID{4, 5}, - }, - { - a: []DocID{1, 2, 3, 4, 9, 10}, - b: []DocID{1, 4, 5, 6, 7, 8, 10, 11}, - res: []DocID{1, 4, 10}, - }, { - a: []DocID{1}, - b: []DocID{0, 1}, - res: []DocID{1}, - }, - } - - for _, c := range cases { - i1 := newPlainListIterator(c.a) - i2 := newPlainListIterator(c.b) - - res, err := ExpandIterator(&intersectIterator{i1: i1, i2: i2}) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } - } -} - -func TestMergeIntersect(t *testing.T) { - var cases = []struct { - a, b, c []DocID - res []DocID - }{ - { - a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001}, - b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001}, - c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200}, - res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200}, - }, - } - - for _, c := range cases { - i1 := newPlainListIterator(c.a) - i2 := newPlainListIterator(c.b) - i3 := newPlainListIterator(c.c) - - res, err := ExpandIterator(Merge(i1, i2, i3)) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } - } -} - -func BenchmarkIntersect(t *testing.B) { - var a, b, c, d []DocID - - for i := 0; i < 10000000; i += 2 { - a = append(a, DocID(i)) - } - for i := 5000000; i < 5000100; i += 4 { - b = append(b, DocID(i)) - } - for i := 5090000; i < 5090600; i += 4 { - b = append(b, DocID(i)) - } - for i := 4990000; i < 5100000; i++ { - c = append(c, DocID(i)) - } - for i := 4000000; i < 6000000; i++ { - d = append(d, DocID(i)) - } - - i1 := newPlainListIterator(a) - i2 := newPlainListIterator(b) - i3 := newPlainListIterator(c) - i4 := newPlainListIterator(d) - - t.ResetTimer() - - for i := 0; i < t.N; i++ { - if _, err := ExpandIterator(Intersect(i1, i2, i3, i4)); err != nil { - t.Fatal(err) - } - } -} - -func TestMergeIterator(t *testing.T) { - var cases = []struct { - a, b []DocID - res []DocID - }{ - { - a: []DocID{1, 2, 3, 4, 5}, - b: []DocID{6, 7, 8, 9, 10}, - res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, - }, - { - a: []DocID{1, 2, 3, 4, 5}, - b: []DocID{4, 5, 6, 7, 8}, - res: []DocID{1, 2, 3, 4, 5, 6, 7, 8}, - }, - { - a: []DocID{1, 2, 3, 4, 9, 10}, - b: []DocID{1, 4, 5, 6, 7, 8, 10, 11}, - res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, - }, - } - - for _, c := range cases { - i1 := newPlainListIterator(c.a) - i2 := newPlainListIterator(c.b) - - res, err := ExpandIterator(&mergeIterator{i1: i1, i2: i2}) - if err != nil { - t.Fatalf("Unexpected error: %s", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } - } -} - -func TestSkippingIterator(t *testing.T) { - var cases = []struct { - skiplist skiplistIterator - its iteratorStore - res []DocID - }{ - { - skiplist: newPlainSkiplistIterator(map[DocID]uint64{ - 5: 3, - 50: 2, - 500: 1, - }), - its: testIteratorStore{ - 3: newPlainListIterator(list{5, 7, 8, 9}), - 2: newPlainListIterator(list{54, 60, 61}), - 1: newPlainListIterator(list{1200, 1300, 100000}), - }, - res: []DocID{5, 7, 8, 9, 54, 60, 61, 1200, 1300, 100000}, - }, - { - skiplist: newPlainSkiplistIterator(map[DocID]uint64{ - 0: 3, - 50: 2, - }), - its: testIteratorStore{ - 3: newPlainListIterator(list{5, 7, 8, 9}), - 2: newPlainListIterator(list{54, 60, 61}), - }, - res: []DocID{5, 7, 8, 9, 54, 60, 61}, - }, - } - - for _, c := range cases { - it := &skippingIterator{ - skiplist: c.skiplist, - iterators: c.its, - } - res, err := ExpandIterator(it) - if err != nil { - t.Fatalf("Unexpected error", err) - } - if !reflect.DeepEqual(res, c.res) { - t.Fatalf("Expected %v but got %v", c.res, res) - } - } -} - -type testIteratorStore map[uint64]Iterator - -func (s testIteratorStore) get(id uint64) (Iterator, error) { - it, ok := s[id] - if !ok { - return nil, errNotFound - } - return it, nil -} diff --git a/index/page.go b/index/page.go deleted file mode 100644 index 868d7ba14..000000000 --- a/index/page.go +++ /dev/null @@ -1,108 +0,0 @@ -package index - -import ( - "encoding/binary" - "errors" - "io" -) - -const pageSize = 2048 - -var errPageFull = errors.New("page full") - -type pageCursor interface { - Iterator - append(v DocID) error -} - -type page interface { - cursor() pageCursor - init(v DocID) error - data() []byte -} - -type pageDelta struct { - b []byte -} - -type pageType uint8 - -const ( - pageTypeDelta pageType = iota -) - -func newPageDelta(data []byte) *pageDelta { - return &pageDelta{b: data} -} - -func (p *pageDelta) init(v DocID) error { - // Write first value. - binary.PutUvarint(p.b, uint64(v)) - return nil -} - -func (p *pageDelta) cursor() pageCursor { - return &pageDeltaCursor{data: p.b} -} - -func (p *pageDelta) data() []byte { - return p.b -} - -type pageDeltaCursor struct { - data []byte - pos int - cur DocID -} - -func (p *pageDeltaCursor) append(id DocID) error { - // Run to the end. - _, err := p.Next() - for ; err == nil; _, err = p.Next() { - // Consume. - } - if err != io.EOF { - return err - } - if len(p.data)-p.pos < binary.MaxVarintLen64 { - return errPageFull - } - if p.cur >= id { - return errOutOfOrder - } - p.pos += binary.PutUvarint(p.data[p.pos:], uint64(id-p.cur)) - p.cur = id - return nil -} - -func (p *pageDeltaCursor) Close() error { - return nil -} - -func (p *pageDeltaCursor) Seek(min DocID) (v DocID, err error) { - if min < p.cur { - p.pos = 0 - } - for v, err = p.Next(); err == nil && v < min; v, err = p.Next() { - // Consume. - } - return p.cur, err -} - -func (p *pageDeltaCursor) Next() (DocID, error) { - var n int - var dv uint64 - if p.pos == 0 { - dv, n = binary.Uvarint(p.data) - p.cur = DocID(dv) - } else { - dv, n = binary.Uvarint(p.data[p.pos:]) - if n <= 0 || dv == 0 { - return 0, io.EOF - } - p.cur += DocID(dv) - } - p.pos += n - - return p.cur, nil -} diff --git a/index/page_test.go b/index/page_test.go deleted file mode 100644 index a3f17bada..000000000 --- a/index/page_test.go +++ /dev/null @@ -1,116 +0,0 @@ -package index - -import ( - "math/rand" - "reflect" - "testing" -) - -func TestPageDelta(t *testing.T) { - var ( - vals []DocID - last DocID - ) - for i := 0; i < 10000; i++ { - vals = append(vals, last) - last += DocID(rand.Int63n(1<<9) + 1) - } - data := make([]byte, pageSize) - page := newPageDelta(data) - - if err := page.init(vals[0]); err != nil { - t.Fatal(err) - } - - var num int - pc := page.cursor() - - for _, v := range vals[1:] { - if err := pc.append(v); err != nil { - if err == errPageFull { - break - } - t.Fatal(err) - } - num++ - } - - res, err := ExpandIterator(pc) - if err != nil { - t.Fatal(err) - } - if !reflect.DeepEqual(res, vals[:num+1]) { - t.Errorf("output did not match") - t.Errorf("expected: %v", vals[:num+1]) - t.Errorf("received: %v", res) - } -} - -func BenchmarkPageDeltaAppend(b *testing.B) { - var ( - vals []DocID - last DocID - ) - for i := 0; i < 10000; i++ { - vals = append(vals, last) - last += DocID(rand.Int63n(1<<10) + 1) - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - data := make([]byte, pageSize) - page := newPageDelta(data) - - if err := page.init(vals[0]); err != nil { - b.Fatal(err) - } - - pc := page.cursor() - - for _, v := range vals[1:] { - if err := pc.append(v); err != nil { - if err == errPageFull { - break - } - b.Fatal(err) - } - } - } -} - -func BenchmarkPageDeltaRead(b *testing.B) { - var ( - vals []DocID - last DocID - ) - for i := 0; i < 10000; i++ { - vals = append(vals, last) - last += DocID(rand.Int63n(1<<10) + 1) - } - data := make([]byte, pageSize) - page := newPageDelta(data) - - if err := page.init(vals[0]); err != nil { - b.Fatal(err) - } - - pc := page.cursor() - - for _, v := range vals[1:] { - if err := pc.append(v); err != nil { - if err == errPageFull { - break - } - b.Fatal(err) - } - } - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - if _, err := ExpandIterator(pc); err != nil { - b.Fatal(err) - } - } -} diff --git a/index/postings.go b/index/postings.go deleted file mode 100644 index 05f8c91ca..000000000 --- a/index/postings.go +++ /dev/null @@ -1,72 +0,0 @@ -package index - -import ( - "io" - - "github.com/boltdb/bolt" -) - -type iteratorStoreFunc func(k uint64) (Iterator, error) - -func (s iteratorStoreFunc) get(k uint64) (Iterator, error) { - return s(k) -} - -// boltSkiplistCursor implements the skiplistCurosr interface. -// -// TODO(fabxc): benchmark the overhead of a bucket per key. -// It might be more performant to have all skiplists in the same bucket. -// -// 20k keys, ~10 skiplist entries avg -> 200k keys, 1 bucket vs 20k buckets, 10 keys -// -type boltSkiplistCursor struct { - // k is currently unused. If the bucket holds entries for more than - // just a single key, it will be necessary. - k uint64 - c *bolt.Cursor - bkt *bolt.Bucket -} - -func (s *boltSkiplistCursor) next() (DocID, uint64, error) { - db, pb := s.c.Next() - if db == nil { - return 0, 0, io.EOF - } - return newDocID(db), decodeUint64(pb), nil -} - -func (s *boltSkiplistCursor) seek(k DocID) (DocID, uint64, error) { - db, pb := s.c.Seek(k.bytes()) - if db == nil { - db, pb = s.c.Last() - if db == nil { - return 0, 0, io.EOF - } - } - did, pid := newDocID(db), decodeUint64(pb) - - if did > k { - // If the found entry is behind the seeked ID, try the previous - // entry if it exists. The page it points to contains the range of k. - dbp, pbp := s.c.Prev() - if dbp != nil { - did, pid = newDocID(dbp), decodeUint64(pbp) - } else { - // We skipped before the first entry. The cursor is now out of - // state and subsequent calls to Next() will return nothing. - // Reset it to the first position. - s.c.First() - } - } - return did, pid, nil -} - -func (s *boltSkiplistCursor) append(d DocID, p uint64) error { - k, _ := s.c.Last() - - if k != nil && decodeUint64(k) >= uint64(d) { - return errOutOfOrder - } - - return s.bkt.Put(encodeUint64(uint64(d)), encodeUint64(p)) -} diff --git a/pages/README.md b/pages/README.md deleted file mode 100644 index ecdf0fa1c..000000000 --- a/pages/README.md +++ /dev/null @@ -1,5 +0,0 @@ -# pages - -Pages stores pages of blob data. It is essentially a minimal version of -BoltDB, where the the B+ tree was removed and replaced by simply writing -page-aligned byte slices. diff --git a/pages/db.go b/pages/db.go deleted file mode 100644 index 08a266270..000000000 --- a/pages/db.go +++ /dev/null @@ -1,782 +0,0 @@ -package pages - -import ( - "errors" - "fmt" - "hash/fnv" - "math" - "os" - "runtime" - "sync" - "time" - "unsafe" -) - -// These errors can be returned when opening or calling methods on a DB. -var ( - // ErrDatabaseNotOpen is returned when a DB instance is accessed before it - // is opened or after it is closed. - ErrDatabaseNotOpen = errors.New("database not open") - - // ErrDatabaseOpen is returned when opening a database that is - // already open. - ErrDatabaseOpen = errors.New("database already open") - - // ErrInvalid is returned when both meta pages on a database are invalid. - // This typically occurs when a file is not a bolt database. - ErrInvalid = errors.New("invalid database") - - // ErrVersionMismatch is returned when the data file was created with a - // different version of Bolt. - ErrVersionMismatch = errors.New("version mismatch") - - // ErrChecksum is returned when either meta page checksum does not match. - ErrChecksum = errors.New("checksum error") - - // ErrTimeout is returned when a database cannot obtain an exclusive lock - // on the data file after the timeout passed to Open(). - ErrTimeout = errors.New("timeout") - - // ErrNotFound is returned when a user page for an ID could not be found. - ErrNotFound = errors.New("not found") - - ErrTxClosed = errors.New("transaction closed") - - ErrTxNotWritable = errors.New("transaction not writable") -) - -// Marker value that indicates that a file is a pagebuf file. -const magic uint32 = 0xAFFEAFFE - -// The data file version. -const version = 1 - -// The largest step that can be taken when remapping the mmap. -const maxMmapStep = 1 << 30 // 1GB - -// defaultPageSize of the underlying buffers is set to the OS page size. -var defaultPageSize = os.Getpagesize() - -// DB is an interface providing access to persistent byte chunks that -// are backed by memory-mapped pages. -type DB struct { - // If you want to read the entire database fast, you can set MmapFlag to - // syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead. - MmapFlags int - - // AllocSize is the amount of space allocated when the database - // needs to create new pages. This is done to amortize the cost - // of truncate() and fsync() when growing the data file. - AllocSize int - - path string // location of the pagebuf file - file *os.File // the opened file of path - opened bool - data *[maxMapSize]byte - dataref []byte // mmap'ed readonly, write throws SEGV - datasz int - filesz int // current on disk file size - pageSize int - meta0 *meta - meta1 *meta - freelist *freelist - rwtx *Tx - txs []*Tx - - pagePool sync.Pool - - rwlock sync.Mutex // Allows only one writer at a time. - metalock sync.Mutex // Protects meta page access. - mmaplock sync.RWMutex // Protects mmap access during remapping - - ops struct { - writeAt func(b []byte, off int64) (n int, err error) - } -} - -// Options defines configuration parameters with which a PageBuf is initialized. -type Options struct { - // Timeout is the amount of time to wait to obtain a file lock. - // When set to zero it will wait indefinitely. This option is only - // available on Darwin and Linux. - Timeout time.Duration - - // Sets the DB.MmapFlags flag before memory mapping the file. - MmapFlags int - - // XXX(fabxc): potentially allow setting different allocation strategies - // to fit different use cases. - - // InitialMmapSize is the initial mmap size of the database - // in bytes. - // - // If <=0, the initial map size is 0. - // If initialMmapSize is smaller than the previous database size, - // it takes no effect. - InitialMmapSize int - - // PageSize defines a custom page size used. It cannot be changed later. - // Must be a multiple of the operating system's default page size. - PageSize int -} - -// DefaultOptions specifies a set of default parameters used when a pagebuf -// is opened without explicit options. -var DefaultOptions = Options{ - // Use the OS's default page size. - PageSize: defaultPageSize, -} - -// Default values if not set in a DB instance. -const ( - DefaultAllocSize = 16 * 1024 * 1024 -) - -// Open and create a new database under the given path. -func Open(path string, mode os.FileMode, o *Options) (*DB, error) { - db := &DB{ - opened: true, - } - - // Set default options if no options are provided. - if o == nil { - o = &DefaultOptions - } - db.MmapFlags = o.MmapFlags - - db.AllocSize = DefaultAllocSize - - flag := os.O_RDWR - - // Open data file and separate sync handler for metadata writes. - db.path = path - var err error - if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil { - _ = db.close() - return nil, err - } - - // Lock file so that other processes using pagebuf in read-write mode cannot - // use the underlying data at the same time. - if err := flock(db, mode, true, o.Timeout); err != nil { - _ = db.close() - return nil, err - } - - // Default values for test hooks - db.ops.writeAt = db.file.WriteAt - - // Initialize the database if it doesn't exist. - if info, err := db.file.Stat(); err != nil { - return nil, err - } else if info.Size() == 0 { - // Initialize new files with meta pages. - if err := db.init(o.PageSize); err != nil { - return nil, err - } - } else { - // Read the first meta page to determine the page size. - var buf [0x1000]byte - if _, err := db.file.ReadAt(buf[:], 0); err == nil { - m := db.pageInBuffer(buf[:], 0).meta() - if err := m.validate(); err != nil { - // We cannot verify which page sizes are used. - return nil, fmt.Errorf("cannot read page size: %s", err) - } else { - db.pageSize = int(m.pageSize) - } - } else { - return nil, fmt.Errorf("reading first meta page failed: %s", err) - } - } - - // Initialize page pool. - db.pagePool = sync.Pool{ - New: func() interface{} { - return make([]byte, db.pageSize) - }, - } - - // Memory map the data file. - if err := db.mmap(o.InitialMmapSize); err != nil { - _ = db.close() - return nil, err - } - - // Read in the freelist. - db.freelist = newFreelist() - db.freelist.read(db.page(db.meta().freelist)) - - // Mark the database as opened and return. - return db, nil -} - -func validatePageSize(psz int) error { - // Max value the content length can hold. - if defaultPageSize > math.MaxUint16 { - return fmt.Errorf("invalid page size %d", psz) - } - // Page size must be a multiple of OS page size so we stay - // page aligned. - if psz < defaultPageSize { - if defaultPageSize%psz != 0 { - return fmt.Errorf("invalid page size %d", psz) - } - } else if psz > defaultPageSize { - if psz%defaultPageSize != 0 { - return fmt.Errorf("invalid page size %d", psz) - } - } - return nil -} - -// init creates a new database file and initializes its meta pages. -func (db *DB) init(psz int) error { - if err := validatePageSize(psz); err != nil { - return err - } - // Set the page size to the OS page size. - db.pageSize = psz - - // Create two meta pages on a buffer. - buf := make([]byte, db.pageSize*4) - for i := 0; i < 2; i++ { - p := db.pageInBuffer(buf[:], pgid(i)) - p.id = pgid(i) - p.flags = pageFlagMeta - - // Initialize the meta page. - m := p.meta() - m.magic = magic - m.version = version - m.pageSize = uint32(db.pageSize) - m.freelist = 2 - m.txid = txid(i) - m.pgid = 4 // TODO(fabxc): we initialize with zero pages, what to do here? - m.checksum = m.sum64() - } - - // Write an empty freelist at page 3. - p := db.pageInBuffer(buf[:], pgid(2)) - p.id = pgid(2) - p.flags = pageFlagFreelist - p.count = 0 - - // Write the first empty page. - p = db.pageInBuffer(buf[:], pgid(3)) - p.id = pgid(3) - p.flags = pageFlagData - p.count = 0 - - // Write the buffer to our data file. - if _, err := db.ops.writeAt(buf, 0); err != nil { - return err - } - if err := fdatasync(db); err != nil { - return err - } - - return nil -} - -// Sync executes fdatasync() against the database file handle. -func (db *DB) Sync() error { return fdatasync(db) } - -// Close synchronizes and closes the memory-mapped pagebuf file. -func (db *DB) Close() error { - db.rwlock.Lock() - defer db.rwlock.Unlock() - - db.metalock.Lock() - defer db.metalock.Unlock() - - db.mmaplock.RLock() - defer db.mmaplock.RUnlock() - - return db.close() -} - -func (db *DB) close() error { - if !db.opened { - return nil - } - - db.opened = false - db.freelist = nil - db.ops.writeAt = nil - - // Close the mmap. - if err := db.munmap(); err != nil { - return err - } - - // Close file handles. - if db.file != nil { - // Close the file descriptor. - if err := db.file.Close(); err != nil { - return fmt.Errorf("db file close: %s", err) - } - db.file = nil - } - - db.path = "" - return nil -} - -// Update executes a function within the context of a read-write managed transaction. -// If no error is returned from the function then the transaction is committed. -// If an error is returned then the entire transaction is rolled back. -// Any error that is returned from the function or returned from the commit is -// returned from the Update() method. -// -// Attempting to manually commit or rollback within the function will cause a panic. -func (db *DB) Update(fn func(*Tx) error) error { - t, err := db.Begin(true) - if err != nil { - return err - } - - // Make sure the transaction rolls back in the event of a panic. - defer func() { - if t.db != nil { - t.rollback() - } - }() - - // Mark as a managed tx so that the inner function cannot manually commit. - t.managed = true - - // If an error is returned from the function then rollback and return error. - err = fn(t) - t.managed = false - if err != nil { - _ = t.Rollback() - return err - } - - return t.Commit() -} - -// View executes a function within the context of a managed read-only transaction. -// Any error that is returned from the function is returned from the View() method. -// -// Attempting to manually rollback within the function will cause a panic. -func (db *DB) View(fn func(*Tx) error) error { - t, err := db.Begin(false) - if err != nil { - return err - } - - // Make sure the transaction rolls back in the event of a panic. - defer func() { - if t.db != nil { - t.rollback() - } - }() - - // Mark as a managed tx so that the inner function cannot manually rollback. - t.managed = true - - // If an error is returned from the function then pass it through. - err = fn(t) - t.managed = false - if err != nil { - _ = t.Rollback() - return err - } - - if err := t.Rollback(); err != nil { - return err - } - - return nil -} - -// pageExists checks whether the page with the given id exists. -func (db *DB) pageExists(id pgid) bool { - // The page exists if it is not in the freelist or out of the data range. - return !db.freelist.cache[pgid(id)] && int(id+1)*db.pageSize < db.datasz -} - -// page retrieves a page reference from the mmap based on the current page size. -func (db *DB) page(id pgid) *page { - pos := id * pgid(db.pageSize) - return (*page)(unsafe.Pointer(&db.data[pos])) -} - -// pageInBuffer retrieves a page reference from a given byte array based on the current -// page size. -func (db *DB) pageInBuffer(b []byte, id pgid) *page { - pos := id * pgid(db.pageSize) - return (*page)(unsafe.Pointer(&b[pos])) -} - -// meta retrieves the current meta page reference. -func (db *DB) meta() *meta { - // We have to return the meta with the highest txid which doesn't fail - // validation. Otherwise, we can cause errors when in fact the database is - // in a consistent state. metaA is the one with the higher txid. - metaA := db.meta0 - metaB := db.meta1 - if db.meta1.txid > db.meta0.txid { - metaA = db.meta1 - metaB = db.meta0 - } - - // Use higher meta page if valid. Otherwise fallback to previous, if valid. - if err := metaA.validate(); err == nil { - return metaA - } else if err := metaB.validate(); err == nil { - return metaB - } - - // This should never be reached, because both meta1 and meta0 were validated - // on mmap() and we do fsync() on every write. - panic("pagebuf.PageBuf.meta(): invalid meta pages") -} - -// allocate returns a contiguous block of memory starting at a given page. -func (db *DB) allocate(count int) (*page, error) { - // Allocate a temporary buffer for the page. - var buf []byte - if count == 1 { - buf = db.pagePool.Get().([]byte) - } else { - buf = make([]byte, count*db.pageSize) - } - p := (*page)(unsafe.Pointer(&buf[0])) - p.overflow = uint32(count - 1) - - // Use pages from the freelist if they are available. - if p.id = db.freelist.allocate(count); p.id != 0 { - return p, nil - } - - // Resize mmap() if we're at the end. - p.id = db.rwtx.meta.pgid - var minsz = int((p.id+pgid(count))+1) * db.pageSize - if minsz >= db.datasz { - if err := db.mmap(minsz); err != nil { - return nil, fmt.Errorf("mmap allocate error: %s", err) - } - } - - // Move the page id high water mark. - db.rwtx.meta.pgid += pgid(count) - return p, nil -} - -// grow grows the size of the database to the given sz. -func (db *DB) grow(sz int) error { - // Ignore if the new size is less than available file size. - if sz <= db.filesz { - return nil - } - - // If the data is smaller than the alloc size then only allocate what's needed. - // Once it goes over the allocation size then allocate in chunks. - if db.datasz < db.AllocSize { - sz = db.datasz - } else { - sz += db.AllocSize - } - - // Truncate and fsync to ensure file size metadata is flushed. - // https://github.com/boltdb/bolt/issues/284 - if runtime.GOOS != "windows" { - if err := db.file.Truncate(int64(sz)); err != nil { - return fmt.Errorf("file resize error: %s", err) - } - } - if err := db.file.Sync(); err != nil { - return fmt.Errorf("file sync error: %s", err) - } - - db.filesz = sz - return nil -} - -// mmap opens the underlying memory-mapped file and initializes it. -// minsz is the minimum size that the mmap can be. -func (db *DB) mmap(minsz int) error { - db.mmaplock.Lock() - defer db.mmaplock.Unlock() - - info, err := db.file.Stat() - if err != nil { - return fmt.Errorf("mmap stat error: %s", err) - } else if int(info.Size()) < db.pageSize*2 { - return fmt.Errorf("file size too small") - } - - // Ensure the size is at least the minimum size. - var size = int(info.Size()) - if size < minsz { - size = minsz - } - size, err = db.mmapSize(size) - if err != nil { - return err - } - - // Unmap existing data before continuing. - if err := db.munmap(); err != nil { - return err - } - // Memory-map the data file as a byte slice. - if err := mmap(db, size); err != nil { - return err - } - - // Save references to the meta pages. - db.meta0 = db.page(0).meta() - db.meta1 = db.page(1).meta() - - // Validate the meta pages. We only return an error if both meta pages fail - // validation, since meta0 failing validation means that it wasn't saved - // properly -- but we can recover using meta1. And vice-versa. - err0 := db.meta0.validate() - err1 := db.meta1.validate() - if err0 != nil && err1 != nil { - return err0 - } - return nil -} - -// munmap unmaps the data file from memory. -func (db *DB) munmap() error { - if err := munmap(db); err != nil { - return fmt.Errorf("unmap error: %s", err) - } - return nil -} - -// mmapSize determines the appropriate size for the mmap given the current size -// of the database. The minimum size is 32KB and doubles until it reaches 1GB. -// Returns an error if the new mmap size is greater than the max allowed. -func (db *DB) mmapSize(size int) (int, error) { - // Double the size from 32KB until 1GB. - for i := uint(15); i <= 30; i++ { - if size <= 1< maxMapSize { - return 0, fmt.Errorf("mmap too large") - } - - // If larger than 1GB then grow by 1GB at a time. - sz := int64(size) - if remainder := sz % int64(maxMmapStep); remainder > 0 { - sz += int64(maxMmapStep) - remainder - } - - // Ensure that the mmap size is a multiple of the page size. - // This should always be true since we're incrementing in MBs. - pageSize := int64(db.pageSize) - if (sz % pageSize) != 0 { - sz = ((sz / pageSize) + 1) * pageSize - } - - // If we've exceeded the max size then only grow up to the max size. - if sz > maxMapSize { - sz = maxMapSize - } - - return int(sz), nil -} - -func (db *DB) String() string { - return fmt.Sprintf("PageBuf<%s>", db.path) -} - -// Path returns the path to the currently opened pagebuf file. -func (db *DB) Path() string { - return db.path -} - -// Begin starts a new transaction. -// Multiple read-only transactions can be used concurrently but only one -// write transaction can be used at a time. Starting multiple write transactions -// will cause the calls to block and be serialized until the current write -// transaction finishes. -// -// Transactions should not be dependent on one another. Opening a read -// transaction and a write transaction in the same goroutine can cause the -// writer to deadlock because the database periodically needs to re-mmap itself -// as it grows and it cannot do that while a read transaction is open. -// -// If a long running read transaction (for example, a snapshot transaction) is -// needed, you might want to set PageBuf.InitialMmapSize to a large enough value -// to avoid potential blocking of write transaction. -// -// IMPORTANT: You must close read-only transactions after you are finished or -// else the database will not reclaim old pages. -func (db *DB) Begin(writable bool) (*Tx, error) { - if writable { - return db.beginRWTx() - } - return db.beginTx() -} - -func (db *DB) beginTx() (*Tx, error) { - // Lock the meta pages while we initialize the transaction. We obtain - // the meta lock before the mmap lock because that's the order that the - // write transaction will obtain them. - db.metalock.Lock() - - // Obtain a read-only lock on the mmap. When the mmap is remapped it will - // obtain a write lock so all transactions must finish before it can be - // remapped. - db.mmaplock.RLock() - - // Exit if the database is not open yet. - if !db.opened { - db.mmaplock.RUnlock() - db.metalock.Unlock() - return nil, ErrDatabaseNotOpen - } - - // Create a transaction associated with the database. - t := &Tx{} - t.init(db) - - // Keep track of transaction until it closes. - db.txs = append(db.txs, t) - - // Unlock the meta pages. - db.metalock.Unlock() - - return t, nil -} - -func (db *DB) beginRWTx() (*Tx, error) { - // Obtain writer lock. This is released by the transaction when it closes. - // This enforces only one writer transaction at a time. - db.rwlock.Lock() - - // Once we have the writer lock then we can lock the meta pages so that - // we can set up the transaction. - db.metalock.Lock() - defer db.metalock.Unlock() - - // Exit if the database is not open yet. - if !db.opened { - db.rwlock.Unlock() - return nil, ErrDatabaseNotOpen - } - - // Create a transaction associated with the database. - t := &Tx{writable: true} - t.init(db) - db.rwtx = t - - // Free any pages associated with closed read-only transactions. - var minid txid = 0xFFFFFFFFFFFFFFFF - for _, t := range db.txs { - if t.meta.txid < minid { - minid = t.meta.txid - } - } - if minid > 0 { - db.freelist.release(minid - 1) - } - - return t, nil -} - -// removeTx removes a transaction from the database. -func (db *DB) removeTx(tx *Tx) { - // Release the read lock on the mmap. - db.mmaplock.RUnlock() - - // Use the meta lock to restrict access to the DB object. - db.metalock.Lock() - - // Remove the transaction. - for i, t := range db.txs { - if t == tx { - db.txs = append(db.txs[:i], db.txs[i+1:]...) - break - } - } - // Unlock the meta pages. - db.metalock.Unlock() -} - -// Size represents a valid page size. -type Size int8 - -// The valid sizes for allocated pages. -const ( - Size512 Size = -3 - Size1024 = -2 - Size2048 = -1 - Size4096 = 0 - Size8192 = 1 -) - -const ( - upageSizeMin = Size512 - upageSizeMax = Size8192 -) - -type meta struct { - magic uint32 - version uint32 - pageSize uint32 - flags uint32 - freelist pgid - txid txid - pgid pgid - checksum uint64 -} - -// validate checks the marker bytes and version of the meta page to ensure it matches this binary. -func (m *meta) validate() error { - if m.magic != magic { - return ErrInvalid - } else if m.version != version { - return ErrVersionMismatch - } else if m.checksum != 0 && m.checksum != m.sum64() { - return ErrChecksum - } - return nil -} - -// copy copies one meta object to another. -func (m *meta) copy(dest *meta) { - *dest = *m -} - -// write writes the meta onto a page. -func (m *meta) write(p *page) { - if m.freelist >= m.pgid { - panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid)) - } - - // Page id is either going to be 0 or 1 which we can determine by the transaction ID. - p.id = pgid(m.txid % 2) - p.flags |= pageFlagMeta - - // Calculate the checksum. - m.checksum = m.sum64() - - m.copy(p.meta()) -} - -// generates the checksum for the meta. -func (m *meta) sum64() uint64 { - var h = fnv.New64a() - _, _ = h.Write((*[unsafe.Offsetof(meta{}.checksum)]byte)(unsafe.Pointer(m))[:]) - return h.Sum64() -} - -// _assert will panic with a given formatted message if the given condition is false. -func _assert(condition bool, msg string, v ...interface{}) { - if !condition { - panic(fmt.Sprintf("assertion failed: "+msg, v...)) - } -} diff --git a/pages/freelist.go b/pages/freelist.go deleted file mode 100644 index 0f87f3acd..000000000 --- a/pages/freelist.go +++ /dev/null @@ -1,248 +0,0 @@ -package pages - -import ( - "fmt" - "sort" - "unsafe" -) - -// freelist represents a list of all pages that are available for allocation. -// It also tracks pages that have been freed but are still in use by open transactions. -type freelist struct { - ids []pgid // all free and available free page ids. - pending map[txid][]pgid // mapping of soon-to-be free page ids by tx. - cache map[pgid]bool // fast lookup of all free and pending page ids. -} - -// newFreelist returns an empty, initialized freelist. -func newFreelist() *freelist { - return &freelist{ - pending: make(map[txid][]pgid), - cache: make(map[pgid]bool), - } -} - -// size returns the size of the page after serialization. -func (f *freelist) size() int { - return PageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * f.count()) -} - -// count returns count of pages on the freelist -func (f *freelist) count() int { - return f.free_count() + f.pending_count() -} - -// free_count returns count of free pages -func (f *freelist) free_count() int { - return len(f.ids) -} - -// pending_count returns count of pending pages -func (f *freelist) pending_count() int { - var count int - for _, list := range f.pending { - count += len(list) - } - return count -} - -// all returns a list of all free ids and all pending ids in one sorted list. -func (f *freelist) all() []pgid { - m := make(pgids, 0) - - for _, list := range f.pending { - m = append(m, list...) - } - - sort.Sort(m) - return pgids(f.ids).merge(m) -} - -// allocate returns the starting page id of a contiguous list of pages of a given size. -// If a contiguous block cannot be found then 0 is returned. -func (f *freelist) allocate(n int) pgid { - if len(f.ids) == 0 { - return 0 - } - - var initial, previd pgid - for i, id := range f.ids { - if id <= 1 { - panic(fmt.Sprintf("invalid page allocation: %d", id)) - } - - // Reset initial page if this is not contiguous. - if previd == 0 || id-previd != 1 { - initial = id - } - - // If we found a contiguous block then remove it and return it. - if (id-initial)+1 == pgid(n) { - // If we're allocating off the beginning then take the fast path - // and just adjust the existing slice. This will use extra memory - // temporarily but the append() in free() will realloc the slice - // as is necessary. - if (i + 1) == n { - f.ids = f.ids[i+1:] - } else { - copy(f.ids[i-n+1:], f.ids[i+1:]) - f.ids = f.ids[:len(f.ids)-n] - } - - // Remove from the free cache. - for i := pgid(0); i < pgid(n); i++ { - delete(f.cache, initial+i) - } - - return initial - } - - previd = id - } - return 0 -} - -// free releases a page and its overflow for a given transaction id. -// If the page is already free then a panic will occur. -func (f *freelist) free(txid txid, p *page) { - if p.id <= 1 { - panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id)) - } - - // Free page and all its overflow pages. - var ids = f.pending[txid] - for id := p.id; id <= p.id+pgid(p.overflow); id++ { - // Verify that page is not already free. - if f.cache[id] { - panic(fmt.Sprintf("page %d already freed", id)) - } - - // Add to the freelist and cache. - ids = append(ids, id) - f.cache[id] = true - } - f.pending[txid] = ids -} - -// release moves all page ids for a transaction id (or older) to the freelist. -func (f *freelist) release(txid txid) { - m := make(pgids, 0) - for tid, ids := range f.pending { - if tid <= txid { - // Move transaction's pending pages to the available freelist. - // Don't remove from the cache since the page is still free. - m = append(m, ids...) - delete(f.pending, tid) - } - } - sort.Sort(m) - f.ids = pgids(f.ids).merge(m) -} - -// rollback removes the pages from a given pending tx. -func (f *freelist) rollback(txid txid) { - // Remove page ids from cache. - for _, id := range f.pending[txid] { - delete(f.cache, id) - } - - // Remove pages from pending list. - delete(f.pending, txid) -} - -// freed returns whether a given page is in the free list. -func (f *freelist) freed(pgid pgid) bool { - return f.cache[pgid] -} - -// read initializes the freelist from a freelist page. -func (f *freelist) read(p *page) { - // If the page.count is at the max uint16 value (64k) then it's considered - // an overflow and the size of the freelist is stored as the first element. - idx, count := 0, int(p.count) - if count == 0xFFFF { - idx = 1 - count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0]) - } - - // Copy the list of page ids from the freelist. - if count == 0 { - f.ids = nil - } else { - ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count] - f.ids = make([]pgid, len(ids)) - copy(f.ids, ids) - - // Make sure they're sorted. - sort.Sort(pgids(f.ids)) - } - - // Rebuild the page cache. - f.reindex() -} - -// write writes the page ids onto a freelist page. All free and pending ids are -// saved to disk since in the event of a program crash, all pending ids will -// become free. -func (f *freelist) write(p *page) error { - // Combine the old free pgids and pgids waiting on an open transaction. - ids := f.all() - - // Update the header flag. - p.flags |= pageFlagFreelist - - // The page.count can only hold up to 64k elements so if we overflow that - // number then we handle it by putting the size in the first element. - if len(ids) == 0 { - p.count = uint16(len(ids)) - } else if len(ids) < 0xFFFF { - p.count = uint16(len(ids)) - copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:], ids) - } else { - p.count = 0xFFFF - ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(len(ids)) - copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:], ids) - } - - return nil -} - -// reload reads the freelist from a page and filters out pending items. -func (f *freelist) reload(p *page) { - f.read(p) - - // Build a cache of only pending pages. - pcache := make(map[pgid]bool) - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { - pcache[pendingID] = true - } - } - - // Check each page in the freelist and build a new available freelist - // with any pages not in the pending lists. - var a []pgid - for _, id := range f.ids { - if !pcache[id] { - a = append(a, id) - } - } - f.ids = a - - // Once the available list is rebuilt then rebuild the free cache so that - // it includes the available and pending free pages. - f.reindex() -} - -// reindex rebuilds the free cache based on available and pending free lists. -func (f *freelist) reindex() { - f.cache = make(map[pgid]bool) - for _, id := range f.ids { - f.cache[id] = true - } - for _, pendingIDs := range f.pending { - for _, pendingID := range pendingIDs { - f.cache[pendingID] = true - } - } -} diff --git a/pages/freelist_test.go b/pages/freelist_test.go deleted file mode 100644 index 9176397ec..000000000 --- a/pages/freelist_test.go +++ /dev/null @@ -1,158 +0,0 @@ -package pages - -import ( - "math/rand" - "reflect" - "sort" - "testing" - "unsafe" -) - -// Ensure that a page is added to a transaction's freelist. -func TestFreelist_free(t *testing.T) { - f := newFreelist() - f.free(100, &page{id: 12}) - if !reflect.DeepEqual([]pgid{12}, f.pending[100]) { - t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100]) - } -} - -// Ensure that a page and its overflow is added to a transaction's freelist. -func TestFreelist_free_overflow(t *testing.T) { - f := newFreelist() - f.free(100, &page{id: 12, overflow: 3}) - if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) { - t.Fatalf("exp=%v; got=%v", exp, f.pending[100]) - } -} - -// Ensure that a transaction's free pages can be released. -func TestFreelist_release(t *testing.T) { - f := newFreelist() - f.free(100, &page{id: 12, overflow: 1}) - f.free(100, &page{id: 9}) - f.free(102, &page{id: 39}) - f.release(100) - f.release(101) - if exp := []pgid{9, 12, 13}; !reflect.DeepEqual(exp, f.ids) { - t.Fatalf("exp=%v; got=%v", exp, f.ids) - } - - f.release(102) - if exp := []pgid{9, 12, 13, 39}; !reflect.DeepEqual(exp, f.ids) { - t.Fatalf("exp=%v; got=%v", exp, f.ids) - } -} - -// Ensure that a freelist can find contiguous blocks of pages. -func TestFreelist_allocate(t *testing.T) { - f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}} - if id := int(f.allocate(3)); id != 3 { - t.Fatalf("exp=3; got=%v", id) - } - if id := int(f.allocate(1)); id != 6 { - t.Fatalf("exp=6; got=%v", id) - } - if id := int(f.allocate(3)); id != 0 { - t.Fatalf("exp=0; got=%v", id) - } - if id := int(f.allocate(2)); id != 12 { - t.Fatalf("exp=12; got=%v", id) - } - if id := int(f.allocate(1)); id != 7 { - t.Fatalf("exp=7; got=%v", id) - } - if id := int(f.allocate(0)); id != 0 { - t.Fatalf("exp=0; got=%v", id) - } - if id := int(f.allocate(0)); id != 0 { - t.Fatalf("exp=0; got=%v", id) - } - if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) { - t.Fatalf("exp=%v; got=%v", exp, f.ids) - } - - if id := int(f.allocate(1)); id != 9 { - t.Fatalf("exp=9; got=%v", id) - } - if id := int(f.allocate(1)); id != 18 { - t.Fatalf("exp=18; got=%v", id) - } - if id := int(f.allocate(1)); id != 0 { - t.Fatalf("exp=0; got=%v", id) - } - if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) { - t.Fatalf("exp=%v; got=%v", exp, f.ids) - } -} - -// Ensure that a freelist can deserialize from a freelist page. -func TestFreelist_read(t *testing.T) { - // Create a page. - var buf [4096]byte - page := (*page)(unsafe.Pointer(&buf[0])) - page.flags = pageFlagFreelist - page.count = 2 - - // Insert 2 page ids. - ids := (*[3]pgid)(unsafe.Pointer(&page.ptr)) - ids[0] = 23 - ids[1] = 50 - - // Deserialize page into a freelist. - f := newFreelist() - f.read(page) - - // Ensure that there are two page ids in the freelist. - if exp := []pgid{23, 50}; !reflect.DeepEqual(exp, f.ids) { - t.Fatalf("exp=%v; got=%v", exp, f.ids) - } -} - -// Ensure that a freelist can serialize into a freelist page. -func TestFreelist_write(t *testing.T) { - // Create a freelist and write it to a page. - var buf [4096]byte - f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)} - f.pending[100] = []pgid{28, 11} - f.pending[101] = []pgid{3} - p := (*page)(unsafe.Pointer(&buf[0])) - if err := f.write(p); err != nil { - t.Fatal(err) - } - - // Read the page back out. - f2 := newFreelist() - f2.read(p) - - // Ensure that the freelist is correct. - // All pages should be present and in reverse order. - if exp := []pgid{3, 11, 12, 28, 39}; !reflect.DeepEqual(exp, f2.ids) { - t.Fatalf("exp=%v; got=%v", exp, f2.ids) - } -} - -func Benchmark_FreelistRelease10K(b *testing.B) { benchmark_FreelistRelease(b, 10000) } -func Benchmark_FreelistRelease100K(b *testing.B) { benchmark_FreelistRelease(b, 100000) } -func Benchmark_FreelistRelease1000K(b *testing.B) { benchmark_FreelistRelease(b, 1000000) } -func Benchmark_FreelistRelease10000K(b *testing.B) { benchmark_FreelistRelease(b, 10000000) } - -func benchmark_FreelistRelease(b *testing.B, size int) { - ids := randomPgids(size) - pending := randomPgids(len(ids) / 400) - b.ResetTimer() - for i := 0; i < b.N; i++ { - f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}} - f.release(1) - } -} - -func randomPgids(n int) []pgid { - rand.Seed(42) - pgids := make(pgids, n) - for i := range pgids { - pgids[i] = pgid(rand.Int63()) - } - sort.Sort(pgids) - return pgids -} diff --git a/pages/page.go b/pages/page.go deleted file mode 100644 index 8c5020734..000000000 --- a/pages/page.go +++ /dev/null @@ -1,103 +0,0 @@ -package pages - -import ( - "fmt" - "os" - "sort" - "unsafe" -) - -const PageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr)) - -const ( - // pageFlag{head,tail,body}? - pageFlagMeta = 0x02 - pageFlagFreelist = 0x04 - pageFlagData = 0x08 -) - -type pgid uint64 - -type page struct { - id pgid - flags uint16 - count uint16 - overflow uint32 - ptr uintptr -} - -// typ returns a human readable page type string. -func (p *page) typ() string { - if (p.flags & pageFlagMeta) != 0 { - return "meta" - } else if (p.flags & pageFlagFreelist) != 0 { - return "freelist" - } else if (p.flags & pageFlagData) != 0 { - return "data" - } - return fmt.Sprintf("unknown<%02x>", p.flags) -} - -func (p *page) String() string { - return fmt.Sprintf("page<%s,%016x>", p.typ(), p.id) -} - -// meta returns a pointer to the metadata section of a page. -func (p *page) meta() *meta { - return (*meta)(unsafe.Pointer(&p.ptr)) -} - -// dump writes n bytes of the page to STDERR as hex output. -func (p *page) hexdump(n int) { - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:n] - fmt.Fprintf(os.Stderr, "%x\n", buf) -} - -type pages []*page - -func (s pages) Len() int { return len(s) } -func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s pages) Less(i, j int) bool { return s[i].id < s[j].id } - -type pgids []pgid - -func (s pgids) Len() int { return len(s) } -func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] } -func (s pgids) Less(i, j int) bool { return s[i] < s[j] } - -// merge returns the sorted union of a and b. -func (a pgids) merge(b pgids) pgids { - // Return the opposite slice if one is nil. - if len(a) == 0 { - return b - } else if len(b) == 0 { - return a - } - - // Create a list to hold all elements from both lists. - merged := make(pgids, 0, len(a)+len(b)) - - // Assign lead to the slice with a lower starting value, follow to the higher value. - lead, follow := a, b - if b[0] < a[0] { - lead, follow = b, a - } - - // Continue while there are elements in the lead. - for len(lead) > 0 { - // Merge largest prefix of lead that is ahead of follow[0]. - n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] }) - merged = append(merged, lead[:n]...) - if n >= len(lead) { - break - } - - // Swap lead and follow. - lead, follow = follow, lead[n:] - } - - // Append what's left in follow. - merged = append(merged, follow...) - - return merged -} diff --git a/pages/page_test.go b/pages/page_test.go deleted file mode 100644 index 3d0d57342..000000000 --- a/pages/page_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package pages - -import ( - "reflect" - "sort" - "testing" - "testing/quick" -) - -// Ensure that the page type can be returned in human readable format. -func TestPage_typ(t *testing.T) { - if typ := (&page{flags: pageFlagData}).typ(); typ != "data" { - t.Fatalf("exp=branch; got=%v", typ) - } - if typ := (&page{flags: pageFlagMeta}).typ(); typ != "meta" { - t.Fatalf("exp=meta; got=%v", typ) - } - if typ := (&page{flags: pageFlagFreelist}).typ(); typ != "freelist" { - t.Fatalf("exp=freelist; got=%v", typ) - } - if typ := (&page{flags: 20000}).typ(); typ != "unknown<4e20>" { - t.Fatalf("exp=unknown<4e20>; got=%v", typ) - } -} - -// Ensure that the hexdump debugging function doesn't blow up. -func TestPage_dump(t *testing.T) { - (&page{id: 256}).hexdump(16) -} - -func TestPgids_merge(t *testing.T) { - a := pgids{4, 5, 6, 10, 11, 12, 13, 27} - b := pgids{1, 3, 8, 9, 25, 30} - c := a.merge(b) - if !reflect.DeepEqual(c, pgids{1, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30}) { - t.Errorf("mismatch: %v", c) - } - - a = pgids{4, 5, 6, 10, 11, 12, 13, 27, 35, 36} - b = pgids{8, 9, 25, 30} - c = a.merge(b) - if !reflect.DeepEqual(c, pgids{4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30, 35, 36}) { - t.Errorf("mismatch: %v", c) - } -} - -func TestPgids_merge_quick(t *testing.T) { - if err := quick.Check(func(a, b pgids) bool { - // Sort incoming lists. - sort.Sort(a) - sort.Sort(b) - - // Merge the two lists together. - got := a.merge(b) - - // The expected value should be the two lists combined and sorted. - exp := append(a, b...) - sort.Sort(exp) - - if !reflect.DeepEqual(exp, got) { - t.Errorf("\nexp=%+v\ngot=%+v\n", exp, got) - return false - } - - return true - }, nil); err != nil { - t.Fatal(err) - } -} diff --git a/pages/pages_amd64.go b/pages/pages_amd64.go deleted file mode 100644 index 776f8dff2..000000000 --- a/pages/pages_amd64.go +++ /dev/null @@ -1,7 +0,0 @@ -package pages - -// maxMapSize represents the largest mmap size supported by pagebuf. -const maxMapSize = 0xFFFFFFFFFFFF // 256TB - -// maxAllocSize is the size used when creating array pointers. -const maxAllocSize = 0x7FFFFFFF diff --git a/pages/pages_linux.go b/pages/pages_linux.go deleted file mode 100644 index c56a396a8..000000000 --- a/pages/pages_linux.go +++ /dev/null @@ -1,10 +0,0 @@ -package pages - -import ( - "syscall" -) - -// fdatasync flushes written data to a file descriptor. -func fdatasync(pb *Pagebuf) error { - return syscall.Fdatasync(int(pb.file.Fd())) -} diff --git a/pages/pages_unix.go b/pages/pages_unix.go deleted file mode 100644 index b413a073e..000000000 --- a/pages/pages_unix.go +++ /dev/null @@ -1,89 +0,0 @@ -// +build !windows,!plan9,!solaris - -package pages - -import ( - "fmt" - "os" - "syscall" - "time" - "unsafe" -) - -// flock acquires an advisory lock on a file descriptor. -func flock(pb *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error { - var t time.Time - for { - // If we're beyond our timeout then return an error. - // This can only occur after we've attempted a flock once. - if t.IsZero() { - t = time.Now() - } else if timeout > 0 && time.Since(t) > timeout { - return ErrTimeout - } - flag := syscall.LOCK_SH - if exclusive { - flag = syscall.LOCK_EX - } - - // Otherwise attempt to obtain an exclusive lock. - err := syscall.Flock(int(pb.file.Fd()), flag|syscall.LOCK_NB) - if err == nil { - return nil - } else if err != syscall.EWOULDBLOCK { - return err - } - - // Wait for a bit and try again. - time.Sleep(50 * time.Millisecond) - } -} - -// funlock releases an advisory lock on a file descriptor. -func funlock(pb *DB) error { - return syscall.Flock(int(pb.file.Fd()), syscall.LOCK_UN) -} - -// mmap memory maps a PageBuf's data file. -func mmap(pb *DB, sz int) error { - // Map the data file to memory. - b, err := syscall.Mmap(int(pb.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|pb.MmapFlags) - if err != nil { - return err - } - - // Advise the kernel that the mmap is accessed randomly. - if err := madvise(b, syscall.MADV_RANDOM); err != nil { - return fmt.Errorf("madvise: %s", err) - } - - // Save the original byte slice and convert to a byte array pointer. - pb.dataref = b - pb.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0])) - pb.datasz = sz - return nil -} - -// munmap unmaps a PageBuf's data file from memory. -func munmap(pb *DB) error { - // Ignore the unmap if we have no mapped data. - if pb.dataref == nil { - return nil - } - - // Unmap using the original byte slice. - err := syscall.Munmap(pb.dataref) - pb.dataref = nil - pb.data = nil - pb.datasz = 0 - return err -} - -// NOTE: This function is copied from stdlib because it is not available on darwin. -func madvise(b []byte, advice int) (err error) { - _, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice)) - if e1 != 0 { - err = e1 - } - return -} diff --git a/pages/pagessync_unix.go b/pages/pagessync_unix.go deleted file mode 100644 index 43a9208c9..000000000 --- a/pages/pagessync_unix.go +++ /dev/null @@ -1,8 +0,0 @@ -// +build !windows,!plan9,!linux,!openbsd - -package pages - -// fdatasync flushes written data to a file descriptor. -func fdatasync(pb *DB) error { - return pb.file.Sync() -} diff --git a/pages/tx.go b/pages/tx.go deleted file mode 100644 index e2e9d065c..000000000 --- a/pages/tx.go +++ /dev/null @@ -1,384 +0,0 @@ -package pages - -import ( - "fmt" - "sort" - "unsafe" -) - -// txid represents the internal transaction identifier. -type txid uint64 - -// Tx represents a read-only or read/write transaction on the page buffer. -// Read-only transactions can be used for retrieving pages. -// Read/write transactions can retrieve and write pages. -// -// IMPORTANT: You must commit or rollback transactions when you are done with -// them. Pages can not be reclaimed by the writer until no more transactions -// are using them. A long running read transaction can cause the database to -// quickly grow. -type Tx struct { - writable bool - managed bool - db *DB - meta *meta - pages map[pgid]*page - delPages map[pgid]bool - - // WriteFlag specifies the flag for write-related methods like WriteTo(). - // Tx opens the database file with the specified flag to copy the data. - // - // By default, the flag is unset, which works well for mostly in-memory - // workloads. For databases that are much larger than available RAM, - // set the flag to syscall.O_DIRECT to avoid trashing the page cache. - WriteFlag int -} - -// init initializes the transaction. -func (tx *Tx) init(db *DB) { - tx.db = db - tx.pages = nil - - // Copy the meta page since it can be changed by the writer. - tx.meta = &meta{} - db.meta().copy(tx.meta) - - // Increment the transaction id and add a page cache for writable transactions. - if tx.writable { - tx.pages = make(map[pgid]*page) - tx.delPages = make(map[pgid]bool) - tx.meta.txid += txid(1) - } -} - -// ID returns the transaction id. -func (tx *Tx) ID() uint64 { - return uint64(tx.meta.txid) -} - -// Size returns current database size in bytes as seen by this transaction. -func (tx *Tx) Size() int64 { - return int64(tx.meta.pgid) * int64(tx.db.pageSize) -} - -// DB returns a reference to the database that created the transaction. -func (tx *Tx) DB() *DB { - return tx.db -} - -// Writable returns whether the transaction can perform write operations. -func (tx *Tx) Writable() bool { - return tx.writable -} - -// Rollback closes the transaction and ignores all previous updates. Read-only -// transactions must be rolled back and not committed. -func (tx *Tx) Rollback() error { - _assert(!tx.managed, "managed tx rollback not allowed") - if tx.db == nil { - return ErrTxClosed - } - tx.rollback() - return nil -} - -func (tx *Tx) rollback() { - if tx.db == nil { - return - } - if tx.writable { - tx.db.freelist.rollback(tx.meta.txid) - tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist)) - } - tx.close() -} - -func (tx *Tx) close() { - if tx.db == nil { - return - } - if tx.writable { - // Remove transaction ref & writer lock. - tx.db.rwtx = nil - tx.db.rwlock.Unlock() - } else { - tx.db.removeTx(tx) - } - - // Clear all references. - tx.db = nil - tx.meta = nil - tx.pages = nil -} - -// page returns a reference to the page with a given id. -// If page has been written to then a temporary buffered page is returned. -func (tx *Tx) page(id pgid) *page { - // Check the dirty pages first. - if tx.pages != nil { - if p, ok := tx.pages[id]; ok { - return p - } - } - - // Otherwise return directly from the mmap. - return tx.db.page(id) -} - -func (tx *Tx) pageExists(id pgid) bool { - // Check whether the page was modified during this transaction. - if tx.pages != nil { - if _, ok := tx.pages[id]; ok { - return true - } - } - // Check whether page was deleted during this transaction. - if tx.delPages != nil { - if tx.delPages[id] { - return false - } - } - // The page was not touched within this transaction. Fallthrough to - // the database's check. - return tx.db.pageExists(id) -} - -// allocate returns a contiguous block of memory starting at a given page. -func (tx *Tx) allocate(count int) (*page, error) { - p, err := tx.db.allocate(count) - if err != nil { - return nil, err - } - - // Save to our page cache. - tx.pages[p.id] = p - return p, nil -} - -// Commit writes all changes to disk and updates the meta page. -// Returns an error if a disk write error occurs, or if Commit is -// called on a read-only transaction. -func (tx *Tx) Commit() error { - _assert(!tx.managed, "managed tx commit not allowed") - if tx.db == nil { - return ErrTxClosed - } else if !tx.writable { - return ErrTxNotWritable - } - - // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - - opgid := tx.meta.pgid - - // Free the freelist and allocate new pages for it. This will overestimate - // the size of the freelist but not underestimate the size (which would be bad). - tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) - p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) - if err != nil { - tx.rollback() - return err - } - if err := tx.db.freelist.write(p); err != nil { - tx.rollback() - return err - } - tx.meta.freelist = p.id - - // If the high water mark has moved up then attempt to grow the database. - if tx.meta.pgid > opgid { - if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { - tx.rollback() - return err - } - } - - // Write dirty pages to disk. - if err := tx.write(); err != nil { - tx.rollback() - return err - } - - // Write meta to disk. - if err := tx.writeMeta(); err != nil { - tx.rollback() - return err - } - - // Finalize the transaction. - tx.close() - - return nil -} - -// write writes any dirty pages to disk. -func (tx *Tx) write() error { - // Sort pages by id. - pages := make(pages, 0, len(tx.pages)) - for _, p := range tx.pages { - pages = append(pages, p) - } - // Clear out page cache early. - tx.pages = make(map[pgid]*page) - sort.Sort(pages) - - // Write pages to disk in order. - for _, p := range pages { - size := (int(p.overflow) + 1) * tx.db.pageSize - offset := int64(p.id) * int64(tx.db.pageSize) - - // Write out page in "max allocation" sized chunks. - ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) - for { - // Limit our write to our max allocation size. - sz := size - if sz > maxAllocSize-1 { - sz = maxAllocSize - 1 - } - - // Write chunk to disk. - buf := ptr[:sz] - if _, err := tx.db.ops.writeAt(buf, offset); err != nil { - return err - } - - // Exit inner for loop if we've written all the chunks. - size -= sz - if size == 0 { - break - } - - // Otherwise move offset forward and move pointer to next chunk. - offset += int64(sz) - ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) - } - } - - if err := fdatasync(tx.db); err != nil { - return err - } - - // Put small pages back to page pool. - for _, p := range pages { - // Ignore page sizes over 1 page. - // These are allocated using make() instead of the page pool. - if int(p.overflow) != 0 { - continue - } - - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize] - - // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1 - for i := range buf { - buf[i] = 0 - } - tx.db.pagePool.Put(buf) - } - - return nil -} - -// writeMeta writes the meta to the disk. -func (tx *Tx) writeMeta() error { - // Create a temporary buffer for the meta page. - buf := make([]byte, tx.db.pageSize) - p := tx.db.pageInBuffer(buf, 0) - tx.meta.write(p) - - // Write the meta page to file. - if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil { - return err - } - if err := fdatasync(tx.db); err != nil { - return err - } - - return nil -} - -// Get retrieves the bytes stored in the page with the given id. -// The returned byte slice is only valid for the duration of the transaction. -func (tx *Tx) Get(id uint64) ([]byte, error) { - if !tx.pageExists(pgid(id)) { - return nil, ErrNotFound - } - - p := tx.page(pgid(id)) - size := int(p.overflow)*tx.db.pageSize - PageHeaderSize + int(p.count) - b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:size] - - return b, nil -} - -// Add creates a new page with the given content. The inserted byte slice -// will be padded at the end to fit the next largest page size. Retrieving the page -// will return the padding as well. -// Inserted data should hence have included termination markers. -func (tx *Tx) Add(c []byte) (uint64, error) { - l := len(c) + PageHeaderSize // total size required - n := 1 // number of pages required - for n*tx.db.pageSize < l { - n++ - } - if l > maxAllocSize { - return 0, fmt.Errorf("page of size %d too large", l) - } - p, err := tx.allocate(n) - if err != nil { - return 0, fmt.Errorf("page alloc error: %s", err) - } - p.flags |= pageFlagData - // count holds the length used in the last page. - p.count = uint16(l - (n-1)*tx.db.pageSize) - - b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:] - copy(b, c) - - return uint64(p.id), nil -} - -// Del deletes the page witht he given ID. -func (tx *Tx) Del(id uint64) error { - if !tx.pageExists(pgid(id)) { - return ErrNotFound - } - - tx.db.freelist.free(tx.meta.txid, tx.db.page(pgid(id))) - return nil -} - -// Set overwrites the page with the given ID with c. -func (tx *Tx) Set(id uint64, c []byte) error { - if !tx.pageExists(pgid(id)) { - return ErrNotFound - } - p := tx.db.page(pgid(id)) - - l := len(c) + PageHeaderSize // total size required - n := int(p.overflow + 1) - // The contents must fit into the previously allocated pages. - if l > n*tx.db.pageSize { - return fmt.Errorf("invalid overwrite size") - } - - // Allocate a temporary buffer for the page. - var buf []byte - if n == 1 { - buf = tx.db.pagePool.Get().([]byte) - } else { - buf = make([]byte, n*tx.db.pageSize) - } - np := tx.db.pageInBuffer(buf, 0) - *np = *p - // count holds the length used in the last page. - np.count = uint16(l - (n-1)*tx.db.pageSize) - - // TODO(fabxc): Potential performance improvement point could be using c directly. - // Just copy it for now. - b := (*[maxAllocSize]byte)(unsafe.Pointer(&np.ptr))[:] - copy(b, c) - - tx.pages[pgid(id)] = np - // TODO(fabxc): truncate and free pages that are no longer needed. - - return nil -} diff --git a/persist.go b/persist.go deleted file mode 100644 index 56a47ed10..000000000 --- a/persist.go +++ /dev/null @@ -1,163 +0,0 @@ -package tsdb - -import ( - "encoding/binary" - "os" - "path/filepath" - "time" - - "github.com/boltdb/bolt" - "github.com/fabxc/pagebuf" - "github.com/prometheus/common/log" -) - -type persistence struct { - *chunkBatchProcessor - - mc *memChunks - chunks *pagebuf.DB - index *bolt.DB -} - -func newPersistence(path string, cap int, to time.Duration) (*persistence, error) { - if err := os.MkdirAll(path, 0777); err != nil { - return nil, err - } - ix, err := bolt.Open(filepath.Join(path, "ix"), 0666, nil) - if err != nil { - return nil, err - } - if err := ix.Update(func(tx *bolt.Tx) error { - _, err := tx.CreateBucketIfNotExists(bktChunks) - return err - }); err != nil { - return nil, err - } - pb, err := pagebuf.Open(filepath.Join(path, "chunks"), 0666, nil) - if err != nil { - return nil, err - } - p := &persistence{ - chunks: pb, - index: ix, - chunkBatchProcessor: newChunkBatchProcessor(log.Base(), cap, to), - } - p.chunkBatchProcessor.processf = p.persist - - return p, nil -} - -var bktChunks = []byte("chunks") - -func (p *persistence) close() error { - // Index must be closed first, otherwise we might deadlock. - err0 := p.index.Close() - err1 := p.chunks.Close() - if err0 != nil { - return err0 - } - return err1 -} - -func (p *persistence) persist(cds ...*chunkDesc) error { - err := p.update(func(tx *persistenceTx) error { - bkt := tx.ix.Bucket(bktChunks) - for _, cd := range cds { - pos, err := tx.chunks.Add(cd.chunk.Data()) - if err != nil { - return err - } - var buf [16]byte - binary.BigEndian.PutUint64(buf[:8], uint64(cd.id)) - binary.BigEndian.PutUint64(buf[8:], pos) - if err := bkt.Put(buf[:8], buf[8:]); err != nil { - return err - } - - tx.ids = append(tx.ids, cd.id) - } - return nil - }) - return err -} - -func (p *persistence) update(f func(*persistenceTx) error) error { - tx, err := p.begin(true) - if err != nil { - return err - } - if err := f(tx); err != nil { - tx.rollback() - return err - } - return tx.commit() -} - -func (p *persistence) view(f func(*persistenceTx) error) error { - tx, err := p.begin(false) - if err != nil { - return err - } - if err := f(tx); err != nil { - tx.rollback() - return err - } - return tx.rollback() -} - -func (p *persistence) begin(writeable bool) (*persistenceTx, error) { - var err error - tx := &persistenceTx{p: p} - // Index transaction is the outer one so we might end up with orphaned - // chunks but never with dangling pointers in the index. - tx.ix, err = p.index.Begin(writeable) - if err != nil { - return nil, err - } - tx.chunks, err = p.chunks.Begin(writeable) - if err != nil { - tx.ix.Rollback() - return nil, err - } - - return tx, nil -} - -type persistenceTx struct { - p *persistence - ix *bolt.Tx - chunks *pagebuf.Tx - - ids []ChunkID -} - -func (tx *persistenceTx) commit() error { - if err := tx.chunks.Commit(); err != nil { - tx.ix.Rollback() - return err - } - if err := tx.ix.Commit(); err != nil { - // TODO(fabxc): log orphaned chunks. What about overwritten ones? - // Should we not allows delete and add in the same tx so this cannot happen? - return err - } - - // Successfully persisted chunks, clear them from the in-memory - // forward mapping. - tx.p.mc.mtx.Lock() - defer tx.p.mc.mtx.Unlock() - - for _, id := range tx.ids { - delete(tx.p.mc.chunks, id) - } - return nil -} - -func (tx *persistenceTx) rollback() error { - err0 := tx.chunks.Rollback() - err1 := tx.ix.Rollback() - if err0 != nil { - return err0 - } - return err1 -} diff --git a/querier.go b/querier.go deleted file mode 100644 index 92fa72db4..000000000 --- a/querier.go +++ /dev/null @@ -1,224 +0,0 @@ -package tsdb - -import ( - "encoding/binary" - "fmt" - "io" - - "github.com/fabxc/tsdb/chunks" - "github.com/fabxc/tsdb/index" - "github.com/prometheus/common/model" -) - -// SeriesIterator provides iteration over a time series associated with a metric. -type SeriesIterator interface { - Metric() map[string]string - Seek(model.Time) (model.SamplePair, bool) - Next() (model.SamplePair, bool) - Err() error -} - -type chunkSeriesIterator struct { - m map[string]string - chunks []chunks.Chunk - - err error - cur chunks.Iterator - curPos int -} - -func newChunkSeriesIterator(m map[string]string, chunks []chunks.Chunk) *chunkSeriesIterator { - return &chunkSeriesIterator{ - m: m, - chunks: chunks, - } -} - -func (it *chunkSeriesIterator) Metric() map[string]string { - return it.m -} - -func (it *chunkSeriesIterator) Seek(ts model.Time) (model.SamplePair, bool) { - // Naively go through all chunk's first timestamps and pick the chunk - // containing the seeked timestamp. - // TODO(fabxc): this can be made smarter if it's a bottleneck. - for i, chk := range it.chunks { - cit := chk.Iterator() - first, ok := cit.First() - if !ok { - it.err = cit.Err() - return model.SamplePair{}, false - } - if first.Timestamp > ts { - break - } - it.cur = cit - it.curPos = i - } - return it.cur.Seek(ts) -} - -func (it *chunkSeriesIterator) Next() (model.SamplePair, bool) { - sp, ok := it.cur.Next() - if ok { - return sp, true - } - if it.cur.Err() != io.EOF { - it.err = it.cur.Err() - return model.SamplePair{}, false - } - if len(it.chunks) == it.curPos+1 { - it.err = io.EOF - return model.SamplePair{}, false - } - it.curPos++ - it.cur = it.chunks[it.curPos].Iterator() - - // Return first sample of the new chunks. - return it.cur.Seek(0) -} - -func (it *chunkSeriesIterator) Err() error { - return it.err -} - -// Querier allows several queries over the storage with a consistent view if the data. -type Querier struct { - db *DB - iq *index.Querier -} - -// Querier returns a new Querier on the index at the current point in time. -func (db *DB) Querier() (*Querier, error) { - iq, err := db.indexer.Querier() - if err != nil { - return nil, err - } - return &Querier{db: db, iq: iq}, nil -} - -// Close the querier. This invalidates all previously retrieved iterators. -func (q *Querier) Close() error { - return q.iq.Close() -} - -// Iterator returns an iterator over all chunks that match all given -// label matchers. The iterator is only valid until the Querier is closed. -func (q *Querier) Iterator(key string, matcher index.Matcher) (index.Iterator, error) { - return q.iq.Search(key, matcher) -} - -// RangeIterator returns an iterator over chunks that are present in the given time range. -// The returned iterator is only valid until the querier is closed. -func (q *Querier) RangeIterator(start, end model.Time) (index.Iterator, error) { - return nil, nil -} - -// InstantIterator returns an iterator over chunks possibly containing values for -// the given timestamp. The returned iterator is only valid until the querier is closed. -func (q *Querier) InstantIterator(at model.Time) (index.Iterator, error) { - return nil, nil -} - -func hash(m map[string]string) uint64 { - return model.LabelsToSignature(m) -} - -// Series returns a list of series iterators over all chunks in the given iterator. -// The returned series iterators are only valid until the querier is closed. -func (q *Querier) Series(it index.Iterator) ([]SeriesIterator, error) { - mets := map[uint64]map[string]string{} - its := map[uint64][]chunks.Chunk{} - - id, err := it.Seek(0) - for ; err == nil; id, err = it.Next() { - terms, err := q.iq.Doc(id) - if err != nil { - return nil, err - } - met := make(map[string]string, len(terms)) - for _, t := range terms { - met[t.Field] = t.Val - } - fp := hash(met) - - chunk, err := q.chunk(ChunkID(id)) - if err != nil { - return nil, err - } - - its[fp] = append(its[fp], chunk) - if _, ok := mets[fp]; ok { - continue - } - mets[fp] = met - } - if err != io.EOF { - return nil, err - } - - res := make([]SeriesIterator, 0, len(its)) - for fp, chks := range its { - res = append(res, newChunkSeriesIterator(mets[fp], chks)) - } - return res, nil -} - -func (q *Querier) chunk(id ChunkID) (chunks.Chunk, error) { - q.db.memChunks.mtx.RLock() - cd, ok := q.db.memChunks.chunks[id] - q.db.memChunks.mtx.RUnlock() - if ok { - return cd.chunk, nil - } - - var chk chunks.Chunk - // TODO(fabxc): this starts a new read transaction for every - // chunk we have to load from persistence. - // Figure out what's best tradeoff between lock contention and - // data consistency: start transaction when instantiating the querier - // or lazily start transaction on first try. (Not all query operations - // need access to persisted chunks.) - err := q.db.persistence.view(func(tx *persistenceTx) error { - chks := tx.ix.Bucket(bktChunks) - ptr := chks.Get(id.bytes()) - if ptr == nil { - return fmt.Errorf("chunk pointer for ID %d not found", id) - } - cdata, err := tx.chunks.Get(binary.BigEndian.Uint64(ptr)) - if err != nil { - return fmt.Errorf("get chunk data for ID %d: %s", id, err) - } - chk, err = chunks.FromData(cdata) - return err - }) - return chk, err -} - -// Metrics returns the unique metrics found across all chunks in the provided iterator. -func (q *Querier) Metrics(it index.Iterator) ([]map[string]string, error) { - m := []map[string]string{} - fps := map[uint64]struct{}{} - - id, err := it.Seek(0) - for ; err == nil; id, err = it.Next() { - terms, err := q.iq.Doc(id) - if err != nil { - return nil, err - } - met := make(map[string]string, len(terms)) - for _, t := range terms { - met[t.Field] = t.Val - } - fp := hash(met) - if _, ok := fps[fp]; ok { - continue - } - fps[fp] = struct{}{} - m = append(m, met) - } - if err != io.EOF { - return nil, err - } - return m, nil -} diff --git a/shard.go b/shard.go new file mode 100644 index 000000000..e69de29bb diff --git a/test/hash_test.go b/test/hash_test.go new file mode 100644 index 000000000..f079aa05d --- /dev/null +++ b/test/hash_test.go @@ -0,0 +1,66 @@ +package test + +import ( + "testing" + + "github.com/cespare/xxhash" + sip13 "github.com/dgryski/go-sip13" +) + +type pair struct { + name, value string +} + +var testInput = []pair{ + {"job", "node"}, + {"instance", "123.123.1.211:9090"}, + {"path", "/api/v1/namespaces//deployments/"}, + {"method", "GET"}, + {"namespace", "system"}, + {"status", "500"}, +} + +func BenchmarkHash(b *testing.B) { + input := []byte{} + for _, v := range testInput { + input = append(input, v.name...) + input = append(input, '\xff') + input = append(input, v.value...) + input = append(input, '\xff') + } + + var total uint64 + + var k0 uint64 = 0x0706050403020100 + var k1 uint64 = 0x0f0e0d0c0b0a0908 + + for name, f := range map[string]func(b []byte) uint64{ + "xxhash": xxhash.Sum64, + "fnv64": fnv64a, + "sip13": func(b []byte) uint64 { return sip13.Sum64(k0, k1, b) }, + } { + b.Run(name, func(b *testing.B) { + b.SetBytes(int64(len(input))) + total = 0 + for i := 0; i < b.N; i++ { + total += f(input) + } + }) + } + +} + +// hashAdd adds a string to a fnv64a hash value, returning the updated hash. +func fnv64a(b []byte) uint64 { + const ( + offset64 = 14695981039346656037 + prime64 = 1099511628211 + ) + + h := uint64(offset64) + for x := range b { + h ^= uint64(x) + h *= prime64 + } + return h +}