This commit is contained in:
Fabian Reinartz 2016-12-02 17:49:05 +01:00
parent 0b6d621471
commit 6f93a699e6
30 changed files with 97 additions and 5163 deletions

457
db.go
View File

@ -3,13 +3,11 @@ package tsdb
import (
"encoding/binary"
"path/filepath"
"sync"
"time"
"github.com/fabxc/tsdb/chunks"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)
// DefaultOptions used for the DB.
@ -27,10 +25,7 @@ type DB struct {
logger log.Logger
opts *Options
memChunks *memChunks
persistence *persistence
indexer *indexer
stopc chan struct{}
shards map[uint64]*TimeShards
}
// Open or create a new DB.
@ -39,433 +34,43 @@ func Open(path string, l log.Logger, opts *Options) (*DB, error) {
opts = DefaultOptions
}
indexer, err := newMetricIndexer(filepath.Join(path, "index"), defaultIndexerQsize, defaultIndexerTimeout)
if err != nil {
return nil, err
}
persistence, err := newPersistence(filepath.Join(path, "chunks"), defaultIndexerQsize, defaultIndexerTimeout)
if err != nil {
return nil, err
}
mchunks := newMemChunks(l, indexer, persistence, 10, opts.StalenessDelta)
indexer.mc = mchunks
persistence.mc = mchunks
c := &DB{
logger: l,
opts: opts,
memChunks: mchunks,
persistence: persistence,
indexer: indexer,
stopc: make(chan struct{}),
logger: l,
opts: opts,
}
go c.memChunks.run(c.stopc)
return c, nil
}
// Close the storage and persist all writes.
func (c *DB) Close() error {
close(c.stopc)
// TODO(fabxc): blocking further writes here necessary?
c.indexer.wait()
c.persistence.wait()
err0 := c.indexer.close()
err1 := c.persistence.close()
if err0 != nil {
return err0
}
return err1
type Label struct {
Name, Value string
}
// Append ingestes the samples in the scrape into the storage.
func (c *DB) Append(scrape *Scrape) error {
// Sequentially add samples to in-memory chunks.
// TODO(fabxc): evaluate cost of making this atomic.
for _, s := range scrape.m {
if err := c.memChunks.append(s.met, scrape.ts, s.val); err != nil {
// TODO(fabxc): collect in multi error.
return err
}
// TODO(fabxc): increment ingested samples metric.
// LabelSet is a sorted set of labels. Order has to be guaranteed upon
// instantiation.
type LabelSet []Label
func (ls LabelSet) Len() int { return len(ls) }
func (ls LabelSet) Swap(i, j int) { ls[i], ls[j] = ls[j], ls[i]}
func (ls LabelSet) Less(i, j int) bool { return ls[i].Name < ls[j].Name }
// NewLabelSet returns a sorted LabelSet from the given labels.
// The caller has to guarantee that all label names are unique.
func NewLabelSet(ls ...Label) LabelSet {
set := make(LabelSet, 0, len(l))
for _, l := range ls {
set = append(set, l)
}
sort.Sort(set)
return set
}
type Vector struct {
LabelSets []LabelSet
Values []float64
}
func (db *DB) AppendVector(v *Vector) error {
return nil
}
// memChunks holds the chunks that are currently being appended to.
type memChunks struct {
logger log.Logger
stalenessDelta time.Duration
mtx sync.RWMutex
// Chunks by their ID as accessed when retrieving a chunk ID from
// an index query.
chunks map[ChunkID]*chunkDesc
// The highest time slice chunks currently have. A new chunk can not
// be in a higher slice before all chunks with lower IDs have been
// added to the slice.
highTime model.Time
// Power of 2 of chunk shards.
num uint8
// Memory chunks sharded by leading bits of the chunk's metric's
// fingerprints. Used to quickly find chunks for new incoming samples
// where the metric is known but the chunk ID is not.
shards []*memChunksShard
indexer *indexer
persistence *persistence
}
// newMemChunks returns a new memChunks sharded by n locks.
func newMemChunks(l log.Logger, ix *indexer, p *persistence, n uint8, staleness time.Duration) *memChunks {
c := &memChunks{
logger: l,
stalenessDelta: staleness,
num: n,
chunks: map[ChunkID]*chunkDesc{},
persistence: p,
indexer: ix,
}
if n > 63 {
panic("invalid shard power")
}
// Initialize 2^n shards.
for i := 0; i < 1<<n; i++ {
c.shards = append(c.shards, &memChunksShard{
descs: map[model.Fingerprint][]*chunkDesc{},
csize: 1024,
})
}
return c
}
func (mc *memChunks) run(stopc <-chan struct{}) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
f := func() error {
for _, cs := range mc.shards {
mc.gc(cs)
}
// Wait for persistence and indexing to finish before reindexing
// memory chunks for the new time slice.
mc.persistence.wait()
mc.indexer.wait()
mc.mtx.Lock()
defer mc.mtx.Unlock()
curTimeSlice := timeSlice(model.Now())
// If the next time slice is in the future, we are done.
if curTimeSlice <= mc.highTime {
return nil
}
ids := make(ChunkIDs, 0, len(mc.chunks))
for id := range mc.chunks {
ids = append(ids, id)
}
if err := mc.indexer.reindexTime(ids, curTimeSlice); err != nil {
return err
}
mc.highTime = curTimeSlice
return nil
}
for {
select {
case <-ticker.C:
if err := f(); err != nil {
mc.logger.With("err", err).Error("memory chunk maintenance failed")
}
case <-stopc:
return
}
}
}
// gc writes stale and incomplete chunks to persistence and removes them
// from the shard.
func (mc *memChunks) gc(cs *memChunksShard) {
cs.RLock()
defer cs.RUnlock()
mint := model.Now().Add(-mc.stalenessDelta)
for fp, cdescs := range cs.descs {
for _, cd := range cdescs {
// If the last sample was added before the staleness delta, consider
// the chunk inactive and persist it.
if cd.lastSample.Timestamp.Before(mint) {
mc.persistence.enqueue(cd)
cs.del(fp, cd)
}
}
}
return
}
func (mc *memChunks) append(m model.Metric, ts model.Time, v model.SampleValue) error {
fp := m.FastFingerprint()
cs := mc.shards[fp>>(64-mc.num)]
cs.Lock()
defer cs.Unlock()
chkd, created := cs.get(fp, m)
if created {
mc.indexer.enqueue(chkd)
}
if err := chkd.append(ts, v); err != chunks.ErrChunkFull {
return err
}
// Chunk was full, remove it so a new head chunk can be created.
// TODO(fabxc): should we just remove them during maintenance if we set a 'persisted'
// flag?
// If we shutdown we work down the persistence queue before exiting, so we should
// lose no data. If we crash, the last snapshot will still have the chunk. Theoretically,
// deleting it here should not be a problem.
cs.del(fp, chkd)
mc.persistence.enqueue(chkd)
// Create a new chunk lazily and continue.
chkd, created = cs.get(fp, m)
if !created {
// Bug if the chunk was not newly created.
panic("expected newly created chunk")
}
mc.indexer.enqueue(chkd)
return chkd.append(ts, v)
}
type memChunksShard struct {
sync.RWMutex
// chunks holds chunk descriptors for one or more chunks
// with a given fingerprint.
descs map[model.Fingerprint][]*chunkDesc
csize int
}
// get returns the chunk descriptor for the given fingerprint/metric combination.
// If none exists, a new chunk descriptor is created and true is returned.
func (cs *memChunksShard) get(fp model.Fingerprint, m model.Metric) (*chunkDesc, bool) {
chks := cs.descs[fp]
for _, cd := range chks {
if cd != nil && cd.met.Equal(m) {
return cd, false
}
}
// None of the given chunks was for the metric, create a new one.
cd := &chunkDesc{
met: m,
chunk: chunks.NewPlainChunk(cs.csize),
}
// Try inserting chunk in existing whole before appending.
for i, c := range chks {
if c == nil {
chks[i] = cd
return cd, true
}
}
cs.descs[fp] = append(chks, cd)
return cd, true
}
// del frees the field of the chunk descriptor for the fingerprint.
func (cs *memChunksShard) del(fp model.Fingerprint, chkd *chunkDesc) {
for i, d := range cs.descs[fp] {
if d == chkd {
cs.descs[fp][i] = nil
return
}
}
}
// ChunkID is a unique identifier for a chunks.
type ChunkID uint64
func (id ChunkID) bytes() []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
return b
}
// ChunkIDs is a sortable list of chunk IDs.
type ChunkIDs []ChunkID
func (c ChunkIDs) Len() int { return len(c) }
func (c ChunkIDs) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
func (c ChunkIDs) Less(i, j int) bool { return c[i] < c[j] }
// chunkDesc wraps a plain data chunk and provides cached meta data about it.
type chunkDesc struct {
id ChunkID
met model.Metric
chunk chunks.Chunk
// Caching fields.
firstTime model.Time
lastSample model.SamplePair
app chunks.Appender // Current appender for the chunks.
}
func (cd *chunkDesc) append(ts model.Time, v model.SampleValue) error {
if cd.app == nil {
cd.app = cd.chunk.Appender()
// TODO(fabxc): set correctly once loading from snapshot is added.
cd.firstTime = ts
}
cd.lastSample.Timestamp = ts
cd.lastSample.Value = v
return cd.app.Append(ts, v)
}
// Scrape gathers samples for a single timestamp.
type Scrape struct {
ts model.Time
m []sample
}
type sample struct {
met model.Metric
val model.SampleValue
}
// Reset resets the scrape data and initializes it for a new scrape at
// the given time. The underlying memory remains allocated for the next scrape.
func (s *Scrape) Reset(ts model.Time) {
s.ts = ts
s.m = s.m[:0]
}
// Dump returns all samples that are part of the scrape.
func (s *Scrape) Dump() []*model.Sample {
d := make([]*model.Sample, 0, len(s.m))
for _, sa := range s.m {
d = append(d, &model.Sample{
Metric: sa.met,
Timestamp: s.ts,
Value: sa.val,
})
}
return d
}
// Add adds a sample value for the given metric to the scrape.
func (s *Scrape) Add(m model.Metric, v model.SampleValue) {
for ln, lv := range m {
if len(lv) == 0 {
delete(m, ln)
}
}
// TODO(fabxc): pre-sort added samples into the correct buckets
// of fingerprint shards so we only have to lock each memChunkShard once.
s.m = append(s.m, sample{met: m, val: v})
}
type chunkBatchProcessor struct {
processf func(...*chunkDesc) error
mtx sync.RWMutex
logger log.Logger
q []*chunkDesc
qcap int
timeout time.Duration
timer *time.Timer
trigger chan struct{}
empty chan struct{}
}
func newChunkBatchProcessor(l log.Logger, cap int, to time.Duration) *chunkBatchProcessor {
if l == nil {
l = log.NewNopLogger()
}
p := &chunkBatchProcessor{
logger: l,
qcap: cap,
timeout: to,
timer: time.NewTimer(to),
trigger: make(chan struct{}, 1),
empty: make(chan struct{}),
}
// Start with closed channel so we don't block on wait if nothing
// has ever been indexed.
close(p.empty)
go p.run()
return p
}
func (p *chunkBatchProcessor) run() {
for {
// Process pending indexing batch if triggered
// or timeout since last indexing has passed.
select {
case <-p.trigger:
case <-p.timer.C:
}
if err := p.process(); err != nil {
p.logger.
With("err", err).With("num", len(p.q)).
Error("batch failed, dropping chunks descs")
}
}
}
func (p *chunkBatchProcessor) process() error {
// TODO(fabxc): locking the entire time will cause lock contention.
p.mtx.Lock()
defer p.mtx.Unlock()
if len(p.q) == 0 {
return nil
}
// Leave chunk descs behind whether successful or not.
defer func() {
p.q = p.q[:0]
close(p.empty)
}()
return p.processf(p.q...)
}
func (p *chunkBatchProcessor) enqueue(cds ...*chunkDesc) {
p.mtx.Lock()
defer p.mtx.Unlock()
if len(p.q) == 0 {
p.timer.Reset(p.timeout)
p.empty = make(chan struct{})
}
p.q = append(p.q, cds...)
if len(p.q) > p.qcap {
select {
case p.trigger <- struct{}{}:
default:
// If we cannot send a signal is already set.
}
}
}
// wait blocks until the queue becomes empty.
func (p *chunkBatchProcessor) wait() {
p.mtx.RLock()
c := p.empty
p.mtx.RUnlock()
<-c
}
}

View File

@ -1,145 +0,0 @@
package tsdb
import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"testing"
"time"
"github.com/fabxc/tindex"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/cinamon/chunk"
"github.com/prometheus/prometheus/storage/metric"
"github.com/stretchr/testify/require"
)
func TestE2E(t *testing.T) {
dir, err := ioutil.TempDir("", "cinamon_test")
require.NoError(t, err)
defer os.RemoveAll(dir)
c, err := Open(dir, log.Base(), nil)
require.NoError(t, err)
c.memChunks.indexer.timeout = 50 * time.Millisecond
// Set indexer size to be triggered exactly when we hit the limit.
// c.memChunks.indexer.qmax = 10
mets := generateMetrics(100000)
// var wg sync.WaitGroup
// for k := 0; k < len(mets)/100+1; k++ {
// wg.Add(1)
// go func(mets []model.Metric) {
var s Scrape
for i := 0; i < 2*64; i++ {
s.Reset(model.Time(i) * 100000)
for _, m := range mets {
s.Add(m, model.SampleValue(rand.Float64()))
}
require.NoError(t, c.Append(&s))
}
// wg.Done()
// }(mets[k*100 : (k+1)*100])
// }
// wg.Wait()
start := time.Now()
c.memChunks.indexer.wait()
fmt.Println("index wait", time.Since(start))
start = time.Now()
q, err := c.Querier()
require.NoError(t, err)
defer q.Close()
m1, err := metric.NewLabelMatcher(metric.Equal, "job", "somejob")
require.NoError(t, err)
m2, err := metric.NewLabelMatcher(metric.Equal, "label2", "value0")
require.NoError(t, err)
m3, err := metric.NewLabelMatcher(metric.Equal, "label4", "value0")
require.NoError(t, err)
it, err := q.Iterator(m1, m2, m3)
require.NoError(t, err)
res, err := tindex.ExpandIterator(it)
require.NoError(t, err)
fmt.Println("result len", len(res))
fmt.Println("querying", time.Since(start))
}
func generateMetrics(n int) (res []model.Metric) {
for i := 0; i < n; i++ {
res = append(res, model.Metric{
"job": "somejob",
"label5": model.LabelValue(fmt.Sprintf("value%d", i%10)),
"label4": model.LabelValue(fmt.Sprintf("value%d", i%5)),
"label3": model.LabelValue(fmt.Sprintf("value%d", i%3)),
"label2": model.LabelValue(fmt.Sprintf("value%d", i%2)),
"label1": model.LabelValue(fmt.Sprintf("value%d", i)),
})
}
return res
}
func TestMemChunksShardGet(t *testing.T) {
cs := &memChunksShard{
descs: map[model.Fingerprint][]*chunkDesc{},
csize: 100,
}
cdesc1, created1 := cs.get(123, model.Metric{"x": "1"})
require.True(t, created1)
require.Equal(t, 1, len(cs.descs[123]))
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "1"},
chunk: chunk.NewPlainChunk(100),
}, cdesc1)
// Add colliding metric.
cdesc2, created2 := cs.get(123, model.Metric{"x": "2"})
require.True(t, created2)
require.Equal(t, 2, len(cs.descs[123]))
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "2"},
chunk: chunk.NewPlainChunk(100),
}, cdesc2)
// First chunk desc can still be retrieved correctly.
cdesc1, created1 = cs.get(123, model.Metric{"x": "1"})
require.False(t, created1)
require.Equal(t, &chunkDesc{
met: model.Metric{"x": "1"},
chunk: chunk.NewPlainChunk(100),
}, cdesc1)
}
func TestChunkSeriesIterator(t *testing.T) {
newChunk := func(s []model.SamplePair) chunk.Chunk {
c := chunk.NewPlainChunk(1000)
app := c.Appender()
for _, sp := range s {
if err := app.Append(sp.Timestamp, sp.Value); err != nil {
t.Fatal(err)
}
}
return c
}
it := newChunkSeriesIterator(metric.Metric{}, []chunk.Chunk{
newChunk([]model.SamplePair{{1, 1}, {2, 2}, {3, 3}}),
newChunk([]model.SamplePair{{4, 4}, {5, 5}, {6, 6}}),
newChunk([]model.SamplePair{{7, 7}, {8, 8}, {9, 9}}),
})
var res []model.SamplePair
for sp, ok := it.Seek(0); ok; sp, ok = it.Next() {
fmt.Println(sp)
res = append(res, sp)
}
require.Equal(t, io.EOF, it.Err())
require.Equal(t, []model.SamplePair{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}, {7, 7}, {8, 8}, {9, 9}}, res)
}

130
index.go
View File

@ -1,130 +0,0 @@
package tsdb
import (
"sort"
"strconv"
"sync/atomic"
"time"
"github.com/fabxc/tsdb/index"
"github.com/prometheus/common/log"
"github.com/prometheus/common/model"
)
const (
defaultIndexerTimeout = 1 * time.Second
defaultIndexerQsize = 500000
)
// indexer asynchronously indexes chunks in batches. It indexes all labels
// of a chunk with a forward mapping and additionally indexes the chunk for
// the time slice of its first sample.
type indexer struct {
*chunkBatchProcessor
ix *index.Index
mc *memChunks
}
// Create batch indexer that creates new index documents
// and indexes them by the metric fields.
// Its post-indexing hook populates the in-memory chunk forward index.
func newMetricIndexer(path string, qsz int, qto time.Duration) (*indexer, error) {
ix, err := index.Open(path, nil)
if err != nil {
return nil, err
}
i := &indexer{
ix: ix,
chunkBatchProcessor: newChunkBatchProcessor(log.Base(), qsz, qto),
}
i.chunkBatchProcessor.processf = i.index
return i, nil
}
func (ix *indexer) Querier() (*index.Querier, error) {
return ix.ix.Querier()
}
const (
timeSliceField = "__ts__"
timeSliceSize = 3 * time.Hour
)
func timeSlice(t model.Time) model.Time {
return t - (t % model.Time(timeSliceSize/time.Millisecond))
}
func timeString(t model.Time) string {
return strconv.FormatInt(int64(t), 16)
}
func (ix *indexer) close() error {
return ix.ix.Close()
}
func (ix *indexer) index(cds ...*chunkDesc) error {
b, err := ix.ix.Batch()
if err != nil {
return err
}
ids := make([]ChunkID, len(cds))
for i, cd := range cds {
terms := make(index.Terms, 0, len(cd.met))
for k, v := range cd.met {
t := index.Term{Field: string(k), Val: string(v)}
terms = append(terms, t)
}
id := b.Add(terms)
ts := timeSlice(cd.firstTime)
// If the chunk has a higher time slice than the high one,
// don't index. It will be indexed when the next time slice
// is initiated over all memory chunks.
if ts <= ix.mc.highTime {
b.SecondaryIndex(id, index.Term{
Field: timeSliceField,
Val: timeString(ts),
})
}
ids[i] = ChunkID(id)
}
if err := b.Commit(); err != nil {
return err
}
// We have to lock here already instead of post-commit as otherwise we might
// generate new chunk IDs, skip their indexing, and have a reindexTime being
// called with the chunk ID not being visible yet.
// TODO(fabxc): move back up
ix.mc.mtx.Lock()
defer ix.mc.mtx.Unlock()
// Make in-memory chunks visible for read.
for i, cd := range cds {
atomic.StoreUint64((*uint64)(&cd.id), uint64(ids[i]))
ix.mc.chunks[cd.id] = cd
}
return nil
}
// reindexTime creates an initial time slice index over all chunk IDs.
// Any future chunks indexed for the same time slice must have higher IDs.
func (ix *indexer) reindexTime(ids ChunkIDs, ts model.Time) error {
b, err := ix.ix.Batch()
if err != nil {
return err
}
sort.Sort(ids)
t := index.Term{Field: timeSliceField, Val: timeString(ts)}
for _, id := range ids {
b.SecondaryIndex(index.DocID(id), t)
}
return b.Commit()
}

View File

@ -1,201 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -1,3 +0,0 @@
index
benchout
testdata*

View File

@ -1,14 +0,0 @@
all: bench svg
bench: build
@echo ">> running benchmark"
@./tindex bench write testdata
build:
@go build .
svg:
@echo ">> create svgs"
@go tool pprof -svg ./tindex benchout/cpu.prof > benchout/cpuprof.svg
@go tool pprof -svg ./tindex benchout/mem.prof > benchout/memprof.svg
@go tool pprof -svg ./tindex benchout/block.prof > benchout/blockprof.svg

View File

@ -1,253 +0,0 @@
package main
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"time"
"github.com/fabxc/tsdb/index"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/spf13/cobra"
)
func main() {
root := &cobra.Command{
Use: "index",
Short: "CLI tool for index",
}
root.AddCommand(
NewBenchCommand(),
)
root.Execute()
}
func NewBenchCommand() *cobra.Command {
c := &cobra.Command{
Use: "bench",
Short: "run benchmarks",
}
c.AddCommand(NewBenchWriteCommand())
return c
}
type writeBenchmark struct {
outPath string
cleanup bool
cpuprof *os.File
memprof *os.File
blockprof *os.File
}
func NewBenchWriteCommand() *cobra.Command {
var wb writeBenchmark
c := &cobra.Command{
Use: "write <file>",
Short: "run a write performance benchmark",
Run: wb.run,
}
c.PersistentFlags().StringVar(&wb.outPath, "out", "benchout/", "set the output path")
return c
}
func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
if len(args) != 1 {
exitWithError(fmt.Errorf("missing file argument"))
}
if b.outPath == "" {
dir, err := ioutil.TempDir("", "index_bench")
if err != nil {
exitWithError(err)
}
b.outPath = dir
b.cleanup = true
}
if err := os.RemoveAll(b.outPath); err != nil {
exitWithError(err)
}
if err := os.MkdirAll(b.outPath, 0777); err != nil {
exitWithError(err)
}
var docs []*InsertDoc
measureTime("readData", func() {
f, err := os.Open(args[0])
if err != nil {
exitWithError(err)
}
defer f.Close()
docs, err = readPrometheusLabels(f)
if err != nil {
exitWithError(err)
}
})
dir := filepath.Join(b.outPath, "ix")
ix, err := index.Open(dir, nil)
if err != nil {
exitWithError(err)
}
defer func() {
ix.Close()
reportSize(dir)
if b.cleanup {
os.RemoveAll(b.outPath)
}
}()
measureTime("indexData", func() {
b.startProfiling()
indexDocs(ix, docs, 100000)
indexDocs(ix, docs, 100000)
indexDocs(ix, docs, 100000)
indexDocs(ix, docs, 100000)
b.stopProfiling()
})
}
func (b *writeBenchmark) startProfiling() {
var err error
// Start CPU profiling.
b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create cpu profile: %v\n", err))
}
pprof.StartCPUProfile(b.cpuprof)
// Start memory profiling.
b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create memory profile: %v\n", err))
}
runtime.MemProfileRate = 4096
// Start fatal profiling.
b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof"))
if err != nil {
exitWithError(fmt.Errorf("bench: could not create block profile: %v\n", err))
}
runtime.SetBlockProfileRate(1)
}
func (b *writeBenchmark) stopProfiling() {
if b.cpuprof != nil {
pprof.StopCPUProfile()
b.cpuprof.Close()
b.cpuprof = nil
}
if b.memprof != nil {
pprof.Lookup("heap").WriteTo(b.memprof, 0)
b.memprof.Close()
b.memprof = nil
}
if b.blockprof != nil {
pprof.Lookup("block").WriteTo(b.blockprof, 0)
b.blockprof.Close()
b.blockprof = nil
runtime.SetBlockProfileRate(0)
}
}
func indexDocs(ix *index.Index, docs []*InsertDoc, batchSize int) {
remDocs := docs[:]
var ids []index.DocID
for len(remDocs) > 0 {
n := batchSize
if n > len(remDocs) {
n = len(remDocs)
}
b, err := ix.Batch()
if err != nil {
exitWithError(err)
}
for _, d := range remDocs[:n] {
id := b.Add(d.Terms)
ids = append(ids, id)
}
if err := b.Commit(); err != nil {
exitWithError(err)
}
remDocs = remDocs[n:]
}
}
func reportSize(dir string) {
err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil || path == dir {
return err
}
fmt.Printf(" > file=%s size=%.03fGiB\n", path[len(dir):], float64(info.Size())/1024/1024/1024)
return nil
})
if err != nil {
exitWithError(err)
}
}
func measureTime(stage string, f func()) {
fmt.Printf(">> start stage=%s\n", stage)
start := time.Now()
f()
fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start))
}
type InsertDoc struct {
Terms index.Terms
}
func readPrometheusLabels(r io.Reader) ([]*InsertDoc, error) {
dec := expfmt.NewDecoder(r, expfmt.FmtProtoText)
var docs []*InsertDoc
var mf dto.MetricFamily
for {
if err := dec.Decode(&mf); err != nil {
if err == io.EOF {
break
}
return nil, err
}
for _, m := range mf.GetMetric() {
d := &InsertDoc{
Terms: make(index.Terms, len(m.GetLabel())+1),
}
d.Terms[0] = index.Term{
Field: "__name__",
Val: mf.GetName(),
}
for i, l := range m.GetLabel() {
d.Terms[i+1] = index.Term{
Field: l.GetName(),
Val: l.GetValue(),
}
}
docs = append(docs, d)
}
}
return docs, nil
}
func exitWithError(err error) {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

View File

@ -1,206 +0,0 @@
package index
import (
"encoding/binary"
"errors"
"io"
"sync"
"github.com/boltdb/bolt"
)
var encpool buffers
type buffers struct {
pool sync.Pool
}
func (b *buffers) get(l int) []byte {
x := b.pool.Get()
if x == nil {
return make([]byte, l)
}
buf := x.([]byte)
if cap(buf) < l {
return make([]byte, l)
}
return buf[:l]
}
func (b *buffers) getZero(l int) []byte {
buf := b.get(l)
for i := range buf {
buf[i] = 0
}
return buf
}
func (b *buffers) put(buf []byte) {
b.pool.Put(buf)
}
func (b *buffers) bucketPut(bkt *bolt.Bucket, k, v []byte) error {
err := bkt.Put(k, v)
b.put(k)
return err
}
func (b *buffers) bucketGet(bkt *bolt.Bucket, k []byte) []byte {
v := bkt.Get(k)
b.put(k)
return v
}
func (b *buffers) uint64be(x uint64) []byte {
buf := b.get(8)
binary.BigEndian.PutUint64(buf, x)
return buf
}
func (b *buffers) uvarint(x uint64) []byte {
buf := b.get(binary.MaxVarintLen64)
return buf[:binary.PutUvarint(buf, x)]
}
type txbuffs struct {
buffers *buffers
done [][]byte
}
func (b *txbuffs) get(l int) []byte {
buf := b.buffers.get(l)
b.done = append(b.done, buf)
return buf
}
func (b *txbuffs) getZero(l int) []byte {
buf := b.buffers.getZero(l)
b.done = append(b.done, buf)
return buf
}
func (b *txbuffs) release() {
for _, buf := range b.done {
b.buffers.put(buf)
}
}
func (b *txbuffs) put(buf []byte) {
b.done = append(b.done, buf)
}
func (b *txbuffs) uint64be(x uint64) []byte {
buf := b.get(8)
binary.BigEndian.PutUint64(buf, x)
return buf
}
func (b *txbuffs) uvarint(x uint64) []byte {
buf := b.get(binary.MaxVarintLen64)
return buf[:binary.PutUvarint(buf, x)]
}
// reuse of buffers
var pagePool sync.Pool
// getBuf returns a buffer from the pool. The length of the returned slice is l.
func getPage(l int) []byte {
x := pagePool.Get()
if x == nil {
return make([]byte, l)
}
buf := x.([]byte)
if cap(buf) < l {
return make([]byte, l)
}
return buf[:l]
}
// putBuf returns a buffer to the pool.
func putPage(buf []byte) {
pagePool.Put(buf)
}
// bufPool is a pool for staging buffers. Using a pool allows concurrency-safe
// reuse of buffers
var bufPool sync.Pool
// getBuf returns a buffer from the pool. The length of the returned slice is l.
func getBuf(l int) []byte {
x := bufPool.Get()
if x == nil {
return make([]byte, l)
}
buf := x.([]byte)
if cap(buf) < l {
return make([]byte, l)
}
return buf[:l]
}
// putBuf returns a buffer to the pool.
func putBuf(buf []byte) {
bufPool.Put(buf)
}
func encodeUint64(x uint64) []byte {
buf := getBuf(8)
binary.BigEndian.PutUint64(buf, x)
return buf
}
func decodeUint64(buf []byte) uint64 {
return binary.BigEndian.Uint64(buf)
}
func writeUvarint(w io.ByteWriter, x uint64) (i int, err error) {
for x >= 0x80 {
if err = w.WriteByte(byte(x) | 0x80); err != nil {
return i, err
}
x >>= 7
i++
}
if err = w.WriteByte(byte(x)); err != nil {
return i, err
}
return i + 1, err
}
func writeVarint(w io.ByteWriter, x int64) (i int, err error) {
ux := uint64(x) << 1
if x < 0 {
ux = ^ux
}
return writeUvarint(w, ux)
}
func readUvarint(r io.ByteReader) (uint64, int, error) {
var (
x uint64
s uint
)
for i := 0; ; i++ {
b, err := r.ReadByte()
if err != nil {
return x, i, err
}
if b < 0x80 {
if i > 9 || i == 9 && b > 1 {
return x, i + 1, errors.New("varint overflows a 64-bit integer")
}
return x | uint64(b)<<s, i + 1, nil
}
x |= uint64(b&0x7f) << s
s += 7
}
}
func readVarint(r io.ByteReader) (int64, int, error) {
ux, n, err := readUvarint(r)
x := int64(ux >> 1)
if ux&1 != 0 {
x = ^x
}
return x, n, err
}

View File

@ -1,716 +0,0 @@
package index
import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"io"
"math"
"os"
"path/filepath"
"regexp"
"sync"
"github.com/boltdb/bolt"
"github.com/fabxc/tsdb/pages"
)
var (
errOutOfOrder = errors.New("out of order")
errNotFound = errors.New("not found")
)
// Options for an Index.
type Options struct {
}
// DefaultOptions used for opening a new index.
var DefaultOptions = &Options{}
// Index is a fully persistent inverted index of documents with any number of fields
// that map to exactly one term.
type Index struct {
pbuf *pages.DB
bolt *bolt.DB
meta *meta
rwlock sync.Mutex
}
// Open returns an index located in the given path. If none exists a new
// one is created.
func Open(path string, opts *Options) (*Index, error) {
if opts == nil {
opts = DefaultOptions
}
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
bdb, err := bolt.Open(filepath.Join(path, "kv"), 0666, nil)
if err != nil {
return nil, err
}
pdb, err := pages.Open(filepath.Join(path, "pb"), 0666, &pages.Options{
PageSize: pageSize,
})
if err != nil {
return nil, err
}
ix := &Index{
bolt: bdb,
pbuf: pdb,
meta: &meta{},
}
if err := ix.bolt.Update(ix.init); err != nil {
return nil, err
}
return ix, nil
}
// Close closes the index.
func (ix *Index) Close() error {
err0 := ix.pbuf.Close()
err1 := ix.bolt.Close()
if err0 != nil {
return err0
}
return err1
}
var (
bktMeta = []byte("meta")
bktDocs = []byte("docs")
bktTerms = []byte("terms")
bktTermIDs = []byte("term_ids")
bktSkiplist = []byte("skiplist")
keyMeta = []byte("meta")
)
func (ix *Index) init(tx *bolt.Tx) error {
// Ensure all buckets exist. Any other index methods assume
// that these buckets exist and may panic otherwise.
for _, bn := range [][]byte{
bktMeta, bktTerms, bktTermIDs, bktDocs, bktSkiplist,
} {
if _, err := tx.CreateBucketIfNotExists(bn); err != nil {
return fmt.Errorf("create bucket %q failed: %s", string(bn), err)
}
}
// Read the meta state if the index was already initialized.
mbkt := tx.Bucket(bktMeta)
if v := mbkt.Get(keyMeta); v != nil {
if err := ix.meta.read(v); err != nil {
return fmt.Errorf("decoding meta failed: %s", err)
}
} else {
// Index not initialized yet, set up meta information.
ix.meta = &meta{
LastDocID: 0,
LastTermID: 0,
}
v, err := ix.meta.bytes()
if err != nil {
return fmt.Errorf("encoding meta failed: %s", err)
}
if err := mbkt.Put(keyMeta, v); err != nil {
return fmt.Errorf("creating meta failed: %s", err)
}
}
return nil
}
// Querier starts a new query session against the index.
func (ix *Index) Querier() (*Querier, error) {
kvtx, err := ix.bolt.Begin(false)
if err != nil {
return nil, err
}
pbtx, err := ix.pbuf.Begin(false)
if err != nil {
kvtx.Rollback()
return nil, err
}
return &Querier{
kvtx: kvtx,
pbtx: pbtx,
// TODO(fabxc): consider getting these buckets lazily.
termBkt: kvtx.Bucket(bktTerms),
termidBkt: kvtx.Bucket(bktTermIDs),
docBkt: kvtx.Bucket(bktDocs),
skiplistBkt: kvtx.Bucket(bktSkiplist),
}, nil
}
// Querier encapsulates the index for several queries.
type Querier struct {
kvtx *bolt.Tx
pbtx *pages.Tx
termBkt *bolt.Bucket
termidBkt *bolt.Bucket
docBkt *bolt.Bucket
skiplistBkt *bolt.Bucket
}
// Close closes the underlying index transactions.
func (q *Querier) Close() error {
err0 := q.pbtx.Rollback()
err1 := q.kvtx.Rollback()
if err0 != nil {
return err0
}
return err1
}
// Terms returns all terms for the key field matching the provided matcher.
// If the matcher is nil, all terms for the field are returned.
func (q *Querier) Terms(key string, m Matcher) []string {
if m == nil {
m = AnyMatcher
}
return q.termsForMatcher(key, m)
}
// Search returns an iterator over all document IDs that match all
// provided matchers.
func (q *Querier) Search(key string, m Matcher) (Iterator, error) {
tids := q.termIDsForMatcher(key, m)
its := make([]Iterator, 0, len(tids))
for _, t := range tids {
it, err := q.postingsIter(t)
if err != nil {
return nil, err
}
its = append(its, it)
}
if len(its) == 0 {
return nil, nil
}
return Merge(its...), nil
}
// postingsIter returns an iterator over the postings list of term t.
func (q *Querier) postingsIter(t termid) (Iterator, error) {
b := q.skiplistBkt.Bucket(t.bytes())
if b == nil {
return nil, errNotFound
}
it := &skippingIterator{
skiplist: &boltSkiplistCursor{
k: uint64(t),
c: b.Cursor(),
bkt: b,
},
iterators: iteratorStoreFunc(func(k uint64) (Iterator, error) {
data, err := q.pbtx.Get(k)
if err != nil {
return nil, errNotFound
}
// TODO(fabxc): for now, offset is zero, pages have no header
// and are always delta encoded.
return newPageDelta(data).cursor(), nil
}),
}
return it, nil
}
func (q *Querier) termsForMatcher(key string, m Matcher) []string {
c := q.termBkt.Cursor()
pref := append([]byte(key), 0xff)
var terms []string
// TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct
// and prefixed seeks depending on the matcher.
for k, _ := c.Seek(pref); bytes.HasPrefix(k, pref); k, _ = c.Next() {
if m.Match(string(k[len(pref):])) {
terms = append(terms, string(k[len(pref):]))
}
}
return terms
}
func (q *Querier) termIDsForMatcher(key string, m Matcher) termids {
c := q.termBkt.Cursor()
pref := append([]byte(key), 0xff)
var ids termids
// TODO(fabxc): We scan the entire term value range for the field. Improvide this by direct
// and prefixed seeks depending on the matcher.
for k, v := c.Seek(pref); bytes.HasPrefix(k, pref); k, v = c.Next() {
if m.Match(string(k[len(pref):])) {
ids = append(ids, newTermID(v))
}
}
return ids
}
// Doc returns the document with the given ID.
func (q *Querier) Doc(id DocID) (Terms, error) {
v := q.docBkt.Get(id.bytes())
if v == nil {
return nil, errNotFound
}
tids := newTermIDs(v)
// TODO(fabxc): consider at least a per-session cache for these.
terms := make(Terms, len(tids))
for i, t := range tids {
// TODO(fabxc): is this encode/decode cycle here worth the space savings?
// If we stored plain uint64s we can just pass the slice back in.
v := q.termidBkt.Get(t.bytes())
if v == nil {
return nil, fmt.Errorf("term not found")
}
term, err := newTerm(v)
if err != nil {
return nil, err
}
terms[i] = term
}
return terms, nil
}
// Delete removes all documents in the iterator from the index.
// It returns the number of deleted documents.
func (ix *Index) Delete(it Iterator) (int, error) {
panic("not implemented")
}
// Batch starts a new batch against the index.
func (ix *Index) Batch() (*Batch, error) {
// Lock writes so we can safely pre-allocate term and doc IDs.
ix.rwlock.Lock()
tx, err := ix.bolt.Begin(false)
if err != nil {
return nil, err
}
b := &Batch{
ix: ix,
tx: tx,
meta: &meta{},
termBkt: tx.Bucket(bktTerms),
termidBkt: tx.Bucket(bktTermIDs),
terms: map[Term]*batchTerm{},
}
*b.meta = *ix.meta
return b, nil
}
// meta contains information about the state of the index.
type meta struct {
LastDocID DocID
LastTermID termid
}
// read initilizes the meta from a byte slice.
func (m *meta) read(b []byte) error {
return gob.NewDecoder(bytes.NewReader(b)).Decode(m)
}
// bytes returns a byte slice representation of the meta.
func (m *meta) bytes() ([]byte, error) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(m); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Terms is a sortable list of terms.
type Terms []Term
func (t Terms) Len() int { return len(t) }
func (t Terms) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t Terms) Less(i, j int) bool {
if t[i].Field < t[j].Field {
return true
}
if t[i].Field > t[j].Field {
return false
}
return t[i].Val < t[j].Val
}
// Term is a term for the specified field.
type Term struct {
Field, Val string
}
func newTerm(b []byte) (t Term, e error) {
c := bytes.SplitN(b, []byte{0xff}, 2)
if len(c) != 2 {
return t, fmt.Errorf("invalid term")
}
t.Field = string(c[0])
t.Val = string(c[1])
return t, nil
}
// bytes returns a byte slice representation of the term.
func (t *Term) bytes() []byte {
b := make([]byte, 0, len(t.Field)+1+len(t.Val))
b = append(b, []byte(t.Field)...)
b = append(b, 0xff)
return append(b, []byte(t.Val)...)
}
// Matcher checks whether a value for a key satisfies a check condition.
type Matcher interface {
Match(value string) bool
}
// AnyMatcher matches any term value for a field.
var AnyMatcher = anyMatcher{}
type anyMatcher struct{}
func (anyMatcher) Match(_ string) bool {
return true
}
// EqualMatcher matches exactly one value for a particular label.
type EqualMatcher struct {
val string
}
func NewEqualMatcher(val string) *EqualMatcher {
return &EqualMatcher{val: val}
}
func (m *EqualMatcher) Match(s string) bool { return m.val == s }
// RegexpMatcher matches labels for the fixed key for which the value
// matches a regular expression.
type RegexpMatcher struct {
re *regexp.Regexp
}
func NewRegexpMatcher(expr string) (*RegexpMatcher, error) {
re, err := regexp.Compile(expr)
if err != nil {
return nil, err
}
return &RegexpMatcher{re: re}, nil
}
func (m *RegexpMatcher) Match(s string) bool { return m.re.MatchString(s) }
// DocID is a unique identifier for a document.
type DocID uint64
func newDocID(b []byte) DocID {
return DocID(decodeUint64(b))
}
func (d DocID) bytes() []byte {
return encodeUint64(uint64(d))
}
type termid uint64
func newTermID(b []byte) termid {
return termid(decodeUint64(b))
}
func (t termid) bytes() []byte {
return encodeUint64(uint64(t))
}
type termids []termid
func (t termids) Len() int { return len(t) }
func (t termids) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t termids) Less(i, j int) bool { return t[i] < t[j] }
// newTermIDs reads a sequence of uvarints from b and appends them
// to the term IDs.
func newTermIDs(b []byte) (t termids) {
for len(b) > 0 {
k, n := binary.Uvarint(b)
t = append(t, termid(k))
b = b[n:]
}
return t
}
// bytes encodes the term IDs as a sequence of uvarints.
func (t termids) bytes() []byte {
b := make([]byte, len(t)*binary.MaxVarintLen64)
n := 0
for _, x := range t {
n += binary.PutUvarint(b[n:], uint64(x))
}
return b[:n]
}
// Batch collects multiple indexing actions and allows to apply them
// to the persistet index all at once for improved performance.
type Batch struct {
ix *Index
tx *bolt.Tx
meta *meta
termBkt *bolt.Bucket
termidBkt *bolt.Bucket
docs []*batchDoc
terms map[Term]*batchTerm
}
type batchDoc struct {
id DocID
terms termids
}
type batchTerm struct {
id termid // zero if term has not been added yet
docs []DocID // documents to be indexed for the term
}
// Add adds a new document with the given terms to the index and
// returns a new unique ID for it.
// The ID only becomes valid after the batch has been committed successfully.
func (b *Batch) Add(terms Terms) DocID {
b.meta.LastDocID++
id := b.meta.LastDocID
tids := make(termids, 0, len(terms))
// Subtract last document ID before this batch was started.
for _, t := range terms {
tids = append(tids, b.addTerm(id, t))
}
b.docs = append(b.docs, &batchDoc{id: id, terms: tids})
return id
}
// SecondaryIndex indexes the document ID for additional terms. The temrs
// are not stored as part of the document's forward index as the initial terms.
// The caller has to ensure that the document IDs are added to terms in
// increasing order.
func (b *Batch) SecondaryIndex(id DocID, terms ...Term) {
for _, t := range terms {
b.addTerm(id, t)
}
}
// addTerm adds the document ID to the term's postings list and returns
// the Term's ID.
func (b *Batch) addTerm(id DocID, t Term) termid {
tb := b.terms[t]
// Populate term if necessary and allocate a new ID if it
// hasn't been created in the database before.
if tb == nil {
tb = &batchTerm{docs: make([]DocID, 0, 1024)}
b.terms[t] = tb
if idb := b.termBkt.Get(t.bytes()); idb != nil {
tb.id = termid(decodeUint64(idb))
} else {
b.meta.LastTermID++
tb.id = b.meta.LastTermID
}
}
tb.docs = append(tb.docs, id)
return tb.id
}
// Commit executes the batched indexing against the underlying index.
func (b *Batch) Commit() error {
defer b.ix.rwlock.Unlock()
// Close read transaction to open a write transaction. The outer rwlock
// stil guards against intermittend writes between switching.
if err := b.tx.Rollback(); err != nil {
return err
}
err := b.ix.bolt.Update(func(tx *bolt.Tx) error {
docsBkt := tx.Bucket(bktDocs)
// Add document IDs to forward index,
for _, d := range b.docs {
if err := docsBkt.Put(d.id.bytes(), d.terms.bytes()); err != nil {
return err
}
}
// Add newly allocated terms.
termBkt := tx.Bucket(bktTerms)
termidBkt := tx.Bucket(bktTermIDs)
for t, tb := range b.terms {
if tb.id > b.ix.meta.LastTermID {
bid := encodeUint64(uint64(tb.id))
tby := t.bytes()
if err := termBkt.Put(tby, bid); err != nil {
return fmt.Errorf("setting term failed: %s", err)
}
if err := termidBkt.Put(bid, tby); err != nil {
return fmt.Errorf("setting term failed: %s", err)
}
}
}
pbtx, err := b.ix.pbuf.Begin(true)
if err != nil {
return err
}
if err := b.writePostingsBatch(tx, pbtx); err != nil {
pbtx.Rollback()
return err
}
if err := pbtx.Commit(); err != nil {
return err
}
return b.updateMeta(tx)
})
return err
}
// Rollback drops all changes applied in the batch.
func (b *Batch) Rollback() error {
b.ix.rwlock.Unlock()
return b.tx.Rollback()
}
// writePostings adds the postings batch to the index.
func (b *Batch) writePostingsBatch(kvtx *bolt.Tx, pbtx *pages.Tx) error {
skiplist := kvtx.Bucket(bktSkiplist)
// createPage allocates a new delta-encoded page starting with id as its first entry.
createPage := func(id DocID) (page, error) {
pg := newPageDelta(make([]byte, pageSize-pages.PageHeaderSize))
if err := pg.init(id); err != nil {
return nil, err
}
return pg, nil
}
for _, tb := range b.terms {
ids := tb.docs
b, err := skiplist.CreateBucketIfNotExists(tb.id.bytes())
if err != nil {
return err
}
sl := &boltSkiplistCursor{
k: uint64(tb.id),
c: b.Cursor(),
bkt: b,
}
var (
pg page // Page we are currently appending to.
pc pageCursor // Its cursor.
pid uint64 // Its ID.
)
// Get the most recent page. If none exist, the entire postings list is new.
_, pid, err = sl.seek(math.MaxUint64)
if err != nil {
if err != io.EOF {
return err
}
// No most recent page for the key exists. The postings list is new and
// we have to allocate a new page ID for it.
if pg, err = createPage(ids[0]); err != nil {
return err
}
pc = pg.cursor()
ids = ids[1:]
} else {
// Load the most recent page.
pdata, err := pbtx.Get(pid)
if pdata == nil {
return fmt.Errorf("error getting page for ID %q: %s", pid, err)
}
pdatac := make([]byte, len(pdata))
// The byte slice is mmaped from bolt. We have to copy it to make modifications.
// pdatac := make([]byte, len(pdata))
copy(pdatac, pdata)
pg = newPageDelta(pdatac)
pc = pg.cursor()
}
for i := 0; i < len(ids); i++ {
if err = pc.append(ids[i]); err == errPageFull {
// We couldn't append to the page because it was full.
// Store away the old page...
if pid == 0 {
// The page was new.
pid, err = pbtx.Add(pg.data())
if err != nil {
return err
}
first, err := pc.Seek(0)
if err != nil {
return err
}
if err := sl.append(first, pid); err != nil {
return err
}
} else {
if err = pbtx.Set(pid, pg.data()); err != nil {
return err
}
}
// ... and allocate a new page.
pid = 0
if pg, err = createPage(ids[i]); err != nil {
return err
}
pc = pg.cursor()
} else if err != nil {
return err
}
}
// Save the last page we have written to.
if pid == 0 {
// The page was new.
pid, err = pbtx.Add(pg.data())
if err != nil {
return err
}
first, err := pc.Seek(0)
if err != nil {
return err
}
if err := sl.append(first, pid); err != nil {
return err
}
} else {
if err = pbtx.Set(pid, pg.data()); err != nil {
return err
}
}
}
return nil
}
// updateMeta updates the index's meta information based on the changes
// applied with the batch.
func (b *Batch) updateMeta(tx *bolt.Tx) error {
b.ix.meta = b.meta
bkt := tx.Bucket([]byte(bktMeta))
if bkt == nil {
return fmt.Errorf("meta bucket not found")
}
v, err := b.ix.meta.bytes()
if err != nil {
return fmt.Errorf("error encoding meta: %s", err)
}
return bkt.Put([]byte(keyMeta), v)
}

View File

@ -1 +0,0 @@
package index

View File

@ -1,294 +0,0 @@
package index
import (
"io"
"sort"
)
// An Iterator provides sorted iteration over a list of uint64s.
type Iterator interface {
// Next retrieves the next document ID in the postings list.
Next() (DocID, error)
// Seek moves the cursor to ID or the closest following one, if it doesn't exist.
// It returns the ID at the position.
Seek(id DocID) (DocID, error)
}
type mergeIterator struct {
i1, i2 Iterator
v1, v2 DocID
e1, e2 error
}
func (it *mergeIterator) Next() (DocID, error) {
if it.e1 == io.EOF && it.e2 == io.EOF {
return 0, io.EOF
}
if it.e1 != nil {
if it.e1 != io.EOF {
return 0, it.e1
}
x := it.v2
it.v2, it.e2 = it.i2.Next()
return x, nil
}
if it.e2 != nil {
if it.e2 != io.EOF {
return 0, it.e2
}
x := it.v1
it.v1, it.e1 = it.i1.Next()
return x, nil
}
if it.v1 < it.v2 {
x := it.v1
it.v1, it.e1 = it.i1.Next()
return x, nil
} else if it.v2 < it.v1 {
x := it.v2
it.v2, it.e2 = it.i2.Next()
return x, nil
} else {
x := it.v1
it.v1, it.e1 = it.i1.Next()
it.v2, it.e2 = it.i2.Next()
return x, nil
}
}
func (it *mergeIterator) Seek(id DocID) (DocID, error) {
// We just have to advance the first iterator. The next common match is also
// the next seeked ID of the intersection.
it.v1, it.e1 = it.i1.Seek(id)
it.v2, it.e2 = it.i2.Seek(id)
return it.Next()
}
// Merge returns a new Iterator over the union of the input iterators.
func Merge(its ...Iterator) Iterator {
if len(its) == 0 {
return nil
}
i1 := its[0]
for _, i2 := range its[1:] {
i1 = &mergeIterator{i1: i1, i2: i2}
}
return i1
}
// ExpandIterator walks through the iterator and returns the result list.
// The iterator is closed after completion.
func ExpandIterator(it Iterator) ([]DocID, error) {
var (
res = []DocID{}
v DocID
err error
)
for v, err = it.Seek(0); err == nil; v, err = it.Next() {
res = append(res, v)
}
if err == io.EOF {
return res, nil
}
return res, err
}
type intersectIterator struct {
i1, i2 Iterator
v1, v2 DocID
e1, e2 error
}
// Intersect returns a new Iterator over the intersection of the input iterators.
func Intersect(its ...Iterator) Iterator {
if len(its) == 0 {
return nil
}
i1 := its[0]
for _, i2 := range its[1:] {
i1 = &intersectIterator{i1: i1, i2: i2}
}
return i1
}
func (it *intersectIterator) Next() (DocID, error) {
for {
if it.e1 != nil {
return 0, it.e1
}
if it.e2 != nil {
return 0, it.e2
}
if it.v1 < it.v2 {
it.v1, it.e1 = it.i1.Seek(it.v2)
} else if it.v2 < it.v1 {
it.v2, it.e2 = it.i2.Seek(it.v1)
} else {
v := it.v1
it.v1, it.e1 = it.i1.Next()
it.v2, it.e2 = it.i2.Next()
return v, nil
}
}
}
func (it *intersectIterator) Seek(id DocID) (DocID, error) {
// We have to advance both iterators. Otherwise, we get a false-positive
// match on 0 if only on of the iterators has it.
it.v1, it.e1 = it.i1.Seek(id)
it.v2, it.e2 = it.i2.Seek(id)
return it.Next()
}
// A skiplist iterator iterates through a list of value/pointer pairs.
type skiplistIterator interface {
// seek returns the value and pointer at or before v.
seek(v DocID) (val DocID, ptr uint64, err error)
// next returns the next value/pointer pair.
next() (val DocID, ptr uint64, err error)
}
// iteratorStore allows to retrieve an iterator based on a key.
type iteratorStore interface {
get(uint64) (Iterator, error)
}
// skippingIterator implements the iterator interface based on skiplist, which
// allows to jump to the iterator closest to the seeked value.
//
// This iterator allows for speed up in seeks if the underlying data cannot
// be searched in O(log n).
// Ideally, the skiplist is seekable in O(log n).
type skippingIterator struct {
skiplist skiplistIterator
iterators iteratorStore
// The iterator holding the next value.
cur Iterator
}
// Seek implements the Iterator interface.
func (it *skippingIterator) Seek(id DocID) (DocID, error) {
_, ptr, err := it.skiplist.seek(id)
if err != nil {
return 0, err
}
cur, err := it.iterators.get(ptr)
if err != nil {
return 0, err
}
it.cur = cur
return it.cur.Seek(id)
}
// Next implements the Iterator interface.
func (it *skippingIterator) Next() (DocID, error) {
// If next was called initially.
// TODO(fabxc): should this just panic and initial call to seek() be required?
if it.cur == nil {
return it.Seek(0)
}
if id, err := it.cur.Next(); err == nil {
return id, nil
} else if err != io.EOF {
return 0, err
}
// We reached the end of the current iterator. Get the next iterator through
// our skiplist.
_, ptr, err := it.skiplist.next()
if err != nil {
// Here we return the actual io.EOF if we reached the end of the iterator
// retrieved from the last skiplist entry.
return 0, err
}
// Iterate over the next iterator.
cur, err := it.iterators.get(ptr)
if err != nil {
return 0, err
}
it.cur = cur
// Return the first value in the new iterator.
return it.cur.Seek(0)
}
// plainListIterator implements the iterator interface on a sorted list of integers.
type plainListIterator struct {
list list
pos int
}
func newPlainListIterator(l []DocID) *plainListIterator {
it := &plainListIterator{list: list(l)}
sort.Sort(it.list)
return it
}
func (it *plainListIterator) Seek(id DocID) (DocID, error) {
it.pos = sort.Search(it.list.Len(), func(i int) bool { return it.list[i] >= id })
return it.Next()
}
func (it *plainListIterator) Next() (DocID, error) {
if it.pos >= it.list.Len() {
return 0, io.EOF
}
x := it.list[it.pos]
it.pos++
return x, nil
}
type list []DocID
func (l list) Len() int { return len(l) }
func (l list) Less(i, j int) bool { return l[i] < l[j] }
func (l list) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
// plainSkiplistIterator implements the skiplistIterator interface on plain
// in-memory mapping.
type plainSkiplistIterator struct {
m map[DocID]uint64
keys list
pos int
}
func newPlainSkiplistIterator(m map[DocID]uint64) *plainSkiplistIterator {
var keys list
for k := range m {
keys = append(keys, k)
}
sort.Sort(keys)
return &plainSkiplistIterator{
m: m,
keys: keys,
}
}
// seek implements the skiplistIterator interface.
func (it *plainSkiplistIterator) seek(id DocID) (DocID, uint64, error) {
pos := sort.Search(len(it.keys), func(i int) bool { return it.keys[i] >= id })
// The skiplist iterator points to the element at or before the last value.
if pos > 0 && it.keys[pos] > id {
it.pos = pos - 1
} else {
it.pos = pos
}
return it.next()
}
// next implements the skiplistIterator interface.
func (it *plainSkiplistIterator) next() (DocID, uint64, error) {
if it.pos >= len(it.keys) {
return 0, 0, io.EOF
}
k := it.keys[it.pos]
it.pos++
return k, it.m[k], nil
}

View File

@ -1,228 +0,0 @@
package index
import (
"reflect"
"testing"
)
func TestMultiIntersect(t *testing.T) {
var cases = []struct {
a, b, c []DocID
res []DocID
}{
{
a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001},
c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200},
res: []DocID{2, 5, 6, 1001},
},
}
for _, c := range cases {
i1 := newPlainListIterator(c.a)
i2 := newPlainListIterator(c.b)
i3 := newPlainListIterator(c.c)
res, err := ExpandIterator(Intersect(i1, i2, i3))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
func TestIntersectIterator(t *testing.T) {
var cases = []struct {
a, b []DocID
res []DocID
}{
{
a: []DocID{1, 2, 3, 4, 5},
b: []DocID{6, 7, 8, 9, 10},
res: []DocID{},
},
{
a: []DocID{1, 2, 3, 4, 5},
b: []DocID{4, 5, 6, 7, 8},
res: []DocID{4, 5},
},
{
a: []DocID{1, 2, 3, 4, 9, 10},
b: []DocID{1, 4, 5, 6, 7, 8, 10, 11},
res: []DocID{1, 4, 10},
}, {
a: []DocID{1},
b: []DocID{0, 1},
res: []DocID{1},
},
}
for _, c := range cases {
i1 := newPlainListIterator(c.a)
i2 := newPlainListIterator(c.b)
res, err := ExpandIterator(&intersectIterator{i1: i1, i2: i2})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
func TestMergeIntersect(t *testing.T) {
var cases = []struct {
a, b, c []DocID
res []DocID
}{
{
a: []DocID{1, 2, 3, 4, 5, 6, 1000, 1001},
b: []DocID{2, 4, 5, 6, 7, 8, 999, 1001},
c: []DocID{1, 2, 5, 6, 7, 8, 1001, 1200},
res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 999, 1000, 1001, 1200},
},
}
for _, c := range cases {
i1 := newPlainListIterator(c.a)
i2 := newPlainListIterator(c.b)
i3 := newPlainListIterator(c.c)
res, err := ExpandIterator(Merge(i1, i2, i3))
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
func BenchmarkIntersect(t *testing.B) {
var a, b, c, d []DocID
for i := 0; i < 10000000; i += 2 {
a = append(a, DocID(i))
}
for i := 5000000; i < 5000100; i += 4 {
b = append(b, DocID(i))
}
for i := 5090000; i < 5090600; i += 4 {
b = append(b, DocID(i))
}
for i := 4990000; i < 5100000; i++ {
c = append(c, DocID(i))
}
for i := 4000000; i < 6000000; i++ {
d = append(d, DocID(i))
}
i1 := newPlainListIterator(a)
i2 := newPlainListIterator(b)
i3 := newPlainListIterator(c)
i4 := newPlainListIterator(d)
t.ResetTimer()
for i := 0; i < t.N; i++ {
if _, err := ExpandIterator(Intersect(i1, i2, i3, i4)); err != nil {
t.Fatal(err)
}
}
}
func TestMergeIterator(t *testing.T) {
var cases = []struct {
a, b []DocID
res []DocID
}{
{
a: []DocID{1, 2, 3, 4, 5},
b: []DocID{6, 7, 8, 9, 10},
res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
},
{
a: []DocID{1, 2, 3, 4, 5},
b: []DocID{4, 5, 6, 7, 8},
res: []DocID{1, 2, 3, 4, 5, 6, 7, 8},
},
{
a: []DocID{1, 2, 3, 4, 9, 10},
b: []DocID{1, 4, 5, 6, 7, 8, 10, 11},
res: []DocID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
}
for _, c := range cases {
i1 := newPlainListIterator(c.a)
i2 := newPlainListIterator(c.b)
res, err := ExpandIterator(&mergeIterator{i1: i1, i2: i2})
if err != nil {
t.Fatalf("Unexpected error: %s", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
func TestSkippingIterator(t *testing.T) {
var cases = []struct {
skiplist skiplistIterator
its iteratorStore
res []DocID
}{
{
skiplist: newPlainSkiplistIterator(map[DocID]uint64{
5: 3,
50: 2,
500: 1,
}),
its: testIteratorStore{
3: newPlainListIterator(list{5, 7, 8, 9}),
2: newPlainListIterator(list{54, 60, 61}),
1: newPlainListIterator(list{1200, 1300, 100000}),
},
res: []DocID{5, 7, 8, 9, 54, 60, 61, 1200, 1300, 100000},
},
{
skiplist: newPlainSkiplistIterator(map[DocID]uint64{
0: 3,
50: 2,
}),
its: testIteratorStore{
3: newPlainListIterator(list{5, 7, 8, 9}),
2: newPlainListIterator(list{54, 60, 61}),
},
res: []DocID{5, 7, 8, 9, 54, 60, 61},
},
}
for _, c := range cases {
it := &skippingIterator{
skiplist: c.skiplist,
iterators: c.its,
}
res, err := ExpandIterator(it)
if err != nil {
t.Fatalf("Unexpected error", err)
}
if !reflect.DeepEqual(res, c.res) {
t.Fatalf("Expected %v but got %v", c.res, res)
}
}
}
type testIteratorStore map[uint64]Iterator
func (s testIteratorStore) get(id uint64) (Iterator, error) {
it, ok := s[id]
if !ok {
return nil, errNotFound
}
return it, nil
}

View File

@ -1,108 +0,0 @@
package index
import (
"encoding/binary"
"errors"
"io"
)
const pageSize = 2048
var errPageFull = errors.New("page full")
type pageCursor interface {
Iterator
append(v DocID) error
}
type page interface {
cursor() pageCursor
init(v DocID) error
data() []byte
}
type pageDelta struct {
b []byte
}
type pageType uint8
const (
pageTypeDelta pageType = iota
)
func newPageDelta(data []byte) *pageDelta {
return &pageDelta{b: data}
}
func (p *pageDelta) init(v DocID) error {
// Write first value.
binary.PutUvarint(p.b, uint64(v))
return nil
}
func (p *pageDelta) cursor() pageCursor {
return &pageDeltaCursor{data: p.b}
}
func (p *pageDelta) data() []byte {
return p.b
}
type pageDeltaCursor struct {
data []byte
pos int
cur DocID
}
func (p *pageDeltaCursor) append(id DocID) error {
// Run to the end.
_, err := p.Next()
for ; err == nil; _, err = p.Next() {
// Consume.
}
if err != io.EOF {
return err
}
if len(p.data)-p.pos < binary.MaxVarintLen64 {
return errPageFull
}
if p.cur >= id {
return errOutOfOrder
}
p.pos += binary.PutUvarint(p.data[p.pos:], uint64(id-p.cur))
p.cur = id
return nil
}
func (p *pageDeltaCursor) Close() error {
return nil
}
func (p *pageDeltaCursor) Seek(min DocID) (v DocID, err error) {
if min < p.cur {
p.pos = 0
}
for v, err = p.Next(); err == nil && v < min; v, err = p.Next() {
// Consume.
}
return p.cur, err
}
func (p *pageDeltaCursor) Next() (DocID, error) {
var n int
var dv uint64
if p.pos == 0 {
dv, n = binary.Uvarint(p.data)
p.cur = DocID(dv)
} else {
dv, n = binary.Uvarint(p.data[p.pos:])
if n <= 0 || dv == 0 {
return 0, io.EOF
}
p.cur += DocID(dv)
}
p.pos += n
return p.cur, nil
}

View File

@ -1,116 +0,0 @@
package index
import (
"math/rand"
"reflect"
"testing"
)
func TestPageDelta(t *testing.T) {
var (
vals []DocID
last DocID
)
for i := 0; i < 10000; i++ {
vals = append(vals, last)
last += DocID(rand.Int63n(1<<9) + 1)
}
data := make([]byte, pageSize)
page := newPageDelta(data)
if err := page.init(vals[0]); err != nil {
t.Fatal(err)
}
var num int
pc := page.cursor()
for _, v := range vals[1:] {
if err := pc.append(v); err != nil {
if err == errPageFull {
break
}
t.Fatal(err)
}
num++
}
res, err := ExpandIterator(pc)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(res, vals[:num+1]) {
t.Errorf("output did not match")
t.Errorf("expected: %v", vals[:num+1])
t.Errorf("received: %v", res)
}
}
func BenchmarkPageDeltaAppend(b *testing.B) {
var (
vals []DocID
last DocID
)
for i := 0; i < 10000; i++ {
vals = append(vals, last)
last += DocID(rand.Int63n(1<<10) + 1)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
data := make([]byte, pageSize)
page := newPageDelta(data)
if err := page.init(vals[0]); err != nil {
b.Fatal(err)
}
pc := page.cursor()
for _, v := range vals[1:] {
if err := pc.append(v); err != nil {
if err == errPageFull {
break
}
b.Fatal(err)
}
}
}
}
func BenchmarkPageDeltaRead(b *testing.B) {
var (
vals []DocID
last DocID
)
for i := 0; i < 10000; i++ {
vals = append(vals, last)
last += DocID(rand.Int63n(1<<10) + 1)
}
data := make([]byte, pageSize)
page := newPageDelta(data)
if err := page.init(vals[0]); err != nil {
b.Fatal(err)
}
pc := page.cursor()
for _, v := range vals[1:] {
if err := pc.append(v); err != nil {
if err == errPageFull {
break
}
b.Fatal(err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := ExpandIterator(pc); err != nil {
b.Fatal(err)
}
}
}

View File

@ -1,72 +0,0 @@
package index
import (
"io"
"github.com/boltdb/bolt"
)
type iteratorStoreFunc func(k uint64) (Iterator, error)
func (s iteratorStoreFunc) get(k uint64) (Iterator, error) {
return s(k)
}
// boltSkiplistCursor implements the skiplistCurosr interface.
//
// TODO(fabxc): benchmark the overhead of a bucket per key.
// It might be more performant to have all skiplists in the same bucket.
//
// 20k keys, ~10 skiplist entries avg -> 200k keys, 1 bucket vs 20k buckets, 10 keys
//
type boltSkiplistCursor struct {
// k is currently unused. If the bucket holds entries for more than
// just a single key, it will be necessary.
k uint64
c *bolt.Cursor
bkt *bolt.Bucket
}
func (s *boltSkiplistCursor) next() (DocID, uint64, error) {
db, pb := s.c.Next()
if db == nil {
return 0, 0, io.EOF
}
return newDocID(db), decodeUint64(pb), nil
}
func (s *boltSkiplistCursor) seek(k DocID) (DocID, uint64, error) {
db, pb := s.c.Seek(k.bytes())
if db == nil {
db, pb = s.c.Last()
if db == nil {
return 0, 0, io.EOF
}
}
did, pid := newDocID(db), decodeUint64(pb)
if did > k {
// If the found entry is behind the seeked ID, try the previous
// entry if it exists. The page it points to contains the range of k.
dbp, pbp := s.c.Prev()
if dbp != nil {
did, pid = newDocID(dbp), decodeUint64(pbp)
} else {
// We skipped before the first entry. The cursor is now out of
// state and subsequent calls to Next() will return nothing.
// Reset it to the first position.
s.c.First()
}
}
return did, pid, nil
}
func (s *boltSkiplistCursor) append(d DocID, p uint64) error {
k, _ := s.c.Last()
if k != nil && decodeUint64(k) >= uint64(d) {
return errOutOfOrder
}
return s.bkt.Put(encodeUint64(uint64(d)), encodeUint64(p))
}

View File

@ -1,5 +0,0 @@
# pages
Pages stores pages of blob data. It is essentially a minimal version of
BoltDB, where the the B+ tree was removed and replaced by simply writing
page-aligned byte slices.

View File

@ -1,782 +0,0 @@
package pages
import (
"errors"
"fmt"
"hash/fnv"
"math"
"os"
"runtime"
"sync"
"time"
"unsafe"
)
// These errors can be returned when opening or calling methods on a DB.
var (
// ErrDatabaseNotOpen is returned when a DB instance is accessed before it
// is opened or after it is closed.
ErrDatabaseNotOpen = errors.New("database not open")
// ErrDatabaseOpen is returned when opening a database that is
// already open.
ErrDatabaseOpen = errors.New("database already open")
// ErrInvalid is returned when both meta pages on a database are invalid.
// This typically occurs when a file is not a bolt database.
ErrInvalid = errors.New("invalid database")
// ErrVersionMismatch is returned when the data file was created with a
// different version of Bolt.
ErrVersionMismatch = errors.New("version mismatch")
// ErrChecksum is returned when either meta page checksum does not match.
ErrChecksum = errors.New("checksum error")
// ErrTimeout is returned when a database cannot obtain an exclusive lock
// on the data file after the timeout passed to Open().
ErrTimeout = errors.New("timeout")
// ErrNotFound is returned when a user page for an ID could not be found.
ErrNotFound = errors.New("not found")
ErrTxClosed = errors.New("transaction closed")
ErrTxNotWritable = errors.New("transaction not writable")
)
// Marker value that indicates that a file is a pagebuf file.
const magic uint32 = 0xAFFEAFFE
// The data file version.
const version = 1
// The largest step that can be taken when remapping the mmap.
const maxMmapStep = 1 << 30 // 1GB
// defaultPageSize of the underlying buffers is set to the OS page size.
var defaultPageSize = os.Getpagesize()
// DB is an interface providing access to persistent byte chunks that
// are backed by memory-mapped pages.
type DB struct {
// If you want to read the entire database fast, you can set MmapFlag to
// syscall.MAP_POPULATE on Linux 2.6.23+ for sequential read-ahead.
MmapFlags int
// AllocSize is the amount of space allocated when the database
// needs to create new pages. This is done to amortize the cost
// of truncate() and fsync() when growing the data file.
AllocSize int
path string // location of the pagebuf file
file *os.File // the opened file of path
opened bool
data *[maxMapSize]byte
dataref []byte // mmap'ed readonly, write throws SEGV
datasz int
filesz int // current on disk file size
pageSize int
meta0 *meta
meta1 *meta
freelist *freelist
rwtx *Tx
txs []*Tx
pagePool sync.Pool
rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping
ops struct {
writeAt func(b []byte, off int64) (n int, err error)
}
}
// Options defines configuration parameters with which a PageBuf is initialized.
type Options struct {
// Timeout is the amount of time to wait to obtain a file lock.
// When set to zero it will wait indefinitely. This option is only
// available on Darwin and Linux.
Timeout time.Duration
// Sets the DB.MmapFlags flag before memory mapping the file.
MmapFlags int
// XXX(fabxc): potentially allow setting different allocation strategies
// to fit different use cases.
// InitialMmapSize is the initial mmap size of the database
// in bytes.
//
// If <=0, the initial map size is 0.
// If initialMmapSize is smaller than the previous database size,
// it takes no effect.
InitialMmapSize int
// PageSize defines a custom page size used. It cannot be changed later.
// Must be a multiple of the operating system's default page size.
PageSize int
}
// DefaultOptions specifies a set of default parameters used when a pagebuf
// is opened without explicit options.
var DefaultOptions = Options{
// Use the OS's default page size.
PageSize: defaultPageSize,
}
// Default values if not set in a DB instance.
const (
DefaultAllocSize = 16 * 1024 * 1024
)
// Open and create a new database under the given path.
func Open(path string, mode os.FileMode, o *Options) (*DB, error) {
db := &DB{
opened: true,
}
// Set default options if no options are provided.
if o == nil {
o = &DefaultOptions
}
db.MmapFlags = o.MmapFlags
db.AllocSize = DefaultAllocSize
flag := os.O_RDWR
// Open data file and separate sync handler for metadata writes.
db.path = path
var err error
if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
_ = db.close()
return nil, err
}
// Lock file so that other processes using pagebuf in read-write mode cannot
// use the underlying data at the same time.
if err := flock(db, mode, true, o.Timeout); err != nil {
_ = db.close()
return nil, err
}
// Default values for test hooks
db.ops.writeAt = db.file.WriteAt
// Initialize the database if it doesn't exist.
if info, err := db.file.Stat(); err != nil {
return nil, err
} else if info.Size() == 0 {
// Initialize new files with meta pages.
if err := db.init(o.PageSize); err != nil {
return nil, err
}
} else {
// Read the first meta page to determine the page size.
var buf [0x1000]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
// We cannot verify which page sizes are used.
return nil, fmt.Errorf("cannot read page size: %s", err)
} else {
db.pageSize = int(m.pageSize)
}
} else {
return nil, fmt.Errorf("reading first meta page failed: %s", err)
}
}
// Initialize page pool.
db.pagePool = sync.Pool{
New: func() interface{} {
return make([]byte, db.pageSize)
},
}
// Memory map the data file.
if err := db.mmap(o.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// Read in the freelist.
db.freelist = newFreelist()
db.freelist.read(db.page(db.meta().freelist))
// Mark the database as opened and return.
return db, nil
}
func validatePageSize(psz int) error {
// Max value the content length can hold.
if defaultPageSize > math.MaxUint16 {
return fmt.Errorf("invalid page size %d", psz)
}
// Page size must be a multiple of OS page size so we stay
// page aligned.
if psz < defaultPageSize {
if defaultPageSize%psz != 0 {
return fmt.Errorf("invalid page size %d", psz)
}
} else if psz > defaultPageSize {
if psz%defaultPageSize != 0 {
return fmt.Errorf("invalid page size %d", psz)
}
}
return nil
}
// init creates a new database file and initializes its meta pages.
func (db *DB) init(psz int) error {
if err := validatePageSize(psz); err != nil {
return err
}
// Set the page size to the OS page size.
db.pageSize = psz
// Create two meta pages on a buffer.
buf := make([]byte, db.pageSize*4)
for i := 0; i < 2; i++ {
p := db.pageInBuffer(buf[:], pgid(i))
p.id = pgid(i)
p.flags = pageFlagMeta
// Initialize the meta page.
m := p.meta()
m.magic = magic
m.version = version
m.pageSize = uint32(db.pageSize)
m.freelist = 2
m.txid = txid(i)
m.pgid = 4 // TODO(fabxc): we initialize with zero pages, what to do here?
m.checksum = m.sum64()
}
// Write an empty freelist at page 3.
p := db.pageInBuffer(buf[:], pgid(2))
p.id = pgid(2)
p.flags = pageFlagFreelist
p.count = 0
// Write the first empty page.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = pageFlagData
p.count = 0
// Write the buffer to our data file.
if _, err := db.ops.writeAt(buf, 0); err != nil {
return err
}
if err := fdatasync(db); err != nil {
return err
}
return nil
}
// Sync executes fdatasync() against the database file handle.
func (db *DB) Sync() error { return fdatasync(db) }
// Close synchronizes and closes the memory-mapped pagebuf file.
func (db *DB) Close() error {
db.rwlock.Lock()
defer db.rwlock.Unlock()
db.metalock.Lock()
defer db.metalock.Unlock()
db.mmaplock.RLock()
defer db.mmaplock.RUnlock()
return db.close()
}
func (db *DB) close() error {
if !db.opened {
return nil
}
db.opened = false
db.freelist = nil
db.ops.writeAt = nil
// Close the mmap.
if err := db.munmap(); err != nil {
return err
}
// Close file handles.
if db.file != nil {
// Close the file descriptor.
if err := db.file.Close(); err != nil {
return fmt.Errorf("db file close: %s", err)
}
db.file = nil
}
db.path = ""
return nil
}
// Update executes a function within the context of a read-write managed transaction.
// If no error is returned from the function then the transaction is committed.
// If an error is returned then the entire transaction is rolled back.
// Any error that is returned from the function or returned from the commit is
// returned from the Update() method.
//
// Attempting to manually commit or rollback within the function will cause a panic.
func (db *DB) Update(fn func(*Tx) error) error {
t, err := db.Begin(true)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually commit.
t.managed = true
// If an error is returned from the function then rollback and return error.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
return t.Commit()
}
// View executes a function within the context of a managed read-only transaction.
// Any error that is returned from the function is returned from the View() method.
//
// Attempting to manually rollback within the function will cause a panic.
func (db *DB) View(fn func(*Tx) error) error {
t, err := db.Begin(false)
if err != nil {
return err
}
// Make sure the transaction rolls back in the event of a panic.
defer func() {
if t.db != nil {
t.rollback()
}
}()
// Mark as a managed tx so that the inner function cannot manually rollback.
t.managed = true
// If an error is returned from the function then pass it through.
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
if err := t.Rollback(); err != nil {
return err
}
return nil
}
// pageExists checks whether the page with the given id exists.
func (db *DB) pageExists(id pgid) bool {
// The page exists if it is not in the freelist or out of the data range.
return !db.freelist.cache[pgid(id)] && int(id+1)*db.pageSize < db.datasz
}
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id pgid) *page {
pos := id * pgid(db.pageSize)
return (*page)(unsafe.Pointer(&db.data[pos]))
}
// pageInBuffer retrieves a page reference from a given byte array based on the current
// page size.
func (db *DB) pageInBuffer(b []byte, id pgid) *page {
pos := id * pgid(db.pageSize)
return (*page)(unsafe.Pointer(&b[pos]))
}
// meta retrieves the current meta page reference.
func (db *DB) meta() *meta {
// We have to return the meta with the highest txid which doesn't fail
// validation. Otherwise, we can cause errors when in fact the database is
// in a consistent state. metaA is the one with the higher txid.
metaA := db.meta0
metaB := db.meta1
if db.meta1.txid > db.meta0.txid {
metaA = db.meta1
metaB = db.meta0
}
// Use higher meta page if valid. Otherwise fallback to previous, if valid.
if err := metaA.validate(); err == nil {
return metaA
} else if err := metaB.validate(); err == nil {
return metaB
}
// This should never be reached, because both meta1 and meta0 were validated
// on mmap() and we do fsync() on every write.
panic("pagebuf.PageBuf.meta(): invalid meta pages")
}
// allocate returns a contiguous block of memory starting at a given page.
func (db *DB) allocate(count int) (*page, error) {
// Allocate a temporary buffer for the page.
var buf []byte
if count == 1 {
buf = db.pagePool.Get().([]byte)
} else {
buf = make([]byte, count*db.pageSize)
}
p := (*page)(unsafe.Pointer(&buf[0]))
p.overflow = uint32(count - 1)
// Use pages from the freelist if they are available.
if p.id = db.freelist.allocate(count); p.id != 0 {
return p, nil
}
// Resize mmap() if we're at the end.
p.id = db.rwtx.meta.pgid
var minsz = int((p.id+pgid(count))+1) * db.pageSize
if minsz >= db.datasz {
if err := db.mmap(minsz); err != nil {
return nil, fmt.Errorf("mmap allocate error: %s", err)
}
}
// Move the page id high water mark.
db.rwtx.meta.pgid += pgid(count)
return p, nil
}
// grow grows the size of the database to the given sz.
func (db *DB) grow(sz int) error {
// Ignore if the new size is less than available file size.
if sz <= db.filesz {
return nil
}
// If the data is smaller than the alloc size then only allocate what's needed.
// Once it goes over the allocation size then allocate in chunks.
if db.datasz < db.AllocSize {
sz = db.datasz
} else {
sz += db.AllocSize
}
// Truncate and fsync to ensure file size metadata is flushed.
// https://github.com/boltdb/bolt/issues/284
if runtime.GOOS != "windows" {
if err := db.file.Truncate(int64(sz)); err != nil {
return fmt.Errorf("file resize error: %s", err)
}
}
if err := db.file.Sync(); err != nil {
return fmt.Errorf("file sync error: %s", err)
}
db.filesz = sz
return nil
}
// mmap opens the underlying memory-mapped file and initializes it.
// minsz is the minimum size that the mmap can be.
func (db *DB) mmap(minsz int) error {
db.mmaplock.Lock()
defer db.mmaplock.Unlock()
info, err := db.file.Stat()
if err != nil {
return fmt.Errorf("mmap stat error: %s", err)
} else if int(info.Size()) < db.pageSize*2 {
return fmt.Errorf("file size too small")
}
// Ensure the size is at least the minimum size.
var size = int(info.Size())
if size < minsz {
size = minsz
}
size, err = db.mmapSize(size)
if err != nil {
return err
}
// Unmap existing data before continuing.
if err := db.munmap(); err != nil {
return err
}
// Memory-map the data file as a byte slice.
if err := mmap(db, size); err != nil {
return err
}
// Save references to the meta pages.
db.meta0 = db.page(0).meta()
db.meta1 = db.page(1).meta()
// Validate the meta pages. We only return an error if both meta pages fail
// validation, since meta0 failing validation means that it wasn't saved
// properly -- but we can recover using meta1. And vice-versa.
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}
return nil
}
// munmap unmaps the data file from memory.
func (db *DB) munmap() error {
if err := munmap(db); err != nil {
return fmt.Errorf("unmap error: %s", err)
}
return nil
}
// mmapSize determines the appropriate size for the mmap given the current size
// of the database. The minimum size is 32KB and doubles until it reaches 1GB.
// Returns an error if the new mmap size is greater than the max allowed.
func (db *DB) mmapSize(size int) (int, error) {
// Double the size from 32KB until 1GB.
for i := uint(15); i <= 30; i++ {
if size <= 1<<i {
return 1 << i, nil
}
}
// Verify the requested size is not above the maximum allowed.
if size > maxMapSize {
return 0, fmt.Errorf("mmap too large")
}
// If larger than 1GB then grow by 1GB at a time.
sz := int64(size)
if remainder := sz % int64(maxMmapStep); remainder > 0 {
sz += int64(maxMmapStep) - remainder
}
// Ensure that the mmap size is a multiple of the page size.
// This should always be true since we're incrementing in MBs.
pageSize := int64(db.pageSize)
if (sz % pageSize) != 0 {
sz = ((sz / pageSize) + 1) * pageSize
}
// If we've exceeded the max size then only grow up to the max size.
if sz > maxMapSize {
sz = maxMapSize
}
return int(sz), nil
}
func (db *DB) String() string {
return fmt.Sprintf("PageBuf<%s>", db.path)
}
// Path returns the path to the currently opened pagebuf file.
func (db *DB) Path() string {
return db.path
}
// Begin starts a new transaction.
// Multiple read-only transactions can be used concurrently but only one
// write transaction can be used at a time. Starting multiple write transactions
// will cause the calls to block and be serialized until the current write
// transaction finishes.
//
// Transactions should not be dependent on one another. Opening a read
// transaction and a write transaction in the same goroutine can cause the
// writer to deadlock because the database periodically needs to re-mmap itself
// as it grows and it cannot do that while a read transaction is open.
//
// If a long running read transaction (for example, a snapshot transaction) is
// needed, you might want to set PageBuf.InitialMmapSize to a large enough value
// to avoid potential blocking of write transaction.
//
// IMPORTANT: You must close read-only transactions after you are finished or
// else the database will not reclaim old pages.
func (db *DB) Begin(writable bool) (*Tx, error) {
if writable {
return db.beginRWTx()
}
return db.beginTx()
}
func (db *DB) beginTx() (*Tx, error) {
// Lock the meta pages while we initialize the transaction. We obtain
// the meta lock before the mmap lock because that's the order that the
// write transaction will obtain them.
db.metalock.Lock()
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()
// Exit if the database is not open yet.
if !db.opened {
db.mmaplock.RUnlock()
db.metalock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{}
t.init(db)
// Keep track of transaction until it closes.
db.txs = append(db.txs, t)
// Unlock the meta pages.
db.metalock.Unlock()
return t, nil
}
func (db *DB) beginRWTx() (*Tx, error) {
// Obtain writer lock. This is released by the transaction when it closes.
// This enforces only one writer transaction at a time.
db.rwlock.Lock()
// Once we have the writer lock then we can lock the meta pages so that
// we can set up the transaction.
db.metalock.Lock()
defer db.metalock.Unlock()
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
return nil, ErrDatabaseNotOpen
}
// Create a transaction associated with the database.
t := &Tx{writable: true}
t.init(db)
db.rwtx = t
// Free any pages associated with closed read-only transactions.
var minid txid = 0xFFFFFFFFFFFFFFFF
for _, t := range db.txs {
if t.meta.txid < minid {
minid = t.meta.txid
}
}
if minid > 0 {
db.freelist.release(minid - 1)
}
return t, nil
}
// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
// Release the read lock on the mmap.
db.mmaplock.RUnlock()
// Use the meta lock to restrict access to the DB object.
db.metalock.Lock()
// Remove the transaction.
for i, t := range db.txs {
if t == tx {
db.txs = append(db.txs[:i], db.txs[i+1:]...)
break
}
}
// Unlock the meta pages.
db.metalock.Unlock()
}
// Size represents a valid page size.
type Size int8
// The valid sizes for allocated pages.
const (
Size512 Size = -3
Size1024 = -2
Size2048 = -1
Size4096 = 0
Size8192 = 1
)
const (
upageSizeMin = Size512
upageSizeMax = Size8192
)
type meta struct {
magic uint32
version uint32
pageSize uint32
flags uint32
freelist pgid
txid txid
pgid pgid
checksum uint64
}
// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
func (m *meta) validate() error {
if m.magic != magic {
return ErrInvalid
} else if m.version != version {
return ErrVersionMismatch
} else if m.checksum != 0 && m.checksum != m.sum64() {
return ErrChecksum
}
return nil
}
// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
*dest = *m
}
// write writes the meta onto a page.
func (m *meta) write(p *page) {
if m.freelist >= m.pgid {
panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid))
}
// Page id is either going to be 0 or 1 which we can determine by the transaction ID.
p.id = pgid(m.txid % 2)
p.flags |= pageFlagMeta
// Calculate the checksum.
m.checksum = m.sum64()
m.copy(p.meta())
}
// generates the checksum for the meta.
func (m *meta) sum64() uint64 {
var h = fnv.New64a()
_, _ = h.Write((*[unsafe.Offsetof(meta{}.checksum)]byte)(unsafe.Pointer(m))[:])
return h.Sum64()
}
// _assert will panic with a given formatted message if the given condition is false.
func _assert(condition bool, msg string, v ...interface{}) {
if !condition {
panic(fmt.Sprintf("assertion failed: "+msg, v...))
}
}

View File

@ -1,248 +0,0 @@
package pages
import (
"fmt"
"sort"
"unsafe"
)
// freelist represents a list of all pages that are available for allocation.
// It also tracks pages that have been freed but are still in use by open transactions.
type freelist struct {
ids []pgid // all free and available free page ids.
pending map[txid][]pgid // mapping of soon-to-be free page ids by tx.
cache map[pgid]bool // fast lookup of all free and pending page ids.
}
// newFreelist returns an empty, initialized freelist.
func newFreelist() *freelist {
return &freelist{
pending: make(map[txid][]pgid),
cache: make(map[pgid]bool),
}
}
// size returns the size of the page after serialization.
func (f *freelist) size() int {
return PageHeaderSize + (int(unsafe.Sizeof(pgid(0))) * f.count())
}
// count returns count of pages on the freelist
func (f *freelist) count() int {
return f.free_count() + f.pending_count()
}
// free_count returns count of free pages
func (f *freelist) free_count() int {
return len(f.ids)
}
// pending_count returns count of pending pages
func (f *freelist) pending_count() int {
var count int
for _, list := range f.pending {
count += len(list)
}
return count
}
// all returns a list of all free ids and all pending ids in one sorted list.
func (f *freelist) all() []pgid {
m := make(pgids, 0)
for _, list := range f.pending {
m = append(m, list...)
}
sort.Sort(m)
return pgids(f.ids).merge(m)
}
// allocate returns the starting page id of a contiguous list of pages of a given size.
// If a contiguous block cannot be found then 0 is returned.
func (f *freelist) allocate(n int) pgid {
if len(f.ids) == 0 {
return 0
}
var initial, previd pgid
for i, id := range f.ids {
if id <= 1 {
panic(fmt.Sprintf("invalid page allocation: %d", id))
}
// Reset initial page if this is not contiguous.
if previd == 0 || id-previd != 1 {
initial = id
}
// If we found a contiguous block then remove it and return it.
if (id-initial)+1 == pgid(n) {
// If we're allocating off the beginning then take the fast path
// and just adjust the existing slice. This will use extra memory
// temporarily but the append() in free() will realloc the slice
// as is necessary.
if (i + 1) == n {
f.ids = f.ids[i+1:]
} else {
copy(f.ids[i-n+1:], f.ids[i+1:])
f.ids = f.ids[:len(f.ids)-n]
}
// Remove from the free cache.
for i := pgid(0); i < pgid(n); i++ {
delete(f.cache, initial+i)
}
return initial
}
previd = id
}
return 0
}
// free releases a page and its overflow for a given transaction id.
// If the page is already free then a panic will occur.
func (f *freelist) free(txid txid, p *page) {
if p.id <= 1 {
panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
}
// Free page and all its overflow pages.
var ids = f.pending[txid]
for id := p.id; id <= p.id+pgid(p.overflow); id++ {
// Verify that page is not already free.
if f.cache[id] {
panic(fmt.Sprintf("page %d already freed", id))
}
// Add to the freelist and cache.
ids = append(ids, id)
f.cache[id] = true
}
f.pending[txid] = ids
}
// release moves all page ids for a transaction id (or older) to the freelist.
func (f *freelist) release(txid txid) {
m := make(pgids, 0)
for tid, ids := range f.pending {
if tid <= txid {
// Move transaction's pending pages to the available freelist.
// Don't remove from the cache since the page is still free.
m = append(m, ids...)
delete(f.pending, tid)
}
}
sort.Sort(m)
f.ids = pgids(f.ids).merge(m)
}
// rollback removes the pages from a given pending tx.
func (f *freelist) rollback(txid txid) {
// Remove page ids from cache.
for _, id := range f.pending[txid] {
delete(f.cache, id)
}
// Remove pages from pending list.
delete(f.pending, txid)
}
// freed returns whether a given page is in the free list.
func (f *freelist) freed(pgid pgid) bool {
return f.cache[pgid]
}
// read initializes the freelist from a freelist page.
func (f *freelist) read(p *page) {
// If the page.count is at the max uint16 value (64k) then it's considered
// an overflow and the size of the freelist is stored as the first element.
idx, count := 0, int(p.count)
if count == 0xFFFF {
idx = 1
count = int(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0])
}
// Copy the list of page ids from the freelist.
if count == 0 {
f.ids = nil
} else {
ids := ((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[idx:count]
f.ids = make([]pgid, len(ids))
copy(f.ids, ids)
// Make sure they're sorted.
sort.Sort(pgids(f.ids))
}
// Rebuild the page cache.
f.reindex()
}
// write writes the page ids onto a freelist page. All free and pending ids are
// saved to disk since in the event of a program crash, all pending ids will
// become free.
func (f *freelist) write(p *page) error {
// Combine the old free pgids and pgids waiting on an open transaction.
ids := f.all()
// Update the header flag.
p.flags |= pageFlagFreelist
// The page.count can only hold up to 64k elements so if we overflow that
// number then we handle it by putting the size in the first element.
if len(ids) == 0 {
p.count = uint16(len(ids))
} else if len(ids) < 0xFFFF {
p.count = uint16(len(ids))
copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[:], ids)
} else {
p.count = 0xFFFF
((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[0] = pgid(len(ids))
copy(((*[maxAllocSize]pgid)(unsafe.Pointer(&p.ptr)))[1:], ids)
}
return nil
}
// reload reads the freelist from a page and filters out pending items.
func (f *freelist) reload(p *page) {
f.read(p)
// Build a cache of only pending pages.
pcache := make(map[pgid]bool)
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
pcache[pendingID] = true
}
}
// Check each page in the freelist and build a new available freelist
// with any pages not in the pending lists.
var a []pgid
for _, id := range f.ids {
if !pcache[id] {
a = append(a, id)
}
}
f.ids = a
// Once the available list is rebuilt then rebuild the free cache so that
// it includes the available and pending free pages.
f.reindex()
}
// reindex rebuilds the free cache based on available and pending free lists.
func (f *freelist) reindex() {
f.cache = make(map[pgid]bool)
for _, id := range f.ids {
f.cache[id] = true
}
for _, pendingIDs := range f.pending {
for _, pendingID := range pendingIDs {
f.cache[pendingID] = true
}
}
}

View File

@ -1,158 +0,0 @@
package pages
import (
"math/rand"
"reflect"
"sort"
"testing"
"unsafe"
)
// Ensure that a page is added to a transaction's freelist.
func TestFreelist_free(t *testing.T) {
f := newFreelist()
f.free(100, &page{id: 12})
if !reflect.DeepEqual([]pgid{12}, f.pending[100]) {
t.Fatalf("exp=%v; got=%v", []pgid{12}, f.pending[100])
}
}
// Ensure that a page and its overflow is added to a transaction's freelist.
func TestFreelist_free_overflow(t *testing.T) {
f := newFreelist()
f.free(100, &page{id: 12, overflow: 3})
if exp := []pgid{12, 13, 14, 15}; !reflect.DeepEqual(exp, f.pending[100]) {
t.Fatalf("exp=%v; got=%v", exp, f.pending[100])
}
}
// Ensure that a transaction's free pages can be released.
func TestFreelist_release(t *testing.T) {
f := newFreelist()
f.free(100, &page{id: 12, overflow: 1})
f.free(100, &page{id: 9})
f.free(102, &page{id: 39})
f.release(100)
f.release(101)
if exp := []pgid{9, 12, 13}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}
f.release(102)
if exp := []pgid{9, 12, 13, 39}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}
}
// Ensure that a freelist can find contiguous blocks of pages.
func TestFreelist_allocate(t *testing.T) {
f := &freelist{ids: []pgid{3, 4, 5, 6, 7, 9, 12, 13, 18}}
if id := int(f.allocate(3)); id != 3 {
t.Fatalf("exp=3; got=%v", id)
}
if id := int(f.allocate(1)); id != 6 {
t.Fatalf("exp=6; got=%v", id)
}
if id := int(f.allocate(3)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if id := int(f.allocate(2)); id != 12 {
t.Fatalf("exp=12; got=%v", id)
}
if id := int(f.allocate(1)); id != 7 {
t.Fatalf("exp=7; got=%v", id)
}
if id := int(f.allocate(0)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if id := int(f.allocate(0)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := []pgid{9, 18}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}
if id := int(f.allocate(1)); id != 9 {
t.Fatalf("exp=9; got=%v", id)
}
if id := int(f.allocate(1)); id != 18 {
t.Fatalf("exp=18; got=%v", id)
}
if id := int(f.allocate(1)); id != 0 {
t.Fatalf("exp=0; got=%v", id)
}
if exp := []pgid{}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}
}
// Ensure that a freelist can deserialize from a freelist page.
func TestFreelist_read(t *testing.T) {
// Create a page.
var buf [4096]byte
page := (*page)(unsafe.Pointer(&buf[0]))
page.flags = pageFlagFreelist
page.count = 2
// Insert 2 page ids.
ids := (*[3]pgid)(unsafe.Pointer(&page.ptr))
ids[0] = 23
ids[1] = 50
// Deserialize page into a freelist.
f := newFreelist()
f.read(page)
// Ensure that there are two page ids in the freelist.
if exp := []pgid{23, 50}; !reflect.DeepEqual(exp, f.ids) {
t.Fatalf("exp=%v; got=%v", exp, f.ids)
}
}
// Ensure that a freelist can serialize into a freelist page.
func TestFreelist_write(t *testing.T) {
// Create a freelist and write it to a page.
var buf [4096]byte
f := &freelist{ids: []pgid{12, 39}, pending: make(map[txid][]pgid)}
f.pending[100] = []pgid{28, 11}
f.pending[101] = []pgid{3}
p := (*page)(unsafe.Pointer(&buf[0]))
if err := f.write(p); err != nil {
t.Fatal(err)
}
// Read the page back out.
f2 := newFreelist()
f2.read(p)
// Ensure that the freelist is correct.
// All pages should be present and in reverse order.
if exp := []pgid{3, 11, 12, 28, 39}; !reflect.DeepEqual(exp, f2.ids) {
t.Fatalf("exp=%v; got=%v", exp, f2.ids)
}
}
func Benchmark_FreelistRelease10K(b *testing.B) { benchmark_FreelistRelease(b, 10000) }
func Benchmark_FreelistRelease100K(b *testing.B) { benchmark_FreelistRelease(b, 100000) }
func Benchmark_FreelistRelease1000K(b *testing.B) { benchmark_FreelistRelease(b, 1000000) }
func Benchmark_FreelistRelease10000K(b *testing.B) { benchmark_FreelistRelease(b, 10000000) }
func benchmark_FreelistRelease(b *testing.B, size int) {
ids := randomPgids(size)
pending := randomPgids(len(ids) / 400)
b.ResetTimer()
for i := 0; i < b.N; i++ {
f := &freelist{ids: ids, pending: map[txid][]pgid{1: pending}}
f.release(1)
}
}
func randomPgids(n int) []pgid {
rand.Seed(42)
pgids := make(pgids, n)
for i := range pgids {
pgids[i] = pgid(rand.Int63())
}
sort.Sort(pgids)
return pgids
}

View File

@ -1,103 +0,0 @@
package pages
import (
"fmt"
"os"
"sort"
"unsafe"
)
const PageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))
const (
// pageFlag{head,tail,body}?
pageFlagMeta = 0x02
pageFlagFreelist = 0x04
pageFlagData = 0x08
)
type pgid uint64
type page struct {
id pgid
flags uint16
count uint16
overflow uint32
ptr uintptr
}
// typ returns a human readable page type string.
func (p *page) typ() string {
if (p.flags & pageFlagMeta) != 0 {
return "meta"
} else if (p.flags & pageFlagFreelist) != 0 {
return "freelist"
} else if (p.flags & pageFlagData) != 0 {
return "data"
}
return fmt.Sprintf("unknown<%02x>", p.flags)
}
func (p *page) String() string {
return fmt.Sprintf("page<%s,%016x>", p.typ(), p.id)
}
// meta returns a pointer to the metadata section of a page.
func (p *page) meta() *meta {
return (*meta)(unsafe.Pointer(&p.ptr))
}
// dump writes n bytes of the page to STDERR as hex output.
func (p *page) hexdump(n int) {
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:n]
fmt.Fprintf(os.Stderr, "%x\n", buf)
}
type pages []*page
func (s pages) Len() int { return len(s) }
func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pages) Less(i, j int) bool { return s[i].id < s[j].id }
type pgids []pgid
func (s pgids) Len() int { return len(s) }
func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pgids) Less(i, j int) bool { return s[i] < s[j] }
// merge returns the sorted union of a and b.
func (a pgids) merge(b pgids) pgids {
// Return the opposite slice if one is nil.
if len(a) == 0 {
return b
} else if len(b) == 0 {
return a
}
// Create a list to hold all elements from both lists.
merged := make(pgids, 0, len(a)+len(b))
// Assign lead to the slice with a lower starting value, follow to the higher value.
lead, follow := a, b
if b[0] < a[0] {
lead, follow = b, a
}
// Continue while there are elements in the lead.
for len(lead) > 0 {
// Merge largest prefix of lead that is ahead of follow[0].
n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] })
merged = append(merged, lead[:n]...)
if n >= len(lead) {
break
}
// Swap lead and follow.
lead, follow = follow, lead[n:]
}
// Append what's left in follow.
merged = append(merged, follow...)
return merged
}

View File

@ -1,69 +0,0 @@
package pages
import (
"reflect"
"sort"
"testing"
"testing/quick"
)
// Ensure that the page type can be returned in human readable format.
func TestPage_typ(t *testing.T) {
if typ := (&page{flags: pageFlagData}).typ(); typ != "data" {
t.Fatalf("exp=branch; got=%v", typ)
}
if typ := (&page{flags: pageFlagMeta}).typ(); typ != "meta" {
t.Fatalf("exp=meta; got=%v", typ)
}
if typ := (&page{flags: pageFlagFreelist}).typ(); typ != "freelist" {
t.Fatalf("exp=freelist; got=%v", typ)
}
if typ := (&page{flags: 20000}).typ(); typ != "unknown<4e20>" {
t.Fatalf("exp=unknown<4e20>; got=%v", typ)
}
}
// Ensure that the hexdump debugging function doesn't blow up.
func TestPage_dump(t *testing.T) {
(&page{id: 256}).hexdump(16)
}
func TestPgids_merge(t *testing.T) {
a := pgids{4, 5, 6, 10, 11, 12, 13, 27}
b := pgids{1, 3, 8, 9, 25, 30}
c := a.merge(b)
if !reflect.DeepEqual(c, pgids{1, 3, 4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30}) {
t.Errorf("mismatch: %v", c)
}
a = pgids{4, 5, 6, 10, 11, 12, 13, 27, 35, 36}
b = pgids{8, 9, 25, 30}
c = a.merge(b)
if !reflect.DeepEqual(c, pgids{4, 5, 6, 8, 9, 10, 11, 12, 13, 25, 27, 30, 35, 36}) {
t.Errorf("mismatch: %v", c)
}
}
func TestPgids_merge_quick(t *testing.T) {
if err := quick.Check(func(a, b pgids) bool {
// Sort incoming lists.
sort.Sort(a)
sort.Sort(b)
// Merge the two lists together.
got := a.merge(b)
// The expected value should be the two lists combined and sorted.
exp := append(a, b...)
sort.Sort(exp)
if !reflect.DeepEqual(exp, got) {
t.Errorf("\nexp=%+v\ngot=%+v\n", exp, got)
return false
}
return true
}, nil); err != nil {
t.Fatal(err)
}
}

View File

@ -1,7 +0,0 @@
package pages
// maxMapSize represents the largest mmap size supported by pagebuf.
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
// maxAllocSize is the size used when creating array pointers.
const maxAllocSize = 0x7FFFFFFF

View File

@ -1,10 +0,0 @@
package pages
import (
"syscall"
)
// fdatasync flushes written data to a file descriptor.
func fdatasync(pb *Pagebuf) error {
return syscall.Fdatasync(int(pb.file.Fd()))
}

View File

@ -1,89 +0,0 @@
// +build !windows,!plan9,!solaris
package pages
import (
"fmt"
"os"
"syscall"
"time"
"unsafe"
)
// flock acquires an advisory lock on a file descriptor.
func flock(pb *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
var t time.Time
for {
// If we're beyond our timeout then return an error.
// This can only occur after we've attempted a flock once.
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
flag := syscall.LOCK_SH
if exclusive {
flag = syscall.LOCK_EX
}
// Otherwise attempt to obtain an exclusive lock.
err := syscall.Flock(int(pb.file.Fd()), flag|syscall.LOCK_NB)
if err == nil {
return nil
} else if err != syscall.EWOULDBLOCK {
return err
}
// Wait for a bit and try again.
time.Sleep(50 * time.Millisecond)
}
}
// funlock releases an advisory lock on a file descriptor.
func funlock(pb *DB) error {
return syscall.Flock(int(pb.file.Fd()), syscall.LOCK_UN)
}
// mmap memory maps a PageBuf's data file.
func mmap(pb *DB, sz int) error {
// Map the data file to memory.
b, err := syscall.Mmap(int(pb.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|pb.MmapFlags)
if err != nil {
return err
}
// Advise the kernel that the mmap is accessed randomly.
if err := madvise(b, syscall.MADV_RANDOM); err != nil {
return fmt.Errorf("madvise: %s", err)
}
// Save the original byte slice and convert to a byte array pointer.
pb.dataref = b
pb.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
pb.datasz = sz
return nil
}
// munmap unmaps a PageBuf's data file from memory.
func munmap(pb *DB) error {
// Ignore the unmap if we have no mapped data.
if pb.dataref == nil {
return nil
}
// Unmap using the original byte slice.
err := syscall.Munmap(pb.dataref)
pb.dataref = nil
pb.data = nil
pb.datasz = 0
return err
}
// NOTE: This function is copied from stdlib because it is not available on darwin.
func madvise(b []byte, advice int) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
if e1 != 0 {
err = e1
}
return
}

View File

@ -1,8 +0,0 @@
// +build !windows,!plan9,!linux,!openbsd
package pages
// fdatasync flushes written data to a file descriptor.
func fdatasync(pb *DB) error {
return pb.file.Sync()
}

View File

@ -1,384 +0,0 @@
package pages
import (
"fmt"
"sort"
"unsafe"
)
// txid represents the internal transaction identifier.
type txid uint64
// Tx represents a read-only or read/write transaction on the page buffer.
// Read-only transactions can be used for retrieving pages.
// Read/write transactions can retrieve and write pages.
//
// IMPORTANT: You must commit or rollback transactions when you are done with
// them. Pages can not be reclaimed by the writer until no more transactions
// are using them. A long running read transaction can cause the database to
// quickly grow.
type Tx struct {
writable bool
managed bool
db *DB
meta *meta
pages map[pgid]*page
delPages map[pgid]bool
// WriteFlag specifies the flag for write-related methods like WriteTo().
// Tx opens the database file with the specified flag to copy the data.
//
// By default, the flag is unset, which works well for mostly in-memory
// workloads. For databases that are much larger than available RAM,
// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
WriteFlag int
}
// init initializes the transaction.
func (tx *Tx) init(db *DB) {
tx.db = db
tx.pages = nil
// Copy the meta page since it can be changed by the writer.
tx.meta = &meta{}
db.meta().copy(tx.meta)
// Increment the transaction id and add a page cache for writable transactions.
if tx.writable {
tx.pages = make(map[pgid]*page)
tx.delPages = make(map[pgid]bool)
tx.meta.txid += txid(1)
}
}
// ID returns the transaction id.
func (tx *Tx) ID() uint64 {
return uint64(tx.meta.txid)
}
// Size returns current database size in bytes as seen by this transaction.
func (tx *Tx) Size() int64 {
return int64(tx.meta.pgid) * int64(tx.db.pageSize)
}
// DB returns a reference to the database that created the transaction.
func (tx *Tx) DB() *DB {
return tx.db
}
// Writable returns whether the transaction can perform write operations.
func (tx *Tx) Writable() bool {
return tx.writable
}
// Rollback closes the transaction and ignores all previous updates. Read-only
// transactions must be rolled back and not committed.
func (tx *Tx) Rollback() error {
_assert(!tx.managed, "managed tx rollback not allowed")
if tx.db == nil {
return ErrTxClosed
}
tx.rollback()
return nil
}
func (tx *Tx) rollback() {
if tx.db == nil {
return
}
if tx.writable {
tx.db.freelist.rollback(tx.meta.txid)
tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
}
tx.close()
}
func (tx *Tx) close() {
if tx.db == nil {
return
}
if tx.writable {
// Remove transaction ref & writer lock.
tx.db.rwtx = nil
tx.db.rwlock.Unlock()
} else {
tx.db.removeTx(tx)
}
// Clear all references.
tx.db = nil
tx.meta = nil
tx.pages = nil
}
// page returns a reference to the page with a given id.
// If page has been written to then a temporary buffered page is returned.
func (tx *Tx) page(id pgid) *page {
// Check the dirty pages first.
if tx.pages != nil {
if p, ok := tx.pages[id]; ok {
return p
}
}
// Otherwise return directly from the mmap.
return tx.db.page(id)
}
func (tx *Tx) pageExists(id pgid) bool {
// Check whether the page was modified during this transaction.
if tx.pages != nil {
if _, ok := tx.pages[id]; ok {
return true
}
}
// Check whether page was deleted during this transaction.
if tx.delPages != nil {
if tx.delPages[id] {
return false
}
}
// The page was not touched within this transaction. Fallthrough to
// the database's check.
return tx.db.pageExists(id)
}
// allocate returns a contiguous block of memory starting at a given page.
func (tx *Tx) allocate(count int) (*page, error) {
p, err := tx.db.allocate(count)
if err != nil {
return nil, err
}
// Save to our page cache.
tx.pages[p.id] = p
return p, nil
}
// Commit writes all changes to disk and updates the meta page.
// Returns an error if a disk write error occurs, or if Commit is
// called on a read-only transaction.
func (tx *Tx) Commit() error {
_assert(!tx.managed, "managed tx commit not allowed")
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
}
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
opgid := tx.meta.pgid
// Free the freelist and allocate new pages for it. This will overestimate
// the size of the freelist but not underestimate the size (which would be bad).
tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
if err != nil {
tx.rollback()
return err
}
if err := tx.db.freelist.write(p); err != nil {
tx.rollback()
return err
}
tx.meta.freelist = p.id
// If the high water mark has moved up then attempt to grow the database.
if tx.meta.pgid > opgid {
if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
tx.rollback()
return err
}
}
// Write dirty pages to disk.
if err := tx.write(); err != nil {
tx.rollback()
return err
}
// Write meta to disk.
if err := tx.writeMeta(); err != nil {
tx.rollback()
return err
}
// Finalize the transaction.
tx.close()
return nil
}
// write writes any dirty pages to disk.
func (tx *Tx) write() error {
// Sort pages by id.
pages := make(pages, 0, len(tx.pages))
for _, p := range tx.pages {
pages = append(pages, p)
}
// Clear out page cache early.
tx.pages = make(map[pgid]*page)
sort.Sort(pages)
// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * tx.db.pageSize
offset := int64(p.id) * int64(tx.db.pageSize)
// Write out page in "max allocation" sized chunks.
ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
for {
// Limit our write to our max allocation size.
sz := size
if sz > maxAllocSize-1 {
sz = maxAllocSize - 1
}
// Write chunk to disk.
buf := ptr[:sz]
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
return err
}
// Exit inner for loop if we've written all the chunks.
size -= sz
if size == 0 {
break
}
// Otherwise move offset forward and move pointer to next chunk.
offset += int64(sz)
ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
}
}
if err := fdatasync(tx.db); err != nil {
return err
}
// Put small pages back to page pool.
for _, p := range pages {
// Ignore page sizes over 1 page.
// These are allocated using make() instead of the page pool.
if int(p.overflow) != 0 {
continue
}
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
for i := range buf {
buf[i] = 0
}
tx.db.pagePool.Put(buf)
}
return nil
}
// writeMeta writes the meta to the disk.
func (tx *Tx) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, tx.db.pageSize)
p := tx.db.pageInBuffer(buf, 0)
tx.meta.write(p)
// Write the meta page to file.
if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
return err
}
if err := fdatasync(tx.db); err != nil {
return err
}
return nil
}
// Get retrieves the bytes stored in the page with the given id.
// The returned byte slice is only valid for the duration of the transaction.
func (tx *Tx) Get(id uint64) ([]byte, error) {
if !tx.pageExists(pgid(id)) {
return nil, ErrNotFound
}
p := tx.page(pgid(id))
size := int(p.overflow)*tx.db.pageSize - PageHeaderSize + int(p.count)
b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:size]
return b, nil
}
// Add creates a new page with the given content. The inserted byte slice
// will be padded at the end to fit the next largest page size. Retrieving the page
// will return the padding as well.
// Inserted data should hence have included termination markers.
func (tx *Tx) Add(c []byte) (uint64, error) {
l := len(c) + PageHeaderSize // total size required
n := 1 // number of pages required
for n*tx.db.pageSize < l {
n++
}
if l > maxAllocSize {
return 0, fmt.Errorf("page of size %d too large", l)
}
p, err := tx.allocate(n)
if err != nil {
return 0, fmt.Errorf("page alloc error: %s", err)
}
p.flags |= pageFlagData
// count holds the length used in the last page.
p.count = uint16(l - (n-1)*tx.db.pageSize)
b := (*[maxAllocSize]byte)(unsafe.Pointer(&p.ptr))[:]
copy(b, c)
return uint64(p.id), nil
}
// Del deletes the page witht he given ID.
func (tx *Tx) Del(id uint64) error {
if !tx.pageExists(pgid(id)) {
return ErrNotFound
}
tx.db.freelist.free(tx.meta.txid, tx.db.page(pgid(id)))
return nil
}
// Set overwrites the page with the given ID with c.
func (tx *Tx) Set(id uint64, c []byte) error {
if !tx.pageExists(pgid(id)) {
return ErrNotFound
}
p := tx.db.page(pgid(id))
l := len(c) + PageHeaderSize // total size required
n := int(p.overflow + 1)
// The contents must fit into the previously allocated pages.
if l > n*tx.db.pageSize {
return fmt.Errorf("invalid overwrite size")
}
// Allocate a temporary buffer for the page.
var buf []byte
if n == 1 {
buf = tx.db.pagePool.Get().([]byte)
} else {
buf = make([]byte, n*tx.db.pageSize)
}
np := tx.db.pageInBuffer(buf, 0)
*np = *p
// count holds the length used in the last page.
np.count = uint16(l - (n-1)*tx.db.pageSize)
// TODO(fabxc): Potential performance improvement point could be using c directly.
// Just copy it for now.
b := (*[maxAllocSize]byte)(unsafe.Pointer(&np.ptr))[:]
copy(b, c)
tx.pages[pgid(id)] = np
// TODO(fabxc): truncate and free pages that are no longer needed.
return nil
}

View File

@ -1,163 +0,0 @@
package tsdb
import (
"encoding/binary"
"os"
"path/filepath"
"time"
"github.com/boltdb/bolt"
"github.com/fabxc/pagebuf"
"github.com/prometheus/common/log"
)
type persistence struct {
*chunkBatchProcessor
mc *memChunks
chunks *pagebuf.DB
index *bolt.DB
}
func newPersistence(path string, cap int, to time.Duration) (*persistence, error) {
if err := os.MkdirAll(path, 0777); err != nil {
return nil, err
}
ix, err := bolt.Open(filepath.Join(path, "ix"), 0666, nil)
if err != nil {
return nil, err
}
if err := ix.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(bktChunks)
return err
}); err != nil {
return nil, err
}
pb, err := pagebuf.Open(filepath.Join(path, "chunks"), 0666, nil)
if err != nil {
return nil, err
}
p := &persistence{
chunks: pb,
index: ix,
chunkBatchProcessor: newChunkBatchProcessor(log.Base(), cap, to),
}
p.chunkBatchProcessor.processf = p.persist
return p, nil
}
var bktChunks = []byte("chunks")
func (p *persistence) close() error {
// Index must be closed first, otherwise we might deadlock.
err0 := p.index.Close()
err1 := p.chunks.Close()
if err0 != nil {
return err0
}
return err1
}
func (p *persistence) persist(cds ...*chunkDesc) error {
err := p.update(func(tx *persistenceTx) error {
bkt := tx.ix.Bucket(bktChunks)
for _, cd := range cds {
pos, err := tx.chunks.Add(cd.chunk.Data())
if err != nil {
return err
}
var buf [16]byte
binary.BigEndian.PutUint64(buf[:8], uint64(cd.id))
binary.BigEndian.PutUint64(buf[8:], pos)
if err := bkt.Put(buf[:8], buf[8:]); err != nil {
return err
}
tx.ids = append(tx.ids, cd.id)
}
return nil
})
return err
}
func (p *persistence) update(f func(*persistenceTx) error) error {
tx, err := p.begin(true)
if err != nil {
return err
}
if err := f(tx); err != nil {
tx.rollback()
return err
}
return tx.commit()
}
func (p *persistence) view(f func(*persistenceTx) error) error {
tx, err := p.begin(false)
if err != nil {
return err
}
if err := f(tx); err != nil {
tx.rollback()
return err
}
return tx.rollback()
}
func (p *persistence) begin(writeable bool) (*persistenceTx, error) {
var err error
tx := &persistenceTx{p: p}
// Index transaction is the outer one so we might end up with orphaned
// chunks but never with dangling pointers in the index.
tx.ix, err = p.index.Begin(writeable)
if err != nil {
return nil, err
}
tx.chunks, err = p.chunks.Begin(writeable)
if err != nil {
tx.ix.Rollback()
return nil, err
}
return tx, nil
}
type persistenceTx struct {
p *persistence
ix *bolt.Tx
chunks *pagebuf.Tx
ids []ChunkID
}
func (tx *persistenceTx) commit() error {
if err := tx.chunks.Commit(); err != nil {
tx.ix.Rollback()
return err
}
if err := tx.ix.Commit(); err != nil {
// TODO(fabxc): log orphaned chunks. What about overwritten ones?
// Should we not allows delete and add in the same tx so this cannot happen?
return err
}
// Successfully persisted chunks, clear them from the in-memory
// forward mapping.
tx.p.mc.mtx.Lock()
defer tx.p.mc.mtx.Unlock()
for _, id := range tx.ids {
delete(tx.p.mc.chunks, id)
}
return nil
}
func (tx *persistenceTx) rollback() error {
err0 := tx.chunks.Rollback()
err1 := tx.ix.Rollback()
if err0 != nil {
return err0
}
return err1
}

View File

@ -1,224 +0,0 @@
package tsdb
import (
"encoding/binary"
"fmt"
"io"
"github.com/fabxc/tsdb/chunks"
"github.com/fabxc/tsdb/index"
"github.com/prometheus/common/model"
)
// SeriesIterator provides iteration over a time series associated with a metric.
type SeriesIterator interface {
Metric() map[string]string
Seek(model.Time) (model.SamplePair, bool)
Next() (model.SamplePair, bool)
Err() error
}
type chunkSeriesIterator struct {
m map[string]string
chunks []chunks.Chunk
err error
cur chunks.Iterator
curPos int
}
func newChunkSeriesIterator(m map[string]string, chunks []chunks.Chunk) *chunkSeriesIterator {
return &chunkSeriesIterator{
m: m,
chunks: chunks,
}
}
func (it *chunkSeriesIterator) Metric() map[string]string {
return it.m
}
func (it *chunkSeriesIterator) Seek(ts model.Time) (model.SamplePair, bool) {
// Naively go through all chunk's first timestamps and pick the chunk
// containing the seeked timestamp.
// TODO(fabxc): this can be made smarter if it's a bottleneck.
for i, chk := range it.chunks {
cit := chk.Iterator()
first, ok := cit.First()
if !ok {
it.err = cit.Err()
return model.SamplePair{}, false
}
if first.Timestamp > ts {
break
}
it.cur = cit
it.curPos = i
}
return it.cur.Seek(ts)
}
func (it *chunkSeriesIterator) Next() (model.SamplePair, bool) {
sp, ok := it.cur.Next()
if ok {
return sp, true
}
if it.cur.Err() != io.EOF {
it.err = it.cur.Err()
return model.SamplePair{}, false
}
if len(it.chunks) == it.curPos+1 {
it.err = io.EOF
return model.SamplePair{}, false
}
it.curPos++
it.cur = it.chunks[it.curPos].Iterator()
// Return first sample of the new chunks.
return it.cur.Seek(0)
}
func (it *chunkSeriesIterator) Err() error {
return it.err
}
// Querier allows several queries over the storage with a consistent view if the data.
type Querier struct {
db *DB
iq *index.Querier
}
// Querier returns a new Querier on the index at the current point in time.
func (db *DB) Querier() (*Querier, error) {
iq, err := db.indexer.Querier()
if err != nil {
return nil, err
}
return &Querier{db: db, iq: iq}, nil
}
// Close the querier. This invalidates all previously retrieved iterators.
func (q *Querier) Close() error {
return q.iq.Close()
}
// Iterator returns an iterator over all chunks that match all given
// label matchers. The iterator is only valid until the Querier is closed.
func (q *Querier) Iterator(key string, matcher index.Matcher) (index.Iterator, error) {
return q.iq.Search(key, matcher)
}
// RangeIterator returns an iterator over chunks that are present in the given time range.
// The returned iterator is only valid until the querier is closed.
func (q *Querier) RangeIterator(start, end model.Time) (index.Iterator, error) {
return nil, nil
}
// InstantIterator returns an iterator over chunks possibly containing values for
// the given timestamp. The returned iterator is only valid until the querier is closed.
func (q *Querier) InstantIterator(at model.Time) (index.Iterator, error) {
return nil, nil
}
func hash(m map[string]string) uint64 {
return model.LabelsToSignature(m)
}
// Series returns a list of series iterators over all chunks in the given iterator.
// The returned series iterators are only valid until the querier is closed.
func (q *Querier) Series(it index.Iterator) ([]SeriesIterator, error) {
mets := map[uint64]map[string]string{}
its := map[uint64][]chunks.Chunk{}
id, err := it.Seek(0)
for ; err == nil; id, err = it.Next() {
terms, err := q.iq.Doc(id)
if err != nil {
return nil, err
}
met := make(map[string]string, len(terms))
for _, t := range terms {
met[t.Field] = t.Val
}
fp := hash(met)
chunk, err := q.chunk(ChunkID(id))
if err != nil {
return nil, err
}
its[fp] = append(its[fp], chunk)
if _, ok := mets[fp]; ok {
continue
}
mets[fp] = met
}
if err != io.EOF {
return nil, err
}
res := make([]SeriesIterator, 0, len(its))
for fp, chks := range its {
res = append(res, newChunkSeriesIterator(mets[fp], chks))
}
return res, nil
}
func (q *Querier) chunk(id ChunkID) (chunks.Chunk, error) {
q.db.memChunks.mtx.RLock()
cd, ok := q.db.memChunks.chunks[id]
q.db.memChunks.mtx.RUnlock()
if ok {
return cd.chunk, nil
}
var chk chunks.Chunk
// TODO(fabxc): this starts a new read transaction for every
// chunk we have to load from persistence.
// Figure out what's best tradeoff between lock contention and
// data consistency: start transaction when instantiating the querier
// or lazily start transaction on first try. (Not all query operations
// need access to persisted chunks.)
err := q.db.persistence.view(func(tx *persistenceTx) error {
chks := tx.ix.Bucket(bktChunks)
ptr := chks.Get(id.bytes())
if ptr == nil {
return fmt.Errorf("chunk pointer for ID %d not found", id)
}
cdata, err := tx.chunks.Get(binary.BigEndian.Uint64(ptr))
if err != nil {
return fmt.Errorf("get chunk data for ID %d: %s", id, err)
}
chk, err = chunks.FromData(cdata)
return err
})
return chk, err
}
// Metrics returns the unique metrics found across all chunks in the provided iterator.
func (q *Querier) Metrics(it index.Iterator) ([]map[string]string, error) {
m := []map[string]string{}
fps := map[uint64]struct{}{}
id, err := it.Seek(0)
for ; err == nil; id, err = it.Next() {
terms, err := q.iq.Doc(id)
if err != nil {
return nil, err
}
met := make(map[string]string, len(terms))
for _, t := range terms {
met[t.Field] = t.Val
}
fp := hash(met)
if _, ok := fps[fp]; ok {
continue
}
fps[fp] = struct{}{}
m = append(m, met)
}
if err != io.EOF {
return nil, err
}
return m, nil
}

0
shard.go Normal file
View File

66
test/hash_test.go Normal file
View File

@ -0,0 +1,66 @@
package test
import (
"testing"
"github.com/cespare/xxhash"
sip13 "github.com/dgryski/go-sip13"
)
type pair struct {
name, value string
}
var testInput = []pair{
{"job", "node"},
{"instance", "123.123.1.211:9090"},
{"path", "/api/v1/namespaces/<namespace>/deployments/<name>"},
{"method", "GET"},
{"namespace", "system"},
{"status", "500"},
}
func BenchmarkHash(b *testing.B) {
input := []byte{}
for _, v := range testInput {
input = append(input, v.name...)
input = append(input, '\xff')
input = append(input, v.value...)
input = append(input, '\xff')
}
var total uint64
var k0 uint64 = 0x0706050403020100
var k1 uint64 = 0x0f0e0d0c0b0a0908
for name, f := range map[string]func(b []byte) uint64{
"xxhash": xxhash.Sum64,
"fnv64": fnv64a,
"sip13": func(b []byte) uint64 { return sip13.Sum64(k0, k1, b) },
} {
b.Run(name, func(b *testing.B) {
b.SetBytes(int64(len(input)))
total = 0
for i := 0; i < b.N; i++ {
total += f(input)
}
})
}
}
// hashAdd adds a string to a fnv64a hash value, returning the updated hash.
func fnv64a(b []byte) uint64 {
const (
offset64 = 14695981039346656037
prime64 = 1099511628211
)
h := uint64(offset64)
for x := range b {
h ^= uint64(x)
h *= prime64
}
return h
}