[ENHANCEMENT] Promtool: Adding labels to time series while creating tsdb blocks (#14403)

* feat: #14402 - Adding labels to time series while creating tsdb blocks

Signed-off-by: Suraj Patil <patilsuraj767@gmail.com>
This commit is contained in:
Suraj Patil 2024-08-28 07:42:24 +05:30 committed by GitHub
parent 9a813d5d0f
commit 7757794bb3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 71 additions and 9 deletions

View File

@ -85,7 +85,7 @@ func getCompatibleBlockDuration(maxBlockDuration int64) int64 {
return blockDuration return blockDuration
} }
func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool) (returnErr error) { func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesInAppender int, outputDir string, humanReadable, quiet bool, customLabels map[string]string) (returnErr error) {
blockDuration := getCompatibleBlockDuration(maxBlockDuration) blockDuration := getCompatibleBlockDuration(maxBlockDuration)
mint = blockDuration * (mint / blockDuration) mint = blockDuration * (mint / blockDuration)
@ -102,6 +102,8 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
nextSampleTs int64 = math.MaxInt64 nextSampleTs int64 = math.MaxInt64
) )
lb := labels.NewBuilder(labels.EmptyLabels())
for t := mint; t <= maxt; t += blockDuration { for t := mint; t <= maxt; t += blockDuration {
tsUpper := t + blockDuration tsUpper := t + blockDuration
if nextSampleTs != math.MaxInt64 && nextSampleTs >= tsUpper { if nextSampleTs != math.MaxInt64 && nextSampleTs >= tsUpper {
@ -162,7 +164,13 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
l := labels.Labels{} l := labels.Labels{}
p.Metric(&l) p.Metric(&l)
if _, err := app.Append(0, l, *ts, v); err != nil { lb.Reset(l)
for name, value := range customLabels {
lb.Set(name, value)
}
lbls := lb.Labels()
if _, err := app.Append(0, lbls, *ts, v); err != nil {
return fmt.Errorf("add sample: %w", err) return fmt.Errorf("add sample: %w", err)
} }
@ -221,13 +229,13 @@ func createBlocks(input []byte, mint, maxt, maxBlockDuration int64, maxSamplesIn
return nil return nil
} }
func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration) (err error) { func backfill(maxSamplesInAppender int, input []byte, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) (err error) {
p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps. p := textparse.NewOpenMetricsParser(input, nil) // Don't need a SymbolTable to get max and min timestamps.
maxt, mint, err := getMinAndMaxTimestamps(p) maxt, mint, err := getMinAndMaxTimestamps(p)
if err != nil { if err != nil {
return fmt.Errorf("getting min and max timestamp: %w", err) return fmt.Errorf("getting min and max timestamp: %w", err)
} }
if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet); err != nil { if err = createBlocks(input, mint, maxt, int64(maxBlockDuration/time.Millisecond), maxSamplesInAppender, outputDir, humanReadable, quiet, customLabels); err != nil {
return fmt.Errorf("block creation: %w", err) return fmt.Errorf("block creation: %w", err)
} }
return nil return nil

View File

@ -92,6 +92,7 @@ func TestBackfill(t *testing.T) {
Description string Description string
MaxSamplesInAppender int MaxSamplesInAppender int
MaxBlockDuration time.Duration MaxBlockDuration time.Duration
Labels map[string]string
Expected struct { Expected struct {
MinTime int64 MinTime int64
MaxTime int64 MaxTime int64
@ -636,6 +637,49 @@ http_requests_total{code="400"} 1024 7199
}, },
}, },
}, },
{
ToParse: `# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{code="200"} 1 1624463088.000
http_requests_total{code="200"} 2 1629503088.000
http_requests_total{code="200"} 3 1629863088.000
# EOF
`,
IsOk: true,
Description: "Sample with external labels.",
MaxSamplesInAppender: 5000,
MaxBlockDuration: 2048 * time.Hour,
Labels: map[string]string{"cluster_id": "123", "org_id": "999"},
Expected: struct {
MinTime int64
MaxTime int64
NumBlocks int
BlockDuration int64
Samples []backfillSample
}{
MinTime: 1624463088000,
MaxTime: 1629863088000,
NumBlocks: 2,
BlockDuration: int64(1458 * time.Hour / time.Millisecond),
Samples: []backfillSample{
{
Timestamp: 1624463088000,
Value: 1,
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200", "cluster_id", "123", "org_id", "999"),
},
{
Timestamp: 1629503088000,
Value: 2,
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200", "cluster_id", "123", "org_id", "999"),
},
{
Timestamp: 1629863088000,
Value: 3,
Labels: labels.FromStrings("__name__", "http_requests_total", "code", "200", "cluster_id", "123", "org_id", "999"),
},
},
},
},
{ {
ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds. ToParse: `# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
# TYPE rpc_duration_seconds summary # TYPE rpc_duration_seconds summary
@ -689,7 +733,7 @@ after_eof 1 2
outputDir := t.TempDir() outputDir := t.TempDir()
err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration) err := backfill(test.MaxSamplesInAppender, []byte(test.ToParse), outputDir, false, false, test.MaxBlockDuration, test.Labels)
if !test.IsOk { if !test.IsOk {
require.Error(t, err, test.Description) require.Error(t, err, test.Description)

View File

@ -253,6 +253,7 @@ func main() {
importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool() importQuiet := importCmd.Flag("quiet", "Do not print created blocks.").Short('q').Bool()
maxBlockDuration := importCmd.Flag("max-block-duration", "Maximum duration created blocks may span. Anything less than 2h is ignored.").Hidden().PlaceHolder("<duration>").Duration() maxBlockDuration := importCmd.Flag("max-block-duration", "Maximum duration created blocks may span. Anything less than 2h is ignored.").Hidden().PlaceHolder("<duration>").Duration()
openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.") openMetricsImportCmd := importCmd.Command("openmetrics", "Import samples from OpenMetrics input and produce TSDB blocks. Please refer to the storage docs for more details.")
openMetricsLabels := openMetricsImportCmd.Flag("label", "Label to attach to metrics. Can be specified multiple times. Example --label=label_name=label_value").StringMap()
importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String() importFilePath := openMetricsImportCmd.Arg("input file", "OpenMetrics file to read samples from.").Required().String()
importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String() importDBPath := openMetricsImportCmd.Arg("output directory", "Output directory for generated blocks.").Default(defaultDBPath).String()
importRulesCmd := importCmd.Command("rules", "Create blocks of data for new recording rules.") importRulesCmd := importCmd.Command("rules", "Create blocks of data for new recording rules.")
@ -408,7 +409,7 @@ func main() {
os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics))) os.Exit(checkErr(dumpSamples(ctx, *dumpOpenMetricsPath, *dumpOpenMetricsSandboxDirRoot, *dumpOpenMetricsMinTime, *dumpOpenMetricsMaxTime, *dumpOpenMetricsMatch, formatSeriesSetOpenMetrics)))
// TODO(aSquare14): Work on adding support for custom block size. // TODO(aSquare14): Work on adding support for custom block size.
case openMetricsImportCmd.FullCommand(): case openMetricsImportCmd.FullCommand():
os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration)) os.Exit(backfillOpenMetrics(*importFilePath, *importDBPath, *importHumanReadable, *importQuiet, *maxBlockDuration, *openMetricsLabels))
case importRulesCmd.FullCommand(): case importRulesCmd.FullCommand():
os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, *importRulesFiles...))) os.Exit(checkErr(importRules(serverURL, httpRoundTripper, *importRulesStart, *importRulesEnd, *importRulesOutputDir, *importRulesEvalInterval, *maxBlockDuration, *importRulesFiles...)))

View File

@ -823,7 +823,7 @@ func checkErr(err error) int {
return 0 return 0
} }
func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration) int { func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxBlockDuration time.Duration, customLabels map[string]string) int {
inputFile, err := fileutil.OpenMmapFile(path) inputFile, err := fileutil.OpenMmapFile(path)
if err != nil { if err != nil {
return checkErr(err) return checkErr(err)
@ -834,7 +834,7 @@ func backfillOpenMetrics(path, outputDir string, humanReadable, quiet bool, maxB
return checkErr(fmt.Errorf("create output dir: %w", err)) return checkErr(fmt.Errorf("create output dir: %w", err))
} }
return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration)) return checkErr(backfill(5000, inputFile.Bytes(), outputDir, humanReadable, quiet, maxBlockDuration, customLabels))
} }
func displayHistogram(dataType string, datas []int, total int) { func displayHistogram(dataType string, datas []int, total int) {

View File

@ -186,7 +186,7 @@ func TestTSDBDumpOpenMetricsRoundTrip(t *testing.T) {
dbDir := t.TempDir() dbDir := t.TempDir()
// Import samples from OM format // Import samples from OM format
err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour) err = backfill(5000, initialMetrics, dbDir, false, false, 2*time.Hour, map[string]string{})
require.NoError(t, err) require.NoError(t, err)
db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil) db, err := tsdb.Open(dbDir, nil, nil, tsdb.DefaultOptions(), nil)
require.NoError(t, err) require.NoError(t, err)

View File

@ -641,6 +641,15 @@ Import samples from OpenMetrics input and produce TSDB blocks. Please refer to t
###### Flags
| Flag | Description |
| --- | --- |
| <code class="text-nowrap">--label</code> | Label to attach to metrics. Can be specified multiple times. Example --label=label_name=label_value |
###### Arguments ###### Arguments
| Argument | Description | Default | Required | | Argument | Description | Default | Required |