Add barrier to benchmark writer

This adds a barrier to avoid issues with unfair goroutine scheduling
that causes some fake scrapers to run away from the other ones.
This commit is contained in:
Fabian Reinartz 2017-01-11 12:54:18 +01:00
parent c32a94d409
commit 80affd98a8
2 changed files with 50 additions and 41 deletions

View File

@ -3,7 +3,7 @@ build:
bench: build bench: build
@echo ">> running benchmark" @echo ">> running benchmark"
@./tsdb bench write --out=$(OUT) --metrics=$(NUM_METRICS) testdata.100k @./tsdb bench write --metrics=$(NUM_METRICS) testdata.100k
@go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg @go tool pprof -svg ./tsdb benchout/cpu.prof > benchout/cpuprof.svg
@go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg @go tool pprof -svg ./tsdb benchout/mem.prof > benchout/memprof.svg
@go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg @go tool pprof -svg ./tsdb benchout/block.prof > benchout/blockprof.svg

View File

@ -138,43 +138,7 @@ func (b *writeBenchmark) ingestScrapes(metrics []model.Metric, scrapeCount int)
var mu sync.Mutex var mu sync.Mutex
var total uint64 var total uint64
for len(metrics) > 0 { lbls := make([]labels.Labels, 0, len(metrics))
l := 1000
if len(metrics) < 1000 {
l = len(metrics)
}
batch := metrics[:l]
metrics = metrics[l:]
wg.Add(1)
go func(batch []model.Metric) {
n, err := b.ingestScrapesShard(batch, scrapeCount)
if err != nil {
// exitWithError(err)
fmt.Println(" err", err)
}
mu.Lock()
total += n
mu.Unlock()
wg.Done()
}(batch)
}
wg.Wait()
fmt.Println("> total samples:", total)
return nil
}
func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount int) (uint64, error) {
app := b.storage.Appender()
ts := int64(model.Now())
type sample struct {
labels labels.Labels
value int64
}
scrape := make(map[uint64]*sample, len(metrics))
for _, m := range metrics { for _, m := range metrics {
lset := make(labels.Labels, 0, len(m)) lset := make(labels.Labels, 0, len(m))
@ -183,10 +147,55 @@ func (b *writeBenchmark) ingestScrapesShard(metrics []model.Metric, scrapeCount
} }
sort.Sort(lset) sort.Sort(lset)
scrape[lset.Hash()] = &sample{ lbls = append(lbls, lset)
labels: lset, }
value: 123456789,
for i := 0; i < scrapeCount; i += 25 {
lbls := lbls
for len(lbls) > 0 {
l := 1000
if len(lbls) < 1000 {
l = len(lbls)
}
batch := lbls[:l]
lbls = lbls[l:]
wg.Add(1)
go func() {
n, err := b.ingestScrapesShard(batch, 25, int64(30000*i))
if err != nil {
// exitWithError(err)
fmt.Println(" err", err)
}
mu.Lock()
total += n
mu.Unlock()
wg.Done()
}()
} }
wg.Wait()
}
fmt.Println("> total samples:", total)
return nil
}
func (b *writeBenchmark) ingestScrapesShard(metrics []labels.Labels, scrapeCount int, baset int64) (uint64, error) {
app := b.storage.Appender()
ts := baset
type sample struct {
labels labels.Labels
value int64
}
scrape := make([]*sample, 0, len(metrics))
for _, m := range metrics {
scrape = append(scrape, &sample{
labels: m,
value: 123456789,
})
} }
total := uint64(0) total := uint64(0)