Merge pull request #1439 from prometheus/fabxc/notifier
Rename notification to notifier
This commit is contained in:
commit
5b78fdd6b7
|
@ -26,7 +26,7 @@ import (
|
|||
|
||||
"github.com/asaskevich/govalidator"
|
||||
"github.com/prometheus/common/log"
|
||||
"github.com/prometheus/prometheus/notification"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage/local"
|
||||
"github.com/prometheus/prometheus/storage/local/index"
|
||||
|
@ -42,11 +42,11 @@ var cfg = struct {
|
|||
printVersion bool
|
||||
configFile string
|
||||
|
||||
storage local.MemorySeriesStorageOptions
|
||||
notification notification.HandlerOptions
|
||||
queryEngine promql.EngineOptions
|
||||
web web.Options
|
||||
remote remote.Options
|
||||
storage local.MemorySeriesStorageOptions
|
||||
notifier notifier.Options
|
||||
queryEngine promql.EngineOptions
|
||||
web web.Options
|
||||
remote remote.Options
|
||||
|
||||
prometheusURL string
|
||||
influxdbURL string
|
||||
|
@ -203,15 +203,15 @@ func init() {
|
|||
|
||||
// Alertmanager.
|
||||
cfg.fs.StringVar(
|
||||
&cfg.notification.AlertmanagerURL, "alertmanager.url", "",
|
||||
&cfg.notifier.AlertmanagerURL, "alertmanager.url", "",
|
||||
"The URL of the alert manager to send notifications to.",
|
||||
)
|
||||
cfg.fs.IntVar(
|
||||
&cfg.notification.QueueCapacity, "alertmanager.notification-queue-capacity", 10000,
|
||||
&cfg.notifier.QueueCapacity, "alertmanager.notification-queue-capacity", 10000,
|
||||
"The capacity of the queue for pending alert manager notifications.",
|
||||
)
|
||||
cfg.fs.DurationVar(
|
||||
&cfg.notification.Timeout, "alertmanager.timeout", 10*time.Second,
|
||||
&cfg.notifier.Timeout, "alertmanager.timeout", 10*time.Second,
|
||||
"Alert manager HTTP API timeout.",
|
||||
)
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ import (
|
|||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/notification"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/retrieval"
|
||||
"github.com/prometheus/prometheus/rules"
|
||||
|
@ -84,16 +84,16 @@ func Main() int {
|
|||
}
|
||||
|
||||
var (
|
||||
notificationHandler = notification.New(&cfg.notification)
|
||||
targetManager = retrieval.NewTargetManager(sampleAppender)
|
||||
queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine)
|
||||
notifier = notifier.New(&cfg.notifier)
|
||||
targetManager = retrieval.NewTargetManager(sampleAppender)
|
||||
queryEngine = promql.NewEngine(memStorage, &cfg.queryEngine)
|
||||
)
|
||||
|
||||
ruleManager := rules.NewManager(&rules.ManagerOptions{
|
||||
SampleAppender: sampleAppender,
|
||||
NotificationHandler: notificationHandler,
|
||||
QueryEngine: queryEngine,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
SampleAppender: sampleAppender,
|
||||
Notifier: notifier,
|
||||
QueryEngine: queryEngine,
|
||||
ExternalURL: cfg.web.ExternalURL,
|
||||
})
|
||||
|
||||
flags := map[string]string{}
|
||||
|
@ -110,7 +110,7 @@ func Main() int {
|
|||
|
||||
webHandler := web.New(memStorage, queryEngine, ruleManager, status, &cfg.web)
|
||||
|
||||
reloadables = append(reloadables, status, targetManager, ruleManager, webHandler, notificationHandler)
|
||||
reloadables = append(reloadables, status, targetManager, ruleManager, webHandler, notifier)
|
||||
|
||||
if !reloadConfig(cfg.configFile, reloadables...) {
|
||||
return 1
|
||||
|
@ -153,14 +153,14 @@ func Main() int {
|
|||
}
|
||||
// The storage has to be fully initialized before registering.
|
||||
prometheus.MustRegister(memStorage)
|
||||
prometheus.MustRegister(notificationHandler)
|
||||
prometheus.MustRegister(notifier)
|
||||
prometheus.MustRegister(configSuccess)
|
||||
prometheus.MustRegister(configSuccessTime)
|
||||
|
||||
// The notification handler is a dependency of the rule manager. It has to be
|
||||
// The notifieris a dependency of the rule manager. It has to be
|
||||
// started before and torn down afterwards.
|
||||
go notificationHandler.Run()
|
||||
defer notificationHandler.Stop()
|
||||
go notifier.Run()
|
||||
defer notifier.Stop()
|
||||
|
||||
go ruleManager.Run()
|
||||
defer ruleManager.Stop()
|
||||
|
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package notification
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
@ -44,9 +44,9 @@ const (
|
|||
|
||||
// Handler is responsible for dispatching alert notifications to an
|
||||
// alert manager service.
|
||||
type Handler struct {
|
||||
type Notifier struct {
|
||||
queue model.Alerts
|
||||
opts *HandlerOptions
|
||||
opts *Options
|
||||
|
||||
more chan struct{}
|
||||
mtx sync.RWMutex
|
||||
|
@ -62,18 +62,18 @@ type Handler struct {
|
|||
}
|
||||
|
||||
// HandlerOptions are the configurable parameters of a Handler.
|
||||
type HandlerOptions struct {
|
||||
type Options struct {
|
||||
AlertmanagerURL string
|
||||
QueueCapacity int
|
||||
Timeout time.Duration
|
||||
ExternalLabels model.LabelSet
|
||||
}
|
||||
|
||||
// New constructs a new Handler.
|
||||
func New(o *HandlerOptions) *Handler {
|
||||
// New constructs a neww Notifier.
|
||||
func New(o *Options) *Notifier {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &Handler{
|
||||
return &Notifier{
|
||||
queue: make(model.Alerts, 0, o.QueueCapacity),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
|
@ -124,7 +124,7 @@ func New(o *HandlerOptions) *Handler {
|
|||
|
||||
// ApplyConfig updates the status state as the new config requires.
|
||||
// Returns true on success.
|
||||
func (n *Handler) ApplyConfig(conf *config.Config) bool {
|
||||
func (n *Notifier) ApplyConfig(conf *config.Config) bool {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
|
@ -134,14 +134,14 @@ func (n *Handler) ApplyConfig(conf *config.Config) bool {
|
|||
|
||||
const maxBatchSize = 64
|
||||
|
||||
func (n *Handler) queueLen() int {
|
||||
func (n *Notifier) queueLen() int {
|
||||
n.mtx.RLock()
|
||||
defer n.mtx.RUnlock()
|
||||
|
||||
return len(n.queue)
|
||||
}
|
||||
|
||||
func (n *Handler) nextBatch() []*model.Alert {
|
||||
func (n *Notifier) nextBatch() []*model.Alert {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
|
@ -159,7 +159,7 @@ func (n *Handler) nextBatch() []*model.Alert {
|
|||
}
|
||||
|
||||
// Run dispatches notifications continuously.
|
||||
func (n *Handler) Run() {
|
||||
func (n *Notifier) Run() {
|
||||
// Just warn once in the beginning to prevent noisy logs.
|
||||
if n.opts.AlertmanagerURL == "" {
|
||||
log.Warnf("No AlertManager configured, not dispatching any alerts")
|
||||
|
@ -202,7 +202,7 @@ func (n *Handler) Run() {
|
|||
|
||||
// Send queues the given notification requests for processing.
|
||||
// Panics if called on a handler that is not running.
|
||||
func (n *Handler) Send(alerts ...*model.Alert) {
|
||||
func (n *Notifier) Send(alerts ...*model.Alert) {
|
||||
n.mtx.Lock()
|
||||
defer n.mtx.Unlock()
|
||||
|
||||
|
@ -230,7 +230,7 @@ func (n *Handler) Send(alerts ...*model.Alert) {
|
|||
}
|
||||
|
||||
// setMore signals that the alert queue has items.
|
||||
func (n *Handler) setMore() {
|
||||
func (n *Notifier) setMore() {
|
||||
// If we cannot send on the channel, it means the signal already exists
|
||||
// and has not been consumed yet.
|
||||
select {
|
||||
|
@ -239,11 +239,11 @@ func (n *Handler) setMore() {
|
|||
}
|
||||
}
|
||||
|
||||
func (n *Handler) postURL() string {
|
||||
func (n *Notifier) postURL() string {
|
||||
return strings.TrimRight(n.opts.AlertmanagerURL, "/") + alertPushEndpoint
|
||||
}
|
||||
|
||||
func (n *Handler) send(alerts ...*model.Alert) error {
|
||||
func (n *Notifier) send(alerts ...*model.Alert) error {
|
||||
// Attach external labels before sending alerts.
|
||||
for _, a := range alerts {
|
||||
for ln, lv := range n.opts.ExternalLabels {
|
||||
|
@ -272,14 +272,14 @@ func (n *Handler) send(alerts ...*model.Alert) error {
|
|||
}
|
||||
|
||||
// Stop shuts down the notification handler.
|
||||
func (n *Handler) Stop() {
|
||||
func (n *Notifier) Stop() {
|
||||
log.Info("Stopping notification handler...")
|
||||
|
||||
n.cancel()
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector.
|
||||
func (n *Handler) Describe(ch chan<- *prometheus.Desc) {
|
||||
func (n *Notifier) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- n.latency.Desc()
|
||||
ch <- n.errors.Desc()
|
||||
ch <- n.sent.Desc()
|
||||
|
@ -289,7 +289,7 @@ func (n *Handler) Describe(ch chan<- *prometheus.Desc) {
|
|||
}
|
||||
|
||||
// Collect implements prometheus.Collector.
|
||||
func (n *Handler) Collect(ch chan<- prometheus.Metric) {
|
||||
func (n *Notifier) Collect(ch chan<- prometheus.Metric) {
|
||||
n.queueLength.Set(float64(n.queueLen()))
|
||||
|
||||
ch <- n.latency
|
|
@ -11,7 +11,7 @@
|
|||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package notification
|
||||
package notifier
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -50,8 +50,8 @@ func TestHandlerPostURL(t *testing.T) {
|
|||
out: "http://localhost:9093/prefix/api/v1/alerts",
|
||||
},
|
||||
}
|
||||
h := &Handler{
|
||||
opts: &HandlerOptions{},
|
||||
h := &Notifier{
|
||||
opts: &Options{},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
|
@ -63,7 +63,7 @@ func TestHandlerPostURL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHandlerNextBatch(t *testing.T) {
|
||||
h := New(&HandlerOptions{})
|
||||
h := New(&Options{})
|
||||
|
||||
for i := range make([]struct{}, 2*maxBatchSize+1) {
|
||||
h.queue = append(h.queue, &model.Alert{
|
||||
|
@ -146,7 +146,7 @@ func TestHandlerSend(t *testing.T) {
|
|||
|
||||
defer server.Close()
|
||||
|
||||
h := New(&HandlerOptions{
|
||||
h := New(&Options{
|
||||
AlertmanagerURL: server.URL,
|
||||
Timeout: time.Minute,
|
||||
ExternalLabels: model.LabelSet{"a": "b"},
|
||||
|
@ -202,7 +202,7 @@ func TestHandlerFull(t *testing.T) {
|
|||
}
|
||||
}))
|
||||
|
||||
h := New(&HandlerOptions{
|
||||
h := New(&Options{
|
||||
AlertmanagerURL: server.URL,
|
||||
Timeout: time.Second,
|
||||
QueueCapacity: 3 * maxBatchSize,
|
|
@ -28,7 +28,7 @@ import (
|
|||
"github.com/prometheus/common/model"
|
||||
|
||||
"github.com/prometheus/prometheus/config"
|
||||
"github.com/prometheus/prometheus/notification"
|
||||
"github.com/prometheus/prometheus/notifier"
|
||||
"github.com/prometheus/prometheus/promql"
|
||||
"github.com/prometheus/prometheus/storage"
|
||||
"github.com/prometheus/prometheus/template"
|
||||
|
@ -343,7 +343,7 @@ func (g *Group) sendAlerts(rule *AlertingRule, timestamp model.Time) error {
|
|||
}
|
||||
|
||||
if len(alerts) > 0 {
|
||||
g.opts.NotificationHandler.Send(alerts...)
|
||||
g.opts.Notifier.Send(alerts...)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -359,10 +359,10 @@ type Manager struct {
|
|||
|
||||
// ManagerOptions bundles options for the Manager.
|
||||
type ManagerOptions struct {
|
||||
ExternalURL *url.URL
|
||||
QueryEngine *promql.Engine
|
||||
NotificationHandler *notification.Handler
|
||||
SampleAppender storage.SampleAppender
|
||||
ExternalURL *url.URL
|
||||
QueryEngine *promql.Engine
|
||||
Notifier *notifier.Notifier
|
||||
SampleAppender storage.SampleAppender
|
||||
}
|
||||
|
||||
// NewManager returns an implementation of Manager, ready to be started
|
||||
|
|
Loading…
Reference in New Issue