From f2c5913416db03da1ec62b5a3d9b07b005060a41 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 13 Feb 2018 22:44:51 +0200 Subject: [PATCH 1/4] generalize the pool package and refactor scraper to use it. --- pkg/pool/pool.go | 51 +++++++++++++++++++++++++++++------------------- scrape/scrape.go | 18 +++++++++++------ 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 7cfa78f42..177389849 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -13,17 +13,23 @@ package pool -import "sync" +import ( + "fmt" + "reflect" + "sync" +) -// BytesPool is a bucketed pool for variably sized byte slices. -type BytesPool struct { +// Pool is a bucketed pool for variably sized byte slices. +type Pool struct { buckets []sync.Pool sizes []int + // initialize is the function used to create an empty slice when none exist yet. + initialize func(int) interface{} } -// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize +// New returns a new Pool with size buckets for minSize to maxSize // increasing by the given factor. -func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { +func New(minSize, maxSize int, factor float64, newFunc func(int) interface{}) *Pool { if minSize < 1 { panic("invalid minimum pool size") } @@ -40,36 +46,41 @@ func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { sizes = append(sizes, s) } - p := &BytesPool{ - buckets: make([]sync.Pool, len(sizes)), - sizes: sizes, + p := &Pool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + initialize: newFunc, } return p } // Get returns a new byte slices that fits the given size. -func (p *BytesPool) Get(sz int) []byte { +func (p *Pool) Get(sz int) interface{} { for i, bktSize := range p.sizes { if sz > bktSize { continue } - b, ok := p.buckets[i].Get().([]byte) - if !ok { - b = make([]byte, 0, bktSize) + b := p.buckets[i].Get() + if b == nil { + b = p.initialize(bktSize) } return b } - return make([]byte, 0, sz) + return p.initialize(sz) } -// Put returns a byte slice to the right bucket in the pool. -func (p *BytesPool) Put(b []byte) { - for i, bktSize := range p.sizes { - if cap(b) > bktSize { - continue +// Put adds a slice to the right bucket in the pool. +func (p *Pool) Put(s interface{}) error { + slice := reflect.ValueOf(s) + if slice.Kind() == reflect.Slice { + for i, size := range p.sizes { + if slice.Cap() > size { + continue + } + p.buckets[i].Put(slice.Slice(0, 0).Interface()) + return nil } - p.buckets[i].Put(b[:0]) - return } + return fmt.Errorf("%+v is not a slice", slice) } diff --git a/scrape/scrape.go b/scrape/scrape.go index a22dcb9a3..69d7f5839 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -148,7 +148,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) level.Error(logger).Log("msg", "Error creating HTTP client", "err", err) } - buffers := pool.NewBytesPool(163, 100e6, 3) + buffers := pool.New(163, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ @@ -481,7 +481,7 @@ type scrapeLoop struct { l log.Logger cache *scrapeCache lastScrapeSize int - buffers *pool.BytesPool + buffers *pool.Pool appender func() storage.Appender sampleMutator labelsMutator @@ -596,7 +596,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, - buffers *pool.BytesPool, + buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, appender func() storage.Appender, @@ -605,7 +605,7 @@ func newScrapeLoop(ctx context.Context, l = log.NewNopLogger() } if buffers == nil { - buffers = pool.NewBytesPool(1e3, 1e6, 3) + buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) } sl := &scrapeLoop{ scraper: sc, @@ -662,7 +662,11 @@ mainLoop: time.Since(last).Seconds(), ) } - b := sl.buffers.Get(sl.lastScrapeSize) + b, ok := sl.buffers.Get(sl.lastScrapeSize).([]byte) + if !ok { + b = make([]byte, 0, sl.lastScrapeSize) + level.Error(sl.l).Log("msg", "buffer pool type assertion error") + } buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf) @@ -695,7 +699,9 @@ mainLoop: } } - sl.buffers.Put(b) + if err := sl.buffers.Put(b); err != nil { + level.Error(sl.l).Log("msg", "buffer pool error", "err", err) + } if scrapeErr == nil { scrapeErr = appErr From 9878d66484b5afb3f6afe4acd23b291e2b71a489 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 19 Feb 2018 14:31:04 +0200 Subject: [PATCH 2/4] exit early and panic for non slice on buffer.Put --- pkg/pool/pool.go | 20 ++++++++++---------- scrape/scrape.go | 4 +--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 177389849..26e1ebe4f 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -71,16 +71,16 @@ func (p *Pool) Get(sz int) interface{} { } // Put adds a slice to the right bucket in the pool. -func (p *Pool) Put(s interface{}) error { +func (p *Pool) Put(s interface{}) { slice := reflect.ValueOf(s) - if slice.Kind() == reflect.Slice { - for i, size := range p.sizes { - if slice.Cap() > size { - continue - } - p.buckets[i].Put(slice.Slice(0, 0).Interface()) - return nil - } + + if slice.Kind() != reflect.Slice { + panic(fmt.Sprintf("%+v is not a slice", slice)) + } + for i, size := range p.sizes { + if slice.Cap() > size { + continue + } + p.buckets[i].Put(slice.Slice(0, 0).Interface()) } - return fmt.Errorf("%+v is not a slice", slice) } diff --git a/scrape/scrape.go b/scrape/scrape.go index 69d7f5839..40f76532d 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -699,9 +699,7 @@ mainLoop: } } - if err := sl.buffers.Put(b); err != nil { - level.Error(sl.l).Log("msg", "buffer pool error", "err", err) - } + sl.buffers.Put(b) if scrapeErr == nil { scrapeErr = appErr From 9b7f0cfa886ab559d6afa408b416b9d0ada206cd Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 19 Feb 2018 14:38:40 +0200 Subject: [PATCH 3/4] rename initialize to make --- pkg/pool/pool.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 26e1ebe4f..e68367e0b 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -23,13 +23,13 @@ import ( type Pool struct { buckets []sync.Pool sizes []int - // initialize is the function used to create an empty slice when none exist yet. - initialize func(int) interface{} + // make is the function used to create an empty slice when none exist yet. + make func(int) interface{} } // New returns a new Pool with size buckets for minSize to maxSize // increasing by the given factor. -func New(minSize, maxSize int, factor float64, newFunc func(int) interface{}) *Pool { +func New(minSize, maxSize int, factor float64, makeFunc func(int) interface{}) *Pool { if minSize < 1 { panic("invalid minimum pool size") } @@ -47,9 +47,9 @@ func New(minSize, maxSize int, factor float64, newFunc func(int) interface{}) *P } p := &Pool{ - buckets: make([]sync.Pool, len(sizes)), - sizes: sizes, - initialize: newFunc, + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + make: makeFunc, } return p @@ -63,11 +63,11 @@ func (p *Pool) Get(sz int) interface{} { } b := p.buckets[i].Get() if b == nil { - b = p.initialize(bktSize) + b = p.make(bktSize) } return b } - return p.initialize(sz) + return p.make(sz) } // Put adds a slice to the right bucket in the pool. From 933ab8a34e0f27ec67012f3ca2e116a1509ffe56 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Tue, 20 Feb 2018 13:32:23 +0200 Subject: [PATCH 4/4] stupid return mistake fix, and dropped the additional assertion check! --- pkg/pool/pool.go | 1 + scrape/scrape.go | 7 ++----- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index e68367e0b..2ee897185 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -82,5 +82,6 @@ func (p *Pool) Put(s interface{}) { continue } p.buckets[i].Put(slice.Slice(0, 0).Interface()) + return } } diff --git a/scrape/scrape.go b/scrape/scrape.go index 40f76532d..0b6e38bb0 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -662,11 +662,8 @@ mainLoop: time.Since(last).Seconds(), ) } - b, ok := sl.buffers.Get(sl.lastScrapeSize).([]byte) - if !ok { - b = make([]byte, 0, sl.lastScrapeSize) - level.Error(sl.l).Log("msg", "buffer pool type assertion error") - } + + b := sl.buffers.Get(sl.lastScrapeSize).([]byte) buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf)