diff --git a/storage/local/chunk.go b/storage/local/chunk.go index cceb1f5cf..b292a378f 100644 --- a/storage/local/chunk.go +++ b/storage/local/chunk.go @@ -97,7 +97,7 @@ type chunkDesc struct { // evictListElement is nil if the chunk is not in the evict list. // evictListElement is _not_ protected by the chunkDesc mutex. - // It must only be touched by the evict list handler in memorySeriesStorage. + // It must only be touched by the evict list handler in MemorySeriesStorage. evictListElement *list.Element } diff --git a/storage/local/instrumentation.go b/storage/local/instrumentation.go index 271d85949..28b218672 100644 --- a/storage/local/instrumentation.go +++ b/storage/local/instrumentation.go @@ -98,7 +98,7 @@ func init() { var ( // Global counter, also used internally, so not implemented as - // metrics. Collected in memorySeriesStorage.Collect. + // metrics. Collected in MemorySeriesStorage.Collect. // TODO(beorn7): As it is used internally, it is actually very bad style // to have it as a global variable. numMemChunks int64 diff --git a/storage/local/preload.go b/storage/local/preload.go index fb6a21f64..8e11de3ea 100644 --- a/storage/local/preload.go +++ b/storage/local/preload.go @@ -19,9 +19,9 @@ import ( "github.com/prometheus/common/model" ) -// memorySeriesPreloader is a Preloader for the memorySeriesStorage. +// memorySeriesPreloader is a Preloader for the MemorySeriesStorage. type memorySeriesPreloader struct { - storage *memorySeriesStorage + storage *MemorySeriesStorage pinnedChunkDescs []*chunkDesc } diff --git a/storage/local/series.go b/storage/local/series.go index 2c49dc9a5..2993cfe08 100644 --- a/storage/local/series.go +++ b/storage/local/series.go @@ -339,7 +339,7 @@ func (s *memorySeries) dropChunks(t model.Time) error { // preloadChunks is an internal helper method. func (s *memorySeries) preloadChunks( - indexes []int, fp model.Fingerprint, mss *memorySeriesStorage, + indexes []int, fp model.Fingerprint, mss *MemorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { loadIndexes := []int{} pinnedChunkDescs := make([]*chunkDesc, 0, len(indexes)) @@ -412,7 +412,7 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc, quarantine fun func (s *memorySeries) preloadChunksForInstant( fp model.Fingerprint, from model.Time, through model.Time, - mss *memorySeriesStorage, + mss *MemorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { // If we have a lastSamplePair in the series, and thas last samplePair // is in the interval, just take it in a singleSampleSeriesIterator. No @@ -437,7 +437,7 @@ func (s *memorySeries) preloadChunksForInstant( func (s *memorySeries) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, - mss *memorySeriesStorage, + mss *MemorySeriesStorage, ) ([]*chunkDesc, SeriesIterator, error) { firstChunkDescTime := model.Latest if len(s.chunkDescs) > 0 { diff --git a/storage/local/storage.go b/storage/local/storage.go index 0fec78870..35e88349a 100644 --- a/storage/local/storage.go +++ b/storage/local/storage.go @@ -128,7 +128,7 @@ const ( // synced or not. It does not need to be goroutine safe. type syncStrategy func() bool -type memorySeriesStorage struct { +type MemorySeriesStorage struct { // archiveHighWatermark and numChunksToPersist have to be aligned for atomic operations. archiveHighWatermark model.Time // No archived series has samples after this time. numChunksToPersist int64 // The number of chunks waiting for persistence. @@ -189,8 +189,8 @@ type MemorySeriesStorageOptions struct { // NewMemorySeriesStorage returns a newly allocated Storage. Storage.Serve still // has to be called to start the storage. -func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage { - s := &memorySeriesStorage{ +func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *MemorySeriesStorage { + s := &MemorySeriesStorage{ fpLocker: newFingerprintLocker(o.NumMutexes), options: o, @@ -303,7 +303,7 @@ func NewMemorySeriesStorage(o *MemorySeriesStorageOptions) *memorySeriesStorage } // Start implements Storage. -func (s *memorySeriesStorage) Start() (err error) { +func (s *MemorySeriesStorage) Start() (err error) { var syncStrategy syncStrategy switch s.options.SyncStrategy { case Never: @@ -360,7 +360,7 @@ func (s *memorySeriesStorage) Start() (err error) { } // Stop implements Storage. -func (s *memorySeriesStorage) Stop() error { +func (s *MemorySeriesStorage) Stop() error { log.Info("Stopping local storage...") log.Info("Stopping maintenance loop...") @@ -391,12 +391,12 @@ func (s *memorySeriesStorage) Stop() error { } // WaitForIndexing implements Storage. -func (s *memorySeriesStorage) WaitForIndexing() { +func (s *MemorySeriesStorage) WaitForIndexing() { s.persistence.waitForIndexing() } // LastSampleForFingerprint implements Storage. -func (s *memorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { +func (s *MemorySeriesStorage) LastSampleForFingerprint(fp model.Fingerprint) model.Sample { s.fpLocker.Lock(fp) defer s.fpLocker.Unlock(fp) @@ -439,7 +439,7 @@ func (bit *boundedIterator) RangeValues(interval metric.Interval) []model.Sample } // NewPreloader implements Storage. -func (s *memorySeriesStorage) NewPreloader() Preloader { +func (s *MemorySeriesStorage) NewPreloader() Preloader { return &memorySeriesPreloader{ storage: s, } @@ -447,7 +447,7 @@ func (s *memorySeriesStorage) NewPreloader() Preloader { // fingerprintsForLabelPairs returns the set of fingerprints that have the given labels. // This does not work with empty label values. -func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair) map[model.Fingerprint]struct{} { +func (s *MemorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair) map[model.Fingerprint]struct{} { var result map[model.Fingerprint]struct{} for _, pair := range pairs { intersection := map[model.Fingerprint]struct{}{} @@ -469,7 +469,7 @@ func (s *memorySeriesStorage) fingerprintsForLabelPairs(pairs ...model.LabelPair } // MetricsForLabelMatchers implements Storage. -func (s *memorySeriesStorage) MetricsForLabelMatchers( +func (s *MemorySeriesStorage) MetricsForLabelMatchers( from, through model.Time, matchers ...*metric.LabelMatcher, ) map[model.Fingerprint]metric.Metric { @@ -550,7 +550,7 @@ func (s *memorySeriesStorage) MetricsForLabelMatchers( // 'through', it returns (metric, nil, true). // // The caller must have locked the fp. -func (s *memorySeriesStorage) metricForRange( +func (s *MemorySeriesStorage) metricForRange( fp model.Fingerprint, from, through model.Time, ) (model.Metric, *memorySeries, bool) { @@ -589,12 +589,12 @@ func (s *memorySeriesStorage) metricForRange( } // LabelValuesForLabelName implements Storage. -func (s *memorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { +func (s *MemorySeriesStorage) LabelValuesForLabelName(labelName model.LabelName) model.LabelValues { return s.persistence.labelValuesForLabelName(labelName) } // DropMetric implements Storage. -func (s *memorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { +func (s *MemorySeriesStorage) DropMetricsForFingerprints(fps ...model.Fingerprint) { for _, fp := range fps { s.purgeSeries(fp, nil, nil) } @@ -612,7 +612,7 @@ var ( ) // Append implements Storage. -func (s *memorySeriesStorage) Append(sample *model.Sample) error { +func (s *MemorySeriesStorage) Append(sample *model.Sample) error { for ln, lv := range sample.Metric { if len(lv) == 0 { delete(sample.Metric, ln) @@ -666,7 +666,7 @@ func (s *memorySeriesStorage) Append(sample *model.Sample) error { } // NeedsThrottling implements Storage. -func (s *memorySeriesStorage) NeedsThrottling() bool { +func (s *MemorySeriesStorage) NeedsThrottling() bool { if s.getNumChunksToPersist() > s.maxChunksToPersist || float64(atomic.LoadInt64(&numMemChunks)) > float64(s.maxMemoryChunks)*toleranceFactorMemChunks { select { @@ -688,7 +688,7 @@ func (s *memorySeriesStorage) NeedsThrottling() bool { // no signal has arrived for a minute, an Info is logged that the storage is not // throttled anymore. This resets things to the initial state, i.e. once a // signal arrives again, the Error will be logged again. -func (s *memorySeriesStorage) logThrottling() { +func (s *MemorySeriesStorage) logThrottling() { timer := time.NewTimer(time.Minute) timer.Stop() @@ -719,7 +719,7 @@ func (s *memorySeriesStorage) logThrottling() { } } -func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { +func (s *MemorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Metric) (*memorySeries, error) { series, ok := s.fpToSeries.get(fp) if !ok { var cds []*chunkDesc @@ -761,7 +761,7 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me // seriesForRange is a helper method for preloadChunksForRange and preloadChunksForInstant. // // The caller must have locked the fp. -func (s *memorySeriesStorage) seriesForRange( +func (s *MemorySeriesStorage) seriesForRange( fp model.Fingerprint, from model.Time, through model.Time, ) *memorySeries { @@ -776,7 +776,7 @@ func (s *memorySeriesStorage) seriesForRange( return series } -func (s *memorySeriesStorage) preloadChunksForRange( +func (s *MemorySeriesStorage) preloadChunksForRange( fp model.Fingerprint, from model.Time, through model.Time, ) ([]*chunkDesc, SeriesIterator) { @@ -795,7 +795,7 @@ func (s *memorySeriesStorage) preloadChunksForRange( return cds, iter } -func (s *memorySeriesStorage) preloadChunksForInstant( +func (s *MemorySeriesStorage) preloadChunksForInstant( fp model.Fingerprint, from model.Time, through model.Time, ) ([]*chunkDesc, SeriesIterator) { @@ -814,7 +814,7 @@ func (s *memorySeriesStorage) preloadChunksForInstant( return cds, iter } -func (s *memorySeriesStorage) handleEvictList() { +func (s *MemorySeriesStorage) handleEvictList() { ticker := time.NewTicker(maxEvictInterval) count := 0 @@ -859,7 +859,7 @@ func (s *memorySeriesStorage) handleEvictList() { } // maybeEvict is a local helper method. Must only be called by handleEvictList. -func (s *memorySeriesStorage) maybeEvict() { +func (s *MemorySeriesStorage) maybeEvict() { numChunksToEvict := int(atomic.LoadInt64(&numMemChunks)) - s.maxMemoryChunks if numChunksToEvict <= 0 { return @@ -911,7 +911,7 @@ func (s *memorySeriesStorage) maybeEvict() { // // Normally, the method returns true once the wait duration has passed. However, // if s.loopStopped is closed, it will return false immediately. -func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool { +func (s *MemorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFactor float64) bool { d := fpMaxWaitDuration if numberOfFPs != 0 { sweepTime := s.dropAfter / 10 @@ -938,7 +938,7 @@ func (s *memorySeriesStorage) waitForNextFP(numberOfFPs int, maxWaitDurationFact // cycleThroughMemoryFingerprints returns a channel that emits fingerprints for // series in memory in a throttled fashion. It continues to cycle through all // fingerprints in memory until s.loopStopping is closed. -func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint { +func (s *MemorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Fingerprint { memoryFingerprints := make(chan model.Fingerprint) go func() { var fpIter <-chan model.Fingerprint @@ -985,7 +985,7 @@ func (s *memorySeriesStorage) cycleThroughMemoryFingerprints() chan model.Finger // cycleThroughArchivedFingerprints returns a channel that emits fingerprints // for archived series in a throttled fashion. It continues to cycle through all // archived fingerprints until s.loopStopping is closed. -func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint { +func (s *MemorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fingerprint { archivedFingerprints := make(chan model.Fingerprint) go func() { defer close(archivedFingerprints) @@ -1024,7 +1024,7 @@ func (s *memorySeriesStorage) cycleThroughArchivedFingerprints() chan model.Fing return archivedFingerprints } -func (s *memorySeriesStorage) loop() { +func (s *MemorySeriesStorage) loop() { checkpointTimer := time.NewTimer(s.checkpointInterval) dirtySeriesCount := 0 @@ -1114,7 +1114,7 @@ loop: // case, it archives the series and returns true. // // Finally, it evicts chunkDescs if there are too many. -func (s *memorySeriesStorage) maintainMemorySeries( +func (s *MemorySeriesStorage) maintainMemorySeries( fp model.Fingerprint, beforeTime model.Time, ) (becameDirty bool) { defer func(begin time.Time) { @@ -1187,7 +1187,7 @@ func (s *memorySeriesStorage) maintainMemorySeries( // case, the method returns true. // // The caller must have locked the fp. -func (s *memorySeriesStorage) writeMemorySeries( +func (s *MemorySeriesStorage) writeMemorySeries( fp model.Fingerprint, series *memorySeries, beforeTime model.Time, ) bool { var ( @@ -1269,7 +1269,7 @@ func (s *memorySeriesStorage) writeMemorySeries( // maintainArchivedSeries drops chunks older than beforeTime from an archived // series. If the series contains no chunks after that, it is purged entirely. -func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) { +func (s *MemorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, beforeTime model.Time) { defer func(begin time.Time) { s.maintainSeriesDuration.WithLabelValues(maintainArchived).Observe( float64(time.Since(begin)) / float64(time.Second), @@ -1302,23 +1302,23 @@ func (s *memorySeriesStorage) maintainArchivedSeries(fp model.Fingerprint, befor } // See persistence.loadChunks for detailed explanation. -func (s *memorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { +func (s *MemorySeriesStorage) loadChunks(fp model.Fingerprint, indexes []int, indexOffset int) ([]chunk, error) { return s.persistence.loadChunks(fp, indexes, indexOffset) } // See persistence.loadChunkDescs for detailed explanation. -func (s *memorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { +func (s *MemorySeriesStorage) loadChunkDescs(fp model.Fingerprint, offsetFromEnd int) ([]*chunkDesc, error) { return s.persistence.loadChunkDescs(fp, offsetFromEnd) } // getNumChunksToPersist returns numChunksToPersist in a goroutine-safe way. -func (s *memorySeriesStorage) getNumChunksToPersist() int { +func (s *MemorySeriesStorage) getNumChunksToPersist() int { return int(atomic.LoadInt64(&s.numChunksToPersist)) } // incNumChunksToPersist increments numChunksToPersist in a goroutine-safe way. Use a // negative 'by' to decrement. -func (s *memorySeriesStorage) incNumChunksToPersist(by int) { +func (s *MemorySeriesStorage) incNumChunksToPersist(by int) { atomic.AddInt64(&s.numChunksToPersist, int64(by)) } @@ -1350,7 +1350,7 @@ func (s *memorySeriesStorage) incNumChunksToPersist(by int) { // checkpointing based on dirty-series count should be disabled, and series // files should not by synced anymore provided the user has specified the // adaptive sync strategy. -func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { +func (s *MemorySeriesStorage) calculatePersistenceUrgencyScore() float64 { s.rushedMtx.Lock() defer s.rushedMtx.Unlock() @@ -1416,7 +1416,7 @@ func (s *memorySeriesStorage) calculatePersistenceUrgencyScore() float64 { // and all its traces are removed from indices. Call this method if an // unrecoverable error is detected while dealing with a series, and pass in the // encountered error. It will be saved as a hint in the orphaned directory. -func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { +func (s *MemorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric model.Metric, err error) { req := quarantineRequest{fp: fp, metric: metric, reason: err} select { case s.quarantineRequests <- req: @@ -1431,7 +1431,7 @@ func (s *memorySeriesStorage) quarantineSeries(fp model.Fingerprint, metric mode } } -func (s *memorySeriesStorage) handleQuarantine() { +func (s *MemorySeriesStorage) handleQuarantine() { for { select { case req := <-s.quarantineRequests: @@ -1454,7 +1454,7 @@ func (s *memorySeriesStorage) handleQuarantine() { // provided, the series file will not be deleted completely, but moved to the // orphaned directory with the reason and the metric in a hint file. The // provided metric might be nil if unknown. -func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { +func (s *MemorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, quarantineReason error) { s.fpLocker.Lock(fp) var ( @@ -1518,7 +1518,7 @@ func (s *memorySeriesStorage) purgeSeries(fp model.Fingerprint, m model.Metric, } // Describe implements prometheus.Collector. -func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { +func (s *MemorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { s.persistence.Describe(ch) s.mapper.Describe(ch) @@ -1537,7 +1537,7 @@ func (s *memorySeriesStorage) Describe(ch chan<- *prometheus.Desc) { } // Collect implements prometheus.Collector. -func (s *memorySeriesStorage) Collect(ch chan<- prometheus.Metric) { +func (s *MemorySeriesStorage) Collect(ch chan<- prometheus.Metric) { s.persistence.Collect(ch) s.mapper.Collect(ch) diff --git a/storage/local/storage_test.go b/storage/local/storage_test.go index 48eb34184..fe5f4103c 100644 --- a/storage/local/storage_test.go +++ b/storage/local/storage_test.go @@ -1678,7 +1678,7 @@ func createRandomSamples(metricName string, minLen int) model.Samples { return result } -func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { +func verifyStorageRandom(t testing.TB, s *MemorySeriesStorage, samples model.Samples) bool { s.WaitForIndexing() result := true for _, i := range rand.Perm(len(samples)) { @@ -1709,7 +1709,7 @@ func verifyStorageRandom(t testing.TB, s *memorySeriesStorage, samples model.Sam return result } -func verifyStorageSequential(t testing.TB, s *memorySeriesStorage, samples model.Samples) bool { +func verifyStorageSequential(t testing.TB, s *MemorySeriesStorage, samples model.Samples) bool { s.WaitForIndexing() var ( result = true diff --git a/storage/local/test_helpers.go b/storage/local/test_helpers.go index 3695ea310..7c2a3a8ee 100644 --- a/storage/local/test_helpers.go +++ b/storage/local/test_helpers.go @@ -40,7 +40,7 @@ func (t *testStorageCloser) Close() { // NewTestStorage creates a storage instance backed by files in a temporary // directory. The returned storage is already in serving state. Upon closing the // returned test.Closer, the temporary directory is cleaned up. -func NewTestStorage(t testutil.T, encoding chunkEncoding) (*memorySeriesStorage, testutil.Closer) { +func NewTestStorage(t testutil.T, encoding chunkEncoding) (*MemorySeriesStorage, testutil.Closer) { DefaultChunkEncoding = encoding directory := testutil.NewTemporaryDirectory("test_storage", t) o := &MemorySeriesStorageOptions{