db: Addressed comments.
Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
This commit is contained in:
parent
cc306ef0d5
commit
15b5d89222
58
db.go
58
db.go
|
@ -569,22 +569,41 @@ func validateBlockSequence(bs []*Block) error {
|
|||
|
||||
overlaps := OverlappingBlocks(metas)
|
||||
if len(overlaps) > 0 {
|
||||
return errors.Errorf("block time ranges overlap: %s", SprintOverlappedBlocks(overlaps))
|
||||
return errors.Errorf("block time ranges overlap: %s", overlaps)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TimeRange specifies minTime and maxTime range.
|
||||
type TimeRange struct {
|
||||
Min, Max int64
|
||||
}
|
||||
|
||||
// OverlappingBlocks returns all overlapping blocks from given meta files aggregated by overlaping range.
|
||||
// We sort blocks by minTime. Then we iterate over each block minTime and treat it as our "current" timestamp.
|
||||
// We check all the pending blocks (blocks that we have seen their minTimes, but their maxTime was still ahead current
|
||||
// timestamp) if they finish. If not, it means they overlap with our current b. In the same time b is assumed as
|
||||
// pending.
|
||||
func OverlappingBlocks(bm []BlockMeta) map[TimeRange][]BlockMeta {
|
||||
// Overlaps contains overlapping blocks aggregated by overlapping range.
|
||||
type Overlaps map[TimeRange][]BlockMeta
|
||||
|
||||
// String returns human readable string form of overlapped blocks.
|
||||
func (o Overlaps) String() string {
|
||||
var res []string
|
||||
for r, overlaps := range o {
|
||||
var groups []string
|
||||
for _, m := range overlaps {
|
||||
groups = append(groups, fmt.Sprintf(
|
||||
"[id: %s mint: %d maxt: %d range: %s]",
|
||||
m.ULID.String(),
|
||||
m.MinTime,
|
||||
m.MaxTime,
|
||||
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
|
||||
))
|
||||
}
|
||||
res = append(res, fmt.Sprintf("[%d %d]: <%s> ", r.Min, r.Max, strings.Join(groups, "")))
|
||||
}
|
||||
return strings.Join(res, "")
|
||||
}
|
||||
|
||||
// OverlappingBlocks returns all overlapping blocks from given meta files.
|
||||
func OverlappingBlocks(bm []BlockMeta) Overlaps {
|
||||
if len(bm) <= 1 {
|
||||
return nil
|
||||
}
|
||||
|
@ -600,6 +619,10 @@ func OverlappingBlocks(bm []BlockMeta) map[TimeRange][]BlockMeta {
|
|||
// continuousPending helps to aggregate same overlaps to single group.
|
||||
continuousPending = true
|
||||
)
|
||||
|
||||
// We have here blocks sorted by minTime. We iterate over each block and treat its minTime as our "current" timestamp.
|
||||
// We check if any of the pending block finished (blocks that we have seen before, but their maxTime was still ahead current
|
||||
// timestamp). If not, it means they overlap with our current block. In the same time current block is assumed pending.
|
||||
for _, b := range bm[1:] {
|
||||
var newPending []BlockMeta
|
||||
|
||||
|
@ -631,7 +654,7 @@ func OverlappingBlocks(bm []BlockMeta) map[TimeRange][]BlockMeta {
|
|||
}
|
||||
|
||||
// Fetch the critical overlapped time range foreach overlap groups.
|
||||
overlapGroups := map[TimeRange][]BlockMeta{}
|
||||
overlapGroups := Overlaps{}
|
||||
for _, overlap := range overlaps {
|
||||
|
||||
minRange := TimeRange{Min: 0, Max: math.MaxInt64}
|
||||
|
@ -650,25 +673,6 @@ func OverlappingBlocks(bm []BlockMeta) map[TimeRange][]BlockMeta {
|
|||
return overlapGroups
|
||||
}
|
||||
|
||||
// SprintOverlappedBlocks returns human readable string form of overlapped blocks.
|
||||
func SprintOverlappedBlocks(overlaps map[TimeRange][]BlockMeta) string {
|
||||
var res []string
|
||||
for r, o := range overlaps {
|
||||
var groups []string
|
||||
for _, m := range o {
|
||||
groups = append(groups, fmt.Sprintf(
|
||||
"[id: %s mint: %d maxt: %d range: %s]",
|
||||
m.ULID.String(),
|
||||
m.MinTime,
|
||||
m.MaxTime,
|
||||
(time.Duration((m.MaxTime-m.MinTime)/1000)*time.Second).String(),
|
||||
))
|
||||
}
|
||||
res = append(res, fmt.Sprintf("[%d %d]: <%s> ", r.Min, r.Max, strings.Join(groups, "")))
|
||||
}
|
||||
return strings.Join(res, "")
|
||||
}
|
||||
|
||||
func (db *DB) String() string {
|
||||
return "HEAD"
|
||||
}
|
||||
|
|
16
db_test.go
16
db_test.go
|
@ -907,13 +907,13 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
|
|||
|
||||
// o1 overlaps with 10-20.
|
||||
o1 := BlockMeta{MinTime: 15, MaxTime: 17}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 15, Max: 17}: {metas[1], o1},
|
||||
}, OverlappingBlocks(append(metas, o1)))
|
||||
|
||||
// o2 overlaps with 20-30 and 30-40.
|
||||
o2 := BlockMeta{MinTime: 21, MaxTime: 31}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 21, Max: 30}: {metas[2], o2},
|
||||
{Min: 30, Max: 31}: {o2, metas[3]},
|
||||
}, OverlappingBlocks(append(metas, o2)))
|
||||
|
@ -921,19 +921,19 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
|
|||
// o3a and o3b overlaps with 30-40 and each other.
|
||||
o3a := BlockMeta{MinTime: 33, MaxTime: 39}
|
||||
o3b := BlockMeta{MinTime: 34, MaxTime: 36}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 34, Max: 36}: {metas[3], o3a, o3b},
|
||||
}, OverlappingBlocks(append(metas, o3a, o3b)))
|
||||
|
||||
// o4 is 1:1 overlap with 50-60.
|
||||
o4 := BlockMeta{MinTime: 50, MaxTime: 60}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 50, Max: 60}: {metas[5], o4},
|
||||
}, OverlappingBlocks(append(metas, o4)))
|
||||
|
||||
// o5 overlaps with 60-70, 70-80 and 80-90.
|
||||
o5 := BlockMeta{MinTime: 61, MaxTime: 85}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 61, Max: 70}: {metas[6], o5},
|
||||
{Min: 70, Max: 80}: {o5, metas[7]},
|
||||
{Min: 80, Max: 85}: {o5, metas[8]},
|
||||
|
@ -942,13 +942,13 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
|
|||
// o6a overlaps with 90-100, 100-110 and o6b, o6b overlaps with 90-100 and o6a.
|
||||
o6a := BlockMeta{MinTime: 92, MaxTime: 105}
|
||||
o6b := BlockMeta{MinTime: 94, MaxTime: 99}
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 94, Max: 99}: {metas[9], o6a, o6b},
|
||||
{Min: 100, Max: 105}: {o6a, metas[10]},
|
||||
}, OverlappingBlocks(append(metas, o6a, o6b)))
|
||||
|
||||
// All together.
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 15, Max: 17}: {metas[1], o1},
|
||||
{Min: 21, Max: 30}: {metas[2], o2}, {Min: 30, Max: 31}: {o2, metas[3]},
|
||||
{Min: 34, Max: 36}: {metas[3], o3a, o3b},
|
||||
|
@ -969,7 +969,7 @@ func TestOverlappingBlocksDetectsAllOverlaps(t *testing.T) {
|
|||
nc1 = append(nc1, BlockMeta{MinTime: 5, MaxTime: 7})
|
||||
nc1 = append(nc1, BlockMeta{MinTime: 7, MaxTime: 10})
|
||||
nc1 = append(nc1, BlockMeta{MinTime: 8, MaxTime: 9})
|
||||
testutil.Equals(t, map[TimeRange][]BlockMeta{
|
||||
testutil.Equals(t, Overlaps{
|
||||
{Min: 2, Max: 3}: {nc1[0], nc1[1], nc1[2], nc1[3], nc1[4], nc1[5]}, // 1-5, 2-3, 2-3, 2-3, 2-3, 2,6
|
||||
{Min: 3, Max: 5}: {nc1[0], nc1[5], nc1[6]}, // 1-5, 2-6, 3-5
|
||||
{Min: 5, Max: 6}: {nc1[5], nc1[7]}, // 2-6, 5-7
|
||||
|
|
Loading…
Reference in New Issue