// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tsdb

import (
	"io/ioutil"
	"math/rand"
	"os"
	"testing"

	"github.com/prometheus/tsdb/chunkenc"
	"github.com/prometheus/tsdb/chunks"
	"github.com/prometheus/tsdb/index"
	"github.com/prometheus/tsdb/labels"
	"github.com/prometheus/tsdb/testutil"
	"github.com/prometheus/tsdb/wal"
)

func BenchmarkCreateSeries(b *testing.B) {
	lbls, err := labels.ReadLabels("testdata/20kseries.json", b.N)
	testutil.Ok(b, err)

	h, err := NewHead(nil, nil, nil, 10000)
	if err != nil {
		testutil.Ok(b, err)
	}
	defer h.Close()

	b.ReportAllocs()
	b.ResetTimer()

	for _, l := range lbls {
		h.getOrCreate(l.Hash(), l)
	}
}

func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) {
	var enc RecordEncoder
	for _, r := range recs {
		switch v := r.(type) {
		case []RefSeries:
			testutil.Ok(t, w.Log(enc.Series(v, nil)))
		case []RefSample:
			testutil.Ok(t, w.Log(enc.Samples(v, nil)))
		case []Stone:
			testutil.Ok(t, w.Log(enc.Tombstones(v, nil)))
		}
	}
}

func readTestWAL(t testing.TB, dir string) (recs []interface{}) {
	sr, err := wal.NewSegmentsReader(dir)
	testutil.Ok(t, err)
	defer sr.Close()

	var dec RecordDecoder
	r := wal.NewReader(sr)

	for r.Next() {
		rec := r.Record()

		switch dec.Type(rec) {
		case RecordSeries:
			series, err := dec.Series(rec, nil)
			testutil.Ok(t, err)
			recs = append(recs, series)
		case RecordSamples:
			samples, err := dec.Samples(rec, nil)
			testutil.Ok(t, err)
			recs = append(recs, samples)
		case RecordTombstones:
			tstones, err := dec.Tombstones(rec, nil)
			testutil.Ok(t, err)
			recs = append(recs, tstones)
		default:
			t.Fatalf("unknown record type")
		}
	}
	testutil.Ok(t, r.Err())
	return recs
}

func TestHead_ReadWAL(t *testing.T) {
	entries := []interface{}{
		[]RefSeries{
			{Ref: 10, Labels: labels.FromStrings("a", "1")},
			{Ref: 11, Labels: labels.FromStrings("a", "2")},
			{Ref: 100, Labels: labels.FromStrings("a", "3")},
		},
		[]RefSample{
			{Ref: 0, T: 99, V: 1},
			{Ref: 10, T: 100, V: 2},
			{Ref: 100, T: 100, V: 3},
		},
		[]RefSeries{
			{Ref: 50, Labels: labels.FromStrings("a", "4")},
		},
		[]RefSample{
			{Ref: 10, T: 101, V: 5},
			{Ref: 50, T: 101, V: 6},
		},
	}
	dir, err := ioutil.TempDir("", "test_read_wal")
	testutil.Ok(t, err)
	defer os.RemoveAll(dir)

	w, err := wal.New(nil, nil, dir)
	testutil.Ok(t, err)
	populateTestWAL(t, w, entries)

	head, err := NewHead(nil, nil, w, 1000)
	testutil.Ok(t, err)
	defer head.Close()

	testutil.Ok(t, head.Init())
	testutil.Equals(t, uint64(100), head.lastSeriesID)

	s10 := head.series.getByID(10)
	s11 := head.series.getByID(11)
	s50 := head.series.getByID(50)
	s100 := head.series.getByID(100)

	testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset)
	testutil.Equals(t, labels.FromStrings("a", "2"), s11.lset)
	testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset)
	testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset)

	expandChunk := func(c chunkenc.Iterator) (x []sample) {
		for c.Next() {
			t, v := c.At()
			x = append(x, sample{t: t, v: v})
		}
		testutil.Ok(t, c.Err())
		return x
	}

	testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0)))
	testutil.Equals(t, 0, len(s11.chunks))
	testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0)))
	testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0)))
}

func TestHead_Truncate(t *testing.T) {
	h, err := NewHead(nil, nil, nil, 1000)
	testutil.Ok(t, err)
	defer h.Close()

	h.initTime(0)

	s1, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1"))
	s2, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1"))
	s3, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2"))
	s4, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1"))

	s1.chunks = []*memChunk{
		{minTime: 0, maxTime: 999},
		{minTime: 1000, maxTime: 1999},
		{minTime: 2000, maxTime: 2999},
	}
	s2.chunks = []*memChunk{
		{minTime: 1000, maxTime: 1999},
		{minTime: 2000, maxTime: 2999},
		{minTime: 3000, maxTime: 3999},
	}
	s3.chunks = []*memChunk{
		{minTime: 0, maxTime: 999},
		{minTime: 1000, maxTime: 1999},
	}
	s4.chunks = []*memChunk{}

	// Truncation need not be aligned.
	testutil.Ok(t, h.Truncate(1))

	testutil.Ok(t, h.Truncate(2000))

	testutil.Equals(t, []*memChunk{
		{minTime: 2000, maxTime: 2999},
	}, h.series.getByID(s1.ref).chunks)

	testutil.Equals(t, []*memChunk{
		{minTime: 2000, maxTime: 2999},
		{minTime: 3000, maxTime: 3999},
	}, h.series.getByID(s2.ref).chunks)

	testutil.Assert(t, h.series.getByID(s3.ref) == nil, "")
	testutil.Assert(t, h.series.getByID(s4.ref) == nil, "")

	postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1"))
	postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2"))
	postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1"))
	postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2"))
	postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1"))
	postingsAll, _ := index.ExpandPostings(h.postings.Get("", ""))

	testutil.Equals(t, []uint64{s1.ref}, postingsA1)
	testutil.Equals(t, []uint64{s2.ref}, postingsA2)
	testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsB1)
	testutil.Equals(t, []uint64{s1.ref, s2.ref}, postingsAll)
	testutil.Assert(t, postingsB2 == nil, "")
	testutil.Assert(t, postingsC1 == nil, "")

	testutil.Equals(t, map[string]struct{}{
		"":  {}, // from 'all' postings list
		"a": {},
		"b": {},
		"1": {},
		"2": {},
	}, h.symbols)

	testutil.Equals(t, map[string]stringset{
		"a": {"1": struct{}{}, "2": struct{}{}},
		"b": {"1": struct{}{}},
		"":  {"": struct{}{}},
	}, h.values)
}

// Validate various behaviors brought on by firstChunkID accounting for
// garbage collected chunks.
func TestMemSeries_truncateChunks(t *testing.T) {
	s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000)

	for i := 0; i < 4000; i += 5 {
		ok, _ := s.append(int64(i), float64(i))
		testutil.Assert(t, ok == true, "sample append failed")
	}

	// Check that truncate removes half of the chunks and afterwards
	// that the ID of the last chunk still gives us the same chunk afterwards.
	countBefore := len(s.chunks)
	lastID := s.chunkID(countBefore - 1)
	lastChunk := s.chunk(lastID)

	testutil.Assert(t, s.chunk(0) != nil, "")
	testutil.Assert(t, lastChunk != nil, "")

	s.truncateChunksBefore(2000)

	testutil.Equals(t, int64(2000), s.chunks[0].minTime)
	testutil.Assert(t, s.chunk(0) == nil, "first chunks not gone")
	testutil.Equals(t, countBefore/2, len(s.chunks))
	testutil.Equals(t, lastChunk, s.chunk(lastID))

	// Validate that the series' sample buffer is applied correctly to the last chunk
	// after truncation.
	it1 := s.iterator(s.chunkID(len(s.chunks) - 1))
	_, ok := it1.(*memSafeIterator)
	testutil.Assert(t, ok == true, "")

	it2 := s.iterator(s.chunkID(len(s.chunks) - 2))
	_, ok = it2.(*memSafeIterator)
	testutil.Assert(t, ok == false, "non-last chunk incorrectly wrapped with sample buffer")
}

func TestHeadDeleteSeriesWithoutSamples(t *testing.T) {
	entries := []interface{}{
		[]RefSeries{
			{Ref: 10, Labels: labels.FromStrings("a", "1")},
		},
		[]RefSample{},
		[]RefSeries{
			{Ref: 50, Labels: labels.FromStrings("a", "2")},
		},
		[]RefSample{
			{Ref: 50, T: 80, V: 1},
			{Ref: 50, T: 90, V: 1},
		},
	}
	dir, err := ioutil.TempDir("", "test_delete_series")
	testutil.Ok(t, err)
	defer os.RemoveAll(dir)

	w, err := wal.New(nil, nil, dir)
	testutil.Ok(t, err)
	populateTestWAL(t, w, entries)

	head, err := NewHead(nil, nil, w, 1000)
	testutil.Ok(t, err)
	defer head.Close()

	testutil.Ok(t, head.Init())

	testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1")))
}

func TestHeadDeleteSimple(t *testing.T) {
	numSamples := int64(10)

	head, err := NewHead(nil, nil, nil, 1000)
	testutil.Ok(t, err)
	defer head.Close()

	app := head.Appender()

	smpls := make([]float64, numSamples)
	for i := int64(0); i < numSamples; i++ {
		smpls[i] = rand.Float64()
		app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
	}

	testutil.Ok(t, app.Commit())
	cases := []struct {
		intervals Intervals
		remaint   []int64
	}{
		{
			intervals: Intervals{{0, 3}},
			remaint:   []int64{4, 5, 6, 7, 8, 9},
		},
		{
			intervals: Intervals{{1, 3}},
			remaint:   []int64{0, 4, 5, 6, 7, 8, 9},
		},
		{
			intervals: Intervals{{1, 3}, {4, 7}},
			remaint:   []int64{0, 8, 9},
		},
		{
			intervals: Intervals{{1, 3}, {4, 700}},
			remaint:   []int64{0},
		},
		{
			intervals: Intervals{{0, 9}},
			remaint:   []int64{},
		},
	}

Outer:
	for _, c := range cases {
		// Reset the tombstones.
		head.tombstones = NewMemTombstones()

		// Delete the ranges.
		for _, r := range c.intervals {
			testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher("a", "b")))
		}

		// Compare the result.
		q, err := NewBlockQuerier(head, head.MinTime(), head.MaxTime())
		testutil.Ok(t, err)
		res, err := q.Select(labels.NewEqualMatcher("a", "b"))
		testutil.Ok(t, err)

		expSamples := make([]sample, 0, len(c.remaint))
		for _, ts := range c.remaint {
			expSamples = append(expSamples, sample{ts, smpls[ts]})
		}

		expss := newListSeriesSet([]Series{
			newSeries(map[string]string{"a": "b"}, expSamples),
		})

		if len(expSamples) == 0 {
			testutil.Assert(t, res.Next() == false, "")
			continue
		}

		for {
			eok, rok := expss.Next(), res.Next()
			testutil.Equals(t, eok, rok)

			if !eok {
				continue Outer
			}
			sexp := expss.At()
			sres := res.At()

			testutil.Equals(t, sexp.Labels(), sres.Labels())

			smplExp, errExp := expandSeriesIterator(sexp.Iterator())
			smplRes, errRes := expandSeriesIterator(sres.Iterator())

			testutil.Equals(t, errExp, errRes)
			testutil.Equals(t, smplExp, smplRes)
		}
	}
}

// func TestDeleteUntilCurMax(t *testing.T) {
// 	numSamples := int64(10)

// 	dir, _ := ioutil.TempDir("", "test")
// 	defer os.RemoveAll(dir)

// 	hb := createTestHead(t, dir, 0, 2*numSamples)
// 	app := hb.Appender()

// 	smpls := make([]float64, numSamples)
// 	for i := int64(0); i < numSamples; i++ {
// 		smpls[i] = rand.Float64()
// 		app.Add(labels.Labels{{"a", "b"}}, i, smpls[i])
// 	}

// 	testutil.Ok(t, app.Commit())
// 	testutil.Ok(t, hb.Delete(0, 10000, labels.NewEqualMatcher("a", "b")))
// 	app = hb.Appender()
// 	_, err := app.Add(labels.Labels{{"a", "b"}}, 11, 1)
// 	testutil.Ok(t, err)
// 	testutil.Ok(t, app.Commit())

// 	q := hb.Querier(0, 100000)
// 	res := q.Select(labels.NewEqualMatcher("a", "b"))

// 	require.True(t, res.Next())
// 	exps := res.At()
// 	it := exps.Iterator()
// 	ressmpls, err := expandSeriesIterator(it)
// 	testutil.Ok(t, err)
// 	testutil.Equals(t, []sample{{11, 1}}, ressmpls)
// }

// func TestDelete_e2e(t *testing.T) {
// 	numDatapoints := 1000
// 	numRanges := 1000
// 	timeInterval := int64(2)
// 	maxTime := int64(2 * 1000)
// 	minTime := int64(200)
// 	// Create 8 series with 1000 data-points of different ranges, delete and run queries.
// 	lbls := [][]labels.Label{
// 		{
// 			{"a", "b"},
// 			{"instance", "localhost:9090"},
// 			{"job", "prometheus"},
// 		},
// 		{
// 			{"a", "b"},
// 			{"instance", "127.0.0.1:9090"},
// 			{"job", "prometheus"},
// 		},
// 		{
// 			{"a", "b"},
// 			{"instance", "127.0.0.1:9090"},
// 			{"job", "prom-k8s"},
// 		},
// 		{
// 			{"a", "b"},
// 			{"instance", "localhost:9090"},
// 			{"job", "prom-k8s"},
// 		},
// 		{
// 			{"a", "c"},
// 			{"instance", "localhost:9090"},
// 			{"job", "prometheus"},
// 		},
// 		{
// 			{"a", "c"},
// 			{"instance", "127.0.0.1:9090"},
// 			{"job", "prometheus"},
// 		},
// 		{
// 			{"a", "c"},
// 			{"instance", "127.0.0.1:9090"},
// 			{"job", "prom-k8s"},
// 		},
// 		{
// 			{"a", "c"},
// 			{"instance", "localhost:9090"},
// 			{"job", "prom-k8s"},
// 		},
// 	}

// 	seriesMap := map[string][]sample{}
// 	for _, l := range lbls {
// 		seriesMap[labels.New(l...).String()] = []sample{}
// 	}

// 	dir, _ := ioutil.TempDir("", "test")
// 	defer os.RemoveAll(dir)

// 	hb := createTestHead(t, dir, minTime, maxTime)
// 	app := hb.Appender()

// 	for _, l := range lbls {
// 		ls := labels.New(l...)
// 		series := []sample{}

// 		ts := rand.Int63n(300)
// 		for i := 0; i < numDatapoints; i++ {
// 			v := rand.Float64()
// 			if ts >= minTime && ts <= maxTime {
// 				series = append(series, sample{ts, v})
// 			}

// 			_, err := app.Add(ls, ts, v)
// 			if ts >= minTime && ts <= maxTime {
// 				testutil.Ok(t, err)
// 			} else {
// 				testutil.EqualsError(t, err, ErrOutOfBounds.Error())
// 			}

// 			ts += rand.Int63n(timeInterval) + 1
// 		}

// 		seriesMap[labels.New(l...).String()] = series
// 	}

// 	testutil.Ok(t, app.Commit())

// 	// Delete a time-range from each-selector.
// 	dels := []struct {
// 		ms     []labels.Matcher
// 		drange Intervals
// 	}{
// 		{
// 			ms:     []labels.Matcher{labels.NewEqualMatcher("a", "b")},
// 			drange: Intervals{{300, 500}, {600, 670}},
// 		},
// 		{
// 			ms: []labels.Matcher{
// 				labels.NewEqualMatcher("a", "b"),
// 				labels.NewEqualMatcher("job", "prom-k8s"),
// 			},
// 			drange: Intervals{{300, 500}, {100, 670}},
// 		},
// 		{
// 			ms: []labels.Matcher{
// 				labels.NewEqualMatcher("a", "c"),
// 				labels.NewEqualMatcher("instance", "localhost:9090"),
// 				labels.NewEqualMatcher("job", "prometheus"),
// 			},
// 			drange: Intervals{{300, 400}, {100, 6700}},
// 		},
// 		// TODO: Add Regexp Matchers.
// 	}

// 	for _, del := range dels {
// 		// Reset the deletes everytime.
// 		writeTombstoneFile(hb.dir, newEmptyTombstoneReader())
// 		hb.tombstones = newEmptyTombstoneReader()

// 		for _, r := range del.drange {
// 			testutil.Ok(t, hb.Delete(r.Mint, r.Maxt, del.ms...))
// 		}

// 		matched := labels.Slice{}
// 		for _, ls := range lbls {
// 			s := labels.Selector(del.ms)
// 			if s.Matches(ls) {
// 				matched = append(matched, ls)
// 			}
// 		}

// 		sort.Sort(matched)

// 		for i := 0; i < numRanges; i++ {
// 			mint := rand.Int63n(200)
// 			maxt := mint + rand.Int63n(timeInterval*int64(numDatapoints))

// 			q := hb.Querier(mint, maxt)
// 			ss := q.Select(del.ms...)

// 			// Build the mockSeriesSet.
// 			matchedSeries := make([]Series, 0, len(matched))
// 			for _, m := range matched {
// 				smpls := boundedSamples(seriesMap[m.String()], mint, maxt)
// 				smpls = deletedSamples(smpls, del.drange)

// 				// Only append those series for which samples exist as mockSeriesSet
// 				// doesn't skip series with no samples.
// 				// TODO: But sometimes SeriesSet returns an empty SeriesIterator
// 				if len(smpls) > 0 {
// 					matchedSeries = append(matchedSeries, newSeries(
// 						m.Map(),
// 						smpls,
// 					))
// 				}
// 			}
// 			expSs := newListSeriesSet(matchedSeries)

// 			// Compare both SeriesSets.
// 			for {
// 				eok, rok := expSs.Next(), ss.Next()

// 				// Skip a series if iterator is empty.
// 				if rok {
// 					for !ss.At().Iterator().Next() {
// 						rok = ss.Next()
// 						if !rok {
// 							break
// 						}
// 					}
// 				}
// 				testutil.Equals(t, eok, rok, "next")

// 				if !eok {
// 					break
// 				}
// 				sexp := expSs.At()
// 				sres := ss.At()

// 				testutil.Equals(t, sexp.Labels(), sres.Labels(), "labels")

// 				smplExp, errExp := expandSeriesIterator(sexp.Iterator())
// 				smplRes, errRes := expandSeriesIterator(sres.Iterator())

// 				testutil.Equals(t, errExp, errRes, "samples error")
// 				testutil.Equals(t, smplExp, smplRes, "samples")
// 			}
// 		}
// 	}

// 	return
// }

func boundedSamples(full []sample, mint, maxt int64) []sample {
	for len(full) > 0 {
		if full[0].t >= mint {
			break
		}
		full = full[1:]
	}
	for i, s := range full {
		// labels.Labelinate on the first sample larger than maxt.
		if s.t > maxt {
			return full[:i]
		}
	}
	// maxt is after highest sample.
	return full
}

func deletedSamples(full []sample, dranges Intervals) []sample {
	ds := make([]sample, 0, len(full))
Outer:
	for _, s := range full {
		for _, r := range dranges {
			if r.inBounds(s.t) {
				continue Outer
			}
		}
		ds = append(ds, s)
	}

	return ds
}

func TestComputeChunkEndTime(t *testing.T) {
	cases := []struct {
		start, cur, max int64
		res             int64
	}{
		{
			start: 0,
			cur:   250,
			max:   1000,
			res:   1000,
		},
		{
			start: 100,
			cur:   200,
			max:   1000,
			res:   550,
		},
		// Case where we fit floored 0 chunks. Must catch division by 0
		// and default to maximum time.
		{
			start: 0,
			cur:   500,
			max:   1000,
			res:   1000,
		},
		// Catch division by zero for cur == start. Strictly not a possible case.
		{
			start: 100,
			cur:   100,
			max:   1000,
			res:   104,
		},
	}

	for _, c := range cases {
		got := computeChunkEndTime(c.start, c.cur, c.max)
		if got != c.res {
			t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got)
		}
	}
}

func TestMemSeries_append(t *testing.T) {
	s := newMemSeries(labels.Labels{}, 1, 500)

	// Add first two samples at the very end of a chunk range and the next two
	// on and after it.
	// New chunk must correctly be cut at 1000.
	ok, chunkCreated := s.append(998, 1)
	testutil.Assert(t, ok, "append failed")
	testutil.Assert(t, chunkCreated, "first sample created chunk")

	ok, chunkCreated = s.append(999, 2)
	testutil.Assert(t, ok, "append failed")
	testutil.Assert(t, !chunkCreated, "second sample should use same chunk")

	ok, chunkCreated = s.append(1000, 3)
	testutil.Assert(t, ok, "append failed")
	testutil.Assert(t, ok, "expected new chunk on boundary")

	ok, chunkCreated = s.append(1001, 4)
	testutil.Assert(t, ok, "append failed")
	testutil.Assert(t, !chunkCreated, "second sample should use same chunk")

	testutil.Assert(t, s.chunks[0].minTime == 998 && s.chunks[0].maxTime == 999, "wrong chunk range")
	testutil.Assert(t, s.chunks[1].minTime == 1000 && s.chunks[1].maxTime == 1001, "wrong chunk range")

	// Fill the range [1000,2000) with many samples. Intermediate chunks should be cut
	// at approximately 120 samples per chunk.
	for i := 1; i < 1000; i++ {
		ok, _ := s.append(1001+int64(i), float64(i))
		testutil.Assert(t, ok, "append failed")
	}

	testutil.Assert(t, len(s.chunks) > 7, "expected intermediate chunks")

	// All chunks but the first and last should now be moderately full.
	for i, c := range s.chunks[1 : len(s.chunks)-1] {
		testutil.Assert(t, c.chunk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, c.chunk.NumSamples())
	}
}

func TestGCChunkAccess(t *testing.T) {
	// Put a chunk, select it. GC it and then access it.
	h, err := NewHead(nil, nil, nil, 1000)
	testutil.Ok(t, err)
	defer h.Close()

	h.initTime(0)

	s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
	s.chunks = []*memChunk{
		{minTime: 0, maxTime: 999},
		{minTime: 1000, maxTime: 1999},
	}

	idx := h.indexRange(0, 1500)
	var (
		lset   labels.Labels
		chunks []chunks.Meta
	)
	testutil.Ok(t, idx.Series(1, &lset, &chunks))

	testutil.Equals(t, labels.Labels{{
		Name: "a", Value: "1",
	}}, lset)
	testutil.Equals(t, 2, len(chunks))

	cr := h.chunksRange(0, 1500)
	_, err = cr.Chunk(chunks[0].Ref)
	testutil.Ok(t, err)
	_, err = cr.Chunk(chunks[1].Ref)
	testutil.Ok(t, err)

	h.Truncate(1500) // Remove a chunk.

	_, err = cr.Chunk(chunks[0].Ref)
	testutil.Equals(t, ErrNotFound, err)
	_, err = cr.Chunk(chunks[1].Ref)
	testutil.Ok(t, err)
}

func TestGCSeriesAccess(t *testing.T) {
	// Put a series, select it. GC it and then access it.
	h, err := NewHead(nil, nil, nil, 1000)
	testutil.Ok(t, err)
	defer h.Close()

	h.initTime(0)

	s, _ := h.getOrCreate(1, labels.FromStrings("a", "1"))
	s.chunks = []*memChunk{
		{minTime: 0, maxTime: 999},
		{minTime: 1000, maxTime: 1999},
	}

	idx := h.indexRange(0, 2000)
	var (
		lset   labels.Labels
		chunks []chunks.Meta
	)
	testutil.Ok(t, idx.Series(1, &lset, &chunks))

	testutil.Equals(t, labels.Labels{{
		Name: "a", Value: "1",
	}}, lset)
	testutil.Equals(t, 2, len(chunks))

	cr := h.chunksRange(0, 2000)
	_, err = cr.Chunk(chunks[0].Ref)
	testutil.Ok(t, err)
	_, err = cr.Chunk(chunks[1].Ref)
	testutil.Ok(t, err)

	h.Truncate(2000) // Remove the series.

	testutil.Equals(t, (*memSeries)(nil), h.series.getByID(1))

	_, err = cr.Chunk(chunks[0].Ref)
	testutil.Equals(t, ErrNotFound, err)
	_, err = cr.Chunk(chunks[1].Ref)
	testutil.Equals(t, ErrNotFound, err)
}

func TestHead_LogRollback(t *testing.T) {
	dir, err := ioutil.TempDir("", "wal_rollback")
	testutil.Ok(t, err)
	defer os.RemoveAll(dir)

	w, err := wal.New(nil, nil, dir)
	testutil.Ok(t, err)
	h, err := NewHead(nil, nil, w, 1000)
	testutil.Ok(t, err)

	app := h.Appender()
	_, err = app.Add(labels.FromStrings("a", "b"), 1, 2)
	testutil.Ok(t, err)

	testutil.Ok(t, app.Rollback())
	recs := readTestWAL(t, w.Dir())

	testutil.Equals(t, 1, len(recs))

	series, ok := recs[0].([]RefSeries)
	testutil.Assert(t, ok, "expected series record but got %+v", recs[0])
	testutil.Equals(t, series, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}})
}