mirror of
https://github.com/prometheus/prometheus
synced 2024-12-27 00:53:12 +00:00
Add resilience to crashes during deletion
Signed-off-by: Fabian Reinartz <freinartz@google.com>
This commit is contained in:
parent
0b200798fe
commit
af9003dcef
14
block.go
14
block.go
@ -164,6 +164,13 @@ type BlockStats struct {
|
|||||||
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
NumTombstones uint64 `json:"numTombstones,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BlockDesc describes a block by ULID and time range.
|
||||||
|
type BlockDesc struct {
|
||||||
|
ULID ulid.ULID `json:"ulid"`
|
||||||
|
MinTime int64 `json:"minTime"`
|
||||||
|
MaxTime int64 `json:"maxTime"`
|
||||||
|
}
|
||||||
|
|
||||||
// BlockMetaCompaction holds information about compactions a block went through.
|
// BlockMetaCompaction holds information about compactions a block went through.
|
||||||
type BlockMetaCompaction struct {
|
type BlockMetaCompaction struct {
|
||||||
// Maximum number of compaction cycles any source block has
|
// Maximum number of compaction cycles any source block has
|
||||||
@ -171,6 +178,7 @@ type BlockMetaCompaction struct {
|
|||||||
Level int `json:"level"`
|
Level int `json:"level"`
|
||||||
// ULIDs of all source head blocks that went into the block.
|
// ULIDs of all source head blocks that went into the block.
|
||||||
Sources []ulid.ULID `json:"sources,omitempty"`
|
Sources []ulid.ULID `json:"sources,omitempty"`
|
||||||
|
Parents []BlockDesc `json:"parents,omitempty"`
|
||||||
Failed bool `json:"failed,omitempty"`
|
Failed bool `json:"failed,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -475,19 +483,17 @@ func (pb *Block) CleanTombstones(dest string, c Compactor) (*ulid.ULID, error) {
|
|||||||
|
|
||||||
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
pb.tombstones.Iter(func(id uint64, ivs Intervals) error {
|
||||||
numStones += len(ivs)
|
numStones += len(ivs)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
if numStones == 0 {
|
if numStones == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime)
|
meta := pb.Meta()
|
||||||
|
uid, err := c.Write(dest, pb, pb.meta.MinTime, pb.meta.MaxTime, &meta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &uid, nil
|
return &uid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
15
compact.go
15
compact.go
@ -55,7 +55,7 @@ type Compactor interface {
|
|||||||
Plan(dir string) ([]string, error)
|
Plan(dir string) ([]string, error)
|
||||||
|
|
||||||
// Write persists a Block into a directory.
|
// Write persists a Block into a directory.
|
||||||
Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error)
|
Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error)
|
||||||
|
|
||||||
// Compact runs compaction against the provided directories. Must
|
// Compact runs compaction against the provided directories. Must
|
||||||
// only be called concurrently with results of Plan().
|
// only be called concurrently with results of Plan().
|
||||||
@ -297,6 +297,11 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
|
|||||||
for _, s := range b.Compaction.Sources {
|
for _, s := range b.Compaction.Sources {
|
||||||
sources[s] = struct{}{}
|
sources[s] = struct{}{}
|
||||||
}
|
}
|
||||||
|
res.Compaction.Parents = append(res.Compaction.Parents, BlockDesc{
|
||||||
|
ULID: b.ULID,
|
||||||
|
MinTime: b.MinTime,
|
||||||
|
MaxTime: b.MaxTime,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
res.Compaction.Level++
|
res.Compaction.Level++
|
||||||
|
|
||||||
@ -367,7 +372,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs ...string) (uid ulid.ULID,
|
|||||||
return uid, merr
|
return uid, merr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||||
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
entropy := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
uid := ulid.MustNew(ulid.Now(), entropy)
|
uid := ulid.MustNew(ulid.Now(), entropy)
|
||||||
|
|
||||||
@ -379,6 +384,12 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64) (
|
|||||||
meta.Compaction.Level = 1
|
meta.Compaction.Level = 1
|
||||||
meta.Compaction.Sources = []ulid.ULID{uid}
|
meta.Compaction.Sources = []ulid.ULID{uid}
|
||||||
|
|
||||||
|
if parent != nil {
|
||||||
|
meta.Compaction.Parents = []BlockDesc{
|
||||||
|
{ULID: parent.ULID, MinTime: parent.MinTime, MaxTime: parent.MaxTime},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err := c.write(dest, meta, b)
|
err := c.write(dest, meta, b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uid, err
|
return uid, err
|
||||||
|
151
db.go
151
db.go
@ -267,17 +267,9 @@ func (db *DB) run() {
|
|||||||
case <-db.compactc:
|
case <-db.compactc:
|
||||||
db.metrics.compactionsTriggered.Inc()
|
db.metrics.compactionsTriggered.Inc()
|
||||||
|
|
||||||
_, err1 := db.retentionCutoff()
|
_, err := db.compact()
|
||||||
if err1 != nil {
|
if err != nil {
|
||||||
level.Error(db.logger).Log("msg", "retention cutoff failed", "err", err1)
|
level.Error(db.logger).Log("msg", "compaction failed", "err", err)
|
||||||
}
|
|
||||||
|
|
||||||
_, err2 := db.compact()
|
|
||||||
if err2 != nil {
|
|
||||||
level.Error(db.logger).Log("msg", "compaction failed", "err", err2)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err1 != nil || err2 != nil {
|
|
||||||
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
|
backoff = exponential(backoff, 1*time.Second, 1*time.Minute)
|
||||||
} else {
|
} else {
|
||||||
backoff = 0
|
backoff = 0
|
||||||
@ -289,19 +281,9 @@ func (db *DB) run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) retentionCutoff() (b bool, err error) {
|
func (db *DB) beyondRetention(meta *BlockMeta) bool {
|
||||||
defer func() {
|
|
||||||
if !b && err == nil {
|
|
||||||
// no data had to be cut off.
|
|
||||||
return
|
|
||||||
}
|
|
||||||
db.metrics.cutoffs.Inc()
|
|
||||||
if err != nil {
|
|
||||||
db.metrics.cutoffsFailed.Inc()
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
if db.opts.RetentionDuration == 0 {
|
if db.opts.RetentionDuration == 0 {
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
db.mtx.RLock()
|
db.mtx.RLock()
|
||||||
@ -309,23 +291,13 @@ func (db *DB) retentionCutoff() (b bool, err error) {
|
|||||||
db.mtx.RUnlock()
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
if len(blocks) == 0 {
|
if len(blocks) == 0 {
|
||||||
return false, nil
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
last := blocks[len(db.blocks)-1]
|
last := blocks[len(db.blocks)-1]
|
||||||
|
|
||||||
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration)
|
||||||
dirs, err := retentionCutoffDirs(db.dir, mint)
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// This will close the dirs and then delete the dirs.
|
return meta.MaxTime < mint
|
||||||
if len(dirs) > 0 {
|
|
||||||
return true, db.reload(dirs...)
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Appender opens a new appender against the database.
|
// Appender opens a new appender against the database.
|
||||||
@ -383,7 +355,7 @@ func (db *DB) compact() (changes bool, err error) {
|
|||||||
mint: mint,
|
mint: mint,
|
||||||
maxt: maxt,
|
maxt: maxt,
|
||||||
}
|
}
|
||||||
if _, err = db.compactor.Write(db.dir, head, mint, maxt); err != nil {
|
if _, err = db.compactor.Write(db.dir, head, mint, maxt, nil); err != nil {
|
||||||
return changes, errors.Wrap(err, "persist head block")
|
return changes, errors.Wrap(err, "persist head block")
|
||||||
}
|
}
|
||||||
changes = true
|
changes = true
|
||||||
@ -418,7 +390,7 @@ func (db *DB) compact() (changes bool, err error) {
|
|||||||
changes = true
|
changes = true
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
|
|
||||||
if err := db.reload(plan...); err != nil {
|
if err := db.reload(); err != nil {
|
||||||
return changes, errors.Wrap(err, "reload blocks")
|
return changes, errors.Wrap(err, "reload blocks")
|
||||||
}
|
}
|
||||||
runtime.GC()
|
runtime.GC()
|
||||||
@ -427,39 +399,6 @@ func (db *DB) compact() (changes bool, err error) {
|
|||||||
return changes, nil
|
return changes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// retentionCutoffDirs returns all directories of blocks in dir that are strictly
|
|
||||||
// before mint.
|
|
||||||
func retentionCutoffDirs(dir string, mint int64) ([]string, error) {
|
|
||||||
df, err := fileutil.OpenDir(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "open directory")
|
|
||||||
}
|
|
||||||
defer df.Close()
|
|
||||||
|
|
||||||
dirs, err := blockDirs(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "list block dirs %s", dir)
|
|
||||||
}
|
|
||||||
|
|
||||||
delDirs := []string{}
|
|
||||||
|
|
||||||
for _, dir := range dirs {
|
|
||||||
meta, err := readMetaFile(dir)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(err, "read block meta %s", dir)
|
|
||||||
}
|
|
||||||
// The first block we encounter marks that we crossed the boundary
|
|
||||||
// of deletable blocks.
|
|
||||||
if meta.MaxTime >= mint {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
delDirs = append(delDirs, dir)
|
|
||||||
}
|
|
||||||
|
|
||||||
return delDirs, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
||||||
for _, b := range db.blocks {
|
for _, b := range db.blocks {
|
||||||
if b.Meta().ULID == id {
|
if b.Meta().ULID == id {
|
||||||
@ -469,18 +408,10 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) {
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
func stringsContain(set []string, elem string) bool {
|
|
||||||
for _, e := range set {
|
|
||||||
if elem == e {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes
|
||||||
// a list of block directories which should be deleted during reload.
|
// a list of block directories which should be deleted during reload.
|
||||||
func (db *DB) reload(deleteable ...string) (err error) {
|
// Blocks that are obsolete due to replacement or retention will be deleted.
|
||||||
|
func (db *DB) reload() (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
db.metrics.reloadsFailed.Inc()
|
db.metrics.reloadsFailed.Inc()
|
||||||
@ -492,21 +423,41 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "find blocks")
|
return errors.Wrap(err, "find blocks")
|
||||||
}
|
}
|
||||||
|
// We delete old blocks that have been superseded by new ones by gathering all parents
|
||||||
|
// from existing blocks. Those parents all have newer replacements and can be safely deleted
|
||||||
|
// after we loaded the other blocks.
|
||||||
var (
|
var (
|
||||||
blocks []*Block
|
blocks []*Block
|
||||||
exist = map[ulid.ULID]struct{}{}
|
opened = map[ulid.ULID]struct{}{}
|
||||||
|
deleteable = map[ulid.ULID]struct{}{}
|
||||||
)
|
)
|
||||||
|
|
||||||
for _, dir := range dirs {
|
for _, dir := range dirs {
|
||||||
meta, err := readMetaFile(dir)
|
meta, err := readMetaFile(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "read meta information %s", dir)
|
// The block was potentially in the middle of being deleted during a crash.
|
||||||
}
|
// Skip it since we may delete it properly further down again.
|
||||||
// If the block is pending for deletion, don't add it to the new block set.
|
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
|
||||||
if stringsContain(deleteable, dir) {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
if db.beyondRetention(meta) {
|
||||||
|
deleteable[meta.ULID] = struct{}{}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, b := range meta.Compaction.Parents {
|
||||||
|
deleteable[b.ULID] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, dir := range dirs {
|
||||||
|
meta, err := readMetaFile(dir)
|
||||||
|
if err != nil {
|
||||||
|
level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Don't load blocks that are scheduled for deletion.
|
||||||
|
if _, ok := deleteable[meta.ULID]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// See if we already have the block in memory or open it otherwise.
|
||||||
b, ok := db.getBlock(meta.ULID)
|
b, ok := db.getBlock(meta.ULID)
|
||||||
if !ok {
|
if !ok {
|
||||||
b, err = OpenBlock(dir, db.chunkPool)
|
b, err = OpenBlock(dir, db.chunkPool)
|
||||||
@ -514,9 +465,8 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
|||||||
return errors.Wrapf(err, "open block %s", dir)
|
return errors.Wrapf(err, "open block %s", dir)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blocks = append(blocks, b)
|
blocks = append(blocks, b)
|
||||||
exist[meta.ULID] = struct{}{}
|
opened[meta.ULID] = struct{}{}
|
||||||
}
|
}
|
||||||
sort.Slice(blocks, func(i, j int) bool {
|
sort.Slice(blocks, func(i, j int) bool {
|
||||||
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
|
return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime
|
||||||
@ -533,15 +483,19 @@ func (db *DB) reload(deleteable ...string) (err error) {
|
|||||||
db.blocks = blocks
|
db.blocks = blocks
|
||||||
db.mtx.Unlock()
|
db.mtx.Unlock()
|
||||||
|
|
||||||
|
// Drop old blocks from memory.
|
||||||
for _, b := range oldBlocks {
|
for _, b := range oldBlocks {
|
||||||
if _, ok := exist[b.Meta().ULID]; ok {
|
if _, ok := opened[b.Meta().ULID]; ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err := b.Close(); err != nil {
|
if err := b.Close(); err != nil {
|
||||||
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
level.Warn(db.logger).Log("msg", "closing block failed", "err", err)
|
||||||
}
|
}
|
||||||
if err := os.RemoveAll(b.Dir()); err != nil {
|
}
|
||||||
level.Warn(db.logger).Log("msg", "deleting block failed", "err", err)
|
// Delete all obsolete blocks. None of them are opened any longer.
|
||||||
|
for ulid := range deleteable {
|
||||||
|
if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil {
|
||||||
|
return errors.Wrapf(err, "delete obsolete block %s", ulid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -765,7 +719,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error {
|
|||||||
if !withHead {
|
if !withHead {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime())
|
_, err := db.compactor.Write(dir, db.head, db.head.MinTime(), db.head.MaxTime(), nil)
|
||||||
return errors.Wrap(err, "snapshot head block")
|
return errors.Wrap(err, "snapshot head block")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -859,22 +813,15 @@ func (db *DB) CleanTombstones() (err error) {
|
|||||||
blocks := db.blocks[:]
|
blocks := db.blocks[:]
|
||||||
db.mtx.RUnlock()
|
db.mtx.RUnlock()
|
||||||
|
|
||||||
deletable := []string{}
|
|
||||||
for _, b := range blocks {
|
for _, b := range blocks {
|
||||||
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
if uid, er := b.CleanTombstones(db.Dir(), db.compactor); er != nil {
|
||||||
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
err = errors.Wrapf(er, "clean tombstones: %s", b.Dir())
|
||||||
return err
|
return err
|
||||||
} else if uid != nil { // New block was created.
|
} else if uid != nil { // New block was created.
|
||||||
deletable = append(deletable, b.Dir())
|
|
||||||
newUIDs = append(newUIDs, *uid)
|
newUIDs = append(newUIDs, *uid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return errors.Wrap(db.reload(), "reload blocks")
|
||||||
if len(deletable) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return errors.Wrap(db.reload(deletable...), "reload blocks")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
func intervalOverlap(amin, amax, bmin, bmax int64) bool {
|
||||||
|
@ -842,7 +842,7 @@ type mockCompactorFailing struct {
|
|||||||
func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
|
func (*mockCompactorFailing) Plan(dir string) ([]string, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64) (ulid.ULID, error) {
|
func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) {
|
||||||
if len(c.blocks) >= c.max {
|
if len(c.blocks) >= c.max {
|
||||||
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
|
return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail")
|
||||||
}
|
}
|
||||||
@ -925,10 +925,8 @@ func TestDB_Retention(t *testing.T) {
|
|||||||
|
|
||||||
testutil.Equals(t, 2, len(db.blocks))
|
testutil.Equals(t, 2, len(db.blocks))
|
||||||
|
|
||||||
// Now call retention.
|
// Reload blocks, which should drop blocks beyond the retention boundary.
|
||||||
changes, err := db.retentionCutoff()
|
testutil.Ok(t, db.reload())
|
||||||
testutil.Ok(t, err)
|
|
||||||
testutil.Assert(t, changes, "there should be changes")
|
|
||||||
testutil.Equals(t, 1, len(db.blocks))
|
testutil.Equals(t, 1, len(db.blocks))
|
||||||
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
|
testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block.
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user