Make vertical compaction and query merge optional
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
This commit is contained in:
parent
752e022aba
commit
28c73f531f
|
@ -51,8 +51,8 @@ func ExponentialBlockRanges(minSize int64, steps, stepSize int) []int64 {
|
||||||
// Compactor provides compaction against an underlying storage
|
// Compactor provides compaction against an underlying storage
|
||||||
// of time series data.
|
// of time series data.
|
||||||
type Compactor interface {
|
type Compactor interface {
|
||||||
// Plan returns a set of non-overlapping directories that can
|
// Plan returns a set of directories that can be compacted concurrently.
|
||||||
// be compacted concurrently.
|
// The directories can be overlapping.
|
||||||
// Results returned when compactions are in progress are undefined.
|
// Results returned when compactions are in progress are undefined.
|
||||||
Plan(dir string) ([]string, error)
|
Plan(dir string) ([]string, error)
|
||||||
|
|
||||||
|
|
37
db.go
37
db.go
|
@ -45,10 +45,11 @@ import (
|
||||||
// DefaultOptions used for the DB. They are sane for setups using
|
// DefaultOptions used for the DB. They are sane for setups using
|
||||||
// millisecond precision timestamps.
|
// millisecond precision timestamps.
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
WALSegmentSize: wal.DefaultSegmentSize,
|
WALSegmentSize: wal.DefaultSegmentSize,
|
||||||
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
|
||||||
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
BlockRanges: ExponentialBlockRanges(int64(2*time.Hour)/1e6, 3, 5),
|
||||||
NoLockfile: false,
|
NoLockfile: false,
|
||||||
|
AllowOverlappingBlock: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options of the DB storage.
|
// Options of the DB storage.
|
||||||
|
@ -71,6 +72,10 @@ type Options struct {
|
||||||
|
|
||||||
// NoLockfile disables creation and consideration of a lock file.
|
// NoLockfile disables creation and consideration of a lock file.
|
||||||
NoLockfile bool
|
NoLockfile bool
|
||||||
|
|
||||||
|
// Overlapping blocks are allowed iff AllowOverlappingBlock is true.
|
||||||
|
// This in-turn enables vertical compaction and vertical query merge.
|
||||||
|
AllowOverlappingBlock bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender allows appending a batch of data. It must be completed with a
|
// Appender allows appending a batch of data. It must be completed with a
|
||||||
|
@ -548,6 +553,11 @@ func (db *DB) reload() (err error) {
|
||||||
sort.Slice(loadable, func(i, j int) bool {
|
sort.Slice(loadable, func(i, j int) bool {
|
||||||
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
|
return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime
|
||||||
})
|
})
|
||||||
|
if !db.opts.AllowOverlappingBlock {
|
||||||
|
if err := validateBlockSequence(loadable); err != nil {
|
||||||
|
return errors.Wrap(err, "invalid block sequence")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Swap new blocks first for subsequently created readers to be seen.
|
// Swap new blocks first for subsequently created readers to be seen.
|
||||||
db.mtx.Lock()
|
db.mtx.Lock()
|
||||||
|
@ -699,6 +709,25 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence.
|
||||||
|
func validateBlockSequence(bs []*Block) error {
|
||||||
|
if len(bs) <= 1 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var metas []BlockMeta
|
||||||
|
for _, b := range bs {
|
||||||
|
metas = append(metas, b.meta)
|
||||||
|
}
|
||||||
|
|
||||||
|
overlaps := OverlappingBlocks(metas)
|
||||||
|
if len(overlaps) > 0 {
|
||||||
|
return errors.Errorf("block time ranges overlap: %s", overlaps)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// TimeRange specifies minTime and maxTime range.
|
// TimeRange specifies minTime and maxTime range.
|
||||||
type TimeRange struct {
|
type TimeRange struct {
|
||||||
Min, Max int64
|
Min, Max int64
|
||||||
|
|
|
@ -1932,7 +1932,9 @@ func TestVerticalCompaction(t *testing.T) {
|
||||||
for _, series := range c.blockSeries {
|
for _, series := range c.blockSeries {
|
||||||
createBlock(t, tmpdir, series)
|
createBlock(t, tmpdir, series)
|
||||||
}
|
}
|
||||||
db, err := Open(tmpdir, nil, nil, nil)
|
opts := *DefaultOptions
|
||||||
|
opts.AllowOverlappingBlock = true
|
||||||
|
db, err := Open(tmpdir, nil, nil, &opts)
|
||||||
testutil.Ok(t, err)
|
testutil.Ok(t, err)
|
||||||
defer func() {
|
defer func() {
|
||||||
testutil.Ok(t, db.Close())
|
testutil.Ok(t, db.Close())
|
||||||
|
|
Loading…
Reference in New Issue