Replace direct curation table access with wrapper.

This commit is contained in:
Matt T. Proud 2013-08-06 12:00:31 +02:00
parent 669cfdaeee
commit cc989c68e1
10 changed files with 206 additions and 227 deletions

View File

@ -64,7 +64,7 @@ type Curator struct {
// forward until the stop point or end of the series is reached.
type watermarkScanner struct {
// curationState is the data store for curation remarks.
curationState raw.Persistence
curationState CurationRemarker
// diskFrontier models the available seekable ranges for the provided
// sampleIterator.
diskFrontier *diskFrontier
@ -92,7 +92,7 @@ type watermarkScanner struct {
// curated.
// curationState is the on-disk store where the curation remarks are made for
// how much progress has been made.
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
func (c *Curator) Run(ignoreYoungerThan time.Duration, instant time.Time, processor Processor, curationState CurationRemarker, samples *leveldb.LevelDBPersistence, watermarks HighWatermarker, status chan CurationState) (err error) {
defer func(t time.Time) {
duration := float64(time.Since(t) / time.Millisecond)
@ -189,26 +189,6 @@ func (w *watermarkScanner) shouldStop() bool {
return len(w.stop) != 0
}
func (w *watermarkScanner) getCurationRemark(k *curationKey) (r *curationRemark, found bool, err error) {
curationKey := new(dto.CurationKey)
curationValue := new(dto.CurationValue)
k.dump(curationKey)
present, err := w.curationState.Get(curationKey, curationValue)
if err != nil {
return nil, false, err
}
if !present {
return nil, false, nil
}
remark := new(curationRemark)
remark.load(curationValue)
return remark, true, nil
}
func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResult) {
fingerprint := key.(*clientmodel.Fingerprint)
@ -244,18 +224,18 @@ func (w *watermarkScanner) Filter(key, value interface{}) (r storage.FilterResul
IgnoreYoungerThan: w.ignoreYoungerThan,
}
curationRemark, present, err := w.getCurationRemark(k)
curationRemark, present, err := w.curationState.Get(k)
if err != nil {
return
}
if !present {
return storage.ACCEPT
}
if !curationRemark.OlderThan(w.stopAt) {
if !curationRemark.Before(w.stopAt) {
return storage.SKIP
}
watermark := value.(*watermarks)
if !curationRemark.OlderThan(watermark.High) {
if !curationRemark.Before(watermark.High) {
return storage.SKIP
}
curationConsistent, err := w.curationConsistent(fingerprint, watermark)
@ -278,14 +258,14 @@ func (w *watermarkScanner) curationConsistent(f *clientmodel.Fingerprint, waterm
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
curationRemark, present, err := w.getCurationRemark(k)
curationRemark, present, err := w.curationState.Get(k)
if err != nil {
return false, err
}
if !present {
return false, nil
}
if !curationRemark.OlderThan(watermark.High) {
if !curationRemark.Before(watermark.High) {
return true, nil
}
@ -303,14 +283,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: false}
}
k := &curationKey{
curationState, present, err := w.curationState.Get(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
})
curationState, _, err := w.getCurationRemark(k)
if err != nil {
// An anomaly with the curation remark is likely not fatal in the sense that
// there was a decoding error with the entity and shouldn't be cause to stop
@ -318,10 +297,19 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
// work forward. With an idempotent processor, this is safe.
return &storage.OperatorError{error: err, Continuable: true}
}
var firstSeek time.Time
switch {
case !present, seriesFrontier.After(curationState):
firstSeek = seriesFrontier.firstSupertime
case !seriesFrontier.InSafeSeekRange(curationState):
firstSeek = seriesFrontier.lastSupertime
default:
firstSeek = curationState
}
startKey := &SampleKey{
Fingerprint: fingerprint,
FirstTimestamp: seriesFrontier.optimalStartTime(curationState),
FirstTimestamp: firstSeek,
}
dto := new(dto.SampleKey)
@ -345,7 +333,13 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: false}
}
err = w.refreshCurationRemark(fingerprint, lastTime)
err = w.curationState.Update(&curationKey{
Fingerprint: fingerprint,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
},
lastTime)
if err != nil {
// Under the assumption that the processors are idempotent, they can be
// re-run; thusly, the commitment of the curation remark is no cause
@ -353,56 +347,7 @@ func (w *watermarkScanner) Operate(key, _ interface{}) (oErr *storage.OperatorEr
return &storage.OperatorError{error: err, Continuable: true}
}
return
}
func (w *watermarkScanner) refreshCurationRemark(f *clientmodel.Fingerprint, finished time.Time) error {
curationKey := curationKey{
Fingerprint: f,
ProcessorMessageRaw: w.processor.Signature(),
ProcessorMessageTypeName: w.processor.Name(),
IgnoreYoungerThan: w.ignoreYoungerThan,
}
k := new(dto.CurationKey)
curationKey.dump(k)
curationValue := curationRemark{
LastCompletionTimestamp: finished,
}
v := new(dto.CurationValue)
curationValue.dump(v)
return w.curationState.Put(k, v)
}
// curationRemark provides a representation of dto.CurationValue with associated
// business logic methods attached to it to enhance code readability.
type curationRemark struct {
LastCompletionTimestamp time.Time
}
// OlderThan answers whether this curationRemark is older than the provided
// cutOff time.
func (c *curationRemark) OlderThan(t time.Time) bool {
return c.LastCompletionTimestamp.Before(t)
}
// Equal answers whether the two curationRemarks are equivalent.
func (c *curationRemark) Equal(o curationRemark) bool {
return c.LastCompletionTimestamp.Equal(o.LastCompletionTimestamp)
}
func (c *curationRemark) String() string {
return fmt.Sprintf("Last curated at %s", c.LastCompletionTimestamp)
}
func (c *curationRemark) load(d *dto.CurationValue) {
c.LastCompletionTimestamp = time.Unix(d.GetLastCompletionTimestamp(), 0).UTC()
}
func (c *curationRemark) dump(d *dto.CurationValue) {
d.Reset()
d.LastCompletionTimestamp = proto.Int64(c.LastCompletionTimestamp.Unix())
return nil
}
// curationKey provides a representation of dto.CurationKey with associated

View File

@ -194,19 +194,3 @@ func (s *seriesFrontier) InSafeSeekRange(t time.Time) (safe bool) {
func (s *seriesFrontier) After(t time.Time) bool {
return s.firstSupertime.After(t)
}
// optimalStartTime indicates what the best start time for a curation operation
// should be given the curation remark.
func (s *seriesFrontier) optimalStartTime(remark *curationRemark) (t time.Time) {
switch {
case remark == nil:
t = s.firstSupertime
case s.After(remark.LastCompletionTimestamp):
t = s.firstSupertime
case !s.InSafeSeekRange(remark.LastCompletionTimestamp):
t = s.lastSupertime
default:
t = remark.LastCompletionTimestamp
}
return
}

View File

@ -14,7 +14,6 @@
package metric
import (
"io"
"sort"
"code.google.com/p/goprotobuf/proto"
@ -31,7 +30,6 @@ import (
type FingerprintMetricMapping map[clientmodel.Fingerprint]clientmodel.Metric
type FingerprintMetricIndex interface {
io.Closer
raw.Pruner
IndexBatch(FingerprintMetricMapping) error
@ -40,7 +38,7 @@ type FingerprintMetricIndex interface {
Size() (s uint64, present bool, err error)
}
type LeveldbFingerprintMetricIndex struct {
type LevelDBFingerprintMetricIndex struct {
p *leveldb.LevelDBPersistence
}
@ -48,22 +46,20 @@ type LevelDBFingerprintMetricIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LeveldbFingerprintMetricIndex) Close() error {
func (i *LevelDBFingerprintMetricIndex) Close() {
i.p.Close()
return nil
}
func (i *LeveldbFingerprintMetricIndex) State() *raw.DatabaseState {
func (i *LevelDBFingerprintMetricIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
func (i *LevelDBFingerprintMetricIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
func (i *LevelDBFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapping) error {
b := leveldb.NewBatch()
defer b.Close()
@ -79,7 +75,7 @@ func (i *LeveldbFingerprintMetricIndex) IndexBatch(mapping FingerprintMetricMapp
return i.p.Commit(b)
}
func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
func (i *LevelDBFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m clientmodel.Metric, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.Metric)
@ -98,19 +94,19 @@ func (i *LeveldbFingerprintMetricIndex) Lookup(f *clientmodel.Fingerprint) (m cl
return m, true, nil
}
func (i *LeveldbFingerprintMetricIndex) Prune() (bool, error) {
func (i *LevelDBFingerprintMetricIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (FingerprintMetricIndex, error) {
func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (*LevelDBFingerprintMetricIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbFingerprintMetricIndex{
return &LevelDBFingerprintMetricIndex{
p: s,
}, nil
}
@ -118,7 +114,6 @@ func NewLevelDBFingerprintMetricIndex(o *LevelDBFingerprintMetricIndexOptions) (
type LabelNameFingerprintMapping map[clientmodel.LabelName]clientmodel.Fingerprints
type LabelNameFingerprintIndex interface {
io.Closer
raw.Pruner
IndexBatch(LabelNameFingerprintMapping) error
@ -128,11 +123,11 @@ type LabelNameFingerprintIndex interface {
Size() (s uint64, present bool, err error)
}
type LeveldbLabelNameFingerprintIndex struct {
type LevelDBLabelNameFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
func (i *LevelDBLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -155,7 +150,7 @@ func (i *LeveldbLabelNameFingerprintIndex) IndexBatch(b LabelNameFingerprintMapp
return i.p.Commit(batch)
}
func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
func (i *LevelDBLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps clientmodel.Fingerprints, ok bool, err error) {
k := new(dto.LabelName)
dumpLabelName(k, l)
v := new(dto.FingerprintCollection)
@ -176,30 +171,28 @@ func (i *LeveldbLabelNameFingerprintIndex) Lookup(l clientmodel.LabelName) (fps
return fps, true, nil
}
func (i *LeveldbLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
func (i *LevelDBLabelNameFingerprintIndex) Has(l clientmodel.LabelName) (ok bool, err error) {
return i.p.Has(&dto.LabelName{
Name: proto.String(string(l)),
})
}
func (i *LeveldbLabelNameFingerprintIndex) Prune() (bool, error) {
func (i *LevelDBLabelNameFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelNameFingerprintIndex) Close() error {
func (i *LevelDBLabelNameFingerprintIndex) Close() {
i.p.Close()
return nil
}
func (i *LeveldbLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
func (i *LevelDBLabelNameFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LeveldbLabelNameFingerprintIndex) State() *raw.DatabaseState {
func (i *LevelDBLabelNameFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
@ -207,13 +200,13 @@ type LevelDBLabelNameFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (LabelNameFingerprintIndex, error) {
func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOptions) (*LevelDBLabelNameFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbLabelNameFingerprintIndex{
return &LevelDBLabelNameFingerprintIndex{
p: s,
}, nil
}
@ -221,7 +214,6 @@ func NewLevelLabelNameFingerprintIndex(o *LevelDBLabelNameFingerprintIndexOption
type LabelSetFingerprintMapping map[LabelPair]clientmodel.Fingerprints
type LabelSetFingerprintIndex interface {
io.Closer
raw.ForEacher
raw.Pruner
@ -232,7 +224,7 @@ type LabelSetFingerprintIndex interface {
Size() (s uint64, present bool, err error)
}
type LeveldbLabelSetFingerprintIndex struct {
type LevelDBLabelSetFingerprintIndex struct {
p *leveldb.LevelDBPersistence
}
@ -240,7 +232,7 @@ type LevelDBLabelSetFingerprintIndexOptions struct {
leveldb.LevelDBOptions
}
func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
func (i *LevelDBLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -264,7 +256,7 @@ func (i *LeveldbLabelSetFingerprintIndex) IndexBatch(m LabelSetFingerprintMappin
return i.p.Commit(batch)
}
func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
func (i *LevelDBLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fingerprints, ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -289,7 +281,7 @@ func (i *LeveldbLabelSetFingerprintIndex) Lookup(p *LabelPair) (m clientmodel.Fi
return m, true, nil
}
func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
func (i *LevelDBLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error) {
k := &dto.LabelPair{
Name: proto.String(string(p.Name)),
Value: proto.String(string(p.Value)),
@ -298,43 +290,40 @@ func (i *LeveldbLabelSetFingerprintIndex) Has(p *LabelPair) (ok bool, err error)
return i.p.Has(k)
}
func (i *LeveldbLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
func (i *LevelDBLabelSetFingerprintIndex) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LeveldbLabelSetFingerprintIndex) Prune() (bool, error) {
func (i *LevelDBLabelSetFingerprintIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbLabelSetFingerprintIndex) Close() error {
func (i *LevelDBLabelSetFingerprintIndex) Close() {
i.p.Close()
return nil
}
func (i *LeveldbLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
func (i *LevelDBLabelSetFingerprintIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LeveldbLabelSetFingerprintIndex) State() *raw.DatabaseState {
func (i *LevelDBLabelSetFingerprintIndex) State() *raw.DatabaseState {
return i.p.State()
}
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (LabelSetFingerprintIndex, error) {
func NewLevelDBLabelSetFingerprintIndex(o *LevelDBLabelSetFingerprintIndexOptions) (*LevelDBLabelSetFingerprintIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbLabelSetFingerprintIndex{
return &LevelDBLabelSetFingerprintIndex{
p: s,
}, nil
}
type MetricMembershipIndex interface {
io.Closer
raw.Pruner
IndexBatch([]clientmodel.Metric) error
@ -343,13 +332,13 @@ type MetricMembershipIndex interface {
Size() (s uint64, present bool, err error)
}
type LeveldbMetricMembershipIndex struct {
type LevelDBMetricMembershipIndex struct {
p *leveldb.LevelDBPersistence
}
var existenceIdentity = new(dto.MembershipIndexValue)
func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
func (i *LevelDBMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -362,29 +351,27 @@ func (i *LeveldbMetricMembershipIndex) IndexBatch(ms []clientmodel.Metric) error
return i.p.Commit(batch)
}
func (i *LeveldbMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
func (i *LevelDBMetricMembershipIndex) Has(m clientmodel.Metric) (ok bool, err error) {
k := new(dto.Metric)
dumpMetric(k, m)
return i.p.Has(k)
}
func (i *LeveldbMetricMembershipIndex) Close() error {
func (i *LevelDBMetricMembershipIndex) Close() {
i.p.Close()
return nil
}
func (i *LeveldbMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
func (i *LevelDBMetricMembershipIndex) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LeveldbMetricMembershipIndex) State() *raw.DatabaseState {
func (i *LevelDBMetricMembershipIndex) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbMetricMembershipIndex) Prune() (bool, error) {
func (i *LevelDBMetricMembershipIndex) Prune() (bool, error) {
i.p.Prune()
return false, nil
@ -394,13 +381,13 @@ type LevelDBMetricMembershipIndexOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (MetricMembershipIndex, error) {
func NewLevelDBMetricMembershipIndex(o *LevelDBMetricMembershipIndexOptions) (*LevelDBMetricMembershipIndex, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbMetricMembershipIndex{
return &LevelDBMetricMembershipIndex{
p: s,
}, nil
}

View File

@ -36,7 +36,7 @@ import (
const sortConcurrency = 2
type LevelDBMetricPersistence struct {
CurationRemarks *leveldb.LevelDBPersistence
CurationRemarks CurationRemarker
fingerprintToMetrics FingerprintMetricIndex
labelNameToFingerprints LabelNameFingerprintIndex
labelSetToFingerprints LabelSetFingerprintIndex
@ -202,13 +202,14 @@ func NewLevelDBMetricPersistence(baseDirectory string) (*LevelDBMetricPersistenc
"Sample Curation Remarks",
func() {
var err error
o := &leveldb.LevelDBOptions{
Name: "Sample Curation Remarks",
Purpose: "Ledger of Progress for Various Curators",
Path: baseDirectory + "/curation_remarks",
CacheSizeBytes: *curationRemarksCacheSize,
}
emission.CurationRemarks, err = leveldb.NewLevelDBPersistence(o)
emission.CurationRemarks, err = NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Name: "Sample Curation Remarks",
Purpose: "Ledger of Progress for Various Curators",
Path: baseDirectory + "/curation_remarks",
CacheSizeBytes: *curationRemarksCacheSize,
},
})
workers.MayFail(err)
},
},
@ -764,7 +765,7 @@ func (l *LevelDBMetricPersistence) GetAllValuesForLabel(labelName clientmodel.La
return
}
// CompactKeyspace compacts each database's keyspace serially.
// Prune compacts each database's keyspace serially.
//
// Beware that it would probably be imprudent to run this on a live user-facing
// server due to latency implications.
@ -778,10 +779,10 @@ func (l *LevelDBMetricPersistence) Prune() {
l.MetricSamples.Prune()
}
func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error) {
func (l *LevelDBMetricPersistence) Sizes() (total uint64, err error) {
size := uint64(0)
if size, err = l.CurationRemarks.ApproximateSize(); err != nil {
if size, _, err = l.CurationRemarks.Size(); err != nil {
return 0, err
}
total += size
@ -811,7 +812,7 @@ func (l *LevelDBMetricPersistence) ApproximateSizes() (total uint64, err error)
}
total += size
if size, err = l.MetricSamples.ApproximateSize(); err != nil {
if size, err = l.MetricSamples.Size(); err != nil {
return 0, err
}
total += size

View File

@ -72,13 +72,10 @@ func (c curationState) Get() (key, value proto.Message) {
k := &dto.CurationKey{}
keyRaw.dump(k)
key = k
valueRaw := curationRemark{
LastCompletionTimestamp: c.lastCurated,
v := &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(c.lastCurated.Unix()),
}
v := &dto.CurationValue{}
valueRaw.dump(v)
return k, v
}
@ -848,8 +845,10 @@ func TestCuratorCompactionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
@ -888,7 +887,7 @@ func TestCuratorCompactionProcessor(t *testing.T) {
t.Fatal(err)
}
iterator := curatorStates.NewIterator(true)
iterator := curatorStates.p.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
@ -904,38 +903,35 @@ func TestCuratorCompactionProcessor(t *testing.T) {
}
curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := &curationKey{}
actualKey := new(curationKey)
actualKey.load(curationKeyDto)
actualCurationRemark := &curationRemark{}
actualCurationRemark.load(curationValueDto)
signature := expected.processor.Signature()
actualValue, present, err := curatorStates.Get(actualKey)
if !present {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey)
}
if err != nil {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err)
}
expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint)
expectedKey := &curationKey{
Fingerprint: expectedFingerprint,
IgnoreYoungerThan: expected.ignoreYoungerThan,
ProcessorMessageRaw: signature,
ProcessorMessageRaw: expected.processor.Signature(),
ProcessorMessageTypeName: expected.processor.Name(),
}
if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
}
expectedCurationRemark := curationRemark{
LastCompletionTimestamp: expected.lastCurated,
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
if !actualValue.Equal(expected.lastCurated) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue)
}
}
@ -1374,8 +1370,11 @@ func TestCuratorDeletionProcessor(t *testing.T) {
sampleDirectory := fixture.NewPreparer(t).Prepare("sample", fixture.NewCassetteFactory(scenario.in.sampleGroups))
defer sampleDirectory.Close()
curatorStates, err := leveldb.NewLevelDBPersistence(&leveldb.LevelDBOptions{
Path: curatorDirectory.Path()})
curatorStates, err := NewLevelDBCurationRemarker(&LevelDBCurationRemarkerOptions{
LevelDBOptions: leveldb.LevelDBOptions{
Path: curatorDirectory.Path(),
},
})
if err != nil {
t.Fatal(err)
}
@ -1412,7 +1411,7 @@ func TestCuratorDeletionProcessor(t *testing.T) {
t.Fatal(err)
}
iterator := curatorStates.NewIterator(true)
iterator := curatorStates.p.NewIterator(true)
defer iterator.Close()
for j, expected := range scenario.out.curationStates {
@ -1427,24 +1426,25 @@ func TestCuratorDeletionProcessor(t *testing.T) {
}
}
curationKeyDto := &dto.CurationKey{}
curationValueDto := &dto.CurationValue{}
curationKeyDto := new(dto.CurationKey)
err = proto.Unmarshal(iterator.Key(), curationKeyDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
err = proto.Unmarshal(iterator.Value(), curationValueDto)
if err != nil {
t.Fatalf("%d.%d. could not unmarshal: %s", i, j, err)
}
actualKey := &curationKey{}
actualKey := new(curationKey)
actualKey.load(curationKeyDto)
actualCurationRemark := &curationRemark{}
actualCurationRemark.load(curationValueDto)
signature := expected.processor.Signature()
actualValue, present, err := curatorStates.Get(actualKey)
if !present {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, actualKey)
}
if err != nil {
t.Fatalf("%d.%d. could not get key-value pair %s", i, j, err)
}
expectedFingerprint := &clientmodel.Fingerprint{}
expectedFingerprint.LoadFromString(expected.fingerprint)
expectedKey := &curationKey{
@ -1456,11 +1456,8 @@ func TestCuratorDeletionProcessor(t *testing.T) {
if !actualKey.Equal(expectedKey) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedKey, actualKey)
}
expectedCurationRemark := curationRemark{
LastCompletionTimestamp: expected.lastCurated,
}
if !actualCurationRemark.Equal(expectedCurationRemark) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expectedCurationRemark, actualCurationRemark)
if !actualValue.Equal(expected.lastCurated) {
t.Fatalf("%d.%d. expected %s, got %s", i, j, expected.lastCurated, actualValue)
}
}

View File

@ -15,7 +15,6 @@ package metric
import (
"container/list"
"io"
"sync"
"time"
@ -171,7 +170,6 @@ func (lru *WatermarkCache) checkCapacity() {
type FingerprintHighWatermarkMapping map[clientmodel.Fingerprint]time.Time
type HighWatermarker interface {
io.Closer
raw.ForEacher
raw.Pruner
@ -181,11 +179,11 @@ type HighWatermarker interface {
Size() (uint64, bool, error)
}
type LeveldbHighWatermarker struct {
type LevelDBHighWatermarker struct {
p *leveldb.LevelDBPersistence
}
func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
func (w *LevelDBHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, ok bool, err error) {
k := new(dto.Fingerprint)
dumpFingerprint(k, f)
v := new(dto.MetricHighWatermark)
@ -200,7 +198,7 @@ func (w *LeveldbHighWatermarker) Get(f *clientmodel.Fingerprint) (t time.Time, o
return t, true, nil
}
func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
func (w *LevelDBHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping) error {
batch := leveldb.NewBatch()
defer batch.Close()
@ -229,28 +227,26 @@ func (w *LeveldbHighWatermarker) UpdateBatch(m FingerprintHighWatermarkMapping)
return w.p.Commit(batch)
}
func (i *LeveldbHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
func (i *LevelDBHighWatermarker) ForEach(d storage.RecordDecoder, f storage.RecordFilter, o storage.RecordOperator) (bool, error) {
return i.p.ForEach(d, f, o)
}
func (i *LeveldbHighWatermarker) Prune() (bool, error) {
func (i *LevelDBHighWatermarker) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LeveldbHighWatermarker) Close() error {
func (i *LevelDBHighWatermarker) Close() {
i.p.Close()
return nil
}
func (i *LeveldbHighWatermarker) State() *raw.DatabaseState {
func (i *LevelDBHighWatermarker) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LeveldbHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.ApproximateSize()
func (i *LevelDBHighWatermarker) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
@ -258,13 +254,82 @@ type LevelDBHighWatermarkerOptions struct {
leveldb.LevelDBOptions
}
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (HighWatermarker, error) {
func NewLevelDBHighWatermarker(o *LevelDBHighWatermarkerOptions) (*LevelDBHighWatermarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LeveldbHighWatermarker{
return &LevelDBHighWatermarker{
p: s,
}, nil
}
type CurationRemarker interface {
raw.Pruner
Update(*curationKey, time.Time) error
Get(*curationKey) (t time.Time, ok bool, err error)
State() *raw.DatabaseState
Size() (uint64, bool, error)
}
type LevelDBCurationRemarker struct {
p *leveldb.LevelDBPersistence
}
type LevelDBCurationRemarkerOptions struct {
leveldb.LevelDBOptions
}
func (i *LevelDBCurationRemarker) State() *raw.DatabaseState {
return i.p.State()
}
func (i *LevelDBCurationRemarker) Size() (uint64, bool, error) {
s, err := i.p.Size()
return s, true, err
}
func (i *LevelDBCurationRemarker) Close() {
i.p.Close()
}
func (i *LevelDBCurationRemarker) Prune() (bool, error) {
i.p.Prune()
return false, nil
}
func (i *LevelDBCurationRemarker) Get(c *curationKey) (t time.Time, ok bool, err error) {
k := new(dto.CurationKey)
c.dump(k)
v := new(dto.CurationValue)
ok, err = i.p.Get(k, v)
if err != nil || !ok {
return t, ok, err
}
return time.Unix(v.GetLastCompletionTimestamp(), 0).UTC(), true, nil
}
func (i *LevelDBCurationRemarker) Update(pair *curationKey, t time.Time) error {
k := new(dto.CurationKey)
pair.dump(k)
return i.p.Put(k, &dto.CurationValue{
LastCompletionTimestamp: proto.Int64(t.Unix()),
})
}
func NewLevelDBCurationRemarker(o *LevelDBCurationRemarkerOptions) (*LevelDBCurationRemarker, error) {
s, err := leveldb.NewLevelDBPersistence(&o.LevelDBOptions)
if err != nil {
return nil, err
}
return &LevelDBCurationRemarker{
p: s,
}, nil
}

View File

@ -71,8 +71,8 @@ func (l *LevelDBMembershipIndex) Prune() {
l.persistence.Prune()
}
func (l *LevelDBMembershipIndex) ApproximateSize() (uint64, error) {
return l.persistence.ApproximateSize()
func (l *LevelDBMembershipIndex) Size() (uint64, error) {
return l.persistence.Size()
}
func (l *LevelDBMembershipIndex) State() *raw.DatabaseState {

View File

@ -332,7 +332,7 @@ func (l *LevelDBPersistence) Prune() {
l.storage.CompactRange(keyspace)
}
func (l *LevelDBPersistence) ApproximateSize() (uint64, error) {
func (l *LevelDBPersistence) Size() (uint64, error) {
iterator := l.NewIterator(false)
defer iterator.Close()

View File

@ -31,7 +31,7 @@ func (l *LevelDBPersistence) State() *raw.DatabaseState {
Supplemental: map[string]string{},
}
if size, err := l.ApproximateSize(); err != nil {
if size, err := l.Size(); err != nil {
databaseState.Supplemental["Errors"] = err.Error()
} else {
databaseState.Size = utility.ByteSize(size)

View File

@ -42,10 +42,10 @@ func main() {
start := time.Now()
log.Printf("Starting compaction...")
size, _ := persistences.ApproximateSizes()
size, _ := persistences.Sizes()
log.Printf("Original Size: %d", size)
persistences.Prune()
log.Printf("Finished in %s", time.Since(start))
size, _ = persistences.ApproximateSizes()
size, _ = persistences.Sizes()
log.Printf("New Size: %d", size)
}