mirror of
https://github.com/prometheus/prometheus
synced 2025-01-12 18:01:36 +00:00
Improvements after review.
- Increase samplesQueueCapacity. - Improve docstring for the above. - Accept a short waiting period for the ingest channel to become ready. This should depend on the http timeout, but 100ms is probably good enough to cushion bursts bigger than samplesQueueCapacity, while it is unlikely that anybody ever will set an HTTP timeout similarly short.
This commit is contained in:
parent
0f191629c6
commit
11b3c2387c
2
main.go
2
main.go
@ -52,7 +52,7 @@ var (
|
||||
remoteTSDBUrl = flag.String("storage.remote.url", "", "The URL of the OpenTSDB instance to send samples to.")
|
||||
remoteTSDBTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to OpenTSDB.")
|
||||
|
||||
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 4096, "The capacity of the queue of samples to be stored.")
|
||||
samplesQueueCapacity = flag.Int("storage.incoming-samples-queue-capacity", 64*1024, "The capacity of the queue of samples to be stored. Note that each slot in the queue takes a whole slice of samples whose size depends on details of the scrape process.")
|
||||
|
||||
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
|
||||
|
||||
|
@ -15,12 +15,15 @@ package retrieval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/extraction"
|
||||
|
||||
clientmodel "github.com/prometheus/client_golang/model"
|
||||
)
|
||||
|
||||
const ingestTimeout = 100 * time.Millisecond // TODO(beorn7): Adjust this to a fraction of the actual HTTP timeout.
|
||||
|
||||
var errIngestChannelFull = errors.New("ingestion channel full")
|
||||
|
||||
// MergeLabelsIngester merges a labelset ontop of a given extraction result and
|
||||
@ -47,13 +50,22 @@ func (i *MergeLabelsIngester) Ingest(samples clientmodel.Samples) error {
|
||||
type ChannelIngester chan<- clientmodel.Samples
|
||||
|
||||
// Ingest ingests the provided extraction result by sending it to its channel.
|
||||
// It returns an error if the channel is not ready to receive. This is important
|
||||
// to fail fast and to not pile up ingestion requests in case of overload.
|
||||
// If the channel was not able to receive the samples within the ingestTimeout,
|
||||
// an error is returned. This is important to fail fast and to not pile up
|
||||
// ingestion requests in case of overload.
|
||||
func (i ChannelIngester) Ingest(s clientmodel.Samples) error {
|
||||
// Since the regular case is that i is ready to receive, first try
|
||||
// without setting a timeout so that we don't need to allocate a timer
|
||||
// most of the time.
|
||||
select {
|
||||
case i <- s:
|
||||
return nil
|
||||
default:
|
||||
return errIngestChannelFull
|
||||
select {
|
||||
case i <- s:
|
||||
return nil
|
||||
case <-time.After(ingestTimeout):
|
||||
return errIngestChannelFull
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user