Merge pull request #2047 from prometheus/write-relabel
Add support for remote write relabelling.
This commit is contained in:
commit
6e8f87a37f
|
@ -192,7 +192,7 @@ type Config struct {
|
|||
RuleFiles []string `yaml:"rule_files,omitempty"`
|
||||
ScrapeConfigs []*ScrapeConfig `yaml:"scrape_configs,omitempty"`
|
||||
|
||||
RemoteWriteConfig []RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
||||
RemoteWriteConfig RemoteWriteConfig `yaml:"remote_write,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
|
@ -1075,11 +1075,12 @@ func (re Regexp) MarshalYAML() (interface{}, error) {
|
|||
|
||||
// RemoteWriteConfig is the configuration for remote storage.
|
||||
type RemoteWriteConfig struct {
|
||||
URL URL `yaml:"url,omitempty"`
|
||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
||||
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
||||
ProxyURL URL `yaml:"proxy_url,omitempty"`
|
||||
URL *URL `yaml:"url,omitempty"`
|
||||
RemoteTimeout model.Duration `yaml:"remote_timeout,omitempty"`
|
||||
BasicAuth *BasicAuth `yaml:"basic_auth,omitempty"`
|
||||
TLSConfig TLSConfig `yaml:"tls_config,omitempty"`
|
||||
ProxyURL URL `yaml:"proxy_url,omitempty"`
|
||||
WriteRelabelConfigs []*RelabelConfig `yaml:"write_relabel_configs,omitempty"`
|
||||
|
||||
// Catches all undefined fields and must be empty after parsing.
|
||||
XXX map[string]interface{} `yaml:",inline"`
|
||||
|
|
|
@ -44,6 +44,19 @@ var expectedConf = &Config{
|
|||
"testdata/my/*.rules",
|
||||
},
|
||||
|
||||
RemoteWriteConfig: RemoteWriteConfig{
|
||||
RemoteTimeout: model.Duration(30 * time.Second),
|
||||
WriteRelabelConfigs: []*RelabelConfig{
|
||||
{
|
||||
SourceLabels: model.LabelNames{"__name__"},
|
||||
Separator: ";",
|
||||
Regex: MustNewRegexp("expensive.*"),
|
||||
Replacement: "$1",
|
||||
Action: RelabelDrop,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
ScrapeConfigs: []*ScrapeConfig{
|
||||
{
|
||||
JobName: "prometheus",
|
||||
|
|
|
@ -13,6 +13,12 @@ rule_files:
|
|||
- "/absolute/second.rules"
|
||||
- "my/*.rules"
|
||||
|
||||
remote_write:
|
||||
write_relabel_configs:
|
||||
- source_labels: [__name__]
|
||||
regex: expensive.*
|
||||
action: drop
|
||||
|
||||
scrape_configs:
|
||||
- job_name: prometheus
|
||||
|
||||
|
|
|
@ -31,14 +31,13 @@ import (
|
|||
|
||||
// Client allows sending batches of Prometheus samples to an HTTP endpoint.
|
||||
type Client struct {
|
||||
index int // Used to differentiate metrics
|
||||
url config.URL
|
||||
client *http.Client
|
||||
timeout time.Duration
|
||||
}
|
||||
|
||||
// NewClient creates a new Client.
|
||||
func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
|
||||
func NewClient(conf config.RemoteWriteConfig) (*Client, error) {
|
||||
tlsConfig, err := httputil.NewTLSConfig(conf.TLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -56,8 +55,7 @@ func NewClient(index int, conf config.RemoteWriteConfig) (*Client, error) {
|
|||
}
|
||||
|
||||
return &Client{
|
||||
index: index,
|
||||
url: conf.URL,
|
||||
url: *conf.URL,
|
||||
client: httputil.NewClient(rt),
|
||||
timeout: time.Duration(conf.RemoteTimeout),
|
||||
}, nil
|
||||
|
@ -117,10 +115,6 @@ func (c *Client) Store(samples model.Samples) error {
|
|||
}
|
||||
|
||||
// Name identifies the client as a generic client.
|
||||
//
|
||||
// TODO: This client is going to be the only one soon - then this method
|
||||
// will simply be removed in the restructuring and the whole "generic" naming
|
||||
// will be gone for good.
|
||||
func (c Client) Name() string {
|
||||
return fmt.Sprintf("generic:%d:%s", c.index, c.url)
|
||||
return "generic"
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
influx "github.com/influxdb/influxdb/client"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
"github.com/prometheus/prometheus/storage/remote/graphite"
|
||||
"github.com/prometheus/prometheus/storage/remote/influxdb"
|
||||
"github.com/prometheus/prometheus/storage/remote/opentsdb"
|
||||
|
@ -33,6 +34,7 @@ import (
|
|||
type Storage struct {
|
||||
queues []*StorageQueueManager
|
||||
externalLabels model.LabelSet
|
||||
relabelConfigs []*config.RelabelConfig
|
||||
mtx sync.RWMutex
|
||||
}
|
||||
|
||||
|
@ -42,6 +44,7 @@ func (s *Storage) ApplyConfig(conf *config.Config) error {
|
|||
defer s.mtx.Unlock()
|
||||
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
s.relabelConfigs = conf.RemoteWriteConfig.WriteRelabelConfigs
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -116,8 +119,14 @@ func (s *Storage) Append(smpl *model.Sample) error {
|
|||
snew.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.relabelConfigs...))
|
||||
s.mtx.RUnlock()
|
||||
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Append(&snew)
|
||||
}
|
||||
|
|
|
@ -19,15 +19,16 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/relabel"
|
||||
)
|
||||
|
||||
// Storage collects multiple remote storage queues.
|
||||
type ReloadableStorage struct {
|
||||
mtx sync.RWMutex
|
||||
externalLabels model.LabelSet
|
||||
conf []config.RemoteWriteConfig
|
||||
conf config.RemoteWriteConfig
|
||||
|
||||
queues []*StorageQueueManager
|
||||
queue *StorageQueueManager
|
||||
}
|
||||
|
||||
// New returns a new remote Storage.
|
||||
|
@ -42,31 +43,32 @@ func (s *ReloadableStorage) ApplyConfig(conf *config.Config) error {
|
|||
|
||||
// TODO: we should only stop & recreate queues which have changes,
|
||||
// as this can be quite disruptive.
|
||||
newQueues := []*StorageQueueManager{}
|
||||
for i, c := range conf.RemoteWriteConfig {
|
||||
c, err := NewClient(i, c)
|
||||
var newQueue *StorageQueueManager
|
||||
|
||||
if conf.RemoteWriteConfig.URL != nil {
|
||||
c, err := NewClient(conf.RemoteWriteConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newQueues = append(newQueues, NewStorageQueueManager(c, nil))
|
||||
newQueue = NewStorageQueueManager(c, nil)
|
||||
}
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
s.queues = newQueues
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
s.queue = newQueue
|
||||
s.conf = conf.RemoteWriteConfig
|
||||
for _, q := range s.queues {
|
||||
q.Start()
|
||||
s.externalLabels = conf.GlobalConfig.ExternalLabels
|
||||
if s.queue != nil {
|
||||
s.queue.Start()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop the background processing of the storage queues.
|
||||
func (s *ReloadableStorage) Stop() {
|
||||
for _, q := range s.queues {
|
||||
q.Stop()
|
||||
if s.queue != nil {
|
||||
s.queue.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -84,9 +86,14 @@ func (s *ReloadableStorage) Append(smpl *model.Sample) error {
|
|||
snew.Metric[ln] = lv
|
||||
}
|
||||
}
|
||||
snew.Metric = model.Metric(
|
||||
relabel.Process(model.LabelSet(snew.Metric), s.conf.WriteRelabelConfigs...))
|
||||
|
||||
for _, q := range s.queues {
|
||||
q.Append(&snew)
|
||||
if snew.Metric == nil {
|
||||
return nil
|
||||
}
|
||||
if s.queue != nil {
|
||||
s.queue.Append(&snew)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue