Change Suppressor from channel-based to mutex-based, add tests.

Start with the simplest possible locking scheme: lock the object-global
mutex at the beginning of each user-facing method. This is equivalent to
implicit locking provided by the reactor.

The reasoning behind this change is the incredible overhead of the
previous reactor request/response code:

Overhead for current model for every user-facing method:

- 2 struct type definitions (req/resp)
- 1 channel
  - 1 struct member definition site
  - 1 channel init site
  - 1 struct population site
  - 1 struct servicing site
  - 1 struct closing site
- 1 actual execution method

New lock-based code:

Per object: 1 lock
Per method:
- 1 taking the lock
- 1 actual execution method
This commit is contained in:
Julius Volz 2013-07-22 18:32:45 +02:00
parent 19e1ad7096
commit b49b7bba6f
4 changed files with 245 additions and 171 deletions

View File

@ -25,11 +25,8 @@ import (
func main() {
flag.Parse()
log.Print("Starting event suppressor...")
suppressor := manager.NewSuppressor()
defer suppressor.Close()
go suppressor.Dispatch()
log.Println("Done.")
log.Println("Starting event aggregator...")
aggregator := manager.NewAggregator()

View File

@ -116,7 +116,7 @@ func (d *SummaryDispatcher) Receive(s *EventSummary) RemoteError {
}
func (d *SummaryDispatcher) dispatchSummary(r *summaryDispatchRequest, i IsInhibitedInterrogator) {
if i.IsInhibited(r.Summary.Event) {
if inhibited, _ := i.IsInhibited(r.Summary.Event); inhibited {
r.Response <- &summaryDispatchResponse{
Disposition: SUPPRESSED,
}

View File

@ -14,213 +14,151 @@
package manager
import (
"container/heap"
"fmt"
"log"
"sort"
"sync"
"time"
)
type SuppressionId uint
type Suppression struct {
Id uint
Description string
Filters *Filters
EndsAt time.Time
// The numeric ID of the suppression.
Id SuppressionId
// Name/email of the suppression creator.
CreatedBy string
// When the suppression was first created (Unix timestamp).
CreatedAt time.Time
// When the suppression expires (Unix timestamp).
EndsAt time.Time
// Additional comment about the suppression.
Comment string
// Filters that determine which events are suppressed.
Filters Filters
// Timer used to trigger the deletion of the Suppression after its expiry
// time.
expiryTimer *time.Timer
}
type suppressionRequest struct {
Suppression Suppression
Response chan *suppressionResponse
}
type suppressionResponse struct {
Err error
}
type isInhibitedRequest struct {
Event *Event
Response chan *isInhibitedResponse
}
type isInhibitedResponse struct {
Err error
Inhibited bool
InhibitingSuppression *Suppression
}
type suppressionSummaryResponse struct {
Err error
Suppressions Suppressions
}
type suppressionSummaryRequest struct {
MatchCandidates map[string]string
Response chan *suppressionSummaryResponse
}
type Suppressions []*Suppression
type Suppressor struct {
Suppressions *Suppressions
// Suppressions managed by this Suppressor.
Suppressions map[SuppressionId]*Suppression
// Used to track the next Suppression Id to allocate.
nextId SuppressionId
suppressionReqs chan *suppressionRequest
suppressionSummaryReqs chan *suppressionSummaryRequest
isInhibitedReqs chan *isInhibitedRequest
// Mutex to protect the above.
mu sync.Mutex
}
type IsInhibitedInterrogator interface {
IsInhibited(*Event) bool
IsInhibited(*Event) (bool, *Suppression)
}
func NewSuppressor() *Suppressor {
suppressions := new(Suppressions)
heap.Init(suppressions)
return &Suppressor{
Suppressions: suppressions,
suppressionReqs: make(chan *suppressionRequest),
suppressionSummaryReqs: make(chan *suppressionSummaryRequest),
isInhibitedReqs: make(chan *isInhibitedRequest),
Suppressions: make(map[SuppressionId]*Suppression),
}
}
type Suppressions []Suppression
func (s Suppressions) Len() int {
return len(s)
func (s *Suppressor) nextSuppressionId() SuppressionId {
// BUG: Build proper ID management. For now, as we are only keeping
// data in memory anyways, this is enough.
s.nextId++
return s.nextId
}
func (s Suppressions) Less(i, j int) bool {
return s[i].EndsAt.Before(s[j].EndsAt)
}
func (s Suppressions) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s *Suppressions) Push(v interface{}) {
*s = append(*s, v.(Suppression))
}
func (s *Suppressions) Pop() interface{} {
old := *s
n := len(old)
item := old[n-1]
*s = old[0 : n-1]
return item
}
func (s *Suppressor) dispatchSuppression(r *suppressionRequest) {
log.Println("dispatching suppression", r)
heap.Push(s.Suppressions, r.Suppression)
r.Response <- &suppressionResponse{}
close(r.Response)
}
func (s *Suppressor) reapSuppressions(t time.Time) {
log.Println("reaping suppression...")
i := sort.Search(len(*s.Suppressions), func(i int) bool {
return (*s.Suppressions)[i].EndsAt.After(t)
func (s *Suppressor) setupExpiryTimer(sup *Suppression) {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
expDuration := sup.EndsAt.Sub(time.Now())
sup.expiryTimer = time.AfterFunc(expDuration, func() {
if err := s.DelSuppression(sup.Id); err != nil {
log.Printf("Failed to delete suppression %d: %s", sup.Id, err)
}
})
*s.Suppressions = (*s.Suppressions)[i:]
// BUG(matt): Validate if strictly necessary.
heap.Init(s.Suppressions)
}
func (s *Suppressor) generateSummary(r *suppressionSummaryRequest) {
log.Println("Generating summary", r)
response := new(suppressionSummaryResponse)
func (s *Suppressor) AddSuppression(sup *Suppression) SuppressionId {
s.mu.Lock()
defer s.mu.Unlock()
for _, suppression := range *s.Suppressions {
response.Suppressions = append(response.Suppressions, suppression)
sup.Id = s.nextSuppressionId()
s.setupExpiryTimer(sup)
s.Suppressions[sup.Id] = sup
return sup.Id
}
func (s *Suppressor) UpdateSuppression(sup *Suppression) error {
s.mu.Lock()
defer s.mu.Unlock()
origSup, ok := s.Suppressions[sup.Id]
if !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", sup.Id)
}
r.Response <- response
close(r.Response)
}
func (s *Suppressor) IsInhibited(e *Event) bool {
req := &isInhibitedRequest{
Event: e,
Response: make(chan *isInhibitedResponse),
if sup.EndsAt != origSup.EndsAt {
origSup.expiryTimer.Stop()
}
s.isInhibitedReqs <- req
resp := <-req.Response
return resp.Inhibited
*origSup = *sup
s.setupExpiryTimer(origSup)
return nil
}
func (s *Suppressor) queryInhibit(q *isInhibitedRequest) {
response := new(isInhibitedResponse)
func (s *Suppressor) GetSuppression(id SuppressionId) (*Suppression, error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, s := range *s.Suppressions {
if s.Filters.Handles(q.Event) {
response.Inhibited = true
response.InhibitingSuppression = &s
sup, ok := s.Suppressions[id]
if !ok {
return nil, fmt.Errorf("Suppression with ID %d doesn't exist", id)
}
return sup, nil
}
break
func (s *Suppressor) DelSuppression(id SuppressionId) error {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.Suppressions[id]; !ok {
return fmt.Errorf("Suppression with ID %d doesn't exist", id)
}
delete(s.Suppressions, id)
return nil
}
func (s *Suppressor) SuppressionSummary() Suppressions {
s.mu.Lock()
defer s.mu.Unlock()
suppressions := make(Suppressions, 0, len(s.Suppressions))
for _, sup := range s.Suppressions {
suppressions = append(suppressions, sup)
}
return suppressions
}
func (s *Suppressor) IsInhibited(e *Event) (bool, *Suppression) {
s.mu.Lock()
defer s.mu.Unlock()
for _, s := range s.Suppressions {
if s.Filters.Handles(e) {
return true, s
}
}
q.Response <- response
close(q.Response)
return false, nil
}
func (s *Suppressor) Close() {
close(s.suppressionReqs)
close(s.suppressionSummaryReqs)
close(s.isInhibitedReqs)
}
s.mu.Lock()
defer s.mu.Unlock()
func (s *Suppressor) Dispatch() {
// BUG: Accomplish this more intelligently by creating a timer for the least-
// likely-to-tenure item.
reaper := time.NewTicker(30 * time.Second)
defer reaper.Stop()
closed := 0
for closed < 2 {
select {
case suppression, open := <-s.suppressionReqs:
s.dispatchSuppression(suppression)
if !open {
closed++
}
case query, open := <-s.isInhibitedReqs:
s.queryInhibit(query)
if !open {
closed++
}
case summary, open := <-s.suppressionSummaryReqs:
s.generateSummary(summary)
if !open {
closed++
}
case time := <-reaper.C:
s.reapSuppressions(time)
for _, sup := range s.Suppressions {
if sup.expiryTimer != nil {
sup.expiryTimer.Stop()
}
}
}

139
manager/suppressor_test.go Normal file
View File

@ -0,0 +1,139 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package manager
import (
"testing"
"time"
)
type testSuppressorScenario struct {
suppressions Suppressions
inhibited Events
uninhibited Events
}
func (sc *testSuppressorScenario) test(i int, t *testing.T) {
s := NewSuppressor()
for j, sup := range sc.suppressions {
id := s.AddSuppression(sup)
retrievedSup, err := s.GetSuppression(id)
if err != nil {
t.Fatalf("%d.%d. Error getting suppression: %s", i, j, err)
}
if retrievedSup.Id != id {
t.Fatalf("%d.%d. Expected ID %d, got %d", i, j, id, retrievedSup.Id)
}
sup.Id = id
if sup != retrievedSup {
t.Fatalf("%d.%d. Expected suppression %v, got %v", i, j, sup, retrievedSup)
}
}
for j, ev := range sc.inhibited {
inhibited, sup := s.IsInhibited(ev)
if !inhibited {
t.Fatalf("%d.%d. Expected %v to be inhibited", i, j, ev)
}
if sup == nil {
t.Fatalf("%d.%d. Expected non-nil Suppression for inhibited event %v", i, j, ev)
}
}
for j, ev := range sc.uninhibited {
inhibited, sup := s.IsInhibited(ev)
if inhibited {
t.Fatalf("%d.%d. Expected %v to not be inhibited, was inhibited by %v", i, j, ev, sup)
}
}
suppressions := s.SuppressionSummary()
if len(suppressions) != len(sc.suppressions) {
t.Fatalf("%d. Expected %d suppressions, got %d", i, len(sc.suppressions), len(suppressions))
}
for j, sup := range suppressions {
if err := s.DelSuppression(sup.Id); err != nil {
t.Fatalf("%d.%d. Got error while deleting suppression: %s", i, j, err)
}
newSuppressions := s.SuppressionSummary()
if len(newSuppressions) != len(suppressions)-j-1 {
t.Fatalf("%d. Expected %d suppressions, got %d", i, len(suppressions), len(newSuppressions))
}
}
s.Close()
}
func TestSuppressor(t *testing.T) {
scenarios := []testSuppressorScenario{
{
// No suppressions, one event.
uninhibited: Events{
&Event{
Labels: map[string]string{
"foo": "bar",
},
},
},
},
{
// One rule, two matching events, one non-matching.
suppressions: Suppressions{
&Suppression{
Filters: Filters{NewFilter("service", "test(-)?service")},
EndsAt: time.Now().Add(time.Hour),
},
&Suppression{
Filters: Filters{NewFilter("testlabel", ".*")},
EndsAt: time.Now().Add(time.Hour),
},
},
inhibited: Events{
&Event{
Labels: map[string]string{
"service": "testservice",
"foo": "bar",
},
},
&Event{
Labels: map[string]string{
"service": "test-service",
"bar": "baz",
},
},
&Event{
Labels: map[string]string{
"service": "bar-service",
"testlabel": "testvalue",
},
},
},
uninhibited: Events{
&Event{
Labels: map[string]string{
"service": "testservice2",
"foo": "bar",
},
},
},
},
}
for i, scenario := range scenarios {
scenario.test(i, t)
}
}