From 40dd13b07420a044cc1b0ca57f639c572583d9c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=9B=BD=E5=BF=A0?= Date: Wed, 12 Feb 2020 01:19:34 +0800 Subject: [PATCH] Storage concurrently (#6770) * Storage concurrently Signed-off-by: fuling --- storage/fanout.go | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/storage/fanout.go b/storage/fanout.go index d8f87cf5c..7a99d5f6a 100644 --- a/storage/fanout.go +++ b/storage/fanout.go @@ -240,23 +240,39 @@ func (q *mergeQuerier) Select(params *SelectParams, matchers ...*labels.Matcher) func (q *mergeQuerier) SelectSorted(params *SelectParams, matchers ...*labels.Matcher) (SeriesSet, Warnings, error) { seriesSets := make([]SeriesSet, 0, len(q.queriers)) var warnings Warnings + + var priErr error = nil + type queryResult struct { + qr Querier + set SeriesSet + wrn Warnings + selectError error + } + queryResultChan := make(chan *queryResult) for _, querier := range q.queriers { - set, wrn, err := querier.SelectSorted(params, matchers...) - q.setQuerierMap[set] = querier - if wrn != nil { - warnings = append(warnings, wrn...) + go func(qr Querier) { + set, wrn, err := qr.SelectSorted(params, matchers...) + queryResultChan <- &queryResult{qr: qr, set: set, wrn: wrn, selectError: err} + }(querier) + } + for i := 0; i < len(q.queriers); i++ { + qryResult := <-queryResultChan + q.setQuerierMap[qryResult.set] = qryResult.qr + if qryResult.wrn != nil { + warnings = append(warnings, qryResult.wrn...) } - if err != nil { - q.failedQueriers[querier] = struct{}{} + if qryResult.selectError != nil { + q.failedQueriers[qryResult.qr] = struct{}{} // If the error source isn't the primary querier, return the error as a warning and continue. - if querier != q.primaryQuerier { - warnings = append(warnings, err) - continue + if qryResult.qr != q.primaryQuerier { + warnings = append(warnings, qryResult.selectError) } else { - return nil, nil, err + priErr = qryResult.selectError } } - seriesSets = append(seriesSets, set) + } + if priErr != nil { + return nil, nil, priErr } return NewMergeSeriesSet(seriesSets, q), warnings, nil }