From acf91f38bd6c9d2dbe0a48d9f6503f754ca2a4b0 Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Tue, 13 Aug 2013 18:08:02 +0200 Subject: [PATCH] Build layered indexers. The indexers will be extracted in a short while and wrapped accordingly with these types. Change-Id: I4d1abda4e46117210babad5aa0d42f9ca1f6594f --- storage/metric/index.go | 197 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 197 insertions(+) diff --git a/storage/metric/index.go b/storage/metric/index.go index 20eee58d8..46e590e04 100644 --- a/storage/metric/index.go +++ b/storage/metric/index.go @@ -400,6 +400,203 @@ type MetricIndexer interface { IndexMetrics(FingerprintMetricMapping) error } +// IndexObserver listens and receives changes to a given +// FingerprintMetricMapping. +type IndexerObserver interface { + Observe(FingerprintMetricMapping) error +} + +// IndexerProxy receives IndexMetric requests and proxies them to the underlying +// MetricIndexer. Upon success of the underlying receiver, the registered +// IndexObservers are called serially. +// +// If an error occurs in the underlying MetricIndexer or any of the observers, +// this proxy will not work any further and return the offending error in this +// call or any subsequent ones. +type IndexerProxy struct { + err error + + i MetricIndexer + observers []IndexerObserver +} + +func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error { + if p.err != nil { + return p.err + } + if p.err = p.i.IndexMetrics(b); p.err != nil { + return p.err + } + + for _, o := range p.observers { + if p.err = o.Observe(b); p.err != nil { + return p.err + } + } + + return nil +} + +// Close closes the underlying indexer. +func (p *IndexerProxy) Close() error { + if p.err != nil { + return p.err + } + if closer, ok := p.i.(io.Closer); ok { + p.err = closer.Close() + return p.err + } + return nil +} + +// Close flushes the underlying index requests before closing. +func (p *IndexerProxy) Flush() error { + if p.err != nil { + return p.err + } + if flusher, ok := p.i.(flusher); ok { + p.err = flusher.Flush() + return p.err + } + return nil +} + +// NewIndexerProxy builds an IndexerProxy for the given configuration. +func NewIndexerProxy(i MetricIndexer, o ...IndexerObserver) *IndexerProxy { + return &IndexerProxy{ + i: i, + observers: o, + } +} + +// SynchronizedIndexer provides naive locking for any MetricIndexer. +type SynchronizedIndexer struct { + mu sync.Mutex + i MetricIndexer +} + +func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error { + i.mu.Lock() + defer i.mu.Unlock() + + return i.i.IndexMetrics(b) +} + +type flusher interface { + Flush() error +} + +func (i *SynchronizedIndexer) Flush() error { + if flusher, ok := i.i.(flusher); ok { + i.mu.Lock() + defer i.mu.Unlock() + + return flusher.Flush() + } + + return nil +} + +func (i *SynchronizedIndexer) Close() error { + if closer, ok := i.i.(io.Closer); ok { + i.mu.Lock() + defer i.mu.Unlock() + + return closer.Close() + } + + return nil +} + +// NewSynchronizedIndexer builds a new MetricIndexer. +func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer { + return &SynchronizedIndexer{ + i: i, + } +} + +// BufferedIndexer provides unsynchronized index buffering. +// +// If an error occurs in the underlying MetricIndexer or any of the observers, +// this proxy will not work any further and return the offending error. +type BufferedIndexer struct { + i MetricIndexer + + limit int + + buf []FingerprintMetricMapping + + err error +} + +func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error { + if i.err != nil { + return i.err + } + + if len(i.buf) < i.limit { + i.buf = append(i.buf, b) + + return nil + } + + i.buf = append(i.buf) + + i.err = i.Flush() + + return i.err +} + +// Flush writes all pending entries to the index. +func (i *BufferedIndexer) Flush() error { + if i.err != nil { + return i.err + } + + if len(i.buf) == 0 { + return nil + } + + superset := FingerprintMetricMapping{} + for _, b := range i.buf { + for fp, m := range b { + if _, ok := superset[fp]; ok { + continue + } + + superset[fp] = m + } + } + + i.buf = make([]FingerprintMetricMapping, 0, i.limit) + + i.err = i.i.IndexMetrics(superset) + + return i.err +} + +// Close flushes and closes the underlying buffer. +func (i *BufferedIndexer) Close() error { + if err := i.Flush(); err != nil { + return err + } + + if closer, ok := i.i.(io.Closer); ok { + return closer.Close() + } + + return nil +} + +func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer { + + return &BufferedIndexer{ + i: i, + limit: limit, + buf: make([]FingerprintMetricMapping, 0, limit), + } +} + // TotalIndexer is a MetricIndexer that indexes all standard facets of a metric // that a user or the Prometheus subsystem would want to query against: //