Merge pull request #7385 from brancz/fix-flaky-kube-test

discovery/kubernetes: Fix incorrect premature break of reading results
This commit is contained in:
Frederic Branczyk 2020-06-11 19:22:59 +02:00 committed by GitHub
commit 7c31fe1541
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 9 additions and 15 deletions

View File

@ -107,14 +107,18 @@ func (d k8sDiscoveryTest) Run(t *testing.T) {
// readResultWithTimeout reads all targegroups from channel with timeout. // readResultWithTimeout reads all targegroups from channel with timeout.
// It merges targegroups by source and sends the result to result channel. // It merges targegroups by source and sends the result to result channel.
func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) { func readResultWithTimeout(t *testing.T, ch <-chan []*targetgroup.Group, max int, timeout time.Duration, resChan chan<- map[string]*targetgroup.Group) {
allTgs := make([][]*targetgroup.Group, 0) res := make(map[string]*targetgroup.Group)
Loop: Loop:
for { for {
select { select {
case tgs := <-ch: case tgs := <-ch:
allTgs = append(allTgs, tgs) for _, tg := range tgs {
if len(allTgs) == max { if tg == nil {
continue
}
res[tg.Source] = tg
}
if len(res) == max {
// Reached max target groups we may get, break fast. // Reached max target groups we may get, break fast.
break Loop break Loop
} }
@ -122,21 +126,11 @@ Loop:
// Because we use queue, an object that is created then // Because we use queue, an object that is created then
// deleted or updated may be processed only once. // deleted or updated may be processed only once.
// So possibly we may skip events, timed out here. // So possibly we may skip events, timed out here.
t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(allTgs), max) t.Logf("timed out, got %d (max: %d) items, some events are skipped", len(res), max)
break Loop break Loop
} }
} }
// Merge by source and sent it to channel.
res := make(map[string]*targetgroup.Group)
for _, tgs := range allTgs {
for _, tg := range tgs {
if tg == nil {
continue
}
res[tg.Source] = tg
}
}
resChan <- res resChan <- res
} }