Skip to content

Commit

Permalink
receiver/prometheusreceiver: allow cumulative resets when using the a…
Browse files Browse the repository at this point in the history
…djuster

Fixes #37717

Prior to this change, when the start time metric adjuster was used all
points used the same start timestamp. Even after a reset, which makes no
sense for a counter which is not supposed to go down.

Instead this change makes it so that when a reset is detected, the the
reset points timestamp is used as the next start time.

Signed-off-by: Ridwan Sharif <ridwanmsharif@google.com>
  • Loading branch information
ridwanmsharif committed Feb 5, 2025
1 parent 10a547a commit 6b0261b
Show file tree
Hide file tree
Showing 5 changed files with 281 additions and 5 deletions.
27 changes: 27 additions & 0 deletions .chloggen/metricadjuster-reset.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "bug_fix"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: prometheusreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Start time metric adjuster now handles reset points correctly

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37717]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/appendable.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewAppendable(
if !useStartTimeMetric {
metricAdjuster = NewInitialPointAdjuster(set.Logger, gcInterval, useCreatedMetric)
} else {
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex)
metricAdjuster = NewStartTimeMetricAdjuster(set.Logger, startTimeMetricRegex, gcInterval)
}

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ReceiverID: set.ID, Transport: transport, ReceiverCreateSettings: set})
Expand Down
21 changes: 21 additions & 0 deletions receiver/prometheusreceiver/internal/metrics_adjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ type initialPointAdjuster struct {
jobsMap *JobsMap
logger *zap.Logger
useCreatedMetric bool
// usePointTimeForReset forces the adjuster to use the timestamp of the
// point instead of the start timestamp when it detects resets. This is
// useful when this adjuster is used after another adjuster that
// pre-populated start times.
usePointTimeForReset bool
}

// NewInitialPointAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on the initial received points.
Expand Down Expand Up @@ -347,6 +352,10 @@ func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current
if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {
// reset re-initialize everything.
tsi.histogram.startTime = currentDist.StartTimestamp()
if a.usePointTimeForReset {
tsi.histogram.startTime = currentDist.Timestamp()
currentDist.SetStartTimestamp(tsi.histogram.startTime)
}
tsi.histogram.previousCount = currentDist.Count()
tsi.histogram.previousSum = currentDist.Sum()
continue
Expand Down Expand Up @@ -395,6 +404,10 @@ func (a *initialPointAdjuster) adjustMetricExponentialHistogram(tsm *timeseriesM
if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum {
// reset re-initialize everything.
tsi.histogram.startTime = currentDist.StartTimestamp()
if a.usePointTimeForReset {
tsi.histogram.startTime = currentDist.Timestamp()
currentDist.SetStartTimestamp(tsi.histogram.startTime)
}
tsi.histogram.previousCount = currentDist.Count()
tsi.histogram.previousSum = currentDist.Sum()
continue
Expand Down Expand Up @@ -436,6 +449,10 @@ func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetr
if currentSum.DoubleValue() < tsi.number.previousValue {
// reset re-initialize everything.
tsi.number.startTime = currentSum.StartTimestamp()
if a.usePointTimeForReset {
tsi.number.startTime = currentSum.Timestamp()
currentSum.SetStartTimestamp(tsi.number.startTime)
}
tsi.number.previousValue = currentSum.DoubleValue()
continue
}
Expand Down Expand Up @@ -482,6 +499,10 @@ func (a *initialPointAdjuster) adjustMetricSummary(tsm *timeseriesMap, current p
currentSummary.Sum() < tsi.summary.previousSum) {
// reset re-initialize everything.
tsi.summary.startTime = currentSummary.StartTimestamp()
if a.usePointTimeForReset {
tsi.summary.startTime = currentSummary.Timestamp()
currentSummary.SetStartTimestamp(tsi.summary.startTime)
}
tsi.summary.previousCount = currentSummary.Count()
tsi.summary.previousSum = currentSummary.Sum()
continue
Expand Down
13 changes: 11 additions & 2 deletions receiver/prometheusreceiver/internal/starttimemetricadjuster.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,21 @@ func init() {

type startTimeMetricAdjuster struct {
startTimeMetricRegex *regexp.Regexp
resetPointAdjuster *initialPointAdjuster
logger *zap.Logger
}

// NewStartTimeMetricAdjuster returns a new MetricsAdjuster that adjust metrics' start times based on a start time metric.
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp) MetricsAdjuster {
func NewStartTimeMetricAdjuster(logger *zap.Logger, startTimeMetricRegex *regexp.Regexp, gcInterval time.Duration) MetricsAdjuster {
resetPointAdjuster := &initialPointAdjuster{
jobsMap: NewJobsMap(gcInterval),
logger: logger,
useCreatedMetric: false,
usePointTimeForReset: true,
}
return &startTimeMetricAdjuster{
startTimeMetricRegex: startTimeMetricRegex,
resetPointAdjuster: resetPointAdjuster,
logger: logger,
}
}
Expand Down Expand Up @@ -110,7 +118,8 @@ func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) erro
}
}

return nil
// Handle resets.
return stma.resetPointAdjuster.AdjustMetrics(metrics)
}

func (stma *startTimeMetricAdjuster) getStartTime(metrics pmetric.Metrics) (float64, error) {
Expand Down
223 changes: 221 additions & 2 deletions receiver/prometheusreceiver/internal/starttimemetricadjuster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
semconv "go.opentelemetry.io/collector/semconv/v1.27.0"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
Expand Down Expand Up @@ -116,11 +117,17 @@ func TestStartTimeMetricMatch(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex)
gcInterval := 10 * time.Millisecond
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, gcInterval)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
}

// We need to make sure the job and instance labels are set before the adjuster is used.
pmetrics := tt.inputs
pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, "0")
pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, "job")
assert.NoError(t, stma.AdjustMetrics(tt.inputs))
for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ {
rm := tt.inputs.ResourceMetrics().At(i)
Expand Down Expand Up @@ -210,7 +217,8 @@ func TestStartTimeMetricFallback(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testutil.SetFeatureGateForTest(t, useCollectorStartTimeFallbackGate, true)
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex)
gcInterval := 10 * time.Millisecond
stma := NewStartTimeMetricAdjuster(zap.NewNop(), tt.startTimeMetricRegex, gcInterval)
if tt.expectedErr != nil {
assert.ErrorIs(t, stma.AdjustMetrics(tt.inputs), tt.expectedErr)
return
Expand All @@ -220,6 +228,10 @@ func TestStartTimeMetricFallback(t *testing.T) {
// directly.
approximateCollectorStartTime = mockStartTime

// We need to make sure the job and instance labels are set before the adjuster is used.
pmetrics := tt.inputs
pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceInstanceID, "0")
pmetrics.ResourceMetrics().At(0).Resource().Attributes().PutStr(semconv.AttributeServiceName, "job")
assert.NoError(t, stma.AdjustMetrics(tt.inputs))
for i := 0; i < tt.inputs.ResourceMetrics().Len(); i++ {
rm := tt.inputs.ResourceMetrics().At(i)
Expand Down Expand Up @@ -250,3 +262,210 @@ func TestStartTimeMetricFallback(t *testing.T) {
})
}
}

func TestFallbackAndReset(t *testing.T) {
mockStartTime := time.Now().Add(-10 * time.Hour).Truncate(time.Second)
mockTimestamp := pcommon.NewTimestampFromTime(mockStartTime)
t1 := pcommon.Timestamp(126 * 1e9)
t2 := pcommon.NewTimestampFromTime(t1.AsTime().Add(1 * time.Hour))
t3 := pcommon.NewTimestampFromTime(t2.AsTime().Add(1 * time.Hour))
t4 := pcommon.NewTimestampFromTime(t3.AsTime().Add(1 * time.Hour))
t5 := pcommon.NewTimestampFromTime(t4.AsTime().Add(1 * time.Hour))
tests := []struct {
name string
useFallback bool
scenario []*metricsAdjusterTest
}{
{
name: "sum no fallback and reset",
useFallback: false,
scenario: []*metricsAdjusterTest{
{
description: "Sum: round 1 - initial instance, start time is established",
metrics: metrics(
sumMetric("test_sum", doublePoint(nil, t1, t1, 44)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t1, t1, float64(mockTimestamp.AsTime().Unix()))),
),
adjusted: metrics(
sumMetric("test_sum", doublePoint(nil, mockTimestamp, t1, 44)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t1, t1, float64(mockTimestamp.AsTime().Unix()))),
),
},
{
description: "Sum: round 2 - instance reset (value less than previous value), start time is reset",
metrics: metrics(
sumMetric("test_sum", doublePoint(nil, t2, t2, 33)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))),
),
adjusted: metrics(
sumMetric("test_sum", doublePoint(nil, t2, t2, 33)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))),
),
},
{
description: "Sum: round 3 - instance adjusted based on round 2",
metrics: metrics(
sumMetric("test_sum", doublePoint(nil, t3, t3, 55)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))),
),
adjusted: metrics(
sumMetric("test_sum", doublePoint(nil, t2, t3, 55)),
gaugeMetric("process_start_time_seconds", doublePoint(nil, t2, t2, float64(mockTimestamp.AsTime().Unix()))),
),
},
},
},
{
name: "sum fallback and reset",
useFallback: true,
scenario: []*metricsAdjusterTest{
{
description: "Sum: round 1 - initial instance, start time is established",
metrics: metrics(sumMetric("test_sum", doublePoint(nil, t1, t1, 44))),
adjusted: metrics(sumMetric("test_sum", doublePoint(nil, mockTimestamp, t1, 44))),
},
{
description: "Sum: round 2 - instance adjusted based on round 1",
metrics: metrics(sumMetric("test_sum", doublePoint(nil, t2, t2, 66))),
adjusted: metrics(sumMetric("test_sum", doublePoint(nil, mockTimestamp, t2, 66))),
},
{
description: "Sum: round 3 - instance reset (value less than previous value), start time is reset",
metrics: metrics(sumMetric("test_sum", doublePoint(nil, t3, t3, 55))),
adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t3, 55))),
},
{
description: "Sum: round 4 - instance adjusted based on round 3",
metrics: metrics(sumMetric("test_sum", doublePoint(nil, t4, t4, 72))),
adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t4, 72))),
},
{
description: "Sum: round 5 - instance adjusted based on round 4",
metrics: metrics(sumMetric("test_sum", doublePoint(nil, t5, t5, 72))),
adjusted: metrics(sumMetric("test_sum", doublePoint(nil, t3, t5, 72))),
},
},
},
{
name: "gauge fallback and reset",
useFallback: true,
scenario: []*metricsAdjusterTest{
{
description: "Gauge: round 1 - gauge not adjusted",
metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t1, t1, 44))),
adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t1, t1, 44))),
},
{
description: "Gauge: round 2 - gauge not adjusted",
metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t2, t2, 66))),
adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t2, t2, 66))),
},
{
description: "Gauge: round 3 - value less than previous value - gauge is not adjusted",
metrics: metrics(gaugeMetric("test_gauge", doublePoint(nil, t3, t3, 55))),
adjusted: metrics(gaugeMetric("test_gauge", doublePoint(nil, t3, t3, 55))),
},
},
},
{
name: "histogram fallback and reset",
useFallback: true,
scenario: []*metricsAdjusterTest{
{
description: "Histogram: round 1 - initial instance, start time is established",
metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t1, t1, bounds0, []uint64{4, 2, 3, 7}))),
adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, mockTimestamp, t1, bounds0, []uint64{4, 2, 3, 7}))),
}, {
description: "Histogram: round 2 - instance adjusted based on round 1",
metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t2, t2, bounds0, []uint64{6, 3, 4, 8}))),
adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, mockTimestamp, t2, bounds0, []uint64{6, 3, 4, 8}))),
}, {
description: "Histogram: round 3 - instance reset (value less than previous value), start time is reset",
metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t3, bounds0, []uint64{5, 3, 2, 7}))),
adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t3, bounds0, []uint64{5, 3, 2, 7}))),
}, {
description: "Histogram: round 4 - instance adjusted based on round 3",
metrics: metrics(histogramMetric("test_histogram", histogramPoint(nil, t4, t4, bounds0, []uint64{7, 4, 2, 12}))),
adjusted: metrics(histogramMetric("test_histogram", histogramPoint(nil, t3, t4, bounds0, []uint64{7, 4, 2, 12}))),
},
},
},
{
name: "exponential histogram fallback and reset",
useFallback: true,
scenario: []*metricsAdjusterTest{
{
description: "Exponential Histogram: round 1 - initial instance, start time is established",
metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))),
adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, mockTimestamp, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))),
},
{
description: "Exponential Histogram: round 2 - instance adjusted based on round 1",
metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t2, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))),
adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, mockTimestamp, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))),
}, {
description: "Exponential Histogram: round 3 - instance reset (value less than previous value), start time is reset",
metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))),
adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))),
}, {
description: "Exponential Histogram: round 4 - instance adjusted based on round 3",
metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t4, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))),
adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(nil, t3, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))),
},
},
},
{
name: "summary fallback and reset",
useFallback: true,
scenario: []*metricsAdjusterTest{
{
description: "Summary: round 1 - initial instance, start time is established",
metrics: metrics(
summaryMetric("test_summary", summaryPoint(nil, t1, t1, 10, 40, percent0, []float64{1, 5, 8})),
),
adjusted: metrics(
summaryMetric("test_summary", summaryPoint(nil, mockTimestamp, t1, 10, 40, percent0, []float64{1, 5, 8})),
),
},
{
description: "Summary: round 2 - instance adjusted based on round 1",
metrics: metrics(
summaryMetric("test_summary", summaryPoint(nil, t2, t2, 15, 70, percent0, []float64{7, 44, 9})),
),
adjusted: metrics(
summaryMetric("test_summary", summaryPoint(nil, mockTimestamp, t2, 15, 70, percent0, []float64{7, 44, 9})),
),
},
{
description: "Summary: round 3 - instance reset (count less than previous), start time is reset",
metrics: metrics(
summaryMetric("test_summary", summaryPoint(nil, t3, t3, 12, 66, percent0, []float64{3, 22, 5})),
),
adjusted: metrics(
summaryMetric("test_summary", summaryPoint(nil, t3, t3, 12, 66, percent0, []float64{3, 22, 5})),
),
},
{
description: "Summary: round 4 - instance adjusted based on round 3",
metrics: metrics(
summaryMetric("test_summary", summaryPoint(nil, t4, t4, 14, 96, percent0, []float64{9, 47, 8})),
),
adjusted: metrics(
summaryMetric("test_summary", summaryPoint(nil, t3, t4, 14, 96, percent0, []float64{9, 47, 8})),
),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testutil.SetFeatureGateForTest(t, useCollectorStartTimeFallbackGate, tt.useFallback)
gcInterval := 10 * time.Millisecond
stma := NewStartTimeMetricAdjuster(zap.NewNop(), nil, gcInterval)
// To test that the adjuster is using the fallback correctly, override the fallback time to use
// directly.
approximateCollectorStartTime = mockStartTime
runScript(t, stma, "job", "0", tt.scenario)
})
}
}

0 comments on commit 6b0261b

Please sign in to comment.