Merge pull request #3835 from krasi-georgiev/pool-package-generalize
Pool package generalize
This commit is contained in:
commit
ef567ceb7d
|
@ -13,17 +13,23 @@
|
||||||
|
|
||||||
package pool
|
package pool
|
||||||
|
|
||||||
import "sync"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
// BytesPool is a bucketed pool for variably sized byte slices.
|
// Pool is a bucketed pool for variably sized byte slices.
|
||||||
type BytesPool struct {
|
type Pool struct {
|
||||||
buckets []sync.Pool
|
buckets []sync.Pool
|
||||||
sizes []int
|
sizes []int
|
||||||
|
// make is the function used to create an empty slice when none exist yet.
|
||||||
|
make func(int) interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBytesPool returns a new BytesPool with size buckets for minSize to maxSize
|
// New returns a new Pool with size buckets for minSize to maxSize
|
||||||
// increasing by the given factor.
|
// increasing by the given factor.
|
||||||
func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool {
|
func New(minSize, maxSize int, factor float64, makeFunc func(int) interface{}) *Pool {
|
||||||
if minSize < 1 {
|
if minSize < 1 {
|
||||||
panic("invalid minimum pool size")
|
panic("invalid minimum pool size")
|
||||||
}
|
}
|
||||||
|
@ -40,36 +46,42 @@ func NewBytesPool(minSize, maxSize int, factor float64) *BytesPool {
|
||||||
sizes = append(sizes, s)
|
sizes = append(sizes, s)
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &BytesPool{
|
p := &Pool{
|
||||||
buckets: make([]sync.Pool, len(sizes)),
|
buckets: make([]sync.Pool, len(sizes)),
|
||||||
sizes: sizes,
|
sizes: sizes,
|
||||||
|
make: makeFunc,
|
||||||
}
|
}
|
||||||
|
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a new byte slices that fits the given size.
|
// Get returns a new byte slices that fits the given size.
|
||||||
func (p *BytesPool) Get(sz int) []byte {
|
func (p *Pool) Get(sz int) interface{} {
|
||||||
for i, bktSize := range p.sizes {
|
for i, bktSize := range p.sizes {
|
||||||
if sz > bktSize {
|
if sz > bktSize {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
b, ok := p.buckets[i].Get().([]byte)
|
b := p.buckets[i].Get()
|
||||||
if !ok {
|
if b == nil {
|
||||||
b = make([]byte, 0, bktSize)
|
b = p.make(bktSize)
|
||||||
}
|
}
|
||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
return make([]byte, 0, sz)
|
return p.make(sz)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put returns a byte slice to the right bucket in the pool.
|
// Put adds a slice to the right bucket in the pool.
|
||||||
func (p *BytesPool) Put(b []byte) {
|
func (p *Pool) Put(s interface{}) {
|
||||||
for i, bktSize := range p.sizes {
|
slice := reflect.ValueOf(s)
|
||||||
if cap(b) > bktSize {
|
|
||||||
|
if slice.Kind() != reflect.Slice {
|
||||||
|
panic(fmt.Sprintf("%+v is not a slice", slice))
|
||||||
|
}
|
||||||
|
for i, size := range p.sizes {
|
||||||
|
if slice.Cap() > size {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p.buckets[i].Put(b[:0])
|
p.buckets[i].Put(slice.Slice(0, 0).Interface())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -148,7 +148,7 @@ func newScrapePool(cfg *config.ScrapeConfig, app Appendable, logger log.Logger)
|
||||||
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
|
level.Error(logger).Log("msg", "Error creating HTTP client", "err", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
buffers := pool.NewBytesPool(163, 100e6, 3)
|
buffers := pool.New(163, 100e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
sp := &scrapePool{
|
sp := &scrapePool{
|
||||||
|
@ -487,7 +487,7 @@ type scrapeLoop struct {
|
||||||
l log.Logger
|
l log.Logger
|
||||||
cache *scrapeCache
|
cache *scrapeCache
|
||||||
lastScrapeSize int
|
lastScrapeSize int
|
||||||
buffers *pool.BytesPool
|
buffers *pool.Pool
|
||||||
|
|
||||||
appender func() storage.Appender
|
appender func() storage.Appender
|
||||||
sampleMutator labelsMutator
|
sampleMutator labelsMutator
|
||||||
|
@ -602,7 +602,7 @@ func (c *scrapeCache) forEachStale(f func(labels.Labels) bool) {
|
||||||
func newScrapeLoop(ctx context.Context,
|
func newScrapeLoop(ctx context.Context,
|
||||||
sc scraper,
|
sc scraper,
|
||||||
l log.Logger,
|
l log.Logger,
|
||||||
buffers *pool.BytesPool,
|
buffers *pool.Pool,
|
||||||
sampleMutator labelsMutator,
|
sampleMutator labelsMutator,
|
||||||
reportSampleMutator labelsMutator,
|
reportSampleMutator labelsMutator,
|
||||||
appender func() storage.Appender,
|
appender func() storage.Appender,
|
||||||
|
@ -611,7 +611,7 @@ func newScrapeLoop(ctx context.Context,
|
||||||
l = log.NewNopLogger()
|
l = log.NewNopLogger()
|
||||||
}
|
}
|
||||||
if buffers == nil {
|
if buffers == nil {
|
||||||
buffers = pool.NewBytesPool(1e3, 1e6, 3)
|
buffers = pool.New(1e3, 1e6, 3, func(sz int) interface{} { return make([]byte, 0, sz) })
|
||||||
}
|
}
|
||||||
sl := &scrapeLoop{
|
sl := &scrapeLoop{
|
||||||
scraper: sc,
|
scraper: sc,
|
||||||
|
@ -668,7 +668,8 @@ mainLoop:
|
||||||
time.Since(last).Seconds(),
|
time.Since(last).Seconds(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
b := sl.buffers.Get(sl.lastScrapeSize)
|
|
||||||
|
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
|
||||||
buf := bytes.NewBuffer(b)
|
buf := bytes.NewBuffer(b)
|
||||||
|
|
||||||
scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
|
scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
|
||||||
|
|
Loading…
Reference in New Issue