Publicize Curator and Processors.

This commit publicizes the curation and processor frameworks for
purposes of making them available in the main processor loop.
This commit is contained in:
Matt T. Proud 2013-05-02 12:37:24 +02:00
parent 30f0239f96
commit 4298bab2b0
3 changed files with 31 additions and 33 deletions

View File

@ -47,7 +47,7 @@ type watermarkFilter struct {
ignoreYoungerThan time.Duration
// processor is the post-processor that performs whatever action is desired on
// the data that is deemed valid to be worked on.
processor processor
processor Processor
// stop functions as the global stop channel for all future operations.
stop chan bool
// stopAt is used to determine the elegibility of series for compaction.
@ -59,7 +59,7 @@ type watermarkFilter struct {
// curator is responsible for effectuating a given curation policy across the
// stored samples on-disk. This is useful to compact sparse sample values into
// single sample entities to reduce keyspace load on the datastore.
type curator struct {
type Curator struct {
// stop functions as a channel that when empty allows the curator to operate.
// The moment a value is ingested inside of it, the curator goes into drain
// mode.
@ -86,7 +86,7 @@ type watermarkOperator struct {
ignoreYoungerThan time.Duration
// processor is responsible for executing a given stategy on the
// to-be-operated-on series.
processor processor
processor Processor
// sampleIterator is a snapshotted iterator for the time series.
sampleIterator leveldb.Iterator
// samples
@ -95,20 +95,13 @@ type watermarkOperator struct {
stopAt time.Time
}
// newCurator builds a new curator for the given LevelDB databases.
func newCurator() curator {
return curator{
stop: make(chan bool),
}
}
// run facilitates the curation lifecycle.
//
// recencyThreshold represents the most recent time up to which values will be
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, processor processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
func (c Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples, watermarks *leveldb.LevelDBPersistence, status chan CurationState) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t))
@ -173,7 +166,7 @@ func (c curator) run(ignoreYoungerThan time.Duration, instant time.Time, process
// drain instructs the curator to stop at the next convenient moment as to not
// introduce data inconsistencies.
func (c curator) drain() {
func (c Curator) Drain() {
if len(c.stop) == 0 {
c.stop <- true
}
@ -211,7 +204,7 @@ func (w watermarkFilter) shouldStop() bool {
return len(w.stop) != 0
}
func getCurationRemark(states raw.Persistence, processor processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
func getCurationRemark(states raw.Persistence, processor Processor, ignoreYoungerThan time.Duration, fingerprint model.Fingerprint) (remark *model.CurationRemark, err error) {
rawSignature, err := processor.Signature()
if err != nil {
return

View File

@ -26,7 +26,7 @@ import (
// processor models a post-processing agent that performs work given a sample
// corpus.
type processor interface {
type Processor interface {
// Name emits the name of this processor's signature encoder. It must be
// fully-qualified in the sense that it could be used via a Protocol Buffer
// registry to extract the descriptor to reassemble this message.
@ -44,9 +44,9 @@ type processor interface {
Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error)
}
// compactionProcessor combines sparse values in the database together such
// CompactionProcessor combines sparse values in the database together such
// that at least MinimumGroupSize-sized chunks are grouped together.
type compactionProcessor struct {
type CompactionProcessor struct {
// MaximumMutationPoolBatch represents approximately the largest pending
// batch of mutation operations for the database before pausing to
// commit before resumption.
@ -56,16 +56,16 @@ type compactionProcessor struct {
// MinimumGroupSize represents the smallest allowed sample chunk size in the
// database.
MinimumGroupSize int
// signature is the byte representation of the compactionProcessor's settings,
// signature is the byte representation of the CompactionProcessor's settings,
// used for purely memoization purposes across an instance.
signature []byte
}
func (p compactionProcessor) Name() string {
func (p CompactionProcessor) Name() string {
return "io.prometheus.CompactionProcessorDefinition"
}
func (p *compactionProcessor) Signature() (out []byte, err error) {
func (p *CompactionProcessor) Signature() (out []byte, err error) {
if len(p.signature) == 0 {
out, err = proto.Marshal(&dto.CompactionProcessorDefinition{
MinimumGroupSize: proto.Uint32(uint32(p.MinimumGroupSize)),
@ -79,11 +79,11 @@ func (p *compactionProcessor) Signature() (out []byte, err error) {
return
}
func (p compactionProcessor) String() string {
func (p CompactionProcessor) String() string {
return fmt.Sprintf("compactionProcess for minimum group size %d", p.MinimumGroupSize)
}
func (p compactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
func (p CompactionProcessor) Apply(sampleIterator leveldb.Iterator, samples raw.Persistence, stopAt time.Time, fingerprint model.Fingerprint) (lastCurated time.Time, err error) {
var pendingBatch raw.Batch = nil
defer func() {

View File

@ -29,7 +29,7 @@ type curationState struct {
fingerprint string
ignoreYoungerThan time.Duration
lastCurated time.Time
processor processor
processor Processor
}
type watermarkState struct {
@ -48,7 +48,7 @@ type in struct {
sampleGroups fixture.Pairs
ignoreYoungerThan time.Duration
groupSize uint32
processor processor
processor Processor
}
type out struct {
@ -101,7 +101,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}{
{
in: in{
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
@ -112,7 +112,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
@ -121,7 +121,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 1 * time.Hour,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
@ -129,7 +129,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
// This rule should effectively be ignored.
curationState{
fingerprint: "0002-A-2-Z",
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
@ -531,7 +531,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0001-A-1-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 30 * time.Minute),
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
@ -540,7 +540,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: 30 * time.Minute,
lastCurated: testInstant.Add(-1 * 90 * time.Minute),
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 2,
MaximumMutationPoolBatch: 15,
},
@ -549,7 +549,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
fingerprint: "0002-A-2-Z",
ignoreYoungerThan: time.Hour,
lastCurated: testInstant.Add(-1 * 60 * time.Minute),
processor: &compactionProcessor{
processor: &CompactionProcessor{
MinimumGroupSize: 5,
MaximumMutationPoolBatch: 15,
},
@ -848,8 +848,14 @@ func TestCuratorCompactionProcessor(t *testing.T) {
updates := make(chan CurationState, 100)
defer close(updates)
c := newCurator()
err = c.run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
stop := make(chan bool)
defer close(stop)
c := Curator{
stop: stop,
}
err = c.Run(scenario.in.ignoreYoungerThan, testInstant, scenario.in.processor, curatorStates, samples, watermarkStates, updates)
if err != nil {
t.Fatal(err)
}
@ -883,7 +889,6 @@ func TestCuratorCompactionProcessor(t *testing.T) {
curationKey := model.NewCurationKeyFromDTO(curationKeyDto)
actualCurationRemark := model.NewCurationRemarkFromDTO(curationValueDto)
signature, err := expected.processor.Signature()
if err != nil {
t.Fatal(err)