generalize the pool package and refactor scraper to use it.

This commit is contained in:
Krasi Georgiev 2018-02-13 22:44:51 +02:00
parent 1fe05d40e4
commit f2c5913416
2 changed files with 43 additions and 26 deletions

View File

@ -13,17 +13,23 @@
package pool package pool
import "sync" import (
"fmt"
"reflect"
"sync"
)
// BytesPool is a bucketed pool for variably sized byte slices. // Pool is a bucketed pool for variably sized byte slices.
type BytesPool struct { type Pool struct {
buckets []sync.Pool buckets []sync.Pool
sizes []int sizes []int
// initialize is the function used to create an empty slice when none exist yet.
initialize func(int) interface{}
} }
// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize // New returns a new Pool with size buckets for minSize to maxSize
// increasing by the given factor. // increasing by the given factor.
func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool { func New(minSize, maxSize int, factor float64, newFunc func(int) interface{}) *Pool {
if minSize < 1 { if minSize < 1 {
panic("invalid minimum pool size") panic("invalid minimum pool size")
} }
@ -40,36 +46,41 @@ func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool {
sizes = append(sizes, s) sizes = append(sizes, s)
} }
p := &BytesPool{ p := &Pool{
buckets: make([]sync.Pool, len(sizes)), buckets: make([]sync.Pool, len(sizes)),
sizes: sizes, sizes: sizes,
initialize: newFunc,
} }
return p return p
} }
// Get returns a new byte slices that fits the given size. // Get returns a new byte slices that fits the given size.
func (p *BytesPool) Get(sz int) []byte { func (p *Pool) Get(sz int) interface{} {
for i, bktSize := range p.sizes { for i, bktSize := range p.sizes {
if sz > bktSize { if sz > bktSize {
continue continue
} }
b, ok := p.buckets[i].Get().([]byte) b := p.buckets[i].Get()
if !ok { if b == nil {
b = make([]byte, 0, bktSize) b = p.initialize(bktSize)
} }
return b return b
} }
return make([]byte, 0, sz) return p.initialize(sz)
} }
// Put returns a byte slice to the right bucket in the pool. // Put adds a slice to the right bucket in the pool.
func (p *BytesPool) Put(b []byte) { func (p *Pool) Put(s interface{}) error {
for i, bktSize := range p.sizes { slice := reflect.ValueOf(s)
if cap(b) > bktSize { if slice.Kind() == reflect.Slice {
continue for i, size := range p.sizes {
if slice.Cap() > size {
continue
}
p.buckets[i].Put(slice.Slice(0, 0).Interface())
return nil
} }
p.buckets[i].Put(b[:0])
return
} }
return fmt.Errorf("%+v is not a slice", slice)
} }

View File

@ -148,7 +148,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err) level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
} }
buffers := pool.NewBytesPool(163, 100e6, 3) buffers := pool.New(163, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{ sp := &scrapePool{
@ -481,7 +481,7 @@ type scrapeLoop struct {
l log.Logger l log.Logger
cache *scrapeCache cache *scrapeCache
lastScrapeSize int lastScrapeSize int
buffers *pool.BytesPool buffers *pool.Pool
appender func() storage.Appender appender func() storage.Appender
sampleMutator labelsMutator sampleMutator labelsMutator
@ -596,7 +596,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
func newScrapeLoop(ctx context.Context, func newScrapeLoop(ctx context.Context,
sc scraper, sc scraper,
l log.Logger, l log.Logger,
buffers *pool.BytesPool, buffers *pool.Pool,
sampleMutator labelsMutator, sampleMutator labelsMutator,
reportSampleMutator labelsMutator, reportSampleMutator labelsMutator,
appender func() storage.Appender, appender func() storage.Appender,
@ -605,7 +605,7 @@ func newScrapeLoop(ctx context.Context,
l = log.NewNopLogger() l = log.NewNopLogger()
} }
if buffers == nil { if buffers == nil {
buffers = pool.NewBytesPool(1e3, 1e6, 3) buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
} }
sl := &scrapeLoop{ sl := &scrapeLoop{
scraper: sc, scraper: sc,
@ -662,7 +662,11 @@ mainLoop:
time.Since(last).Seconds(), time.Since(last).Seconds(),
) )
} }
b := sl.buffers.Get(sl.lastScrapeSize) b, ok := sl.buffers.Get(sl.lastScrapeSize).([]byte)
if !ok {
b = make([]byte, 0, sl.lastScrapeSize)
level.Error(sl.l).Log("msg", "buffer pool type assertion error")
}
buf := bytes.NewBuffer(b) buf := bytes.NewBuffer(b)
scrapeErr := sl.scraper.scrape(scrapeCtx, buf) scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
@ -695,7 +699,9 @@ mainLoop:
} }
} }
sl.buffers.Put(b) if err := sl.buffers.Put(b); err != nil {
level.Error(sl.l).Log("msg", "buffer pool error", "err", err)
}
if scrapeErr == nil { if scrapeErr == nil {
scrapeErr = appErr scrapeErr = appErr