diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go index 7cfa78f42..177389849 100644 --- a/pkg/pool/pool.go +++ b/pkg/pool/pool.go @@ -13,17 +13,23 @@ package pool -import "sync" +import ( + "fmt" + "reflect" + "sync" +) -// BytesPool is a bucketed pool for variably sized byte slices. -type BytesPool struct { +// Pool is a bucketed pool for variably sized byte slices. +type Pool struct { buckets []sync.Pool sizes []int + // initialize is the function used to create an empty slice when none exist yet. + initialize func(int) interface{} } -// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize +// New returns a new Pool with size buckets for minSize to maxSize // increasing by the given factor. -func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { +func New(minSize, maxSize int, factor float64, newFunc func(int) interface{}) *Pool { if minSize < 1 { panic("invalid minimum pool size") } @@ -40,36 +46,41 @@ func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { sizes = append(sizes, s) } - p := &BytesPool{ - buckets: make([]sync.Pool, len(sizes)), - sizes: sizes, + p := &Pool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + initialize: newFunc, } return p } // Get returns a new byte slices that fits the given size. -func (p *BytesPool) Get(sz int) []byte { +func (p *Pool) Get(sz int) interface{} { for i, bktSize := range p.sizes { if sz > bktSize { continue } - b, ok := p.buckets[i].Get().([]byte) - if !ok { - b = make([]byte, 0, bktSize) + b := p.buckets[i].Get() + if b == nil { + b = p.initialize(bktSize) } return b } - return make([]byte, 0, sz) + return p.initialize(sz) } -// Put returns a byte slice to the right bucket in the pool. -func (p *BytesPool) Put(b []byte) { - for i, bktSize := range p.sizes { - if cap(b) > bktSize { - continue +// Put adds a slice to the right bucket in the pool. +func (p *Pool) Put(s interface{}) error { + slice := reflect.ValueOf(s) + if slice.Kind() == reflect.Slice { + for i, size := range p.sizes { + if slice.Cap() > size { + continue + } + p.buckets[i].Put(slice.Slice(0, 0).Interface()) + return nil } - p.buckets[i].Put(b[:0]) - return } + return fmt.Errorf("%+v is not a slice", slice) } diff --git a/scrape/scrape.go b/scrape/scrape.go index a22dcb9a3..69d7f5839 100644 --- a/scrape/scrape.go +++ b/scrape/scrape.go @@ -148,7 +148,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger) level.Error(logger).Log("msg", "Error creating HTTP client", "err", err) } - buffers := pool.NewBytesPool(163, 100e6, 3) + buffers := pool.New(163, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) ctx, cancel := context.WithCancel(context.Background()) sp := &scrapePool{ @@ -481,7 +481,7 @@ type scrapeLoop struct { l log.Logger cache *scrapeCache lastScrapeSize int - buffers *pool.BytesPool + buffers *pool.Pool appender func() storage.Appender sampleMutator labelsMutator @@ -596,7 +596,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) { func newScrapeLoop(ctx context.Context, sc scraper, l log.Logger, - buffers *pool.BytesPool, + buffers *pool.Pool, sampleMutator labelsMutator, reportSampleMutator labelsMutator, appender func() storage.Appender, @@ -605,7 +605,7 @@ func newScrapeLoop(ctx context.Context, l = log.NewNopLogger() } if buffers == nil { - buffers = pool.NewBytesPool(1e3, 1e6, 3) + buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) }) } sl := &scrapeLoop{ scraper: sc, @@ -662,7 +662,11 @@ mainLoop: time.Since(last).Seconds(), ) } - b := sl.buffers.Get(sl.lastScrapeSize) + b, ok := sl.buffers.Get(sl.lastScrapeSize).([]byte) + if !ok { + b = make([]byte, 0, sl.lastScrapeSize) + level.Error(sl.l).Log("msg", "buffer pool type assertion error") + } buf := bytes.NewBuffer(b) scrapeErr := sl.scraper.scrape(scrapeCtx, buf) @@ -695,7 +699,9 @@ mainLoop: } } - sl.buffers.Put(b) + if err := sl.buffers.Put(b); err != nil { + level.Error(sl.l).Log("msg", "buffer pool error", "err", err) + } if scrapeErr == nil { scrapeErr = appErr