Merge "Build layered indexers."
This commit is contained in:
commit
bd7cd61939
|
@ -400,6 +400,203 @@ type MetricIndexer interface {
|
|||
IndexMetrics(FingerprintMetricMapping) error
|
||||
}
|
||||
|
||||
// IndexObserver listens and receives changes to a given
|
||||
// FingerprintMetricMapping.
|
||||
type IndexerObserver interface {
|
||||
Observe(FingerprintMetricMapping) error
|
||||
}
|
||||
|
||||
// IndexerProxy receives IndexMetric requests and proxies them to the underlying
|
||||
// MetricIndexer. Upon success of the underlying receiver, the registered
|
||||
// IndexObservers are called serially.
|
||||
//
|
||||
// If an error occurs in the underlying MetricIndexer or any of the observers,
|
||||
// this proxy will not work any further and return the offending error in this
|
||||
// call or any subsequent ones.
|
||||
type IndexerProxy struct {
|
||||
err error
|
||||
|
||||
i MetricIndexer
|
||||
observers []IndexerObserver
|
||||
}
|
||||
|
||||
func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error {
|
||||
if p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
if p.err = p.i.IndexMetrics(b); p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
|
||||
for _, o := range p.observers {
|
||||
if p.err = o.Observe(b); p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the underlying indexer.
|
||||
func (p *IndexerProxy) Close() error {
|
||||
if p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
if closer, ok := p.i.(io.Closer); ok {
|
||||
p.err = closer.Close()
|
||||
return p.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close flushes the underlying index requests before closing.
|
||||
func (p *IndexerProxy) Flush() error {
|
||||
if p.err != nil {
|
||||
return p.err
|
||||
}
|
||||
if flusher, ok := p.i.(flusher); ok {
|
||||
p.err = flusher.Flush()
|
||||
return p.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewIndexerProxy builds an IndexerProxy for the given configuration.
|
||||
func NewIndexerProxy(i MetricIndexer, o ...IndexerObserver) *IndexerProxy {
|
||||
return &IndexerProxy{
|
||||
i: i,
|
||||
observers: o,
|
||||
}
|
||||
}
|
||||
|
||||
// SynchronizedIndexer provides naive locking for any MetricIndexer.
|
||||
type SynchronizedIndexer struct {
|
||||
mu sync.Mutex
|
||||
i MetricIndexer
|
||||
}
|
||||
|
||||
func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
return i.i.IndexMetrics(b)
|
||||
}
|
||||
|
||||
type flusher interface {
|
||||
Flush() error
|
||||
}
|
||||
|
||||
func (i *SynchronizedIndexer) Flush() error {
|
||||
if flusher, ok := i.i.(flusher); ok {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
return flusher.Flush()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *SynchronizedIndexer) Close() error {
|
||||
if closer, ok := i.i.(io.Closer); ok {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
|
||||
return closer.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewSynchronizedIndexer builds a new MetricIndexer.
|
||||
func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer {
|
||||
return &SynchronizedIndexer{
|
||||
i: i,
|
||||
}
|
||||
}
|
||||
|
||||
// BufferedIndexer provides unsynchronized index buffering.
|
||||
//
|
||||
// If an error occurs in the underlying MetricIndexer or any of the observers,
|
||||
// this proxy will not work any further and return the offending error.
|
||||
type BufferedIndexer struct {
|
||||
i MetricIndexer
|
||||
|
||||
limit int
|
||||
|
||||
buf []FingerprintMetricMapping
|
||||
|
||||
err error
|
||||
}
|
||||
|
||||
func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
|
||||
if i.err != nil {
|
||||
return i.err
|
||||
}
|
||||
|
||||
if len(i.buf) < i.limit {
|
||||
i.buf = append(i.buf, b)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
i.buf = append(i.buf)
|
||||
|
||||
i.err = i.Flush()
|
||||
|
||||
return i.err
|
||||
}
|
||||
|
||||
// Flush writes all pending entries to the index.
|
||||
func (i *BufferedIndexer) Flush() error {
|
||||
if i.err != nil {
|
||||
return i.err
|
||||
}
|
||||
|
||||
if len(i.buf) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
superset := FingerprintMetricMapping{}
|
||||
for _, b := range i.buf {
|
||||
for fp, m := range b {
|
||||
if _, ok := superset[fp]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
superset[fp] = m
|
||||
}
|
||||
}
|
||||
|
||||
i.buf = make([]FingerprintMetricMapping, 0, i.limit)
|
||||
|
||||
i.err = i.i.IndexMetrics(superset)
|
||||
|
||||
return i.err
|
||||
}
|
||||
|
||||
// Close flushes and closes the underlying buffer.
|
||||
func (i *BufferedIndexer) Close() error {
|
||||
if err := i.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if closer, ok := i.i.(io.Closer); ok {
|
||||
return closer.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer {
|
||||
|
||||
return &BufferedIndexer{
|
||||
i: i,
|
||||
limit: limit,
|
||||
buf: make([]FingerprintMetricMapping, 0, limit),
|
||||
}
|
||||
}
|
||||
|
||||
// TotalIndexer is a MetricIndexer that indexes all standard facets of a metric
|
||||
// that a user or the Prometheus subsystem would want to query against:
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue