From 77605649a93a3ab1671470c65edfc232eb3c64ec Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Sat, 1 Oct 2016 16:42:43 +0100 Subject: [PATCH] Add support for remote write relabelling. Switch back to a single remote writer, as we were only ever meant to have one and the relabel semantics are clearer that way. --- config/config.go | 13 +++++----- config/config_test.go | 13 ++++++++++ config/testdata/conf.good.yml | 6 +++++ storage/remote/client.go | 12 +++------ storage/remote/remote.go | 9 +++++++ storage/remote/remote_reloadable.go | 39 +++++++++++++++++------------ 6 files changed, 61 insertions(+), 31 deletions(-) diff --git a/config/config.go b/config/config.go index 6a82ade0e..c83b73109 100644 --- a/config/config.go +++ b/config/config.go @@ -192,7 +192,7 @@ type Config struct { RuleFiles []string `yaml:"rule_files,omitempty"` ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"` - RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"` + RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` @@ -1075,11 +1075,12 @@ func (re Regexp) MarshalYAML() (interface{}, error) { // RemoteWriteConfig is the configuration for remote storage. type RemoteWriteConfig struct { - URL URL `yaml:"url,omitempty"` - RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` - BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` - TLSConfig TLSConfig `yaml:"tls_config,omitempty"` - ProxyURL URL `yaml:"proxy_url,omitempty"` + URL *URL `yaml:"url,omitempty"` + RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"` + BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"` + TLSConfig TLSConfig `yaml:"tls_config,omitempty"` + ProxyURL URL `yaml:"proxy_url,omitempty"` + WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"` // Catches all undefined fields and must be empty after parsing. XXX map[string]interface{} `yaml:",inline"` diff --git a/config/config_test.go b/config/config_test.go index c9a94a46b..17642f76c 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -44,6 +44,19 @@ var expectedConf = &Config{ "testdata/my/*.rules", }, + RemoteWriteConfig: RemoteWriteConfig{ + RemoteTimeout: model.Duration(30 * time.Second), + WriteRelabelConfigs: []*RelabelConfig{ + { + SourceLabels: model.LabelNames{"__name__"}, + Separator: ";", + Regex: MustNewRegexp("expensive.*"), + Replacement: "$1", + Action: RelabelDrop, + }, + }, + }, + ScrapeConfigs: []*ScrapeConfig{ { JobName: "prometheus", diff --git a/config/testdata/conf.good.yml b/config/testdata/conf.good.yml index ac80a524f..14a8436f8 100644 --- a/config/testdata/conf.good.yml +++ b/config/testdata/conf.good.yml @@ -13,6 +13,12 @@ rule_files: - "/absolute/second.rules" - "my/*.rules" +remote_write: + write_relabel_configs: + - source_labels: [__name__] + regex: expensive.* + action: drop + scrape_configs: - job_name: prometheus diff --git a/storage/remote/client.go b/storage/remote/client.go index 5befc38bd..771ecb696 100644 --- a/storage/remote/client.go +++ b/storage/remote/client.go @@ -31,14 +31,13 @@ import ( // Client allows sending batches of Prometheus samples to an HTTP endpoint. type Client struct { - index int // Used to differentiate metrics url config.URL client *http.Client timeout time.Duration } // NewClient creates a new Client. -func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) { +func NewClient(conf config.RemoteWriteConfig) (*Client, error) { tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig) if err != nil { return nil, err @@ -56,8 +55,7 @@ func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) { } return &Client{ - index: index, - url: conf.URL, + url: *conf.URL, client: httputil.NewClient(rt), timeout: time.Duration(conf.RemoteTimeout), }, nil @@ -117,10 +115,6 @@ func (c *Client) Store(samples model.Samples) error { } // Name identifies the client as a generic client. -// -// TODO: This client is going to be the only one soon - then this method -// will simply be removed in the restructuring and the whole "generic" naming -// will be gone for good. func (c Client) Name() string { - return fmt.Sprintf("generic:%d:%s", c.index, c.url) + return "generic" } diff --git a/storage/remote/remote.go b/storage/remote/remote.go index 784001c1b..4aedb794f 100644 --- a/storage/remote/remote.go +++ b/storage/remote/remote.go @@ -24,6 +24,7 @@ import ( influx "github.com/influxdb/influxdb/client" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" "github.com/prometheus/prometheus/storage/remote/graphite" "github.com/prometheus/prometheus/storage/remote/influxdb" "github.com/prometheus/prometheus/storage/remote/opentsdb" @@ -33,6 +34,7 @@ import ( type Storage struct { queues []*StorageQueueManager externalLabels model.LabelSet + relabelConfigs []*config.RelabelConfig mtx sync.RWMutex } @@ -42,6 +44,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error { defer s.mtx.Unlock() s.externalLabels = conf.GlobalConfig.ExternalLabels + s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs return nil } @@ -116,8 +119,14 @@ func (s *Storage) Append(smpl *model.Sample) error { snew.Metric[ln] = lv } } + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...)) s.mtx.RUnlock() + if snew.Metric == nil { + return nil + } + for _, q := range s.queues { q.Append(&snew) } diff --git a/storage/remote/remote_reloadable.go b/storage/remote/remote_reloadable.go index 85fbed6d4..d42a187f3 100644 --- a/storage/remote/remote_reloadable.go +++ b/storage/remote/remote_reloadable.go @@ -19,15 +19,16 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/relabel" ) // Storage collects multiple remote storage queues. type ReloadableStorage struct { mtx sync.RWMutex externalLabels model.LabelSet - conf []config.RemoteWriteConfig + conf config.RemoteWriteConfig - queues []*StorageQueueManager + queue *StorageQueueManager } // New returns a new remote Storage. @@ -42,31 +43,32 @@ func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error { // TODO: we should only stop & recreate queues which have changes, // as this can be quite disruptive. - newQueues := []*StorageQueueManager{} - for i, c := range conf.RemoteWriteConfig { - c, err := NewClient(i, c) + var newQueue *StorageQueueManager + + if conf.RemoteWriteConfig.URL != nil { + c, err := NewClient(conf.RemoteWriteConfig) if err != nil { return err } - newQueues = append(newQueues, NewStorageQueueManager(c, nil)) + newQueue = NewStorageQueueManager(c, nil) } - for _, q := range s.queues { - q.Stop() + if s.queue != nil { + s.queue.Stop() } - s.queues = newQueues - s.externalLabels = conf.GlobalConfig.ExternalLabels + s.queue = newQueue s.conf = conf.RemoteWriteConfig - for _, q := range s.queues { - q.Start() + s.externalLabels = conf.GlobalConfig.ExternalLabels + if s.queue != nil { + s.queue.Start() } return nil } // Stop the background processing of the storage queues. func (s *ReloadableStorage) Stop() { - for _, q := range s.queues { - q.Stop() + if s.queue != nil { + s.queue.Stop() } } @@ -84,9 +86,14 @@ func (s *ReloadableStorage) Append(smpl *model.Sample) error { snew.Metric[ln] = lv } } + snew.Metric = model.Metric( + relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...)) - for _, q := range s.queues { - q.Append(&snew) + if snew.Metric == nil { + return nil + } + if s.queue != nil { + s.queue.Append(&snew) } return nil }