nflog: add gc/snapshotting maintenance, remove delete

This removes the Delete function from the interface as the log
should be append-only and only be reduced by expired entries.
This also adds an argument to configure a background processing routine,
which periodically garbage collects and snapshots.
This commit is contained in:
Fabian Reinartz 2016-08-12 15:15:16 +02:00
parent 80afd502d5
commit 086d581cf8

View File

@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"os"
"sync"
"time"
@ -31,13 +33,13 @@ type Log interface {
// - extend the interface by a `QueryOne` method?
// - return an iterator rather than a materialized list?
Query(p ...QueryParam) ([]*pb.Entry, error)
// Delete log st along the given Parameters. Returns
// the number of deleted st.
Delete(p ...DeleteParam) (int, error)
// Snapshot the current log state and return the number
// of bytes written.
Snapshot(w io.Writer) (int, error)
// GC removes expired entries from the log. It returns
// the total number of deleted entries.
GC() (int, error)
}
// query currently allows filtering by and/or receiver group key.
@ -71,31 +73,16 @@ func QGroupKey(gk []byte) QueryParam {
}
}
// delQuery holds parameters for a deletion query.
// TODO(fabxc): can this be consolidated with regular QueryParams?
type delQuery struct {
// Delete log st that are expired. Does NOT delete
// unexpired st if set to false.
expired bool
}
// DeleteParam is a function that modifies parameters of a delete request.
// Returns an error for invalid of conflicting parameters.
type DeleteParam func(*delQuery) error
// DExpired adds a parameter to delete expired log st.
func DExpired() DeleteParam {
return func(d *delQuery) error {
d.expired = true
return nil
}
}
type nlog struct {
retention time.Duration
now func() time.Time
retention time.Duration
gossip mesh.Gossip // A gossip channel for sharing log state.
runInterval time.Duration
snapf string
stopc chan struct{}
done func()
gossip mesh.Gossip // gossip channel for sharing log state.
// For now we only store the most recently added log entry.
// The key is a serialized concatenation of group key and receiver.
@ -136,9 +123,28 @@ func WithNow(f func() time.Time) Option {
}
}
// WithMaintenance configures the Log to run garbage collection
// and snapshotting to the provided file at the given interval.
// On startup the a snapshot is also loaded from the given file.
//
// The maintenance terminates on receiving from the provided channel.
// The done function is called after the final snapshot was completed.
//
// Providing a 0 duration will not run background processing.
// Providing an empty file name will skip snapshotting.
func WithMaintenance(sf string, d time.Duration, stopc chan struct{}, done func()) Option {
return func(l *nlog) error {
l.runInterval = d
l.stopc = stopc
l.done = done
l.snapf = sf
return nil
}
}
// New creates a new notification log based on the provided options.
// The snapshot is loaded into the Log if it is set.
func New(snapshot io.Reader, opts ...Option) (Log, error) {
func New(opts ...Option) (Log, error) {
l := &nlog{
now: time.Now,
st: map[string]*pb.MeshEntry{},
@ -148,14 +154,65 @@ func New(snapshot io.Reader, opts ...Option) (Log, error) {
return nil, err
}
}
if snapshot != nil {
if err := l.loadSnapshot(snapshot); err != nil {
if l.snapf != "" {
f, err := os.Open(l.snapf)
if err != nil {
return l, err
}
defer f.Close()
if err := l.loadSnapshot(f); err != nil {
return l, err
}
}
go l.run()
return l, nil
}
// run periodic background maintenance.
func (l *nlog) run() {
if l.runInterval == 0 || l.stopc == nil {
return
}
t := time.NewTicker(l.runInterval)
defer t.Stop()
if l.done != nil {
defer l.done()
}
f := func() {
if _, err := l.GC(); err != nil {
// TODO(fabxc): log error instead
panic(err)
}
if l.snapf == "" {
return
}
f, err := openReplace(l.snapf)
if err != nil {
panic(err)
}
if _, err := l.Snapshot(f); err != nil {
panic(err)
}
if err := f.Close(); err != nil {
panic(err)
}
}
for {
select {
case <-l.stopc:
return
case <-t.C:
f()
}
}
f()
}
// LogActive implements the Log interface.
func (l *nlog) LogActive(r *pb.Receiver, key, hash []byte) error {
return l.log(r, key, hash, false)
@ -214,17 +271,8 @@ func (l *nlog) log(r *pb.Receiver, gkey, ghash []byte, resolved bool) error {
return nil
}
// Delete implements the Log interface.
func (l *nlog) Delete(params ...DeleteParam) (int, error) {
var del delQuery
for _, p := range params {
if err := p(&del); err != nil {
return 0, err
}
}
if !del.expired {
return 0, errors.New("only expiration deletion supported")
}
// GC implements the Log interface.
func (l *nlog) GC() (int, error) {
now := l.now()
var n int
@ -273,6 +321,8 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
l.mtx.Lock()
defer l.mtx.Unlock()
st := gossipData{}
for {
var e pb.MeshEntry
if _, err := pbutil.ReadDelimited(r, &e); err != nil {
@ -281,8 +331,9 @@ func (l *nlog) loadSnapshot(r io.Reader) error {
}
return err
}
l.st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
st[stateKey(e.Entry.GroupKey, e.Entry.Receiver)] = &e
}
l.st = st
return nil
}
@ -453,3 +504,35 @@ func (gd gossipData) mergeDelta(od gossipData) gossipData {
}
return delta
}
// replaceFile wraps a file that is moved to another filename on closing.
type replaceFile struct {
*os.File
filename string
}
func (f *replaceFile) Close() error {
if err := f.File.Sync(); err != nil {
return err
}
if err := f.File.Close(); err != nil {
return err
}
return os.Rename(f.File.Name(), f.filename)
}
// openReplace opens a new temporary file that is moved to filename on closing.
func openReplace(filename string) (*replaceFile, error) {
tmpFilename := fmt.Sprintf("%s.%x", filename, uint64(rand.Int63()))
f, err := os.Create(tmpFilename)
if err != nil {
return nil, err
}
rf := &replaceFile{
File: f,
filename: filename,
}
return rf, nil
}