150 lines
4.8 KiB
Go
150 lines
4.8 KiB
Go
// Copyright 2018 Prometheus Team
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package cluster
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-kit/kit/log"
|
|
"github.com/go-kit/kit/log/level"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/hashicorp/memberlist"
|
|
"github.com/prometheus/alertmanager/cluster/clusterpb"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
// Channel allows clients to send messages for a specific state type that will be
|
|
// broadcasted in a best-effort manner.
|
|
type Channel struct {
|
|
key string
|
|
send func([]byte)
|
|
peers func() []*memberlist.Node
|
|
sendOversize func(*memberlist.Node, []byte) error
|
|
|
|
msgc chan []byte
|
|
logger log.Logger
|
|
|
|
oversizeGossipMessageFailureTotal prometheus.Counter
|
|
oversizeGossipMessageDroppedTotal prometheus.Counter
|
|
oversizeGossipMessageSentTotal prometheus.Counter
|
|
oversizeGossipDuration prometheus.Histogram
|
|
}
|
|
|
|
// NewChannel creates a new Channel struct, which handles sending normal and
|
|
// oversize messages to peers.
|
|
func NewChannel(
|
|
key string,
|
|
send func([]byte),
|
|
peers func() []*memberlist.Node,
|
|
sendOversize func(*memberlist.Node, []byte) error,
|
|
logger log.Logger,
|
|
stopc chan struct{},
|
|
reg prometheus.Registerer,
|
|
) *Channel {
|
|
oversizeGossipMessageFailureTotal := prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_oversized_gossip_message_failure_total",
|
|
Help: "Number of oversized gossip message sends that failed.",
|
|
ConstLabels: prometheus.Labels{"key": key},
|
|
})
|
|
oversizeGossipMessageSentTotal := prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_oversized_gossip_message_sent_total",
|
|
Help: "Number of oversized gossip message sent.",
|
|
ConstLabels: prometheus.Labels{"key": key},
|
|
})
|
|
oversizeGossipMessageDroppedTotal := prometheus.NewCounter(prometheus.CounterOpts{
|
|
Name: "alertmanager_oversized_gossip_message_dropped_total",
|
|
Help: "Number of oversized gossip messages that were dropped due to a full message queue.",
|
|
ConstLabels: prometheus.Labels{"key": key},
|
|
})
|
|
oversizeGossipDuration := prometheus.NewHistogram(prometheus.HistogramOpts{
|
|
Name: "alertmanager_oversize_gossip_message_duration_seconds",
|
|
Help: "Duration of oversized gossip message requests.",
|
|
ConstLabels: prometheus.Labels{"key": key},
|
|
})
|
|
|
|
reg.MustRegister(oversizeGossipDuration, oversizeGossipMessageFailureTotal, oversizeGossipMessageDroppedTotal, oversizeGossipMessageSentTotal)
|
|
|
|
c := &Channel{
|
|
key: key,
|
|
send: send,
|
|
peers: peers,
|
|
logger: logger,
|
|
msgc: make(chan []byte, 200),
|
|
sendOversize: sendOversize,
|
|
oversizeGossipMessageFailureTotal: oversizeGossipMessageFailureTotal,
|
|
oversizeGossipMessageDroppedTotal: oversizeGossipMessageDroppedTotal,
|
|
oversizeGossipMessageSentTotal: oversizeGossipMessageSentTotal,
|
|
oversizeGossipDuration: oversizeGossipDuration,
|
|
}
|
|
|
|
go c.handleOverSizedMessages(stopc)
|
|
|
|
return c
|
|
}
|
|
|
|
// handleOverSizedMessages prevents memberlist from opening too many parallel
|
|
// TCP connections to its peers.
|
|
func (c *Channel) handleOverSizedMessages(stopc chan struct{}) {
|
|
var wg sync.WaitGroup
|
|
for {
|
|
select {
|
|
case b := <-c.msgc:
|
|
for _, n := range c.peers() {
|
|
wg.Add(1)
|
|
go func(n *memberlist.Node) {
|
|
defer wg.Done()
|
|
c.oversizeGossipMessageSentTotal.Inc()
|
|
start := time.Now()
|
|
if err := c.sendOversize(n, b); err != nil {
|
|
level.Debug(c.logger).Log("msg", "failed to send reliable", "key", c.key, "node", n, "err", err)
|
|
c.oversizeGossipMessageFailureTotal.Inc()
|
|
return
|
|
}
|
|
c.oversizeGossipDuration.Observe(time.Since(start).Seconds())
|
|
}(n)
|
|
}
|
|
|
|
wg.Wait()
|
|
case <-stopc:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Broadcast enqueues a message for broadcasting.
|
|
func (c *Channel) Broadcast(b []byte) {
|
|
b, err := proto.Marshal(&clusterpb.Part{Key: c.key, Data: b})
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if OversizedMessage(b) {
|
|
select {
|
|
case c.msgc <- b:
|
|
default:
|
|
level.Debug(c.logger).Log("msg", "oversized gossip channel full")
|
|
c.oversizeGossipMessageDroppedTotal.Inc()
|
|
}
|
|
} else {
|
|
c.send(b)
|
|
}
|
|
}
|
|
|
|
// OversizedMessage indicates whether or not the byte payload should be sent
|
|
// via TCP.
|
|
func OversizedMessage(b []byte) bool {
|
|
return len(b) > maxGossipPacketSize/2
|
|
}
|