2018-05-16 12:02:55 +00:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bytes"
2019-01-16 18:09:08 +00:00
"fmt"
2018-05-16 12:02:55 +00:00
"io/ioutil"
"math/rand"
"os"
2019-02-18 19:05:07 +00:00
"path/filepath"
2018-05-16 12:02:55 +00:00
"testing"
2019-05-17 08:47:42 +00:00
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
2019-08-14 09:07:02 +00:00
"github.com/prometheus/prometheus/util/testutil"
2018-05-16 12:02:55 +00:00
)
2019-06-12 14:10:37 +00:00
// TestWALRepair_ReadingError ensures that a repair is run for an error
// when reading a record.
func TestWALRepair_ReadingError ( t * testing . T ) {
2018-11-28 13:15:11 +00:00
for name , test := range map [ string ] struct {
corrSgm int // Which segment to corrupt.
corrFunc func ( f * os . File ) // Func that applies the corruption.
intactRecs int // Total expected records left after the repair.
} {
"torn_last_record" : {
2 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize * 2 , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { byte ( recFirst ) } )
testutil . Ok ( t , err )
} ,
8 ,
} ,
// Ensures that the page buffer is big enough to fit
// an entire page size without panicing.
2019-08-13 08:34:14 +00:00
// https://github.com/prometheus/prometheus/tsdb/pull/414
2018-11-28 13:15:11 +00:00
"bad_header" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { byte ( recPageTerm ) } )
testutil . Ok ( t , err )
} ,
4 ,
2018-11-14 16:43:33 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_fragment_sequence" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { byte ( recLast ) } )
testutil . Ok ( t , err )
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_fragment_flag" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { 123 } )
testutil . Ok ( t , err )
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_checksum" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 4 , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { 0 } )
testutil . Ok ( t , err )
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_length" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 2 , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte { 0 } )
testutil . Ok ( t , err )
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
2018-11-28 13:15:11 +00:00
"bad_content" : {
1 ,
func ( f * os . File ) {
_ , err := f . Seek ( pageSize + 100 , 0 )
testutil . Ok ( t , err )
_ , err = f . Write ( [ ] byte ( "beef" ) )
testutil . Ok ( t , err )
} ,
4 ,
2018-05-17 13:00:32 +00:00
} ,
} {
t . Run ( name , func ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "wal_repair" )
testutil . Ok ( t , err )
2019-03-19 13:31:57 +00:00
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
2018-05-17 13:00:32 +00:00
2018-11-28 13:15:11 +00:00
// We create 3 segments with 3 records each and
// then corrupt a given record in a given segment.
// As a result we want a repaired WAL with given intact records.
2019-02-25 10:10:27 +00:00
segSize := 3 * pageSize
2019-06-19 13:46:24 +00:00
w , err := NewSize ( nil , nil , dir , segSize , false )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , err )
var records [ ] [ ] byte
for i := 1 ; i <= 9 ; i ++ {
b := make ( [ ] byte , pageSize - recordHeaderSize )
b [ 0 ] = byte ( i )
records = append ( records , b )
testutil . Ok ( t , w . Log ( b ) )
}
2019-06-12 14:10:37 +00:00
first , last , err := w . Segments ( )
testutil . Ok ( t , err )
testutil . Equals ( t , 3 , 1 + last - first , "wal creation didn't result in expected number of segments" )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , w . Close ( ) )
2018-11-28 13:15:11 +00:00
f , err := os . OpenFile ( SegmentName ( dir , test . corrSgm ) , os . O_RDWR , 0666 )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , err )
// Apply corruption function.
2018-11-28 13:15:11 +00:00
test . corrFunc ( f )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , f . Close ( ) )
2019-06-19 13:46:24 +00:00
w , err = NewSize ( nil , nil , dir , segSize , false )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , err )
2019-03-19 13:31:57 +00:00
defer w . Close ( )
2018-05-17 13:00:32 +00:00
2019-06-12 14:10:37 +00:00
first , last , err = w . Segments ( )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , err )
2019-05-24 18:33:28 +00:00
// Backfill segments from the most recent checkpoint onwards.
for i := first ; i <= last ; i ++ {
s , err := OpenReadSegment ( SegmentName ( w . Dir ( ) , i ) )
testutil . Ok ( t , err )
sr := NewSegmentBufReader ( s )
testutil . Ok ( t , err )
r := NewReader ( sr )
for r . Next ( ) {
}
//Close the segment so we don't break things on Windows.
s . Close ( )
// No corruption in this segment.
if r . Err ( ) == nil {
continue
}
testutil . Ok ( t , w . Repair ( r . Err ( ) ) )
break
2018-05-17 13:00:32 +00:00
}
2019-05-24 18:33:28 +00:00
sr , err := NewSegmentsReader ( dir )
2018-05-17 13:00:32 +00:00
testutil . Ok ( t , err )
2019-03-19 13:31:57 +00:00
defer sr . Close ( )
2019-05-24 18:33:28 +00:00
r := NewReader ( sr )
2018-05-17 13:00:32 +00:00
var result [ ] [ ] byte
for r . Next ( ) {
var b [ ] byte
result = append ( result , append ( b , r . Record ( ) ... ) )
}
testutil . Ok ( t , r . Err ( ) )
2018-11-28 13:15:11 +00:00
testutil . Equals ( t , test . intactRecs , len ( result ) , "Wrong number of intact records" )
2018-05-17 13:00:32 +00:00
for i , r := range result {
if ! bytes . Equal ( records [ i ] , r ) {
t . Fatalf ( "record %d diverges: want %x, got %x" , i , records [ i ] [ : 10 ] , r [ : 10 ] )
}
}
2019-02-25 10:10:27 +00:00
2019-05-24 18:33:28 +00:00
// Make sure there is a new 0 size Segment after the corrupted Segment.
_ , last , err = w . Segments ( )
testutil . Ok ( t , err )
testutil . Equals ( t , test . corrSgm + 1 , last )
fi , err := os . Stat ( SegmentName ( dir , last ) )
2019-02-25 10:10:27 +00:00
testutil . Ok ( t , err )
2019-05-24 18:33:28 +00:00
testutil . Equals ( t , int64 ( 0 ) , fi . Size ( ) )
2018-05-17 13:00:32 +00:00
} )
}
}
2019-02-18 19:05:07 +00:00
// TestCorruptAndCarryOn writes a multi-segment WAL; corrupts the first segment and
// ensures that an error during reading that segment are correctly repaired before
// moving to write more records to the WAL.
func TestCorruptAndCarryOn ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "wal_repair" )
testutil . Ok ( t , err )
2019-03-19 13:31:57 +00:00
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
2019-02-18 19:05:07 +00:00
var (
logger = testutil . NewLogger ( t )
segmentSize = pageSize * 3
recordSize = ( pageSize / 3 ) - recordHeaderSize
)
// Produce a WAL with a two segments of 3 pages with 3 records each,
// so when we truncate the file we're guaranteed to split a record.
{
2019-06-19 13:46:24 +00:00
w , err := NewSize ( logger , nil , dir , segmentSize , false )
2019-02-18 19:05:07 +00:00
testutil . Ok ( t , err )
for i := 0 ; i < 18 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
testutil . Ok ( t , err )
err = w . Log ( buf )
testutil . Ok ( t , err )
}
err = w . Close ( )
testutil . Ok ( t , err )
}
// Check all the segments are the correct size.
{
segments , err := listSegments ( dir )
testutil . Ok ( t , err )
for _ , segment := range segments {
f , err := os . OpenFile ( filepath . Join ( dir , fmt . Sprintf ( "%08d" , segment . index ) ) , os . O_RDONLY , 0666 )
testutil . Ok ( t , err )
fi , err := f . Stat ( )
testutil . Ok ( t , err )
t . Log ( "segment" , segment . index , "size" , fi . Size ( ) )
testutil . Equals ( t , int64 ( segmentSize ) , fi . Size ( ) )
err = f . Close ( )
testutil . Ok ( t , err )
}
}
// Truncate the first file, splitting the middle record in the second
// page in half, leaving 4 valid records.
{
f , err := os . OpenFile ( filepath . Join ( dir , fmt . Sprintf ( "%08d" , 0 ) ) , os . O_RDWR , 0666 )
testutil . Ok ( t , err )
fi , err := f . Stat ( )
testutil . Ok ( t , err )
testutil . Equals ( t , int64 ( segmentSize ) , fi . Size ( ) )
err = f . Truncate ( int64 ( segmentSize / 2 ) )
testutil . Ok ( t , err )
err = f . Close ( )
testutil . Ok ( t , err )
}
// Now try and repair this WAL, and write 5 more records to it.
{
sr , err := NewSegmentsReader ( dir )
testutil . Ok ( t , err )
reader := NewReader ( sr )
i := 0
for ; i < 4 && reader . Next ( ) ; i ++ {
testutil . Equals ( t , recordSize , len ( reader . Record ( ) ) )
}
testutil . Equals ( t , 4 , i , "not enough records" )
testutil . Assert ( t , ! reader . Next ( ) , "unexpected record" )
corruptionErr := reader . Err ( )
testutil . Assert ( t , corruptionErr != nil , "expected error" )
err = sr . Close ( )
testutil . Ok ( t , err )
2019-06-19 13:46:24 +00:00
w , err := NewSize ( logger , nil , dir , segmentSize , false )
2019-02-18 19:05:07 +00:00
testutil . Ok ( t , err )
err = w . Repair ( corruptionErr )
testutil . Ok ( t , err )
2019-05-24 18:33:28 +00:00
// Ensure that we have a completely clean slate after reapiring.
testutil . Equals ( t , w . segment . Index ( ) , 1 ) // We corrupted segment 0.
testutil . Equals ( t , w . donePages , 0 )
2019-02-18 19:05:07 +00:00
for i := 0 ; i < 5 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
testutil . Ok ( t , err )
err = w . Log ( buf )
testutil . Ok ( t , err )
}
err = w . Close ( )
testutil . Ok ( t , err )
}
// Replay the WAL. Should get 9 records.
{
sr , err := NewSegmentsReader ( dir )
testutil . Ok ( t , err )
reader := NewReader ( sr )
i := 0
for ; i < 9 && reader . Next ( ) ; i ++ {
testutil . Equals ( t , recordSize , len ( reader . Record ( ) ) )
}
testutil . Equals ( t , 9 , i , "wrong number of records" )
testutil . Assert ( t , ! reader . Next ( ) , "unexpected record" )
testutil . Equals ( t , nil , reader . Err ( ) )
2019-03-19 13:31:57 +00:00
sr . Close ( )
2019-02-18 19:05:07 +00:00
}
}
2019-04-30 07:17:07 +00:00
// TestClose ensures that calling Close more than once doesn't panic and doesn't block.
func TestClose ( t * testing . T ) {
dir , err := ioutil . TempDir ( "" , "wal_repair" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
2019-06-19 13:46:24 +00:00
w , err := NewSize ( nil , nil , dir , pageSize , false )
2019-04-30 07:17:07 +00:00
testutil . Ok ( t , err )
testutil . Ok ( t , w . Close ( ) )
testutil . NotOk ( t , w . Close ( ) )
}
2019-05-17 08:47:42 +00:00
func TestSegmentMetric ( t * testing . T ) {
var (
segmentSize = pageSize
recordSize = ( pageSize / 2 ) - recordHeaderSize
)
dir , err := ioutil . TempDir ( "" , "segment_metric" )
testutil . Ok ( t , err )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dir ) )
} ( )
2019-06-19 13:46:24 +00:00
w , err := NewSize ( nil , nil , dir , segmentSize , false )
2019-05-17 08:47:42 +00:00
testutil . Ok ( t , err )
initialSegment := client_testutil . ToFloat64 ( w . currentSegment )
// Write 3 records, each of which is half the segment size, meaning we should rotate to the next segment.
for i := 0 ; i < 3 ; i ++ {
buf := make ( [ ] byte , recordSize )
_ , err := rand . Read ( buf )
testutil . Ok ( t , err )
err = w . Log ( buf )
testutil . Ok ( t , err )
}
testutil . Assert ( t , client_testutil . ToFloat64 ( w . currentSegment ) == initialSegment + 1 , "segment metric did not increment after segment rotation" )
testutil . Ok ( t , w . Close ( ) )
}
2019-06-19 13:46:24 +00:00
func TestCompression ( t * testing . T ) {
boostrap := func ( compressed bool ) string {
const (
segmentSize = pageSize
recordSize = ( pageSize / 2 ) - recordHeaderSize
records = 100
)
dirPath , err := ioutil . TempDir ( "" , fmt . Sprintf ( "TestCompression_%t" , compressed ) )
testutil . Ok ( t , err )
w , err := NewSize ( nil , nil , dirPath , segmentSize , compressed )
testutil . Ok ( t , err )
buf := make ( [ ] byte , recordSize )
for i := 0 ; i < records ; i ++ {
testutil . Ok ( t , w . Log ( buf ) )
}
testutil . Ok ( t , w . Close ( ) )
return dirPath
}
dirCompressed := boostrap ( true )
2019-03-19 13:31:57 +00:00
defer func ( ) {
2019-06-19 13:46:24 +00:00
testutil . Ok ( t , os . RemoveAll ( dirCompressed ) )
} ( )
dirUnCompressed := boostrap ( false )
defer func ( ) {
testutil . Ok ( t , os . RemoveAll ( dirUnCompressed ) )
2019-03-19 13:31:57 +00:00
} ( )
2018-05-16 12:02:55 +00:00
2019-07-23 08:04:48 +00:00
uncompressedSize := testutil . DirSize ( t , dirUnCompressed )
compressedSize := testutil . DirSize ( t , dirCompressed )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
testutil . Assert ( t , float64 ( uncompressedSize ) * 0.75 > float64 ( compressedSize ) , "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d" , uncompressedSize , compressedSize )
}
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
func BenchmarkWAL_LogBatched ( b * testing . B ) {
for _ , compress := range [ ] bool { true , false } {
b . Run ( fmt . Sprintf ( "compress=%t" , compress ) , func ( b * testing . B ) {
dir , err := ioutil . TempDir ( "" , "bench_logbatch" )
testutil . Ok ( b , err )
defer func ( ) {
testutil . Ok ( b , os . RemoveAll ( dir ) )
} ( )
w , err := New ( nil , nil , dir , compress )
testutil . Ok ( b , err )
defer w . Close ( )
var buf [ 2048 ] byte
var recs [ ] [ ] byte
b . SetBytes ( 2048 )
for i := 0 ; i < b . N ; i ++ {
recs = append ( recs , buf [ : ] )
if len ( recs ) < 1000 {
continue
}
err := w . Log ( recs ... )
testutil . Ok ( b , err )
recs = recs [ : 0 ]
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b . StopTimer ( )
} )
2018-05-16 12:02:55 +00:00
}
}
func BenchmarkWAL_Log ( b * testing . B ) {
2019-06-19 13:46:24 +00:00
for _ , compress := range [ ] bool { true , false } {
b . Run ( fmt . Sprintf ( "compress=%t" , compress ) , func ( b * testing . B ) {
dir , err := ioutil . TempDir ( "" , "bench_logsingle" )
testutil . Ok ( b , err )
defer func ( ) {
testutil . Ok ( b , os . RemoveAll ( dir ) )
} ( )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
w , err := New ( nil , nil , dir , compress )
testutil . Ok ( b , err )
defer w . Close ( )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
var buf [ 2048 ] byte
b . SetBytes ( 2048 )
2018-05-16 12:02:55 +00:00
2019-06-19 13:46:24 +00:00
for i := 0 ; i < b . N ; i ++ {
err := w . Log ( buf [ : ] )
testutil . Ok ( b , err )
}
// Stop timer to not count fsync time on close.
// If it's counted batched vs. single benchmarks are very similar but
// do not show burst throughput well.
b . StopTimer ( )
} )
2018-05-16 12:02:55 +00:00
}
}