2017-05-10 09:44:13 +00:00
// Copyright 2013 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remote
import (
2018-05-29 08:51:29 +00:00
"context"
2017-05-10 09:44:13 +00:00
"math"
2018-09-07 21:26:04 +00:00
"strconv"
2017-05-10 09:44:13 +00:00
"sync"
"time"
2017-08-11 18:45:52 +00:00
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2018-09-07 21:26:04 +00:00
"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
2020-06-01 15:21:13 +00:00
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
2020-07-30 07:45:42 +00:00
"go.uber.org/atomic"
2018-09-07 21:26:04 +00:00
2020-11-19 15:23:03 +00:00
"github.com/prometheus/client_golang/prometheus"
2021-02-10 22:25:37 +00:00
"github.com/prometheus/common/model"
2017-05-10 09:44:13 +00:00
"github.com/prometheus/prometheus/config"
2019-03-08 16:29:25 +00:00
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
2017-10-23 20:28:17 +00:00
"github.com/prometheus/prometheus/prompb"
2020-11-19 15:23:03 +00:00
"github.com/prometheus/prometheus/scrape"
2019-09-19 09:15:41 +00:00
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
2017-05-10 09:44:13 +00:00
)
const (
// We track samples in/out and how long pushes take using an Exponentially
// Weighted Moving Average.
ewmaWeight = 0.2
shardUpdateDuration = 10 * time . Second
// Allow 30% too many shards before scaling down.
shardToleranceFraction = 0.3
)
2020-02-03 21:47:03 +00:00
type queueManagerMetrics struct {
2020-04-25 03:39:46 +00:00
reg prometheus . Registerer
2020-11-19 15:23:03 +00:00
samplesTotal prometheus . Counter
metadataTotal prometheus . Counter
failedSamplesTotal prometheus . Counter
failedMetadataTotal prometheus . Counter
retriedSamplesTotal prometheus . Counter
retriedMetadataTotal prometheus . Counter
droppedSamplesTotal prometheus . Counter
enqueueRetriesTotal prometheus . Counter
sentBatchDuration prometheus . Histogram
highestSentTimestamp * maxTimestamp
pendingSamples prometheus . Gauge
shardCapacity prometheus . Gauge
numShards prometheus . Gauge
maxNumShards prometheus . Gauge
minNumShards prometheus . Gauge
desiredNumShards prometheus . Gauge
samplesBytesTotal prometheus . Counter
metadataBytesTotal prometheus . Counter
maxSamplesPerSend prometheus . Gauge
2020-02-03 21:47:03 +00:00
}
2020-04-25 03:39:46 +00:00
func newQueueManagerMetrics ( r prometheus . Registerer , rn , e string ) * queueManagerMetrics {
m := & queueManagerMetrics {
reg : r ,
}
constLabels := prometheus . Labels {
remoteName : rn ,
endpoint : e ,
}
2020-11-19 15:23:03 +00:00
m . samplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "samples_total" ,
Help : "Total number of samples sent to remote storage." ,
ConstLabels : constLabels ,
} )
m . metadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
2020-04-25 03:39:46 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "metadata_total" ,
Help : "Total number of metadata entries sent to remote storage." ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
m . failedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_failed_total" ,
2020-04-25 03:39:46 +00:00
Help : "Total number of samples which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . failedMetadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "metadata_failed_total" ,
Help : "Total number of metadata entries which failed on send to remote storage, non-recoverable errors." ,
ConstLabels : constLabels ,
} )
2020-04-25 03:39:46 +00:00
m . retriedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_retried_total" ,
2020-04-25 03:39:46 +00:00
Help : "Total number of samples which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . retriedMetadataTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "metadata_retried_total" ,
Help : "Total number of metadata entries which failed on send to remote storage but were retried because the send error was recoverable." ,
ConstLabels : constLabels ,
} )
2020-04-25 03:39:46 +00:00
m . droppedSamplesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_dropped_total" ,
2020-04-25 03:39:46 +00:00
Help : "Total number of samples which were dropped after being read from the WAL before being sent via remote write." ,
ConstLabels : constLabels ,
} )
m . enqueueRetriesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "enqueue_retries_total" ,
Help : "Total number of times enqueue has failed because a shards queue was full." ,
ConstLabels : constLabels ,
} )
m . sentBatchDuration = prometheus . NewHistogram ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "sent_batch_duration_seconds" ,
2020-11-19 15:23:03 +00:00
Help : "Duration of send calls to the remote storage." ,
increase the remote write bucket range (#7323)
* increase the remote write bucket range
Increase the range of remote write buckets to capture times above 10s for laggy scenarios
Buckets had been: {.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
Buckets are now: {0.03125, 0.0625, 0.125, 0.25, 0.5, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512}
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
* revert back to DefBuckets with addons to be backwards compatible
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
* shuffle the buckets to maintain 2-2.5x increases
Signed-off-by: Bert Hartmann <berthartm@gmail.com>
2020-06-04 19:54:47 +00:00
Buckets : append ( prometheus . DefBuckets , 25 , 60 , 120 , 300 ) ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
2020-10-15 21:53:59 +00:00
m . highestSentTimestamp = & maxTimestamp {
2020-04-25 03:39:46 +00:00
Gauge : prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "queue_highest_sent_timestamp_seconds" ,
Help : "Timestamp from a WAL sample, the highest timestamp successfully sent by this queue, in seconds since epoch." ,
ConstLabels : constLabels ,
} ) ,
}
m . pendingSamples = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "samples_pending" ,
2020-04-25 03:39:46 +00:00
Help : "The number of samples pending in the queues shards to be sent to the remote storage." ,
ConstLabels : constLabels ,
} )
m . shardCapacity = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shard_capacity" ,
Help : "The capacity of each shard of the queue used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . numShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards" ,
Help : "The number of shards used for parallel sending to the remote storage." ,
ConstLabels : constLabels ,
} )
m . maxNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_max" ,
Help : "The maximum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . minNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_min" ,
Help : "The minimum number of shards that the queue is allowed to run." ,
ConstLabels : constLabels ,
} )
m . desiredNumShards = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "shards_desired" ,
Help : "The number of shards that the queues shard calculation wants to run based on the rate of samples in vs. samples out." ,
ConstLabels : constLabels ,
} )
2020-11-19 15:23:03 +00:00
m . samplesBytesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "samples_bytes_total" ,
Help : "The total number of bytes of samples sent by the queue after compression." ,
ConstLabels : constLabels ,
} )
m . metadataBytesTotal = prometheus . NewCounter ( prometheus . CounterOpts {
2020-04-25 03:39:46 +00:00
Namespace : namespace ,
Subsystem : subsystem ,
2020-11-19 15:23:03 +00:00
Name : "metadata_bytes_total" ,
Help : "The total number of bytes of metadata sent by the queue after compression." ,
2020-04-25 03:39:46 +00:00
ConstLabels : constLabels ,
} )
2020-10-28 11:39:36 +00:00
m . maxSamplesPerSend = prometheus . NewGauge ( prometheus . GaugeOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "max_samples_per_send" ,
Help : "The maximum number of samples to be sent, in a single request, to the remote storage." ,
ConstLabels : constLabels ,
} )
2020-02-03 21:47:03 +00:00
2020-06-26 06:33:52 +00:00
return m
}
func ( m * queueManagerMetrics ) register ( ) {
if m . reg != nil {
m . reg . MustRegister (
2020-11-19 15:23:03 +00:00
m . samplesTotal ,
m . metadataTotal ,
2020-02-03 21:47:03 +00:00
m . failedSamplesTotal ,
2020-11-19 15:23:03 +00:00
m . failedMetadataTotal ,
2020-02-03 21:47:03 +00:00
m . retriedSamplesTotal ,
2020-11-19 15:23:03 +00:00
m . retriedMetadataTotal ,
2020-02-03 21:47:03 +00:00
m . droppedSamplesTotal ,
m . enqueueRetriesTotal ,
m . sentBatchDuration ,
2020-04-25 03:39:46 +00:00
m . highestSentTimestamp ,
m . pendingSamples ,
2020-02-03 21:47:03 +00:00
m . shardCapacity ,
m . numShards ,
m . maxNumShards ,
m . minNumShards ,
m . desiredNumShards ,
2020-11-19 15:23:03 +00:00
m . samplesBytesTotal ,
m . metadataBytesTotal ,
2020-10-28 11:39:36 +00:00
m . maxSamplesPerSend ,
2020-02-03 21:47:03 +00:00
)
}
}
2017-05-10 09:44:13 +00:00
2020-04-25 03:39:46 +00:00
func ( m * queueManagerMetrics ) unregister ( ) {
if m . reg != nil {
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . samplesTotal )
m . reg . Unregister ( m . metadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . failedSamplesTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . failedMetadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . retriedSamplesTotal )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . retriedMetadataTotal )
2020-04-25 03:39:46 +00:00
m . reg . Unregister ( m . droppedSamplesTotal )
m . reg . Unregister ( m . enqueueRetriesTotal )
m . reg . Unregister ( m . sentBatchDuration )
m . reg . Unregister ( m . highestSentTimestamp )
m . reg . Unregister ( m . pendingSamples )
m . reg . Unregister ( m . shardCapacity )
m . reg . Unregister ( m . numShards )
m . reg . Unregister ( m . maxNumShards )
m . reg . Unregister ( m . minNumShards )
m . reg . Unregister ( m . desiredNumShards )
2020-11-19 15:23:03 +00:00
m . reg . Unregister ( m . samplesBytesTotal )
m . reg . Unregister ( m . metadataBytesTotal )
2020-10-28 11:39:36 +00:00
m . reg . Unregister ( m . maxSamplesPerSend )
2020-04-25 03:39:46 +00:00
}
}
2020-06-24 13:41:52 +00:00
// WriteClient defines an interface for sending a batch of samples to an
2017-05-10 09:44:13 +00:00
// external timeseries database.
2020-06-24 13:41:52 +00:00
type WriteClient interface {
2017-05-10 09:44:13 +00:00
// Store stores the given samples in the remote storage.
2018-09-07 21:26:04 +00:00
Store ( context . Context , [ ] byte ) error
2019-12-12 20:47:23 +00:00
// Name uniquely identifies the remote storage.
2017-05-10 09:44:13 +00:00
Name ( ) string
2019-12-12 20:47:23 +00:00
// Endpoint is the remote read or write endpoint for the storage client.
Endpoint ( ) string
2017-05-10 09:44:13 +00:00
}
// QueueManager manages a queue of samples to be sent to the Storage
2020-06-24 13:41:52 +00:00
// indicated by the provided WriteClient. Implements writeTo interface
2018-09-07 21:26:04 +00:00
// used by WAL Watcher.
2017-05-10 09:44:13 +00:00
type QueueManager struct {
2020-07-30 07:45:42 +00:00
lastSendTimestamp atomic . Int64
2019-10-21 21:54:25 +00:00
2020-11-19 15:23:03 +00:00
logger log . Logger
flushDeadline time . Duration
cfg config . QueueConfig
mcfg config . MetadataConfig
externalLabels labels . Labels
relabelConfigs [ ] * relabel . Config
watcher * wal . Watcher
metadataWatcher * MetadataWatcher
2018-09-07 21:26:04 +00:00
2020-03-31 03:39:29 +00:00
clientMtx sync . RWMutex
2020-06-24 13:41:52 +00:00
storeClient WriteClient
2020-03-31 03:39:29 +00:00
2019-09-13 17:23:58 +00:00
seriesMtx sync . Mutex
2019-08-07 19:39:07 +00:00
seriesLabels map [ uint64 ] labels . Labels
2018-09-07 21:26:04 +00:00
seriesSegmentIndexes map [ uint64 ] int
droppedSeries map [ uint64 ] struct { }
2017-05-10 09:44:13 +00:00
shards * shards
numShards int
reshardChan chan int
2019-01-18 12:48:16 +00:00
quit chan struct { }
wg sync . WaitGroup
2017-05-10 09:44:13 +00:00
2019-02-20 07:51:08 +00:00
samplesIn , samplesDropped , samplesOut , samplesOutDuration * ewmaRate
2019-03-05 12:21:11 +00:00
2020-09-24 18:44:18 +00:00
metrics * queueManagerMetrics
interner * pool
2020-10-15 21:53:59 +00:00
highestRecvTimestamp * maxTimestamp
2017-05-10 09:44:13 +00:00
}
// NewQueueManager builds a new QueueManager.
2020-04-25 03:39:46 +00:00
func NewQueueManager (
metrics * queueManagerMetrics ,
watcherMetrics * wal . WatcherMetrics ,
readerMetrics * wal . LiveReaderMetrics ,
logger log . Logger ,
walDir string ,
samplesIn * ewmaRate ,
cfg config . QueueConfig ,
2020-11-19 15:23:03 +00:00
mCfg config . MetadataConfig ,
2020-04-25 03:39:46 +00:00
externalLabels labels . Labels ,
relabelConfigs [ ] * relabel . Config ,
2020-06-24 13:41:52 +00:00
client WriteClient ,
2020-04-25 03:39:46 +00:00
flushDeadline time . Duration ,
2020-09-24 18:44:18 +00:00
interner * pool ,
2020-10-15 21:53:59 +00:00
highestRecvTimestamp * maxTimestamp ,
2020-11-19 15:23:03 +00:00
sm ReadyScrapeManager ,
2020-04-25 03:39:46 +00:00
) * QueueManager {
2017-08-11 18:45:52 +00:00
if logger == nil {
logger = log . NewNopLogger ( )
}
2019-03-05 12:21:11 +00:00
2019-12-12 20:47:23 +00:00
logger = log . With ( logger , remoteName , client . Name ( ) , endpoint , client . Endpoint ( ) )
2017-05-10 09:44:13 +00:00
t := & QueueManager {
2019-03-05 12:21:11 +00:00
logger : logger ,
2018-05-23 14:03:54 +00:00
flushDeadline : flushDeadline ,
2017-05-10 09:44:13 +00:00
cfg : cfg ,
2020-11-19 15:23:03 +00:00
mcfg : mCfg ,
2017-05-10 09:44:13 +00:00
externalLabels : externalLabels ,
relabelConfigs : relabelConfigs ,
2020-03-31 03:39:29 +00:00
storeClient : client ,
2017-05-10 09:44:13 +00:00
2019-08-07 19:39:07 +00:00
seriesLabels : make ( map [ uint64 ] labels . Labels ) ,
2018-09-07 21:26:04 +00:00
seriesSegmentIndexes : make ( map [ uint64 ] int ) ,
droppedSeries : make ( map [ uint64 ] struct { } ) ,
2018-12-04 17:32:14 +00:00
numShards : cfg . MinShards ,
2017-05-10 09:44:13 +00:00
reshardChan : make ( chan int ) ,
quit : make ( chan struct { } ) ,
2018-09-07 21:26:04 +00:00
samplesIn : samplesIn ,
2019-02-20 07:51:08 +00:00
samplesDropped : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2017-05-10 09:44:13 +00:00
samplesOut : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
samplesOutDuration : newEWMARate ( ewmaWeight , shardUpdateDuration ) ,
2020-02-03 21:47:03 +00:00
2020-09-24 18:44:18 +00:00
metrics : metrics ,
interner : interner ,
highestRecvTimestamp : highestRecvTimestamp ,
2019-03-01 19:04:26 +00:00
}
2018-09-07 21:26:04 +00:00
2020-03-20 16:34:15 +00:00
t . watcher = wal . NewWatcher ( watcherMetrics , readerMetrics , logger , client . Name ( ) , t , walDir )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher = NewMetadataWatcher ( logger , sm , client . Name ( ) , t , t . mcfg . SendInterval , flushDeadline )
}
2019-03-05 12:21:11 +00:00
t . shards = t . newShards ( )
2017-05-10 09:44:13 +00:00
return t
}
2020-11-19 15:23:03 +00:00
// AppendMetadata sends metadata the remote storage. Metadata is sent all at once and is not parallelized.
func ( t * QueueManager ) AppendMetadata ( ctx context . Context , metadata [ ] scrape . MetricMetadata ) {
mm := make ( [ ] prompb . MetricMetadata , 0 , len ( metadata ) )
for _ , entry := range metadata {
mm = append ( mm , prompb . MetricMetadata {
MetricFamilyName : entry . Metric ,
Help : entry . Help ,
Type : metricTypeToMetricTypeProto ( entry . Type ) ,
Unit : entry . Unit ,
} )
}
err := t . sendMetadataWithBackoff ( ctx , mm )
if err != nil {
t . metrics . failedMetadataTotal . Add ( float64 ( len ( metadata ) ) )
level . Error ( t . logger ) . Log ( "msg" , "non-recoverable error while sending metadata" , "count" , len ( metadata ) , "err" , err )
}
}
func ( t * QueueManager ) sendMetadataWithBackoff ( ctx context . Context , metadata [ ] prompb . MetricMetadata ) error {
// Build the WriteRequest with no samples.
req , _ , err := buildWriteRequest ( nil , metadata , nil )
if err != nil {
return err
}
metadataCount := len ( metadata )
attemptStore := func ( try int ) error {
span , ctx := opentracing . StartSpanFromContext ( ctx , "Remote Metadata Send Batch" )
defer span . Finish ( )
span . SetTag ( "metadata" , metadataCount )
span . SetTag ( "try" , try )
span . SetTag ( "remote_name" , t . storeClient . Name ( ) )
span . SetTag ( "remote_url" , t . storeClient . Endpoint ( ) )
begin := time . Now ( )
err := t . storeClient . Store ( ctx , req )
t . metrics . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
if err != nil {
span . LogKV ( "error" , err )
ext . Error . Set ( span , true )
return err
}
return nil
}
retry := func ( ) {
t . metrics . retriedMetadataTotal . Add ( float64 ( len ( metadata ) ) )
}
2021-02-04 13:38:32 +00:00
err = sendWriteRequestWithBackoff ( ctx , t . cfg , t . logger , attemptStore , retry )
2020-11-19 15:23:03 +00:00
if err != nil {
return err
}
t . metrics . metadataTotal . Add ( float64 ( len ( metadata ) ) )
t . metrics . metadataBytesTotal . Add ( float64 ( len ( req ) ) )
return nil
}
2018-09-07 21:26:04 +00:00
// Append queues a sample to be sent to the remote storage. Blocks until all samples are
// enqueued on their shards or a shutdown signal is received.
2019-09-19 09:15:41 +00:00
func ( t * QueueManager ) Append ( samples [ ] record . RefSample ) bool {
2019-06-27 18:48:21 +00:00
outer :
2019-08-12 16:22:02 +00:00
for _ , s := range samples {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
2019-08-12 16:22:02 +00:00
lbls , ok := t . seriesLabels [ s . Ref ]
2019-06-27 18:48:21 +00:00
if ! ok {
2020-04-25 03:39:46 +00:00
t . metrics . droppedSamplesTotal . Inc ( )
2019-02-20 07:51:08 +00:00
t . samplesDropped . incr ( 1 )
2019-08-12 16:22:02 +00:00
if _ , ok := t . droppedSeries [ s . Ref ] ; ! ok {
2020-04-11 08:22:18 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Dropped sample for series that was not explicitly dropped via relabelling" , "ref" , s . Ref )
2018-09-07 21:26:04 +00:00
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
continue
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2019-01-18 12:48:16 +00:00
// This will only loop if the queues are being resharded.
backoff := t . cfg . MinBackoff
2018-09-07 21:26:04 +00:00
for {
select {
case <- t . quit :
return false
default :
}
2017-05-10 09:44:13 +00:00
2019-08-12 16:22:02 +00:00
if t . shards . enqueue ( s . Ref , sample {
labels : lbls ,
t : s . T ,
v : s . V ,
} ) {
2018-09-07 21:26:04 +00:00
continue outer
}
2019-01-18 12:48:16 +00:00
2020-04-25 03:39:46 +00:00
t . metrics . enqueueRetriesTotal . Inc ( )
2018-09-07 21:26:04 +00:00
time . Sleep ( time . Duration ( backoff ) )
backoff = backoff * 2
if backoff > t . cfg . MaxBackoff {
backoff = t . cfg . MaxBackoff
}
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
return true
2017-05-10 09:44:13 +00:00
}
// Start the queue manager sending samples to the remote storage.
// Does not block.
func ( t * QueueManager ) Start ( ) {
2020-06-26 06:33:52 +00:00
// Register and initialise some metrics.
t . metrics . register ( )
2020-04-25 03:39:46 +00:00
t . metrics . shardCapacity . Set ( float64 ( t . cfg . Capacity ) )
t . metrics . maxNumShards . Set ( float64 ( t . cfg . MaxShards ) )
t . metrics . minNumShards . Set ( float64 ( t . cfg . MinShards ) )
t . metrics . desiredNumShards . Set ( float64 ( t . cfg . MinShards ) )
2020-10-28 11:39:36 +00:00
t . metrics . maxSamplesPerSend . Set ( float64 ( t . cfg . MaxSamplesPerSend ) )
2019-04-23 08:49:17 +00:00
2018-09-07 21:26:04 +00:00
t . shards . start ( t . numShards )
t . watcher . Start ( )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher . Start ( )
}
2018-09-07 21:26:04 +00:00
2017-05-10 09:44:13 +00:00
t . wg . Add ( 2 )
go t . updateShardsLoop ( )
go t . reshardLoop ( )
}
// Stop stops sending samples to the remote storage and waits for pending
// sends to complete.
func ( t * QueueManager ) Stop ( ) {
2017-08-11 18:45:52 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Stopping remote storage..." )
2018-09-07 21:26:04 +00:00
defer level . Info ( t . logger ) . Log ( "msg" , "Remote storage stopped." )
2017-05-10 09:44:13 +00:00
close ( t . quit )
2019-04-16 10:25:19 +00:00
t . wg . Wait ( )
2020-11-19 15:23:03 +00:00
// Wait for all QueueManager routines to end before stopping shards, metadata watcher, and WAL watcher. This
2019-04-16 10:25:19 +00:00
// is to ensure we don't end up executing a reshard and shards.stop() at the same time, which
// causes a closed channel panic.
2018-09-07 21:26:04 +00:00
t . shards . stop ( )
t . watcher . Stop ( )
2020-11-19 15:23:03 +00:00
if t . mcfg . Send {
t . metadataWatcher . Stop ( )
}
2019-03-13 10:02:36 +00:00
// On shutdown, release the strings in the labels from the intern pool.
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
2019-03-13 10:02:36 +00:00
for _ , labels := range t . seriesLabels {
2020-09-24 18:44:18 +00:00
t . releaseLabels ( labels )
2019-03-13 10:02:36 +00:00
}
2019-09-13 17:23:58 +00:00
t . seriesMtx . Unlock ( )
2020-04-25 03:39:46 +00:00
t . metrics . unregister ( )
2018-09-07 21:26:04 +00:00
}
// StoreSeries keeps track of which series we know about for lookups when sending samples to remote.
2019-09-19 09:15:41 +00:00
func ( t * QueueManager ) StoreSeries ( series [ ] record . RefSeries , index int ) {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
for _ , s := range series {
2021-01-22 15:03:10 +00:00
// Just make sure all the Refs of Series will insert into seriesSegmentIndexes map for tracking.
t . seriesSegmentIndexes [ s . Ref ] = index
2019-03-08 16:29:25 +00:00
ls := processExternalLabels ( s . Labels , t . externalLabels )
2019-08-07 19:39:07 +00:00
lbls := relabel . Process ( ls , t . relabelConfigs ... )
if len ( lbls ) == 0 {
2018-09-07 21:26:04 +00:00
t . droppedSeries [ s . Ref ] = struct { } { }
continue
}
2020-09-24 18:44:18 +00:00
t . internLabels ( lbls )
2019-03-11 23:44:23 +00:00
2019-03-13 10:02:36 +00:00
// We should not ever be replacing a series labels in the map, but just
// in case we do we need to ensure we do not leak the replaced interned
// strings.
2019-06-27 18:48:21 +00:00
if orig , ok := t . seriesLabels [ s . Ref ] ; ok {
2020-09-24 18:44:18 +00:00
t . releaseLabels ( orig )
2019-03-11 23:44:23 +00:00
}
2019-08-07 19:39:07 +00:00
t . seriesLabels [ s . Ref ] = lbls
2018-09-07 21:26:04 +00:00
}
}
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
// SeriesReset is used when reading a checkpoint. WAL Watcher should have
// stored series records with the checkpoints index number, so we can now
// delete any ref ID's lower than that # from the two maps.
func ( t * QueueManager ) SeriesReset ( index int ) {
2019-09-13 17:23:58 +00:00
t . seriesMtx . Lock ( )
defer t . seriesMtx . Unlock ( )
2018-09-07 21:26:04 +00:00
// Check for series that are in segments older than the checkpoint
// that were not also present in the checkpoint.
for k , v := range t . seriesSegmentIndexes {
if v < index {
delete ( t . seriesSegmentIndexes , k )
2020-09-24 18:44:18 +00:00
t . releaseLabels ( t . seriesLabels [ k ] )
2019-03-11 23:44:23 +00:00
delete ( t . seriesLabels , k )
2019-09-13 17:23:58 +00:00
delete ( t . droppedSeries , k )
2018-09-07 21:26:04 +00:00
}
}
}
2017-08-11 18:45:52 +00:00
2020-03-31 03:39:29 +00:00
// SetClient updates the client used by a queue. Used when only client specific
// fields are updated to avoid restarting the queue.
2020-06-24 13:41:52 +00:00
func ( t * QueueManager ) SetClient ( c WriteClient ) {
2020-03-31 03:39:29 +00:00
t . clientMtx . Lock ( )
t . storeClient = c
t . clientMtx . Unlock ( )
}
2020-06-24 13:41:52 +00:00
func ( t * QueueManager ) client ( ) WriteClient {
2020-03-31 03:39:29 +00:00
t . clientMtx . RLock ( )
defer t . clientMtx . RUnlock ( )
return t . storeClient
}
2020-09-24 18:44:18 +00:00
func ( t * QueueManager ) internLabels ( lbls labels . Labels ) {
2019-08-07 19:39:07 +00:00
for i , l := range lbls {
2020-09-24 18:44:18 +00:00
lbls [ i ] . Name = t . interner . intern ( l . Name )
lbls [ i ] . Value = t . interner . intern ( l . Value )
2019-08-07 19:39:07 +00:00
}
}
2020-09-24 18:44:18 +00:00
func ( t * QueueManager ) releaseLabels ( ls labels . Labels ) {
2019-03-11 23:44:23 +00:00
for _ , l := range ls {
2020-09-24 18:44:18 +00:00
t . interner . release ( l . Name )
t . interner . release ( l . Value )
2019-03-11 23:44:23 +00:00
}
}
2019-03-13 10:02:36 +00:00
// processExternalLabels merges externalLabels into ls. If ls contains
2019-03-08 16:29:25 +00:00
// a label in externalLabels, the value in ls wins.
2019-11-18 19:53:33 +00:00
func processExternalLabels ( ls labels . Labels , externalLabels labels . Labels ) labels . Labels {
2019-03-08 16:29:25 +00:00
i , j , result := 0 , 0 , make ( labels . Labels , 0 , len ( ls ) + len ( externalLabels ) )
for i < len ( ls ) && j < len ( externalLabels ) {
if ls [ i ] . Name < externalLabels [ j ] . Name {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
} else if ls [ i ] . Name > externalLabels [ j ] . Name {
result = append ( result , externalLabels [ j ] )
j ++
} else {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
i ++
j ++
2018-09-07 21:26:04 +00:00
}
}
2019-03-08 16:29:25 +00:00
for ; i < len ( ls ) ; i ++ {
result = append ( result , labels . Label {
Name : ls [ i ] . Name ,
Value : ls [ i ] . Value ,
} )
}
result = append ( result , externalLabels [ j : ] ... )
return result
2017-05-10 09:44:13 +00:00
}
func ( t * QueueManager ) updateShardsLoop ( ) {
defer t . wg . Done ( )
2017-10-09 16:36:20 +00:00
ticker := time . NewTicker ( shardUpdateDuration )
defer ticker . Stop ( )
2017-05-10 09:44:13 +00:00
for {
select {
2017-10-09 16:36:20 +00:00
case <- ticker . C :
2019-10-21 21:54:25 +00:00
desiredShards := t . calculateDesiredShards ( )
2020-04-20 22:20:39 +00:00
if ! t . shouldReshard ( desiredShards ) {
2019-10-21 21:54:25 +00:00
continue
}
// Resharding can take some time, and we want this loop
// to stay close to shardUpdateDuration.
select {
case t . reshardChan <- desiredShards :
2019-11-26 13:22:56 +00:00
level . Info ( t . logger ) . Log ( "msg" , "Remote storage resharding" , "from" , t . numShards , "to" , desiredShards )
2019-10-21 21:54:25 +00:00
t . numShards = desiredShards
default :
level . Info ( t . logger ) . Log ( "msg" , "Currently resharding, skipping." )
}
2017-05-10 09:44:13 +00:00
case <- t . quit :
return
}
}
}
2020-04-20 22:20:39 +00:00
// shouldReshard returns if resharding should occur
func ( t * QueueManager ) shouldReshard ( desiredShards int ) bool {
if desiredShards == t . numShards {
return false
}
// We shouldn't reshard if Prometheus hasn't been able to send to the
// remote endpoint successfully within some period of time.
minSendTimestamp := time . Now ( ) . Add ( - 2 * time . Duration ( t . cfg . BatchSendDeadline ) ) . Unix ( )
2020-07-30 07:45:42 +00:00
lsts := t . lastSendTimestamp . Load ( )
2020-04-20 22:20:39 +00:00
if lsts < minSendTimestamp {
level . Warn ( t . logger ) . Log ( "msg" , "Skipping resharding, last successful send was beyond threshold" , "lastSendTimestamp" , lsts , "minSendTimestamp" , minSendTimestamp )
return false
}
return true
}
2019-10-21 21:54:25 +00:00
// calculateDesiredShards returns the number of desired shards, which will be
// the current QueueManager.numShards if resharding should not occur for reasons
// outlined in this functions implementation. It is up to the caller to reshard, or not,
// based on the return value.
func ( t * QueueManager ) calculateDesiredShards ( ) int {
2017-05-10 09:44:13 +00:00
t . samplesOut . tick ( )
2019-02-20 07:51:08 +00:00
t . samplesDropped . tick ( )
2017-05-10 09:44:13 +00:00
t . samplesOutDuration . tick ( )
// We use the number of incoming samples as a prediction of how much work we
// will need to do next iteration. We add to this any pending samples
// (received - send) so we can catch up with any backlog. We use the average
// outgoing batch latency to work out how many shards we need.
var (
2019-08-13 09:10:21 +00:00
samplesInRate = t . samplesIn . rate ( )
samplesOutRate = t . samplesOut . rate ( )
samplesKeptRatio = samplesOutRate / ( t . samplesDropped . rate ( ) + samplesOutRate )
samplesOutDuration = t . samplesOutDuration . rate ( ) / float64 ( time . Second )
samplesPendingRate = samplesInRate * samplesKeptRatio - samplesOutRate
2020-04-25 03:39:46 +00:00
highestSent = t . metrics . highestSentTimestamp . Get ( )
2020-09-24 18:44:18 +00:00
highestRecv = t . highestRecvTimestamp . Get ( )
2020-01-02 17:30:56 +00:00
delay = highestRecv - highestSent
samplesPending = delay * samplesInRate * samplesKeptRatio
2017-05-10 09:44:13 +00:00
)
2019-08-13 09:10:21 +00:00
if samplesOutRate <= 0 {
2019-10-21 21:54:25 +00:00
return t . numShards
2017-05-10 09:44:13 +00:00
}
2019-12-23 18:03:54 +00:00
// When behind we will try to catch up on a proporation of samples per tick.
// This works similarly to an integral accumulator in that pending samples
// is the result of the error integral.
const integralGain = 0.1 / float64 ( shardUpdateDuration / time . Second )
2017-05-10 09:44:13 +00:00
var (
2019-08-13 09:10:21 +00:00
timePerSample = samplesOutDuration / samplesOutRate
2019-12-23 18:03:54 +00:00
desiredShards = timePerSample * ( samplesInRate * samplesKeptRatio + integralGain * samplesPending )
2017-05-10 09:44:13 +00:00
)
2020-04-25 03:39:46 +00:00
t . metrics . desiredNumShards . Set ( desiredShards )
2019-08-10 15:24:58 +00:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.calculateDesiredShards" ,
2019-08-13 09:10:21 +00:00
"samplesInRate" , samplesInRate ,
"samplesOutRate" , samplesOutRate ,
2019-03-01 19:04:26 +00:00
"samplesKeptRatio" , samplesKeptRatio ,
2019-08-13 09:10:21 +00:00
"samplesPendingRate" , samplesPendingRate ,
2019-03-01 19:04:26 +00:00
"samplesPending" , samplesPending ,
"samplesOutDuration" , samplesOutDuration ,
"timePerSample" , timePerSample ,
"desiredShards" , desiredShards ,
"highestSent" , highestSent ,
2019-08-13 09:10:21 +00:00
"highestRecv" , highestRecv ,
)
2017-05-10 09:44:13 +00:00
// Changes in the number of shards must be greater than shardToleranceFraction.
var (
lowerBound = float64 ( t . numShards ) * ( 1. - shardToleranceFraction )
upperBound = float64 ( t . numShards ) * ( 1. + shardToleranceFraction )
)
2017-08-11 18:45:52 +00:00
level . Debug ( t . logger ) . Log ( "msg" , "QueueManager.updateShardsLoop" ,
"lowerBound" , lowerBound , "desiredShards" , desiredShards , "upperBound" , upperBound )
2017-05-10 09:44:13 +00:00
if lowerBound <= desiredShards && desiredShards <= upperBound {
2019-10-21 21:54:25 +00:00
return t . numShards
2017-05-10 09:44:13 +00:00
}
numShards := int ( math . Ceil ( desiredShards ) )
2020-01-02 17:30:56 +00:00
// Do not downshard if we are more than ten seconds back.
if numShards < t . numShards && delay > 10.0 {
level . Debug ( t . logger ) . Log ( "msg" , "Not downsharding due to being too far behind" )
return t . numShards
}
2017-05-10 09:44:13 +00:00
if numShards > t . cfg . MaxShards {
numShards = t . cfg . MaxShards
2018-12-04 17:32:14 +00:00
} else if numShards < t . cfg . MinShards {
numShards = t . cfg . MinShards
2017-05-10 09:44:13 +00:00
}
2019-10-21 21:54:25 +00:00
return numShards
2017-05-10 09:44:13 +00:00
}
func ( t * QueueManager ) reshardLoop ( ) {
defer t . wg . Done ( )
for {
select {
case numShards := <- t . reshardChan :
2018-09-07 21:26:04 +00:00
// We start the newShards after we have stopped (the therefore completely
// flushed) the oldShards, to guarantee we only every deliver samples in
// order.
t . shards . stop ( )
t . shards . start ( numShards )
2017-05-10 09:44:13 +00:00
case <- t . quit :
return
}
}
}
2018-09-07 21:26:04 +00:00
func ( t * QueueManager ) newShards ( ) * shards {
s := & shards {
qm : t ,
done : make ( chan struct { } ) ,
}
return s
}
2017-05-10 09:44:13 +00:00
2019-08-12 16:22:02 +00:00
type sample struct {
labels labels . Labels
t int64
v float64
}
2017-05-10 09:44:13 +00:00
type shards struct {
2018-09-07 21:26:04 +00:00
mtx sync . RWMutex // With the WAL, this is never actually contended.
qm * QueueManager
2019-08-12 16:22:02 +00:00
queues [ ] chan sample
2018-09-07 21:26:04 +00:00
// Emulate a wait group with a channel and an atomic int, as you
// cannot select on a wait group.
2018-02-01 13:20:38 +00:00
done chan struct { }
2020-07-30 07:45:42 +00:00
running atomic . Int32
2018-09-07 21:26:04 +00:00
// Soft shutdown context will prevent new enqueues and deadlocks.
softShutdown chan struct { }
// Hard shutdown context is used to terminate outgoing HTTP connections
// after giving them a chance to terminate.
2020-06-25 20:48:30 +00:00
hardShutdown context . CancelFunc
2020-07-30 07:45:42 +00:00
droppedOnHardShutdown atomic . Uint32
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
// start the shards; must be called before any call to enqueue.
func ( s * shards ) start ( n int ) {
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
2020-06-25 20:48:30 +00:00
s . qm . metrics . pendingSamples . Set ( 0 )
s . qm . metrics . numShards . Set ( float64 ( n ) )
2019-08-12 16:22:02 +00:00
newQueues := make ( [ ] chan sample , n )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < n ; i ++ {
2019-08-12 16:22:02 +00:00
newQueues [ i ] = make ( chan sample , s . qm . cfg . Capacity )
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
s . queues = newQueues
var hardShutdownCtx context . Context
hardShutdownCtx , s . hardShutdown = context . WithCancel ( context . Background ( ) )
s . softShutdown = make ( chan struct { } )
2020-07-30 07:45:42 +00:00
s . running . Store ( int32 ( n ) )
2018-09-07 21:26:04 +00:00
s . done = make ( chan struct { } )
2020-07-30 07:45:42 +00:00
s . droppedOnHardShutdown . Store ( 0 )
2018-09-07 21:26:04 +00:00
for i := 0 ; i < n ; i ++ {
go s . runShard ( hardShutdownCtx , i , newQueues [ i ] )
2017-05-10 09:44:13 +00:00
}
}
2018-09-07 21:26:04 +00:00
// stop the shards; subsequent call to enqueue will return false.
func ( s * shards ) stop ( ) {
// Attempt a clean shutdown, but only wait flushDeadline for all the shards
2019-03-03 11:35:29 +00:00
// to cleanly exit. As we're doing RPCs, enqueue can block indefinitely.
2018-09-07 21:26:04 +00:00
// We must be able so call stop concurrently, hence we can only take the
// RLock here.
s . mtx . RLock ( )
close ( s . softShutdown )
s . mtx . RUnlock ( )
// Enqueue should now be unblocked, so we can take the write lock. This
// also ensures we don't race with writes to the queues, and get a panic:
// send on closed channel.
s . mtx . Lock ( )
defer s . mtx . Unlock ( )
for _ , queue := range s . queues {
close ( queue )
2017-05-10 09:44:13 +00:00
}
2018-01-31 15:41:48 +00:00
select {
2018-02-01 13:20:38 +00:00
case <- s . done :
2018-05-29 08:51:29 +00:00
return
2018-09-07 21:26:04 +00:00
case <- time . After ( s . qm . flushDeadline ) :
2018-01-31 15:41:48 +00:00
}
2018-05-29 08:51:29 +00:00
2018-05-29 10:35:43 +00:00
// Force an unclean shutdown.
2018-09-07 21:26:04 +00:00
s . hardShutdown ( )
2018-05-29 08:51:29 +00:00
<- s . done
2020-07-30 07:45:42 +00:00
if dropped := s . droppedOnHardShutdown . Load ( ) ; dropped > 0 {
2020-06-25 20:48:30 +00:00
level . Error ( s . qm . logger ) . Log ( "msg" , "Failed to flush all samples on shutdown" , "count" , dropped )
}
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
// enqueue a sample. If we are currently in the process of shutting down or resharding,
// will return false; in this case, you should back off and retry.
2019-08-12 16:22:02 +00:00
func ( s * shards ) enqueue ( ref uint64 , sample sample ) bool {
2018-09-07 21:26:04 +00:00
s . mtx . RLock ( )
defer s . mtx . RUnlock ( )
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
select {
case <- s . softShutdown :
return false
default :
}
2017-05-10 09:44:13 +00:00
2018-09-07 21:26:04 +00:00
shard := uint64 ( ref ) % uint64 ( len ( s . queues ) )
2017-05-10 09:44:13 +00:00
select {
2018-09-07 21:26:04 +00:00
case <- s . softShutdown :
return false
2017-05-10 09:44:13 +00:00
case s . queues [ shard ] <- sample :
2020-06-25 20:48:30 +00:00
s . qm . metrics . pendingSamples . Inc ( )
2017-05-10 09:44:13 +00:00
return true
}
}
2019-08-12 16:22:02 +00:00
func ( s * shards ) runShard ( ctx context . Context , shardID int , queue chan sample ) {
2018-02-01 13:20:38 +00:00
defer func ( ) {
2020-07-30 07:45:42 +00:00
if s . running . Dec ( ) == 0 {
2018-02-01 13:20:38 +00:00
close ( s . done )
}
} ( )
2019-08-12 16:22:02 +00:00
shardNum := strconv . Itoa ( shardID )
2017-05-10 09:44:13 +00:00
// Send batches of at most MaxSamplesPerSend samples to the remote storage.
// If we have fewer samples than that, flush them out after a deadline
// anyways.
2019-08-12 16:22:02 +00:00
var (
max = s . qm . cfg . MaxSamplesPerSend
nPending = 0
pendingSamples = allocateTimeSeries ( max )
buf [ ] byte
)
2019-06-27 18:48:21 +00:00
2018-08-24 14:55:21 +00:00
timer := time . NewTimer ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2018-03-12 14:27:48 +00:00
stop := func ( ) {
2018-03-09 12:00:26 +00:00
if ! timer . Stop ( ) {
select {
case <- timer . C :
default :
}
}
2018-03-12 14:27:48 +00:00
}
defer stop ( )
2018-01-24 12:36:29 +00:00
2017-05-10 09:44:13 +00:00
for {
select {
2018-09-07 21:26:04 +00:00
case <- ctx . Done ( ) :
2020-06-25 20:48:30 +00:00
// In this case we drop all samples in the buffer and the queue.
// Remove them from pending and mark them as failed.
droppedSamples := nPending + len ( queue )
s . qm . metrics . pendingSamples . Sub ( float64 ( droppedSamples ) )
s . qm . metrics . failedSamplesTotal . Add ( float64 ( droppedSamples ) )
2020-07-30 07:45:42 +00:00
s . droppedOnHardShutdown . Add ( uint32 ( droppedSamples ) )
2018-05-29 08:51:29 +00:00
return
2017-05-10 09:44:13 +00:00
case sample , ok := <- queue :
if ! ok {
2019-08-12 16:22:02 +00:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "Flushing samples to remote storage..." , "count" , nPending )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
2020-04-25 03:39:46 +00:00
s . qm . metrics . pendingSamples . Sub ( float64 ( nPending ) )
2017-08-11 18:45:52 +00:00
level . Debug ( s . qm . logger ) . Log ( "msg" , "Done flushing." )
2017-05-10 09:44:13 +00:00
}
return
}
2018-09-07 21:26:04 +00:00
// Number of pending samples is limited by the fact that sendSamples (via sendSamplesWithBackoff)
2019-08-12 16:22:02 +00:00
// retries endlessly, so once we reach max samples, if we can never send to the endpoint we'll
// stop reading from the queue. This makes it safe to reference pendingSamples by index.
pendingSamples [ nPending ] . Labels = labelsToLabelsProto ( sample . labels , pendingSamples [ nPending ] . Labels )
pendingSamples [ nPending ] . Samples [ 0 ] . Timestamp = sample . t
pendingSamples [ nPending ] . Samples [ 0 ] . Value = sample . v
nPending ++
2017-05-10 09:44:13 +00:00
2019-08-12 16:22:02 +00:00
if nPending >= max {
s . sendSamples ( ctx , pendingSamples , & buf )
nPending = 0
2020-04-25 03:39:46 +00:00
s . qm . metrics . pendingSamples . Sub ( float64 ( max ) )
2018-03-09 12:00:26 +00:00
2018-03-12 14:27:48 +00:00
stop ( )
2018-08-24 14:55:21 +00:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 09:44:13 +00:00
}
2018-03-09 12:00:26 +00:00
2018-01-24 12:36:29 +00:00
case <- timer . C :
2019-08-12 16:22:02 +00:00
if nPending > 0 {
level . Debug ( s . qm . logger ) . Log ( "msg" , "runShard timer ticked, sending samples" , "samples" , nPending , "shard" , shardNum )
s . sendSamples ( ctx , pendingSamples [ : nPending ] , & buf )
2020-04-25 03:39:46 +00:00
s . qm . metrics . pendingSamples . Sub ( float64 ( nPending ) )
2020-02-10 03:51:21 +00:00
nPending = 0
2017-05-10 09:44:13 +00:00
}
2018-08-24 14:55:21 +00:00
timer . Reset ( time . Duration ( s . qm . cfg . BatchSendDeadline ) )
2017-05-10 09:44:13 +00:00
}
}
}
2019-06-27 18:48:21 +00:00
func ( s * shards ) sendSamples ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) {
2017-05-10 09:44:13 +00:00
begin := time . Now ( )
2019-06-27 18:48:21 +00:00
err := s . sendSamplesWithBackoff ( ctx , samples , buf )
2019-02-12 14:58:25 +00:00
if err != nil {
2018-09-07 21:26:04 +00:00
level . Error ( s . qm . logger ) . Log ( "msg" , "non-recoverable error" , "count" , len ( samples ) , "err" , err )
2020-04-25 03:39:46 +00:00
s . qm . metrics . failedSamplesTotal . Add ( float64 ( len ( samples ) ) )
2018-09-07 21:26:04 +00:00
}
2017-05-10 09:44:13 +00:00
2018-04-08 09:51:54 +00:00
// These counters are used to calculate the dynamic sharding, and as such
2017-05-10 09:44:13 +00:00
// should be maintained irrespective of success or failure.
s . qm . samplesOut . incr ( int64 ( len ( samples ) ) )
s . qm . samplesOutDuration . incr ( int64 ( time . Since ( begin ) ) )
2020-07-30 07:45:42 +00:00
s . qm . lastSendTimestamp . Store ( time . Now ( ) . Unix ( ) )
2017-05-10 09:44:13 +00:00
}
// sendSamples to the remote storage with backoff for recoverable errors.
2019-06-27 18:48:21 +00:00
func ( s * shards ) sendSamplesWithBackoff ( ctx context . Context , samples [ ] prompb . TimeSeries , buf * [ ] byte ) error {
2020-11-19 15:23:03 +00:00
// Build the WriteRequest with no metadata.
req , highest , err := buildWriteRequest ( samples , nil , * buf )
2018-09-07 21:26:04 +00:00
if err != nil {
2019-03-01 19:04:26 +00:00
// Failing to build the write request is non-recoverable, since it will
// only error if marshaling the proto to bytes fails.
2018-09-07 21:26:04 +00:00
return err
}
2019-03-01 19:04:26 +00:00
2020-06-01 15:21:13 +00:00
reqSize := len ( * buf )
sampleCount := len ( samples )
* buf = req
// An anonymous function allows us to defer the completion of our per-try spans
// without causing a memory leak, and it has the nice effect of not propagating any
// parameters for sendSamplesWithBackoff/3.
2020-11-19 15:23:03 +00:00
attemptStore := func ( try int ) error {
2020-06-01 15:21:13 +00:00
span , ctx := opentracing . StartSpanFromContext ( ctx , "Remote Send Batch" )
defer span . Finish ( )
span . SetTag ( "samples" , sampleCount )
span . SetTag ( "request_size" , reqSize )
span . SetTag ( "try" , try )
span . SetTag ( "remote_name" , s . qm . storeClient . Name ( ) )
span . SetTag ( "remote_url" , s . qm . storeClient . Endpoint ( ) )
begin := time . Now ( )
2020-11-19 15:23:03 +00:00
s . qm . metrics . samplesTotal . Add ( float64 ( sampleCount ) )
2020-06-01 15:21:13 +00:00
err := s . qm . client ( ) . Store ( ctx , * buf )
s . qm . metrics . sentBatchDuration . Observe ( time . Since ( begin ) . Seconds ( ) )
if err != nil {
span . LogKV ( "error" , err )
ext . Error . Set ( span , true )
return err
}
return nil
}
2020-11-19 15:23:03 +00:00
onRetry := func ( ) {
s . qm . metrics . retriedSamplesTotal . Add ( float64 ( sampleCount ) )
}
2021-02-04 13:38:32 +00:00
err = sendWriteRequestWithBackoff ( ctx , s . qm . cfg , s . qm . logger , attemptStore , onRetry )
2020-11-19 15:23:03 +00:00
if err != nil {
return err
}
s . qm . metrics . samplesBytesTotal . Add ( float64 ( reqSize ) )
s . qm . metrics . highestSentTimestamp . Set ( float64 ( highest / 1000 ) )
return nil
}
2021-02-04 13:38:32 +00:00
func sendWriteRequestWithBackoff ( ctx context . Context , cfg config . QueueConfig , l log . Logger , attempt func ( int ) error , onRetry func ( ) ) error {
2020-11-19 15:23:03 +00:00
backoff := cfg . MinBackoff
2021-02-10 22:25:37 +00:00
sleepDuration := model . Duration ( 0 )
2020-11-19 15:23:03 +00:00
try := 0
2018-09-07 21:26:04 +00:00
for {
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2017-05-10 09:44:13 +00:00
2020-11-19 15:23:03 +00:00
err := attempt ( try )
2018-09-07 21:26:04 +00:00
2020-11-19 15:23:03 +00:00
if err == nil {
return nil
}
2017-05-10 09:44:13 +00:00
2020-11-19 15:23:03 +00:00
// If the error is unrecoverable, we should not retry.
2021-02-10 22:25:37 +00:00
backoffErr , ok := err . ( RecoverableError )
if ! ok {
2020-11-19 15:23:03 +00:00
return err
}
2020-06-01 15:21:13 +00:00
2021-02-10 22:25:37 +00:00
sleepDuration = backoff
if backoffErr . retryAfter > 0 {
sleepDuration = backoffErr . retryAfter
level . Info ( l ) . Log ( "msg" , "Retrying after duration specified by Retry-After header" , "duration" , sleepDuration )
} else if backoffErr . retryAfter < 0 {
level . Debug ( l ) . Log ( "msg" , "retry-after cannot be in past, retrying using default backoff mechanism" )
}
select {
case <- ctx . Done ( ) :
case <- time . After ( time . Duration ( sleepDuration ) ) :
}
2020-11-19 15:23:03 +00:00
// If we make it this far, we've encountered a recoverable error and will retry.
onRetry ( )
2021-01-27 16:38:34 +00:00
level . Warn ( l ) . Log ( "msg" , "Failed to send batch, retrying" , "err" , err )
2018-09-07 21:26:04 +00:00
2021-02-10 22:25:37 +00:00
backoff = sleepDuration * 2
2020-11-19 15:23:03 +00:00
if backoff > cfg . MaxBackoff {
backoff = cfg . MaxBackoff
2017-05-10 09:44:13 +00:00
}
2020-06-01 15:21:13 +00:00
2020-11-19 15:23:03 +00:00
try ++
continue
2017-05-10 09:44:13 +00:00
}
2018-09-07 21:26:04 +00:00
}
2020-11-19 15:23:03 +00:00
func buildWriteRequest ( samples [ ] prompb . TimeSeries , metadata [ ] prompb . MetricMetadata , buf [ ] byte ) ( [ ] byte , int64 , error ) {
2018-09-07 21:26:04 +00:00
var highest int64
for _ , ts := range samples {
// At the moment we only ever append a TimeSeries with a single sample in it.
if ts . Samples [ 0 ] . Timestamp > highest {
highest = ts . Samples [ 0 ] . Timestamp
}
}
2020-11-19 15:23:03 +00:00
2018-09-07 21:26:04 +00:00
req := & prompb . WriteRequest {
Timeseries : samples ,
2020-11-19 15:23:03 +00:00
Metadata : metadata ,
2018-09-07 21:26:04 +00:00
}
data , err := proto . Marshal ( req )
if err != nil {
return nil , highest , err
}
2017-05-10 09:44:13 +00:00
2019-06-27 18:48:21 +00:00
// snappy uses len() to see if it needs to allocate a new slice. Make the
// buffer as long as possible.
if buf != nil {
buf = buf [ 0 : cap ( buf ) ]
}
compressed := snappy . Encode ( buf , data )
2018-09-07 21:26:04 +00:00
return compressed , highest , nil
2017-05-10 09:44:13 +00:00
}
2019-08-12 16:22:02 +00:00
func allocateTimeSeries ( capacity int ) [ ] prompb . TimeSeries {
timeseries := make ( [ ] prompb . TimeSeries , capacity )
// We only ever send one sample per timeseries, so preallocate with length one.
for i := range timeseries {
timeseries [ i ] . Samples = [ ] prompb . Sample { { } }
}
return timeseries
}