mirror of
https://github.com/prometheus/alertmanager
synced 2024-12-24 15:12:37 +00:00
Serialize all database accesses
This commit is contained in:
parent
27603290b2
commit
3620d996cf
@ -30,16 +30,21 @@ CREATE INDEX IF NOT EXISTS alerts_end ON alerts (ends_at);
|
||||
CREATE INDEX IF NOT EXISTS alerts_updated ON alerts (updated_at);
|
||||
`
|
||||
|
||||
var dbmtx sync.Mutex
|
||||
|
||||
type Alerts struct {
|
||||
db *sql.DB
|
||||
|
||||
pmtx, mtx sync.RWMutex
|
||||
mtx sync.RWMutex
|
||||
listeners map[int]chan *types.Alert
|
||||
next int
|
||||
insertCh chan *types.Alert
|
||||
}
|
||||
|
||||
func NewAlerts(db *sql.DB) (*Alerts, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -120,6 +125,9 @@ func (a *Alerts) GetPending() provider.AlertIterator {
|
||||
}
|
||||
|
||||
func (a *Alerts) getPending() ([]*types.Alert, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
// Get the last instance for each alert.
|
||||
rows, err := a.db.Query(`
|
||||
SELECT a1.labels, a1.annotations, a1.starts_at, a1.ends_at, a1.updated_at, a1.timeout
|
||||
@ -173,8 +181,8 @@ func (a *Alerts) Get(model.Fingerprint) (*types.Alert, error) {
|
||||
|
||||
// Put implements the Alerts interface.
|
||||
func (a *Alerts) Put(alerts ...*types.Alert) error {
|
||||
a.pmtx.Lock()
|
||||
defer a.pmtx.Unlock()
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
tx, err := a.db.Begin()
|
||||
if err != nil {
|
||||
@ -337,6 +345,9 @@ type Notifies struct {
|
||||
}
|
||||
|
||||
func NewNotifies(db *sql.DB) (*Notifies, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -352,6 +363,9 @@ func NewNotifies(db *sql.DB) (*Notifies, error) {
|
||||
|
||||
// Get implements the Notifies interface.
|
||||
func (n *Notifies) Get(dest string, fps ...model.Fingerprint) ([]*types.NotifyInfo, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
var result []*types.NotifyInfo
|
||||
|
||||
for _, fp := range fps {
|
||||
@ -388,6 +402,9 @@ func (n *Notifies) Get(dest string, fps ...model.Fingerprint) ([]*types.NotifyIn
|
||||
|
||||
// Set implements the Notifies interface.
|
||||
func (n *Notifies) Set(ns ...*types.NotifyInfo) error {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
tx, err := n.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -455,6 +472,9 @@ type Silences struct {
|
||||
|
||||
// NewSilences returns a new Silences based on the provided SQL DB.
|
||||
func NewSilences(db *sql.DB, mk types.Marker) (*Silences, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
tx, err := db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -490,6 +510,9 @@ func (s *Silences) Mutes(lset model.LabelSet) bool {
|
||||
|
||||
// All implements the Silences interface.
|
||||
func (s *Silences) All() ([]*types.Silence, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
rows, err := s.db.Query(`
|
||||
SELECT id, matchers, starts_at, ends_at, created_at, created_by, comment
|
||||
FROM silences
|
||||
@ -535,6 +558,9 @@ func (s *Silences) All() ([]*types.Silence, error) {
|
||||
|
||||
// Set impelements the Silences interface.
|
||||
func (s *Silences) Set(sil *types.Silence) (uint64, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
mb, err := json.Marshal(sil.Silence.Matchers)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
@ -590,6 +616,9 @@ func (s *Silences) Del(sid uint64) error {
|
||||
|
||||
// Get implements the Silences interface.
|
||||
func (s *Silences) Get(sid uint64) (*types.Silence, error) {
|
||||
dbmtx.Lock()
|
||||
defer dbmtx.Unlock()
|
||||
|
||||
row := s.db.QueryRow(`
|
||||
SELECT id, matchers, starts_at, ends_at, created_at, created_by, comment
|
||||
FROM silences
|
||||
|
Loading…
Reference in New Issue
Block a user