mirror of
https://github.com/prometheus/prometheus
synced 2025-02-19 14:07:09 +00:00
Remove old appendBatch methods
This commit is contained in:
parent
fde69dab49
commit
22db9c3413
27
db.go
27
db.go
@ -382,33 +382,6 @@ func (a *dbAppender) Commit() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// func (db *DB) appendBatch(samples []hashedSample) error {
|
||||
// if len(samples) == 0 {
|
||||
// return nil
|
||||
// }
|
||||
// db.mtx.RLock()
|
||||
// defer db.mtx.RUnlock()
|
||||
|
||||
// head := db.heads[len(db.heads)-1]
|
||||
|
||||
// // TODO(fabxc): distinguish samples between concurrent heads for
|
||||
// // different time blocks. Those may occurr during transition to still
|
||||
// // allow late samples to arrive for a previous block.
|
||||
// n, err := head.appendBatch(samples)
|
||||
// if err == nil {
|
||||
// db.metrics.samplesAppended.Add(float64(n))
|
||||
// }
|
||||
|
||||
// if head.fullness() > 1.0 {
|
||||
// select {
|
||||
// case db.cutc <- struct{}{}:
|
||||
// default:
|
||||
// }
|
||||
// }
|
||||
|
||||
// return err
|
||||
// }
|
||||
|
||||
func (db *DB) headForDir(dir string) (int, bool) {
|
||||
for i, b := range db.heads {
|
||||
if b.Dir() == dir {
|
||||
|
121
head.go
121
head.go
@ -431,127 +431,6 @@ var (
|
||||
ErrOutOfBounds = errors.New("out of bounds")
|
||||
)
|
||||
|
||||
// func (h *headBlock) appendBatch(samples []hashedSample) (int, error) {
|
||||
// // Find head chunks for all samples and allocate new IDs/refs for
|
||||
// // ones we haven't seen before.
|
||||
|
||||
// var (
|
||||
// newSeries = map[uint64][]*hashedSample{}
|
||||
// newLabels []labels.Labels
|
||||
// )
|
||||
|
||||
// h.mtx.RLock()
|
||||
// defer h.mtx.RUnlock()
|
||||
|
||||
// for i := range samples {
|
||||
// s := &samples[i]
|
||||
|
||||
// ms := h.get(s.hash, s.labels)
|
||||
// if ms != nil {
|
||||
// c := ms.head()
|
||||
|
||||
// if s.t < c.maxTime {
|
||||
// return 0, ErrOutOfOrderSample
|
||||
// }
|
||||
// if c.maxTime == s.t && ms.lastValue != s.v {
|
||||
// return 0, ErrAmendSample
|
||||
// }
|
||||
// // TODO(fabxc): sample refs are only scoped within a block for
|
||||
// // now and we ignore any previously set value
|
||||
// s.ref = ms.ref
|
||||
// continue
|
||||
// }
|
||||
|
||||
// // TODO(fabxc): technically there's still collision probability here.
|
||||
// // Extract the hashmap of the head block and use an instance of it here as well.
|
||||
// newSeries[s.hash] = append(newSeries[s.hash], s)
|
||||
// }
|
||||
|
||||
// // After the samples were successfully written to the WAL, there may
|
||||
// // be no further failures.
|
||||
// if len(newSeries) > 0 {
|
||||
// newLabels = make([]labels.Labels, 0, len(newSeries))
|
||||
// base0 := len(h.series)
|
||||
|
||||
// h.mtx.RUnlock()
|
||||
// h.mtx.Lock()
|
||||
|
||||
// base1 := len(h.series)
|
||||
// i := 0
|
||||
|
||||
// for hash, ser := range newSeries {
|
||||
// lset := ser[0].labels
|
||||
// // We switched locks and have to re-validate that the series were not
|
||||
// // created by another goroutine in the meantime.
|
||||
// if base1 != base0 {
|
||||
// if ms := h.get(hash, lset); ms != nil {
|
||||
// for _, s := range ser {
|
||||
// s.ref = ms.ref
|
||||
// }
|
||||
// continue
|
||||
// }
|
||||
// }
|
||||
// // Series is still new.
|
||||
// newLabels = append(newLabels, lset)
|
||||
|
||||
// h.create(hash, lset)
|
||||
// // Set sample references to the series we just created.
|
||||
// for _, s := range ser {
|
||||
// s.ref = uint32(base1 + i)
|
||||
// }
|
||||
// i++
|
||||
// }
|
||||
|
||||
// h.mtx.Unlock()
|
||||
// h.mtx.RLock()
|
||||
// }
|
||||
// // Write all new series and samples to the WAL and add it to the
|
||||
// // in-mem database on success.
|
||||
// if err := h.wal.Log(newLabels, samples); err != nil {
|
||||
// return 0, err
|
||||
// }
|
||||
|
||||
// var (
|
||||
// total = uint64(len(samples))
|
||||
// mint = int64(math.MaxInt64)
|
||||
// maxt = int64(math.MinInt64)
|
||||
// )
|
||||
// for _, s := range samples {
|
||||
// ser := h.series[s.ref]
|
||||
|
||||
// ser.mtx.Lock()
|
||||
// ok := ser.append(s.t, s.v)
|
||||
// ser.mtx.Unlock()
|
||||
|
||||
// if !ok {
|
||||
// total--
|
||||
// continue
|
||||
// }
|
||||
// if mint > s.t {
|
||||
// mint = s.t
|
||||
// }
|
||||
// if maxt < s.t {
|
||||
// maxt = s.t
|
||||
// }
|
||||
// }
|
||||
|
||||
// h.stats.mtx.Lock()
|
||||
// defer h.stats.mtx.Unlock()
|
||||
|
||||
// h.stats.SampleCount += total
|
||||
// h.stats.SeriesCount += uint64(len(newSeries))
|
||||
// h.stats.ChunkCount += uint64(len(newSeries)) // head block has one chunk/series
|
||||
|
||||
// if mint < h.stats.MinTime {
|
||||
// h.stats.MinTime = mint
|
||||
// }
|
||||
// if maxt > h.stats.MaxTime {
|
||||
// h.stats.MaxTime = maxt
|
||||
// }
|
||||
|
||||
// return int(total), nil
|
||||
// }
|
||||
|
||||
func (h *headBlock) fullness() float64 {
|
||||
h.stats.mtx.RLock()
|
||||
defer h.stats.mtx.RUnlock()
|
||||
|
Loading…
Reference in New Issue
Block a user