Allow writing to InfluxDB/OpenTSDB at the same time.

This commit is contained in:
Julius Volz 2015-04-02 20:20:00 +02:00
parent 61fb688dd9
commit 593e565688
6 changed files with 80 additions and 55 deletions

56
main.go
View File

@ -51,9 +51,8 @@ var (
persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.") persistenceStoragePath = flag.String("storage.local.path", "/tmp/metrics", "Base path for metrics storage.")
remoteStorageType = flag.String("storage.remote.type", "", "The type of remote storage to use. Valid values: 'opentsdb', 'influxdb'. If this flag is left empty, no remote storage is used.") opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to. None, if empty.")
opentsdbURL = flag.String("storage.remote.opentsdb-url", "", "The URL of the remote OpenTSDB server to send samples to.") influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to. None, if empty.")
influxdbURL = flag.String("storage.remote.influxdb-url", "", "The URL of the remote InfluxDB server to send samples to.")
remoteStorageTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.") remoteStorageTimeout = flag.Duration("storage.remote.timeout", 30*time.Second, "The timeout to use when sending samples to the remote storage.")
numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.") numMemoryChunks = flag.Int("storage.local.memory-chunks", 1024*1024, "How many chunks to keep in memory. While the size of a chunk is 1kiB, the total memory usage will be significantly higher than this value * 1kiB. Furthermore, for various reasons, more chunks might have to be kept in memory temporarily.")
@ -76,7 +75,7 @@ type prometheus struct {
targetManager retrieval.TargetManager targetManager retrieval.TargetManager
notificationHandler *notification.NotificationHandler notificationHandler *notification.NotificationHandler
storage local.Storage storage local.Storage
remoteStorageQueue *remote.StorageQueueManager remoteStorageQueues []*remote.StorageQueueManager
webService *web.WebService webService *web.WebService
@ -122,26 +121,27 @@ func NewPrometheus() *prometheus {
} }
var sampleAppender storage.SampleAppender var sampleAppender storage.SampleAppender
var remoteStorageQueue *remote.StorageQueueManager var remoteStorageQueues []*remote.StorageQueueManager
if *remoteStorageType == "" { if *opentsdbURL == "" && *influxdbURL == "" {
glog.Warningf("No remote storage implementation selected; not sending any samples to long-term storage") glog.Warningf("No remote storage URLs provided; not sending any samples to long-term storage")
sampleAppender = memStorage sampleAppender = memStorage
} else { } else {
var c remote.StorageClient fanout := storage.Fanout{memStorage}
switch *remoteStorageType {
case "opentsdb": addRemoteStorage := func(c remote.StorageClient) {
c = opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout) qm := remote.NewStorageQueueManager(c, 100*1024)
case "influxdb": fanout = append(fanout, qm)
c = influxdb.NewClient(*influxdbURL, *remoteStorageTimeout) remoteStorageQueues = append(remoteStorageQueues, qm)
default:
glog.Fatalf("Invalid flag value for 'storage.remote.type': %s", *remoteStorageType)
} }
remoteStorageQueue = remote.NewStorageQueueManager(c, 100*1024) if *opentsdbURL != "" {
sampleAppender = storage.Tee{ addRemoteStorage(opentsdb.NewClient(*opentsdbURL, *remoteStorageTimeout))
Appender1: remoteStorageQueue,
Appender2: memStorage,
} }
if *influxdbURL != "" {
addRemoteStorage(influxdb.NewClient(*influxdbURL, *remoteStorageTimeout))
}
sampleAppender = fanout
} }
targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels()) targetManager := retrieval.NewTargetManager(sampleAppender, conf.GlobalLabels())
@ -196,7 +196,7 @@ func NewPrometheus() *prometheus {
targetManager: targetManager, targetManager: targetManager,
notificationHandler: notificationHandler, notificationHandler: notificationHandler,
storage: memStorage, storage: memStorage,
remoteStorageQueue: remoteStorageQueue, remoteStorageQueues: remoteStorageQueues,
webService: webService, webService: webService,
} }
@ -208,8 +208,8 @@ func NewPrometheus() *prometheus {
// down. The method installs an interrupt handler, allowing to trigger a // down. The method installs an interrupt handler, allowing to trigger a
// shutdown by sending SIGTERM to the process. // shutdown by sending SIGTERM to the process.
func (p *prometheus) Serve() { func (p *prometheus) Serve() {
if p.remoteStorageQueue != nil { for _, q := range p.remoteStorageQueues {
go p.remoteStorageQueue.Run() go q.Run()
} }
go p.ruleManager.Run() go p.ruleManager.Run()
go p.notificationHandler.Run() go p.notificationHandler.Run()
@ -239,8 +239,8 @@ func (p *prometheus) Serve() {
glog.Error("Error stopping local storage: ", err) glog.Error("Error stopping local storage: ", err)
} }
if p.remoteStorageQueue != nil { for _, q := range p.remoteStorageQueues {
p.remoteStorageQueue.Stop() q.Stop()
} }
p.notificationHandler.Stop() p.notificationHandler.Stop()
@ -251,8 +251,8 @@ func (p *prometheus) Serve() {
func (p *prometheus) Describe(ch chan<- *registry.Desc) { func (p *prometheus) Describe(ch chan<- *registry.Desc) {
p.notificationHandler.Describe(ch) p.notificationHandler.Describe(ch)
p.storage.Describe(ch) p.storage.Describe(ch)
if p.remoteStorageQueue != nil { for _, q := range p.remoteStorageQueues {
p.remoteStorageQueue.Describe(ch) q.Describe(ch)
} }
} }
@ -260,8 +260,8 @@ func (p *prometheus) Describe(ch chan<- *registry.Desc) {
func (p *prometheus) Collect(ch chan<- registry.Metric) { func (p *prometheus) Collect(ch chan<- registry.Metric) {
p.notificationHandler.Collect(ch) p.notificationHandler.Collect(ch)
p.storage.Collect(ch) p.storage.Collect(ch)
if p.remoteStorageQueue != nil { for _, q := range p.remoteStorageQueues {
p.remoteStorageQueue.Collect(ch) q.Collect(ch)
} }
} }

View File

@ -157,3 +157,8 @@ func (c *Client) Store(samples clientmodel.Samples) error {
} }
return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"]) return fmt.Errorf("failed to write samples into InfluxDB. Error: %s", r["error"])
} }
// Name identifies the client as an InfluxDB client.
func (c Client) Name() string {
return "influxdb"
}

View File

@ -134,3 +134,8 @@ func (c *Client) Store(samples clientmodel.Samples) error {
} }
return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"]) return fmt.Errorf("failed to write %d samples to OpenTSDB, %d succeeded", r["failed"], r["success"])
} }
// Name identifies the client as an OpenTSDB client.
func (c Client) Name() string {
return "opentsdb"
}

View File

@ -46,7 +46,10 @@ const (
// StorageClient defines an interface for sending a batch of samples to an // StorageClient defines an interface for sending a batch of samples to an
// external timeseries database. // external timeseries database.
type StorageClient interface { type StorageClient interface {
// Store stores the given samples in the remote storage.
Store(clientmodel.Samples) error Store(clientmodel.Samples) error
// Name identifies the remote storage implementation.
Name() string
} }
// StorageQueueManager manages a queue of samples to be sent to the Storage // StorageQueueManager manages a queue of samples to be sent to the Storage
@ -67,6 +70,10 @@ type StorageQueueManager struct {
// NewStorageQueueManager builds a new StorageQueueManager. // NewStorageQueueManager builds a new StorageQueueManager.
func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager { func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueueManager {
constLabels := prometheus.Labels{
"type": tsdb.Name(),
}
return &StorageQueueManager{ return &StorageQueueManager{
tsdb: tsdb, tsdb: tsdb,
queue: make(chan *clientmodel.Sample, queueCapacity), queue: make(chan *clientmodel.Sample, queueCapacity),
@ -75,36 +82,41 @@ func NewStorageQueueManager(tsdb StorageClient, queueCapacity int) *StorageQueue
samplesCount: prometheus.NewCounterVec( samplesCount: prometheus.NewCounterVec(
prometheus.CounterOpts{ prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "sent_samples_total", Name: "sent_samples_total",
Help: "Total number of processed samples to be sent to remote storage.", Help: "Total number of processed samples to be sent to remote storage.",
ConstLabels: constLabels,
}, },
[]string{result}, []string{result},
), ),
sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{ sendLatency: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "sent_latency_milliseconds", Name: "sent_latency_milliseconds",
Help: "Latency quantiles for sending sample batches to the remote storage.", Help: "Latency quantiles for sending sample batches to the remote storage.",
ConstLabels: constLabels,
}), }),
sendErrors: prometheus.NewCounter(prometheus.CounterOpts{ sendErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "sent_errors_total", Name: "sent_errors_total",
Help: "Total number of errors sending sample batches to the remote storage.", Help: "Total number of errors sending sample batches to the remote storage.",
ConstLabels: constLabels,
}), }),
queueLength: prometheus.NewGauge(prometheus.GaugeOpts{ queueLength: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace, Namespace: namespace,
Subsystem: subsystem, Subsystem: subsystem,
Name: "queue_length", Name: "queue_length",
Help: "The number of processed samples queued to be sent to the remote storage.", Help: "The number of processed samples queued to be sent to the remote storage.",
ConstLabels: constLabels,
}), }),
queueCapacity: prometheus.MustNewConstMetric( queueCapacity: prometheus.MustNewConstMetric(
prometheus.NewDesc( prometheus.NewDesc(
prometheus.BuildFQName(namespace, subsystem, "queue_capacity"), prometheus.BuildFQName(namespace, subsystem, "queue_capacity"),
"The capacity of the queue of samples to be sent to the remote storage.", "The capacity of the queue of samples to be sent to the remote storage.",
nil, nil, nil,
constLabels,
), ),
prometheus.GaugeValue, prometheus.GaugeValue,
float64(queueCapacity), float64(queueCapacity),

View File

@ -46,6 +46,10 @@ func (c *TestStorageClient) Store(s clientmodel.Samples) error {
return nil return nil
} }
func (c TestStorageClient) Name() string {
return "teststorageclient"
}
func TestSampleDelivery(t *testing.T) { func TestSampleDelivery(t *testing.T) {
// Let's create an even number of send batches so we don't run into the // Let's create an even number of send batches so we don't run into the
// batch timeout case. // batch timeout case.

View File

@ -23,16 +23,15 @@ type SampleAppender interface {
Append(*clientmodel.Sample) Append(*clientmodel.Sample)
} }
// Tee is a SampleAppender that appends every sample to two other // Fanout is a SampleAppender that appends every sample to a list of other
// SampleAppenders. // SampleAppenders.
type Tee struct { type Fanout []SampleAppender
Appender1, Appender2 SampleAppender
}
// Append implements SampleAppender. It appends the provided sample first // Append implements SampleAppender. It appends the provided sample to all
// to Appender1, then to Appender2, waiting for each to return before // SampleAppenders in the Fanout slice and waits for each append to complete
// proceeding. // before proceeding with the next.
func (t Tee) Append(s *clientmodel.Sample) { func (f Fanout) Append(s *clientmodel.Sample) {
t.Appender1.Append(s) for _, a := range f {
t.Appender2.Append(s) a.Append(s)
}
} }