commit
27f1b8aac3
2
db.go
2
db.go
|
@ -204,7 +204,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db
|
|||
return nil, errors.Wrap(err, "create leveled compactor")
|
||||
}
|
||||
|
||||
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval)
|
||||
wal, err := OpenSegmentWAL(filepath.Join(dir, "wal"), l, opts.WALFlushInterval, r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
36
wal.go
36
wal.go
|
@ -32,6 +32,7 @@ import (
|
|||
"github.com/go-kit/kit/log/level"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/tsdb/labels"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// WALEntryType indicates what data a WAL entry contains.
|
||||
|
@ -65,6 +66,26 @@ type SeriesCB func([]RefSeries) error
|
|||
// is only valid until the call returns.
|
||||
type DeletesCB func([]Stone) error
|
||||
|
||||
type walMetrics struct {
|
||||
fsyncDuration prometheus.Summary
|
||||
}
|
||||
|
||||
func newWalMetrics(wal *SegmentWAL, r prometheus.Registerer) *walMetrics {
|
||||
m := &walMetrics{}
|
||||
|
||||
m.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{
|
||||
Name: "tsdb_wal_fsync_duration_seconds",
|
||||
Help: "Duration of WAL fsync.",
|
||||
})
|
||||
|
||||
if r != nil {
|
||||
r.MustRegister(
|
||||
m.fsyncDuration,
|
||||
)
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// WAL is a write ahead log that can log new series labels and samples.
|
||||
// It must be completely read before new entries are logged.
|
||||
type WAL interface {
|
||||
|
@ -150,6 +171,7 @@ func newCRC32() hash.Hash32 {
|
|||
// SegmentWAL is a write ahead log for series data.
|
||||
type SegmentWAL struct {
|
||||
mtx sync.Mutex
|
||||
metrics *walMetrics
|
||||
|
||||
dirFile *os.File
|
||||
files []*segmentFile
|
||||
|
@ -169,7 +191,7 @@ type SegmentWAL struct {
|
|||
|
||||
// OpenSegmentWAL opens or creates a write ahead log in the given directory.
|
||||
// The WAL must be read completely before new data is written.
|
||||
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration) (*SegmentWAL, error) {
|
||||
func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration, r prometheus.Registerer) (*SegmentWAL, error) {
|
||||
if err := os.MkdirAll(dir, 0777); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -190,6 +212,7 @@ func OpenSegmentWAL(dir string, logger log.Logger, flushInterval time.Duration)
|
|||
segmentSize: walSegmentSizeBytes,
|
||||
crc32: newCRC32(),
|
||||
}
|
||||
w.metrics = newWalMetrics(w, r)
|
||||
|
||||
fns, err := sequenceFiles(w.dirFile.Name())
|
||||
if err != nil {
|
||||
|
@ -592,7 +615,10 @@ func (w *SegmentWAL) Sync() error {
|
|||
}
|
||||
if head != nil {
|
||||
// But only fsync the head segment after releasing the mutex as it will block on disk I/O.
|
||||
return fileutil.Fdatasync(head.File)
|
||||
start := time.Now()
|
||||
err := fileutil.Fdatasync(head.File)
|
||||
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -604,7 +630,11 @@ func (w *SegmentWAL) sync() error {
|
|||
if w.head() == nil {
|
||||
return nil
|
||||
}
|
||||
return fileutil.Fdatasync(w.head().File)
|
||||
|
||||
start := time.Now()
|
||||
err := fileutil.Fdatasync(w.head().File)
|
||||
w.metrics.fsyncDuration.Observe(time.Since(start).Seconds())
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *SegmentWAL) flush() error {
|
||||
|
|
18
wal_test.go
18
wal_test.go
|
@ -44,7 +44,7 @@ func TestSegmentWAL_Open(t *testing.T) {
|
|||
}
|
||||
|
||||
// Initialize 5 correct segment files.
|
||||
w, err := OpenSegmentWAL(tmpdir, nil, 0)
|
||||
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, 5, len(w.files), "unexpected number of segments loaded")
|
||||
|
@ -74,7 +74,7 @@ func TestSegmentWAL_Open(t *testing.T) {
|
|||
_, err = f.WriteAt([]byte{0}, 4)
|
||||
require.NoError(t, err)
|
||||
|
||||
w, err = OpenSegmentWAL(tmpdir, nil, 0)
|
||||
w, err = OpenSegmentWAL(tmpdir, nil, 0, nil)
|
||||
require.Error(t, err, "open with corrupted segments")
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,7 @@ func TestSegmentWAL_cut(t *testing.T) {
|
|||
defer os.RemoveAll(tmpdir)
|
||||
|
||||
// This calls cut() implicitly the first time without a previous tail.
|
||||
w, err := OpenSegmentWAL(tmpdir, nil, 0)
|
||||
w, err := OpenSegmentWAL(tmpdir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, w.write(WALEntrySeries, 1, []byte("Hello World!!")))
|
||||
|
@ -131,7 +131,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
// defer os.RemoveAll(dir)
|
||||
|
||||
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||
w, err := OpenSegmentWAL(dir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
w.segmentSize = 10000
|
||||
|
||||
|
@ -181,7 +181,7 @@ func TestSegmentWAL_Truncate(t *testing.T) {
|
|||
require.NoError(t, w.Close())
|
||||
|
||||
// The same again with a new WAL.
|
||||
w, err = OpenSegmentWAL(dir, nil, 0)
|
||||
w, err = OpenSegmentWAL(dir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
var readSeries []RefSeries
|
||||
|
@ -221,7 +221,7 @@ func TestSegmentWAL_Log_Restore(t *testing.T) {
|
|||
// Open WAL a bunch of times, validate all previous data can be read,
|
||||
// write more data to it, close it.
|
||||
for k := 0; k < numMetrics; k += numMetrics / iterations {
|
||||
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||
w, err := OpenSegmentWAL(dir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Set smaller segment size so we can actually write several files.
|
||||
|
@ -390,7 +390,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
w, err := OpenSegmentWAL(dir, nil, 0)
|
||||
w, err := OpenSegmentWAL(dir, nil, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, w.LogSamples([]RefSample{{T: 1, V: 2}}))
|
||||
|
@ -415,7 +415,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
|
||||
logger := log.NewLogfmtLogger(os.Stderr)
|
||||
|
||||
w2, err := OpenSegmentWAL(dir, logger, 0)
|
||||
w2, err := OpenSegmentWAL(dir, logger, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
r := w2.Reader()
|
||||
|
@ -446,7 +446,7 @@ func TestWALRestoreCorrupted(t *testing.T) {
|
|||
|
||||
// We should see the first valid entry and the new one, everything after
|
||||
// is truncated.
|
||||
w3, err := OpenSegmentWAL(dir, logger, 0)
|
||||
w3, err := OpenSegmentWAL(dir, logger, 0, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
r = w3.Reader()
|
||||
|
|
Loading…
Reference in New Issue