Merge pull request #93 from prometheus/feature/storage/compaction

Include nascent curator scaffolding.
This commit is contained in:
Matt T. Proud 2013-03-25 11:44:03 -07:00
commit a3a6434ea9
12 changed files with 648 additions and 10 deletions

7
.gitignore vendored
View File

@ -20,4 +20,9 @@
[568a].out
_cgo_*
core
web/blob/files.go
*-stamp
leveldb-1.7.0*
protobuf-2.4.1*
snappy-1.0.5*
prometheus.build

View File

@ -19,15 +19,15 @@ test: build
go test ./...
build:
./utility/embed-static.sh web/static web/templates | gofmt > web/blob/files.go
$(MAKE) -C model
$(MAKE) -C web
go build ./...
go build -o prometheus.build
clean:
rm -rf web/static/blob/files.go
rm -rf $(TEST_ARTIFACTS)
$(MAKE) -C model clean
$(MAKE) -C web clean
rm -rf $(TEST_ARTIFACTS)
-find . -type f -iname '*~' -exec rm '{}' ';'
-find . -type f -iname '*#' -exec rm '{}' ';'
-find . -type f -iname '.#*' -exec rm '{}' ';'
@ -45,4 +45,3 @@ documentation: search_index
godoc -http=:6060 -index -index_files='search_index'
.PHONY: advice build clean documentation format search_index test

View File

@ -41,7 +41,7 @@ preparation-stamp: build-dependencies
build-dependencies: build-dependencies-stamp
build-dependencies-stamp: bison cc mercurial protoc goprotobuf gorest goskiplist go instrumentation leveldb levigo
build-dependencies-stamp: bison cc go goprotobuf gorest goskiplist instrumentation leveldb levigo mercurial protoc
touch $@
overlay: overlay-stamp
@ -148,7 +148,7 @@ vim-common: vim-common-stamp
vim-common-stamp:
$(APT_GET_INSTALL) vim-common
touch $@
touch $@
test: test-stamp

View File

@ -11,13 +11,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
MAKE_ARTIFACTS = generated-stamp
export PATH := $(PATH):/Users/mtp/Development/go/bin
all: data.proto
all: generated
data.proto:
generated: generated-stamp
generated-stamp: data.proto
protoc --go_out=generated/ data.proto
touch $@
clean:
rm -rf generated/*
-rm -f $(MAKE_ARTIFACTS)
.PHONY: data.proto
.PHONY: generated

39
model/curation.go Normal file
View File

@ -0,0 +1,39 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
dto "github.com/prometheus/prometheus/model/generated"
"time"
)
// CurationRemark provides a representation of dto.CurationValue with associated
// business logic methods attached to it to enhance code readability.
type CurationRemark struct {
LastCompletionTimestamp time.Time
}
// OlderThanLimit answers whether this CurationRemark is older than the provided
// cutOff time.
func (c CurationRemark) OlderThanLimit(cutOff time.Time) bool {
return c.LastCompletionTimestamp.Before(cutOff)
}
// NewCurationRemarkFromDTO builds CurationRemark from the provided
// dto.CurationValue object.
func NewCurationRemarkFromDTO(d *dto.CurationValue) CurationRemark {
return CurationRemark{
LastCompletionTimestamp: time.Unix(*d.LastCompletionTimestamp, 0),
}
}

View File

@ -59,3 +59,34 @@ message MembershipIndexValue {
message MetricHighWatermark {
optional int64 timestamp = 1;
}
// CurationKey models the state of curation for a given metric fingerprint and
// its associated samples. The time series database only knows about compaction
// and resampling behaviors that are explicitly defined to it in its runtime
// configuration, meaning it never scans on-disk tables for CurationKey
// policies; rather, it looks up via the CurationKey tuple to find out what the
// effectuation state for a given metric fingerprint is.
//
// For instance, how far along as a rule for (Fingerprint A, Samples Older Than
// B, and Grouped Together in Size of C) has been effectuated on-disk.
message CurationKey {
// fingerprint identifies the fingerprint for the given policy.
optional Fingerprint fingerprint = 1;
// older_than represents in seconds relative to when curation cycle starts
// into the past when the curator should stop operating on a given metric
// fingerprint's samples:
//
// [Oldest Sample Time, time.Now().Sub(time.Second * older_than))
optional int64 older_than = 2;
// minimum_group_size identifies how minimally samples should be grouped
// together to write a new SampleValueSeries chunk.
optional uint32 minimum_group_size = 3;
}
// CurationValue models the progress for a given CurationKey.
message CurationValue {
// last_completion_timestamp represents the seconds since the epoch UTC at
// which the curator last completed its duty cycle for a given metric
// fingerprint.
optional int64 last_completion_timestamp = 1;
}

33
model/watermark.go Normal file
View File

@ -0,0 +1,33 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package model
import (
dto "github.com/prometheus/prometheus/model/generated"
"time"
)
// Watermark provides a representation of dto.MetricHighWatermark with
// associated business logic methods attached to it to enhance code readability.
type Watermark struct {
time.Time
}
// NewWatermarkFromHighWatermarkDTO builds Watermark from the provided
// dto.MetricHighWatermark object.
func NewWatermarkFromHighWatermarkDTO(d *dto.MetricHighWatermark) Watermark {
return Watermark{
time.Unix(*d.Timestamp, 0),
}
}

266
storage/metric/curator.go Normal file
View File

@ -0,0 +1,266 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"code.google.com/p/goprotobuf/proto"
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
dto "github.com/prometheus/prometheus/model/generated"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"time"
)
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type curator struct {
// stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
stop chan bool
// samples is the on-disk metric store that is scanned for compaction
// candidates.
samples raw.Persistence
// watermarks is the on-disk store that is scanned for high watermarks for
// given metrics.
watermarks raw.Persistence
// cutOff represents the most recent time up to which values will be curated.
cutOff time.Time
// groupingQuantity represents the number of samples below which encountered
// samples will be dismembered and reaggregated into larger groups.
groupingQuantity uint32
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
curationState raw.Persistence
}
// newCurator builds a new curator for the given LevelDB databases.
func newCurator(cutOff time.Time, groupingQuantity uint32, curationState, samples, watermarks raw.Persistence) curator {
return curator{
cutOff: cutOff,
stop: make(chan bool),
samples: samples,
curationState: curationState,
watermarks: watermarks,
groupingQuantity: groupingQuantity,
}
}
// run facilitates the curation lifecycle.
func (c curator) run() (err error) {
var (
decoder watermarkDecoder
filter = watermarkFilter{
stop: c.stop,
}
operator = watermarkOperator{
olderThan: c.cutOff,
groupSize: c.groupingQuantity,
curationState: c.curationState,
}
)
_, err = c.watermarks.ForEach(decoder, filter, operator)
return
}
// drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c curator) drain() {
if len(c.stop) == 0 {
c.stop <- true
}
}
// watermarkDecoder converts (dto.Fingerprint, dto.MetricHighWatermark) doubles
// into (model.Fingerprint, model.Watermark) doubles.
type watermarkDecoder struct{}
func (w watermarkDecoder) DecodeKey(in interface{}) (out interface{}, err error) {
var (
key = &dto.Fingerprint{}
bytes = in.([]byte)
)
err = proto.Unmarshal(bytes, key)
if err != nil {
panic(err)
}
out = model.NewFingerprintFromRowKey(*key.Signature)
return
}
func (w watermarkDecoder) DecodeValue(in interface{}) (out interface{}, err error) {
var (
dto = &dto.MetricHighWatermark{}
bytes = in.([]byte)
)
err = proto.Unmarshal(bytes, dto)
if err != nil {
panic(err)
}
out = model.NewWatermarkFromHighWatermarkDTO(dto)
return
}
// watermarkFilter determines whether to include or exclude candidate
// values from the curation process by virtue of how old the high watermark is.
type watermarkFilter struct {
// curationState is the table of CurationKey to CurationValues that remark on
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
// stop, when non-empty, instructs the filter to stop operation.
stop chan bool
}
func (w watermarkFilter) Filter(key, value interface{}) (result storage.FilterResult) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
curationKey = fingerprint.ToDTO()
rawCurationValue []byte
err error
curationValue = &dto.CurationValue{}
)
rawCurationValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
if err != nil {
panic(err)
}
err = proto.Unmarshal(rawCurationValue, curationValue)
if err != nil {
panic(err)
}
switch {
case model.NewCurationRemarkFromDTO(curationValue).OlderThanLimit(watermark.Time):
result = storage.ACCEPT
case len(w.stop) != 0:
result = storage.STOP
default:
result = storage.SKIP
}
return
}
// watermarkOperator scans over the curator.samples table for metrics whose
// high watermark has been determined to be allowable for curation. This type
// is individually responsible for compaction.
//
// The scanning starts from CurationRemark.LastCompletionTimestamp and goes
// forward until the stop point or end of the series is reached.
type watermarkOperator struct {
// olderThan functions as the cutoff when scanning curator.samples for
// uncurated samples to compact. The operator scans forward in the samples
// until olderThan is reached and then stops operation for samples that occur
// after it.
olderThan time.Time
// groupSize is the target quantity of samples to group together for a given
// to-be-written sample. Observed samples of less than groupSize are combined
// up to groupSize if possible. The protocol does not define the behavior if
// observed chunks are larger than groupSize.
groupSize uint32
// curationState is the table of CurationKey to CurationValues that remark on
// far along the curation process has gone for a given metric fingerprint.
curationState raw.Persistence
}
func (w watermarkOperator) Operate(key, value interface{}) (err *storage.OperatorError) {
var (
fingerprint = key.(model.Fingerprint)
watermark = value.(model.Watermark)
queryErr error
hasBeenCurated bool
curationConsistent bool
)
hasBeenCurated, queryErr = w.hasBeenCurated(fingerprint)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
return
}
if !hasBeenCurated {
// curate
return
}
curationConsistent, queryErr = w.curationConsistent(fingerprint, watermark)
if queryErr != nil {
err = &storage.OperatorError{queryErr, false}
return
}
if curationConsistent {
return
}
// curate
return
}
// hasBeenCurated answers true if the provided Fingerprint has been curated in
// in the past.
func (w watermarkOperator) hasBeenCurated(f model.Fingerprint) (curated bool, err error) {
curationKey := &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
curated, err = w.curationState.Has(coding.NewProtocolBufferEncoder(curationKey))
return
}
// curationConsistent determines whether the given metric is in a dirty state
// and needs curation.
func (w watermarkOperator) curationConsistent(f model.Fingerprint, watermark model.Watermark) (consistent bool, err error) {
var (
rawValue []byte
curationValue = &dto.CurationValue{}
curationKey = &dto.CurationKey{
Fingerprint: f.ToDTO(),
OlderThan: proto.Int64(w.olderThan.Unix()),
MinimumGroupSize: proto.Uint32(w.groupSize),
}
)
rawValue, err = w.curationState.Get(coding.NewProtocolBufferEncoder(curationKey))
if err != nil {
return
}
err = proto.Unmarshal(rawValue, curationValue)
if err != nil {
return
}
curationRemark := model.NewCurationRemarkFromDTO(curationValue)
if !curationRemark.OlderThanLimit(watermark.Time) {
consistent = true
return
}
return
}

View File

@ -0,0 +1,213 @@
// Copyright 2013 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric
import (
"github.com/prometheus/prometheus/coding"
"github.com/prometheus/prometheus/model"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/raw"
"sort"
"testing"
"time"
)
type (
keyPair struct {
fingerprint model.Fingerprint
time time.Time
}
fakeCurationStates map[model.Fingerprint]time.Time
fakeSamples map[keyPair][]float32
fakeWatermarks map[model.Fingerprint]time.Time
in struct {
curationStates fakeCurationStates
samples fakeSamples
watermarks fakeWatermarks
cutOff time.Time
grouping uint32
}
out struct {
curationStates fakeCurationStates
samples fakeSamples
watermarks fakeWatermarks
}
)
func (c fakeCurationStates) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c fakeCurationStates) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeCurationStates) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeCurationStates) Close() error {
panic("unimplemented")
}
func (c fakeCurationStates) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
var (
fingerprints model.Fingerprints
)
for f := range c {
fingerprints = append(fingerprints, f)
}
sort.Sort(fingerprints)
for _, k := range fingerprints {
v := c[k]
var (
decodedKey interface{}
decodedValue interface{}
)
decodedKey, err = d.DecodeKey(k)
if err != nil {
continue
}
decodedValue, err = d.DecodeValue(v)
if err != nil {
continue
}
switch f.Filter(decodedKey, decodedValue) {
case storage.STOP:
return
case storage.SKIP:
continue
case storage.ACCEPT:
opErr := o.Operate(decodedKey, decodedValue)
if opErr != nil {
if opErr.Continuable {
continue
}
break
}
}
}
return
}
func (c fakeCurationStates) Commit(_ raw.Batch) error {
panic("unimplemented")
}
func (c fakeSamples) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c fakeSamples) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeSamples) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeSamples) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeSamples) Close() error {
panic("unimplemented")
}
func (c fakeSamples) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeSamples) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Has(_ coding.Encoder) (bool, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Get(_ coding.Encoder) ([]byte, error) {
panic("unimplemented")
}
func (c fakeWatermarks) Drop(_ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Put(_, _ coding.Encoder) error {
panic("unimplemented")
}
func (c fakeWatermarks) Close() error {
panic("unimplemented")
}
func (c fakeWatermarks) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (scannedAll bool, err error) {
panic("unimplemented")
}
func (c fakeWatermarks) Commit(_ raw.Batch) (err error) {
panic("unimplemented")
}
func TestCurator(t *testing.T) {
var (
scenarios = []struct {
in in
out out
}{
{
in: in{
curationStates: fakeCurationStates{
model.NewFingerprintFromRowKey("0-A-10-Z"): testInstant.Add(5 * time.Minute),
model.NewFingerprintFromRowKey("1-B-10-A"): testInstant.Add(4 * time.Minute),
},
watermarks: fakeWatermarks{},
samples: fakeSamples{},
cutOff: testInstant.Add(5 * time.Minute),
grouping: 5,
},
},
}
)
for _, scenario := range scenarios {
var (
in = scenario.in
curationStates = in.curationStates
samples = in.samples
watermarks = in.watermarks
cutOff = in.cutOff
grouping = in.grouping
)
_ = newCurator(cutOff, grouping, curationStates, samples, watermarks)
}
}

22
web/Makefile Normal file
View File

@ -0,0 +1,22 @@
# Copyright 2013 Prometheus Team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
all: blob
blob:
$(MAKE) -C blob
clean:
$(MAKE) -C blob clean
.PHONY: blob clean

1
web/blob/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
files.go

22
web/blob/Makefile Normal file
View File

@ -0,0 +1,22 @@
# Copyright 2013 Prometheus Team
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
all: files.go
files.go:
../../utility/embed-static.sh ../static ../templates | gofmt > $@
clean:
-rm files.go
.PHONY: clean