Skip to content

Commit

Permalink
Add metadata on Kafka sink metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Naireen committed Feb 3, 2025
1 parent 4d644cf commit 67cd9a2
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ message MonitoringInfo {
SPANNER_TABLE_ID = 25 [(label_props) = { name: "SPANNER_TABLE_ID" }];
SPANNER_INSTANCE_ID = 26 [(label_props) = { name: "SPANNER_INSTANCE_ID" }];
SPANNER_QUERY_NAME = 27 [(label_props) = { name: "SPANNER_QUERY_NAME" }];
PER_WORKER_METRIC = 28 [(label_props) = { name: "PER_WORKER_METRIC" }];
}

// A set of key and value labels which define the scope of the metric. For
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,13 @@ public MetricUpdates getUpdates() {
.setLabel(MonitoringInfoConstants.Labels.NAME, metricKey.metricName().getName())
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, metricKey.stepName());
}

// Based on namespace, add per worker metrics label to enable separate runner based sink based
// processing.
if (metricName.getNamespace().equals("BigQuerySink")
|| metricName.getNamespace().equals("KafkaSink")) {
builder.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true");
}
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public static final class Labels {
public static final String SPANNER_DATABASE_ID = "SPANNER_DATABASE_ID";
public static final String SPANNER_INSTANCE_ID = "SPANNER_INSTANCE_ID";
public static final String SPANNER_QUERY_NAME = "SPANNER_QUERY_NAME";
public static final String PER_WORKER_METRIC = "PER_WORKER_METRIC";

static {
// Validate that compile time constants match the values stored in the protos.
Expand Down Expand Up @@ -150,6 +151,7 @@ public static final class Labels {
SPANNER_INSTANCE_ID.equals(extractLabel(MonitoringInfoLabels.SPANNER_INSTANCE_ID)));
checkArgument(
SPANNER_QUERY_NAME.equals(extractLabel(MonitoringInfoLabels.SPANNER_QUERY_NAME)));
checkArgument(PER_WORKER_METRIC.equals(extractLabel(MonitoringInfoLabels.PER_WORKER_METRIC)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,54 @@ public void testMonitoringInfosArePopulatedForUserCounters() {
assertThat(actualMonitoringInfos, containsInAnyOrder(builder1.build(), builder2.build()));
}

@Test
public void testMonitoringInfosLabelsArePopulatedForSinkCounter() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
CounterCell c1 = testObject.getCounter(MetricName.named("KafkaSink", "name1"));
CounterCell c2 = testObject.getCounter(MetricName.named("BigQuerySink", "name2"));
CounterCell c3 = testObject.getCounter(MetricName.named("PS", "name3"));

c1.inc(2L);
c2.inc(4L);
c3.inc(5L);

SimpleMonitoringInfoBuilder builder1 = new SimpleMonitoringInfoBuilder();
builder1
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "KafkaSink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name1")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(2)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

SimpleMonitoringInfoBuilder builder2 = new SimpleMonitoringInfoBuilder();
builder2
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "BigQuerySink")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name2")
.setLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, "true")
.setInt64SumValue(4)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

// Not in an supported namespace, so extra metadata isn't added.
SimpleMonitoringInfoBuilder builder3 = new SimpleMonitoringInfoBuilder();
builder3
.setUrn(MonitoringInfoConstants.Urns.USER_SUM_INT64)
.setLabel(MonitoringInfoConstants.Labels.NAMESPACE, "PS")
.setLabel(MonitoringInfoConstants.Labels.NAME, "name3")
.setInt64SumValue(5)
.setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, "step1");

ArrayList<MonitoringInfo> actualMonitoringInfos = new ArrayList<MonitoringInfo>();
for (MonitoringInfo mi : testObject.getMonitoringInfos()) {
actualMonitoringInfos.add(mi);
}

assertThat(
actualMonitoringInfos,
containsInAnyOrder(builder1.build(), builder2.build(), builder3.build()));
}

@Test
public void testMonitoringInfosArePopulatedForUserDistributions() {
MetricsContainerImpl testObject = new MetricsContainerImpl("step1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,6 +40,8 @@ public interface KafkaMetrics {

void updateKafkaMetrics();

void recordBacklogBytes(String topic, int partitionId, long backlog);

/** No-op implementation of {@code KafkaResults}. */
class NoOpKafkaMetrics implements KafkaMetrics {
private NoOpKafkaMetrics() {}
Expand All @@ -52,6 +55,9 @@ public void updateBacklogBytes(String topic, int partitionId, long elapsedTime)
@Override
public void updateKafkaMetrics() {}

@Override
public void recordBacklogBytes(String topic, int partitionId, long backlog) {};

private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();

static NoOpKafkaMetrics getInstance() {
Expand Down Expand Up @@ -113,6 +119,8 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
}

/**
* This is for tracking backlog bytes to be added to the Metric Container at a later time.
*
* @param topicName topicName
* @param partitionId partitionId
* @param backlog backlog for the specific partitionID of topicName
Expand Down Expand Up @@ -146,14 +154,31 @@ private void recordRpcLatencyMetrics() {
}
}

private void recordBacklogBytes() {
private void recordBacklogBytesInternal() {
for (Map.Entry<String, Long> backlogs : perTopicPartitionBacklogs().entrySet()) {
Gauge gauge =
KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey()));
gauge.set(backlogs.getValue());
}
}

/**
* This is for recording backlog bytes on the current thread.
*
* @param topicName topicName
* @param partitionId partitionId for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @param backlogBytes backlog for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
*/
@Override
public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) {
Gauge perPartion =
Metrics.gauge(
"KafkaSink", KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName());
perPartion.set(backlogBytes);
}

/**
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}
* containers. This function will only report metrics once per instance. Subsequent calls to
Expand All @@ -165,7 +190,7 @@ public void updateKafkaMetrics() {
LOG.warn("Updating stale Kafka metrics container");
return;
}
recordBacklogBytes();
recordBacklogBytesInternal();
recordRpcLatencyMetrics();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ public ProcessContinuation processElement(
}
}
}

backlogBytes.set(
(long)
(BigDecimal.valueOf(
Expand All @@ -558,6 +557,17 @@ public ProcessContinuation processElement(
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
kafkaResults.recordBacklogBytes(
kafkaSourceDescriptor.getTopic(),
kafkaSourceDescriptor.getPartition(),
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
}
}
}
Expand Down

0 comments on commit 67cd9a2

Please sign in to comment.