Merge branch 'release-2.2'

Signed-off-by: beorn7 <beorn@soundcloud.com>
This commit is contained in:
beorn7 2018-04-10 16:45:56 +02:00
commit 94ff07b81d
16 changed files with 186 additions and 101 deletions

View File

@ -3,6 +3,14 @@
* [CHANGE] `marathon_sd`: use `auth_token` and `auth_token_file` for token-based authentication instead of `bearer_token` and `bearer_token_file` respectively.
* [ENHANCEMENT] `marathon_sd`: adds support for basic and bearer authentication, plus all other common HTTP client options (TLS config, proxy URL, etc.)
## 2.2.1 / 2018-03-13
* [BUGFIX] Fix data loss in TSDB on compaction
* [BUGFIX] Correctly stop timer in remote-write path
* [BUGFIX] Fix deadlock triggerd by loading targets page
* [BUGFIX] Fix incorrect buffering of samples on range selection queries
* [BUGFIX] Handle large index files on windows properly
## 2.2.0 / 2018-03-08
* [CHANGE] Rename file SD mtime metric.

View File

@ -1 +1 @@
2.2.0
2.2.1

View File

@ -40,18 +40,25 @@ func NewManager(logger log.Logger, app Appendable) *Manager {
scrapeConfigs: make(map[string]*config.ScrapeConfig),
scrapePools: make(map[string]*scrapePool),
graceShut: make(chan struct{}),
targetsAll: make(map[string][]*Target),
}
}
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups form the discovery manager.
type Manager struct {
logger log.Logger
append Appendable
logger log.Logger
append Appendable
graceShut chan struct{}
mtxTargets sync.Mutex // Guards the fields below.
targetsActive []*Target
targetsDropped []*Target
targetsAll map[string][]*Target
mtxScrape sync.Mutex // Guards the fields below.
scrapeConfigs map[string]*config.ScrapeConfig
scrapePools map[string]*scrapePool
mtx sync.RWMutex
graceShut chan struct{}
}
// Run starts background processing to handle target updates and reload the scraping loops.
@ -68,6 +75,9 @@ func (m *Manager) Run(tsets <-chan map[string][]*targetgroup.Group) error {
// Stop cancels all running scrape pools and blocks until all have exited.
func (m *Manager) Stop() {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
for _, sp := range m.scrapePools {
sp.stop()
}
@ -76,8 +86,9 @@ func (m *Manager) Stop() {
// ApplyConfig resets the manager's target providers and job configurations as defined by the new cfg.
func (m *Manager) ApplyConfig(cfg *config.Config) error {
m.mtx.Lock()
defer m.mtx.Unlock()
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
c := make(map[string]*config.ScrapeConfig)
for _, scfg := range cfg.ScrapeConfigs {
c[scfg.JobName] = scfg
@ -97,71 +108,66 @@ func (m *Manager) ApplyConfig(cfg *config.Config) error {
return nil
}
// TargetMap returns map of active and dropped targets and their corresponding scrape config job name.
func (m *Manager) TargetMap() map[string][]*Target {
m.mtx.Lock()
defer m.mtx.Unlock()
targets := make(map[string][]*Target)
for jobName, sp := range m.scrapePools {
sp.mtx.RLock()
for _, t := range sp.targets {
targets[jobName] = append(targets[jobName], t)
}
targets[jobName] = append(targets[jobName], sp.droppedTargets...)
sp.mtx.RUnlock()
}
return targets
// TargetsAll returns active and dropped targets grouped by job_name.
func (m *Manager) TargetsAll() map[string][]*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsAll
}
// Targets returns the targets currently being scraped.
func (m *Manager) Targets() []*Target {
m.mtx.Lock()
defer m.mtx.Unlock()
var targets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
for _, tt := range p.targets {
targets = append(targets, tt)
}
p.mtx.RUnlock()
}
return targets
// TargetsActive returns the active targets currently being scraped.
func (m *Manager) TargetsActive() []*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsActive
}
// DroppedTargets returns the targets dropped during relabelling.
func (m *Manager) DroppedTargets() []*Target {
m.mtx.Lock()
defer m.mtx.Unlock()
var droppedTargets []*Target
for _, p := range m.scrapePools {
p.mtx.RLock()
droppedTargets = append(droppedTargets, p.droppedTargets...)
p.mtx.RUnlock()
// TargetsDropped returns the dropped targets during relabelling.
func (m *Manager) TargetsDropped() []*Target {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
return m.targetsDropped
}
func (m *Manager) targetsUpdate(active, dropped map[string][]*Target) {
m.mtxTargets.Lock()
defer m.mtxTargets.Unlock()
m.targetsAll = make(map[string][]*Target)
m.targetsActive = nil
m.targetsDropped = nil
for jobName, targets := range active {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsActive = append(m.targetsActive, targets...)
}
for jobName, targets := range dropped {
m.targetsAll[jobName] = append(m.targetsAll[jobName], targets...)
m.targetsDropped = append(m.targetsDropped, targets...)
}
return droppedTargets
}
func (m *Manager) reload(t map[string][]*targetgroup.Group) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
tDropped := make(map[string][]*Target)
tActive := make(map[string][]*Target)
for tsetName, tgroup := range t {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
// Scrape pool doesn't exist so start a new one.
existing, ok := m.scrapePools[tsetName]
if !ok {
sp := newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
var sp *scrapePool
if existing, ok := m.scrapePools[tsetName]; !ok {
scrapeConfig, ok := m.scrapeConfigs[tsetName]
if !ok {
level.Error(m.logger).Log("msg", "error reloading target set", "err", fmt.Sprintf("invalid config id:%v", tsetName))
continue
}
sp = newScrapePool(scrapeConfig, m.append, log.With(m.logger, "scrape_pool", tsetName))
m.scrapePools[tsetName] = sp
sp.Sync(tgroup)
} else {
existing.Sync(tgroup)
sp = existing
}
tActive[tsetName], tDropped[tsetName] = sp.Sync(tgroup)
}
m.targetsUpdate(tActive, tDropped)
}

View File

@ -245,8 +245,8 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) {
}
// Sync converts target groups into actual scrape targets and synchronizes
// the currently running scraper with the resulting set.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// the currently running scraper with the resulting set and returns all scraped and dropped targets.
func (sp *scrapePool) Sync(tgs []*targetgroup.Group) (tActive []*Target, tDropped []*Target) {
start := time.Now()
var all []*Target
@ -273,6 +273,15 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
sp.mtx.RLock()
for _, t := range sp.targets {
tActive = append(tActive, t)
}
tDropped = sp.droppedTargets
sp.mtx.RUnlock()
return tActive, tDropped
}
// sync takes a list of potentially duplicated targets, deduplicates them, starts

View File

@ -110,6 +110,8 @@ func (t *Target) Labels() labels.Labels {
// DiscoveredLabels returns a copy of the target's labels before any processing.
func (t *Target) DiscoveredLabels() labels.Labels {
t.mtx.Lock()
defer t.mtx.Unlock()
lset := make(labels.Labels, len(t.discoveredLabels))
copy(lset, t.discoveredLabels)
return lset
@ -117,6 +119,8 @@ func (t *Target) DiscoveredLabels() labels.Labels {
// SetDiscoveredLabels sets new DiscoveredLabels
func (t *Target) SetDiscoveredLabels(l labels.Labels) {
t.mtx.Lock()
defer t.mtx.Unlock()
t.discoveredLabels = l
}

View File

@ -151,16 +151,11 @@ func (c *LeveledCompactor) Plan(dir string) ([]string, error) {
if err != nil {
return nil, err
}
// We do not include the most recently created block. This gives users a window
// of a full block size to piece-wise backup new data without having to care
// about data overlap.
if len(dirs) < 1 {
return nil, nil
}
dirs = dirs[:len(dirs)-1]
var dms []dirMeta
for _, dir := range dirs {
meta, err := readMetaFile(dir)
if err != nil {
@ -176,6 +171,10 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
dms = dms[:len(dms)-1]
var res []string
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)

View File

@ -0,0 +1,5 @@
// +build windows
package fileutil
const maxMapSize = 0x7FFFFFFF // 2GB

View File

@ -0,0 +1,5 @@
// +build windows
package fileutil
const maxMapSize = 0xFFFFFFFFFFFF // 256TB

View File

@ -19,14 +19,14 @@ import (
"unsafe"
)
func mmap(f *os.File, sz int) ([]byte, error) {
low, high := uint32(sz), uint32(sz>>32)
func mmap(f *os.File, size int) ([]byte, error) {
low, high := uint32(size), uint32(size>>32)
h, errno := syscall.CreateFileMapping(syscall.Handle(f.Fd()), nil, syscall.PAGE_READONLY, high, low, nil)
if h == 0 {
return nil, os.NewSyscallError("CreateFileMapping", errno)
}
addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ, 0, 0, uintptr(sz))
addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ, 0, 0, uintptr(size))
if addr == 0 {
return nil, os.NewSyscallError("MapViewOfFile", errno)
}
@ -35,7 +35,7 @@ func mmap(f *os.File, sz int) ([]byte, error) {
return nil, os.NewSyscallError("CloseHandle", err)
}
return (*[1 << 30]byte)(unsafe.Pointer(addr))[:sz], nil
return (*[maxMapSize]byte)(unsafe.Pointer(addr))[:size], nil
}
func munmap(b []byte) error {

View File

@ -653,6 +653,11 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return r, nil
}
// Version returns the file format version of the underlying index.
func (r *Reader) Version() int {
return r.version
}
// Range marks a byte range.
type Range struct {
Start, End int64

View File

@ -61,6 +61,9 @@ func repairBadIndexVersion(logger log.Logger, dir string) error {
if err := repl.Close(); err != nil {
return err
}
if err := broken.Close(); err != nil {
return err
}
if err := renameFile(repl.Name(), broken.Name()); err != nil {
return err
}

30
vendor/vendor.json vendored
View File

@ -826,40 +826,40 @@
"revisionTime": "2016-04-11T19:08:41Z"
},
{
"checksumSHA1": "zVgXlbZ1J8GhBN7tZji7M/SuiAU=",
"checksumSHA1": "vNslgGjRBqauFmVIBTkvEWwvURg=",
"path": "github.com/prometheus/tsdb",
"revision": "16b2bf1b45ce3e3536c78ebec5116ea09a69786e",
"revisionTime": "2018-03-02T11:51:49Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "S7F4yWxVLhxQNHMdgoOo6plmOOs=",
"path": "github.com/prometheus/tsdb/chunkenc",
"revision": "494acd307058387ced7646f9996b0f7372eaa558",
"revisionTime": "2018-02-15T11:29:47Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "+zsn1i8cqwgZXL8Bg6jDy32xjAo=",
"path": "github.com/prometheus/tsdb/chunks",
"revision": "494acd307058387ced7646f9996b0f7372eaa558",
"revisionTime": "2018-02-15T11:29:47Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "h49AAcJ5+iRBwCgbfQf+2T1E1ZE=",
"checksumSHA1": "T7qvg4VhFLklT3g+qPkUWxBo0yw=",
"path": "github.com/prometheus/tsdb/fileutil",
"revision": "494acd307058387ced7646f9996b0f7372eaa558",
"revisionTime": "2018-02-15T11:29:47Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "UlvN+ZhTu52S8f9niySQpPC+dvQ=",
"checksumSHA1": "4ebzIE2Jvj6+SG6yGFSXN8scgfo=",
"path": "github.com/prometheus/tsdb/index",
"revision": "494acd307058387ced7646f9996b0f7372eaa558",
"revisionTime": "2018-02-15T11:29:47Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "Va8HWvOFTwFeewZFadMAOzNGDps=",
"path": "github.com/prometheus/tsdb/labels",
"revision": "494acd307058387ced7646f9996b0f7372eaa558",
"revisionTime": "2018-02-15T11:29:47Z"
"revision": "659ed644294eec6310cef0685b002a3aed8c8f85",
"revisionTime": "2018-03-14T13:49:50Z"
},
{
"checksumSHA1": "5SYLEhADhdBVZAGPVHWggQl7H8k=",

View File

@ -82,8 +82,8 @@ func (e *apiError) Error() string {
}
type targetRetriever interface {
Targets() []*scrape.Target
DroppedTargets() []*scrape.Target
TargetsActive() []*scrape.Target
TargetsDropped() []*scrape.Target
}
type alertmanagerRetriever interface {
@ -452,11 +452,12 @@ type TargetDiscovery struct {
}
func (api *API) targets(r *http.Request) (interface{}, *apiError) {
targets := api.targetRetriever.Targets()
droppedTargets := api.targetRetriever.DroppedTargets()
res := &TargetDiscovery{ActiveTargets: make([]*Target, len(targets)), DroppedTargets: make([]*DroppedTarget, len(droppedTargets))}
tActive := api.targetRetriever.TargetsActive()
tDropped := api.targetRetriever.TargetsDropped()
res := &TargetDiscovery{ActiveTargets: make([]*Target, len(tActive)), DroppedTargets: make([]*DroppedTarget, len(tDropped))}
for i, t := range tActive {
for i, t := range targets {
lastErrStr := ""
lastErr := t.LastError()
if lastErr != nil {
@ -473,12 +474,11 @@ func (api *API) targets(r *http.Request) (interface{}, *apiError) {
}
}
for i, t := range droppedTargets {
for i, t := range tDropped {
res.DroppedTargets[i] = &DroppedTarget{
DiscoveredLabels: t.DiscoveredLabels().Map(),
}
}
return res, nil
}

View File

@ -45,7 +45,7 @@ import (
type testTargetRetriever struct{}
func (t testTargetRetriever) Targets() []*scrape.Target {
func (t testTargetRetriever) TargetsActive() []*scrape.Target {
return []*scrape.Target{
scrape.NewTarget(
labels.FromMap(map[string]string{
@ -58,7 +58,7 @@ func (t testTargetRetriever) Targets() []*scrape.Target {
),
}
}
func (t testTargetRetriever) DroppedTargets() []*scrape.Target {
func (t testTargetRetriever) TargetsDropped() []*scrape.Target {
return []*scrape.Target{
scrape.NewTarget(
nil,

View File

@ -437,7 +437,7 @@ func (h *Handler) Run(ctx context.Context) error {
h.options.QueryEngine,
h.options.Storage.Querier,
func() []*scrape.Target {
return h.options.ScrapeManager.Targets()
return h.options.ScrapeManager.TargetsActive()
},
func() []*url.URL {
return h.options.Notifier.Alertmanagers()
@ -659,7 +659,7 @@ func (h *Handler) rules(w http.ResponseWriter, r *http.Request) {
func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
var index []string
targets := h.scrapeManager.TargetMap()
targets := h.scrapeManager.TargetsAll()
for job := range targets {
index = append(index, job)
}
@ -677,7 +677,7 @@ func (h *Handler) serviceDiscovery(w http.ResponseWriter, r *http.Request) {
func (h *Handler) targets(w http.ResponseWriter, r *http.Request) {
// Bucket targets by job label
tps := map[string][]*scrape.Target{}
for _, t := range h.scrapeManager.Targets() {
for _, t := range h.scrapeManager.TargetsActive() {
job := t.Labels().Get(model.JobLabel)
tps[job] = append(tps[job], t)
}
@ -733,7 +733,13 @@ func tmplFuncs(consolesPath string, opts *Options) template_text.FuncMap {
return time.Since(t) / time.Millisecond * time.Millisecond
},
"consolesPath": func() string { return consolesPath },
"pathPrefix": func() string { return opts.ExternalURL.Path },
"pathPrefix": func() string {
if opts.RoutePrefix == "/" {
return ""
} else {
return opts.RoutePrefix
}
},
"buildVersion": func() string { return opts.Version.Revision },
"stripLabels": func(lset map[string]string, labels ...string) map[string]string {
for _, ln := range labels {

View File

@ -272,6 +272,41 @@ func TestRoutePrefix(t *testing.T) {
testutil.Equals(t, http.StatusOK, resp.StatusCode)
}
func TestPathPrefix(t *testing.T) {
tests := []struct {
routePrefix string
pathPrefix string
}{
{
routePrefix: "/",
// If we have pathPrefix as "/", URL in UI gets "//"" as prefix,
// hence doesn't remain relative path anymore.
pathPrefix: "",
},
{
routePrefix: "/prometheus",
pathPrefix: "/prometheus",
},
{
routePrefix: "/p1/p2/p3/p4",
pathPrefix: "/p1/p2/p3/p4",
},
}
for _, test := range tests {
opts := &Options{
RoutePrefix: test.routePrefix,
}
pathPrefix := tmplFuncs("", opts)["pathPrefix"].(func() string)
pp := pathPrefix()
testutil.Equals(t, test.pathPrefix, pp)
}
}
func TestDebugHandler(t *testing.T) {
for _, tc := range []struct {
prefix, url string