From 3e97a3630d15e24a5edcb83906b07f9511d7d10e Mon Sep 17 00:00:00 2001 From: "Matt T. Proud" Date: Thu, 21 Mar 2013 18:29:33 +0100 Subject: [PATCH] Include nascent curator scaffolding. The curator doesn't do anything yet; rather, this is the type definition including the anciliary testing scaffold. Improve Makefile and Git developer experience. The top-level Makefile was a bit overloaded in terms of generation of assets and their management. This has been offloaded into separate Makefiles. The Git developer experience sucked due to lack of .gitignore policies. Also: Fix faulty skiplist naming from old merge. --- .gitignore | 7 +- Makefile | 7 +- Makefile.TRAVIS | 4 +- model/Makefile | 13 +- model/curation.go | 39 +++++ model/data.proto | 31 ++++ model/watermark.go | 33 ++++ storage/metric/curator.go | 266 +++++++++++++++++++++++++++++++++ storage/metric/curator_test.go | 213 ++++++++++++++++++++++++++ web/Makefile | 22 +++ web/blob/.gitignore | 1 + web/blob/Makefile | 22 +++ 12 files changed, 648 insertions(+), 10 deletions(-) create mode 100644 model/curation.go create mode 100644 model/watermark.go create mode 100644 storage/metric/curator.go create mode 100644 storage/metric/curator_test.go create mode 100644 web/Makefile create mode 100644 web/blob/.gitignore create mode 100644 web/blob/Makefile diff --git a/.gitignore b/.gitignore index 281b2bb72..d7eb0a973 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,9 @@ [568a].out _cgo_* core -web/blob/files.go + +*-stamp +leveldb-1.7.0* +protobuf-2.4.1* +snappy-1.0.5* +prometheus.build diff --git a/Makefile b/Makefile index 01ca8f790..9b9582be7 100644 --- a/Makefile +++ b/Makefile @@ -19,15 +19,15 @@ test: build go test ./... build: - ./utility/embed-static.sh web/static web/templates | gofmt > web/blob/files.go $(MAKE) -C model + $(MAKE) -C web go build ./... go build -o prometheus.build clean: - rm -rf web/static/blob/files.go - rm -rf $(TEST_ARTIFACTS) $(MAKE) -C model clean + $(MAKE) -C web clean + rm -rf $(TEST_ARTIFACTS) -find . -type f -iname '*~' -exec rm '{}' ';' -find . -type f -iname '*#' -exec rm '{}' ';' -find . -type f -iname '.#*' -exec rm '{}' ';' @@ -45,4 +45,3 @@ documentation: search_index godoc -http=:6060 -index -index_files='search_index' .PHONY: advice build clean documentation format search_index test - diff --git a/Makefile.TRAVIS b/Makefile.TRAVIS index c47a8b47a..74402dcdd 100644 --- a/Makefile.TRAVIS +++ b/Makefile.TRAVIS @@ -41,7 +41,7 @@ preparation-stamp: build-dependencies build-dependencies: build-dependencies-stamp -build-dependencies-stamp: bison cc mercurial protoc goprotobuf gorest goskiplist go instrumentation leveldb levigo +build-dependencies-stamp: bison cc go goprotobuf gorest goskiplist instrumentation leveldb levigo mercurial protoc touch $@ overlay: overlay-stamp @@ -148,7 +148,7 @@ vim-common: vim-common-stamp vim-common-stamp: $(APT_GET_INSTALL) vim-common - touch $@ + touch $@ test: test-stamp diff --git a/model/Makefile b/model/Makefile index 87d95d754..9b2a342dd 100644 --- a/model/Makefile +++ b/model/Makefile @@ -11,13 +11,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +MAKE_ARTIFACTS = generated-stamp + export PATH := $(PATH):/Users/mtp/Development/go/bin -all: data.proto +all: generated -data.proto: +generated: generated-stamp + +generated-stamp: data.proto protoc --go_out=generated/ data.proto + touch $@ + clean: rm -rf generated/* + -rm -f $(MAKE_ARTIFACTS) -.PHONY: data.proto +.PHONY: generated diff --git a/model/curation.go b/model/curation.go new file mode 100644 index 000000000..26bcdf8e6 --- /dev/null +++ b/model/curation.go @@ -0,0 +1,39 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + dto "github.com/prometheus/prometheus/model/generated" + "time" +) + +// CurationRemark provides a representation of dto.CurationValue with associated +// business logic methods attached to it to enhance code readability. +type CurationRemark struct { + LastCompletionTimestamp time.Time +} + +// OlderThanLimit answers whether this CurationRemark is older than the provided +// cutOff time. +func (c CurationRemark) OlderThanLimit(cutOff time.Time) bool { + return c.LastCompletionTimestamp.Before(cutOff) +} + +// NewCurationRemarkFromDTO builds CurationRemark from the provided +// dto.CurationValue object. +func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark { + return CurationRemark{ + LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0), + } +} diff --git a/model/data.proto b/model/data.proto index 4f056c08b..3b6004e3d 100644 --- a/model/data.proto +++ b/model/data.proto @@ -59,3 +59,34 @@ message MembershipIndexValue { message MetricHighWatermark { optional int64 timestamp = 1; } + +// CurationKey models the state of curation for a given metric fingerprint and +// its associated samples. The time series database only knows about compaction +// and resampling behaviors that are explicitly defined to it in its runtime +// configuration, meaning it never scans on-disk tables for CurationKey +// policies; rather, it looks up via the CurationKey tuple to find out what the +// effectuation state for a given metric fingerprint is. +// +// For instance, how far along as a rule for (Fingerprint A, Samples Older Than +// B, and Grouped Together in Size of C) has been effectuated on-disk. +message CurationKey { + // fingerprint identifies the fingerprint for the given policy. + optional Fingerprint fingerprint = 1; + // older_than represents in seconds relative to when curation cycle starts + // into the past when the curator should stop operating on a given metric + // fingerprint's samples: + // + // [Oldest Sample Time, time.Now().Sub(time.Second * older_than)) + optional int64 older_than = 2; + // minimum_group_size identifies how minimally samples should be grouped + // together to write a new SampleValueSeries chunk. + optional uint32 minimum_group_size = 3; +} + +// CurationValue models the progress for a given CurationKey. +message CurationValue { + // last_completion_timestamp represents the seconds since the epoch UTC at + // which the curator last completed its duty cycle for a given metric + // fingerprint. + optional int64 last_completion_timestamp = 1; +} \ No newline at end of file diff --git a/model/watermark.go b/model/watermark.go new file mode 100644 index 000000000..93cf47c91 --- /dev/null +++ b/model/watermark.go @@ -0,0 +1,33 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + dto "github.com/prometheus/prometheus/model/generated" + "time" +) + +// Watermark provides a representation of dto.MetricHighWatermark with +// associated business logic methods attached to it to enhance code readability. +type Watermark struct { + time.Time +} + +// NewWatermarkFromHighWatermarkDTO builds Watermark from the provided +// dto.MetricHighWatermark object. +func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark { + return Watermark{ + time.Unix(*d.Timestamp, 0), + } +} diff --git a/storage/metric/curator.go b/storage/metric/curator.go new file mode 100644 index 000000000..fb3d64646 --- /dev/null +++ b/storage/metric/curator.go @@ -0,0 +1,266 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "code.google.com/p/goprotobuf/proto" + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/model" + dto "github.com/prometheus/prometheus/model/generated" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw" + "time" +) + +// curator is responsible for effectuating a given curation policy across the +// stored samples on-disk. This is useful to compact sparse sample values into +// single sample entities to reduce keyspace load on the datastore. +type curator struct { + // stop functions as a channel that when empty allows the curator to operate. + // The moment a value is ingested inside of it, the curator goes into drain + // mode. + stop chan bool + // samples is the on-disk metric store that is scanned for compaction + // candidates. + samples raw.Persistence + // watermarks is the on-disk store that is scanned for high watermarks for + // given metrics. + watermarks raw.Persistence + // cutOff represents the most recent time up to which values will be curated. + cutOff time.Time + // groupingQuantity represents the number of samples below which encountered + // samples will be dismembered and reaggregated into larger groups. + groupingQuantity uint32 + // curationState is the on-disk store where the curation remarks are made for + // how much progress has been made. + curationState raw.Persistence +} + +// newCurator builds a new curator for the given LevelDB databases. +func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator { + return curator{ + cutOff: cutOff, + stop: make(chan bool), + samples: samples, + curationState: curationState, + watermarks: watermarks, + groupingQuantity: groupingQuantity, + } +} + +// run facilitates the curation lifecycle. +func (c curator) run() (err error) { + var ( + decoder watermarkDecoder + filter = watermarkFilter{ + stop: c.stop, + } + operator = watermarkOperator{ + olderThan: c.cutOff, + groupSize: c.groupingQuantity, + curationState: c.curationState, + } + ) + + _, err = c.watermarks.ForEach(decoder, filter, operator) + + return +} + +// drain instructs the curator to stop at the next convenient moment as to not +// introduce data inconsistencies. +func (c curator) drain() { + if len(c.stop) == 0 { + c.stop <- true + } +} + +// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles +// into (model.Fingerprint, model.Watermark) doubles. +type watermarkDecoder struct{} + +func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) { + var ( + key = &dto.Fingerprint{} + bytes = in.([]byte) + ) + + err = proto.Unmarshal(bytes, key) + if err != nil { + panic(err) + } + + out = model.NewFingerprintFromRowKey(*key.Signature) + + return +} + +func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) { + var ( + dto = &dto.MetricHighWatermark{} + bytes = in.([]byte) + ) + + err = proto.Unmarshal(bytes, dto) + if err != nil { + panic(err) + } + + out = model.NewWatermarkFromHighWatermarkDTO(dto) + + return +} + +// watermarkFilter determines whether to include or exclude candidate +// values from the curation process by virtue of how old the high watermark is. +type watermarkFilter struct { + // curationState is the table of CurationKey to CurationValues that remark on + // far along the curation process has gone for a given metric fingerprint. + curationState raw.Persistence + // stop, when non-empty, instructs the filter to stop operation. + stop chan bool +} + +func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) { + var ( + fingerprint = key.(model.Fingerprint) + watermark = value.(model.Watermark) + curationKey = fingerprint.ToDTO() + rawCurationValue []byte + err error + curationValue = &dto.CurationValue{} + ) + + rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + if err != nil { + panic(err) + } + + err = proto.Unmarshal(rawCurationValue, curationValue) + if err != nil { + panic(err) + } + + switch { + case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time): + result = storage.ACCEPT + case len(w.stop) != 0: + result = storage.STOP + default: + result = storage.SKIP + } + + return +} + +// watermarkOperator scans over the curator.samples table for metrics whose +// high watermark has been determined to be allowable for curation. This type +// is individually responsible for compaction. +// +// The scanning starts from CurationRemark.LastCompletionTimestamp and goes +// forward until the stop point or end of the series is reached. +type watermarkOperator struct { + // olderThan functions as the cutoff when scanning curator.samples for + // uncurated samples to compact. The operator scans forward in the samples + // until olderThan is reached and then stops operation for samples that occur + // after it. + olderThan time.Time + // groupSize is the target quantity of samples to group together for a given + // to-be-written sample. Observed samples of less than groupSize are combined + // up to groupSize if possible. The protocol does not define the behavior if + // observed chunks are larger than groupSize. + groupSize uint32 + // curationState is the table of CurationKey to CurationValues that remark on + // far along the curation process has gone for a given metric fingerprint. + curationState raw.Persistence +} + +func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) { + var ( + fingerprint = key.(model.Fingerprint) + watermark = value.(model.Watermark) + queryErr error + hasBeenCurated bool + curationConsistent bool + ) + hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint) + if queryErr != nil { + err = &storage.OperatorError{queryErr, false} + return + } + + if !hasBeenCurated { + // curate + return + } + + curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark) + if queryErr != nil { + err = &storage.OperatorError{queryErr, false} + return + } + if curationConsistent { + return + } + + // curate + + return +} + +// hasBeenCurated answers true if the provided Fingerprint has been curated in +// in the past. +func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) { + curationKey := &dto.CurationKey{ + Fingerprint: f.ToDTO(), + OlderThan: proto.Int64(w.olderThan.Unix()), + MinimumGroupSize: proto.Uint32(w.groupSize), + } + + curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey)) + + return +} + +// curationConsistent determines whether the given metric is in a dirty state +// and needs curation. +func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) { + var ( + rawValue []byte + curationValue = &dto.CurationValue{} + curationKey = &dto.CurationKey{ + Fingerprint: f.ToDTO(), + OlderThan: proto.Int64(w.olderThan.Unix()), + MinimumGroupSize: proto.Uint32(w.groupSize), + } + ) + + rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey)) + if err != nil { + return + } + + err = proto.Unmarshal(rawValue, curationValue) + if err != nil { + return + } + + curationRemark := model.NewCurationRemarkFromDTO(curationValue) + if !curationRemark.OlderThanLimit(watermark.Time) { + consistent = true + return + } + + return +} diff --git a/storage/metric/curator_test.go b/storage/metric/curator_test.go new file mode 100644 index 000000000..5adb75c2e --- /dev/null +++ b/storage/metric/curator_test.go @@ -0,0 +1,213 @@ +// Copyright 2013 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "github.com/prometheus/prometheus/coding" + "github.com/prometheus/prometheus/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/storage/raw" + "sort" + "testing" + "time" +) + +type ( + keyPair struct { + fingerprint model.Fingerprint + time time.Time + } + + fakeCurationStates map[model.Fingerprint]time.Time + fakeSamples map[keyPair][]float32 + fakeWatermarks map[model.Fingerprint]time.Time + + in struct { + curationStates fakeCurationStates + samples fakeSamples + watermarks fakeWatermarks + cutOff time.Time + grouping uint32 + } + + out struct { + curationStates fakeCurationStates + samples fakeSamples + watermarks fakeWatermarks + } +) + +func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) { + panic("unimplemented") +} + +func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) { + panic("unimplemented") +} + +func (c fakeCurationStates) Drop(_ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeCurationStates) Put(_, _ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeCurationStates) Close() error { + panic("unimplemented") +} + +func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { + var ( + fingerprints model.Fingerprints + ) + + for f := range c { + fingerprints = append(fingerprints, f) + } + + sort.Sort(fingerprints) + + for _, k := range fingerprints { + v := c[k] + + var ( + decodedKey interface{} + decodedValue interface{} + ) + + decodedKey, err = d.DecodeKey(k) + if err != nil { + continue + } + + decodedValue, err = d.DecodeValue(v) + if err != nil { + continue + } + + switch f.Filter(decodedKey, decodedValue) { + case storage.STOP: + return + case storage.SKIP: + continue + case storage.ACCEPT: + opErr := o.Operate(decodedKey, decodedValue) + if opErr != nil { + if opErr.Continuable { + continue + } + break + } + } + } + + return +} + +func (c fakeCurationStates) Commit(_ raw.Batch) error { + panic("unimplemented") +} + +func (c fakeSamples) Has(_ coding.Encoder) (bool, error) { + panic("unimplemented") +} + +func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) { + panic("unimplemented") +} + +func (c fakeSamples) Drop(_ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeSamples) Put(_, _ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeSamples) Close() error { + panic("unimplemented") +} + +func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { + panic("unimplemented") +} + +func (c fakeSamples) Commit(_ raw.Batch) (err error) { + panic("unimplemented") +} + +func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) { + panic("unimplemented") +} + +func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) { + panic("unimplemented") +} + +func (c fakeWatermarks) Drop(_ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeWatermarks) Put(_, _ coding.Encoder) error { + panic("unimplemented") +} + +func (c fakeWatermarks) Close() error { + panic("unimplemented") +} + +func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) { + panic("unimplemented") +} + +func (c fakeWatermarks) Commit(_ raw.Batch) (err error) { + panic("unimplemented") +} + +func TestCurator(t *testing.T) { + var ( + scenarios = []struct { + in in + out out + }{ + { + in: in{ + curationStates: fakeCurationStates{ + model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute), + model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute), + }, + watermarks: fakeWatermarks{}, + samples: fakeSamples{}, + cutOff: testInstant.Add(5 * time.Minute), + grouping: 5, + }, + }, + } + ) + + for _, scenario := range scenarios { + var ( + in = scenario.in + + curationStates = in.curationStates + samples = in.samples + watermarks = in.watermarks + cutOff = in.cutOff + grouping = in.grouping + ) + + _ = newCurator(cutOff, grouping, curationStates, samples, watermarks) + } +} diff --git a/web/Makefile b/web/Makefile new file mode 100644 index 000000000..4f3cf0980 --- /dev/null +++ b/web/Makefile @@ -0,0 +1,22 @@ +# Copyright 2013 Prometheus Team +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +all: blob + +blob: + $(MAKE) -C blob + +clean: + $(MAKE) -C blob clean + +.PHONY: blob clean diff --git a/web/blob/.gitignore b/web/blob/.gitignore new file mode 100644 index 000000000..ee22a0c61 --- /dev/null +++ b/web/blob/.gitignore @@ -0,0 +1 @@ +files.go \ No newline at end of file diff --git a/web/blob/Makefile b/web/blob/Makefile new file mode 100644 index 000000000..4024a0be7 --- /dev/null +++ b/web/blob/Makefile @@ -0,0 +1,22 @@ +# Copyright 2013 Prometheus Team +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +all: files.go + +files.go: + ../../utility/embed-static.sh ../static ../templates | gofmt > $@ + +clean: + -rm files.go + +.PHONY: clean