From 4597b747abd5f6b0f9835d006a9da5198f6a03f5 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:12:31 +0800 Subject: [PATCH] feat(quota): support broker quota for slow fetch (#2160) * feat(quota): introduce `SLOW_FETCH` broker quota Signed-off-by: Ning Yu * feat(quota): add slow fetch quota Signed-off-by: Ning Yu * test(quota): test broker slow fetch quota Signed-off-by: Ning Yu * test(quota): test zero quota value Signed-off-by: Ning Yu --------- Signed-off-by: Ning Yu --- .../kafka/server/quota/ClientQuotaType.java | 1 + .../scala/kafka/server/DelayedFetch.scala | 6 ++- .../scala/kafka/server/QuotaFactory.scala | 9 +++++ .../streamaspect/BrokerQuotaManager.scala | 31 ++++++++++++---- .../streamaspect/ElasticKafkaApis.scala | 14 +++++-- .../kafka/server/BrokerQuotaManagerTest.java | 37 ++++++++++++++++++- .../kafka/server/config/QuotaConfigs.java | 2 + .../config/BrokerQuotaManagerConfig.java | 10 +++++ 8 files changed, 97 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java index 5b55e3cd76..d5f8b27ced 100644 --- a/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java +++ b/clients/src/main/java/org/apache/kafka/server/quota/ClientQuotaType.java @@ -24,6 +24,7 @@ public enum ClientQuotaType { FETCH, REQUEST, // AutoMQ for Kafka inject start + SLOW_FETCH, REQUEST_RATE, // AutoMQ for Kafka inject end CONTROLLER_MUTATION diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index a3c5e64d93..6bb9d6257c 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -241,7 +241,6 @@ class DelayedFetch( error(s"Unexpected error in delayed fetch: $params $fetchInfos ", e) } } - ReadHint.clear() // AutoMQ for Kafka inject end val fetchPartitionData = logReadResults.map { case (tp, result) => @@ -252,6 +251,11 @@ class DelayedFetch( } responseCallback(fetchPartitionData) + // AutoMQ for Kafka inject start + // clear hint after callback as it will be used in the callback + // see {@link ElasticKafkaApis#handleFetchRequest#processResponseCallback} + ReadHint.clear() + // AutoMQ for Kafka inject end } } diff --git a/core/src/main/scala/kafka/server/QuotaFactory.scala b/core/src/main/scala/kafka/server/QuotaFactory.scala index 0d89b6d7b0..9a13d864f7 100644 --- a/core/src/main/scala/kafka/server/QuotaFactory.scala +++ b/core/src/main/scala/kafka/server/QuotaFactory.scala @@ -33,6 +33,10 @@ object QuotaType { case object Produce extends QuotaType case object Request extends QuotaType // AutoMQ for Kafka inject start + /** + * Quota type for slow fetch throughput limiting. + */ + case object SlowFetch extends QuotaType /** * Quota type for request rate limiting. */ @@ -51,6 +55,7 @@ object QuotaType { case QuotaType.Produce => ClientQuotaType.PRODUCE case QuotaType.Request => ClientQuotaType.REQUEST // AutoMQ for Kafka inject start + case QuotaType.SlowFetch => ClientQuotaType.SLOW_FETCH case QuotaType.RequestRate => ClientQuotaType.REQUEST_RATE // AutoMQ for Kafka inject end case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION @@ -68,6 +73,10 @@ object QuotaType { QuotaType.Produce } + def slowFetch(): QuotaType = { + QuotaType.SlowFetch + } + def requestRate(): QuotaType = { QuotaType.RequestRate } diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index b598a2206d..aac66a22d1 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -56,10 +56,12 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, request: RequestChannel.Request, value: Double, timeMs: Long): Int = { if (!config.quotaEnabled) { + // Quota is disabled, no need to throttle return 0 } if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) { + // Client is in the white list, no need to throttle return 0 } @@ -67,11 +69,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, } protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = { - if (quotaType == QuotaType.RequestRate) { - QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs) - } else { - QuotaUtils.throttleTime(e, timeMs) + quotaType match { + case QuotaType.SlowFetch | QuotaType.RequestRate => + QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs) + case _ => + QuotaUtils.throttleTime(e, timeMs) } + } private def isInWhiteList(principal: KafkaPrincipal, clientId: String, listenerName: String): Boolean = { @@ -114,9 +118,11 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, metrics.removeSensor(getQuotaSensorName(QuotaType.RequestRate, metricsTags)) metrics.removeSensor(getQuotaSensorName(QuotaType.Produce, metricsTags)) metrics.removeSensor(getQuotaSensorName(QuotaType.Fetch, metricsTags)) + metrics.removeSensor(getQuotaSensorName(QuotaType.SlowFetch, metricsTags)) metrics.removeSensor(getThrottleTimeSensorName(QuotaType.RequestRate, metricsTags)) metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Produce, metricsTags)) metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Fetch, metricsTags)) + metrics.removeSensor(getThrottleTimeSensorName(QuotaType.SlowFetch, metricsTags)) return } @@ -136,6 +142,11 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, if (fetchMetric != null) { fetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch))) } + + val slowFetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.SlowFetch, metricsTags)) + if (slowFetchMetric != null) { + slowFetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.SlowFetch))) + } } } @@ -145,6 +156,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, case QuotaType.RequestRate => config.requestRateQuota(quota) case QuotaType.Produce => config.produceQuota(quota) case QuotaType.Fetch => config.fetchQuota(quota) + case QuotaType.SlowFetch => config.slowFetchQuota(quota) case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType") } @@ -178,10 +190,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, s"$quotaType-${metricTagsToSensorSuffix(metricTags)}" private def quotaLimit(quotaType: QuotaType): Double = { - if (quotaType == QuotaType.RequestRate) config.requestRateQuota - else if (quotaType == QuotaType.Produce) config.produceQuota - else if (quotaType == QuotaType.Fetch) config.fetchQuota - else throw new IllegalArgumentException(s"Unknown quota type $quotaType") + quotaType match { + case QuotaType.RequestRate => config.requestRateQuota + case QuotaType.Produce => config.produceQuota + case QuotaType.Fetch => config.fetchQuota + case QuotaType.SlowFetch => config.slowFetchQuota + case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType") + } } protected def clientQuotaMetricName(quotaType: QuotaType, quotaMetricTags: Map[String, String]): MetricName = { diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala index 9e4bd7b5da..f4856364db 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala @@ -5,7 +5,7 @@ import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import com.yammer.metrics.core.Histogram import kafka.automq.zonerouter.{ClientIdMetadata, NoopProduceRouter, ProduceRouter} import kafka.coordinator.transaction.TransactionCoordinator -import kafka.log.streamaspect.ElasticLogManager +import kafka.log.streamaspect.{ElasticLogManager, ReadHint} import kafka.metrics.KafkaMetricsUtil import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers @@ -696,24 +696,32 @@ class ElasticKafkaApis( val timeMs = time.milliseconds() // AutoMQ for Kafka inject start + val isSlowRead = !ReadHint.isFastRead + val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs) val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) - val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs) val brokerBandwidthThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Fetch, request, responseSize, timeMs) + val brokerSlowFetchThrottleTimeMs = if (isSlowRead) quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.SlowFetch, request, responseSize, timeMs) else 0 + val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs) - val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0) + val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerSlowFetchThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0) if (maxThrottleTimeMs > 0) { request.apiThrottleTimeMs = maxThrottleTimeMs // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value // from the fetch quota because we are going to return an empty response. quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) quotas.broker.unrecordQuotaSensor(QuotaType.Fetch, responseSize, timeMs) + if (isSlowRead) { + quotas.broker.unrecordQuotaSensor(QuotaType.SlowFetch, responseSize, timeMs) + } if (bandwidthThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(quotas.fetch, request, bandwidthThrottleTimeMs) } else if (requestThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(quotas.request, request, requestThrottleTimeMs) } else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(QuotaType.Fetch, quotas.broker, request, brokerBandwidthThrottleTimeMs) + } else if (brokerSlowFetchThrottleTimeMs == maxThrottleTimeMs) { + requestHelper.throttle(QuotaType.SlowFetch, quotas.broker, request, brokerSlowFetchThrottleTimeMs) } else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) { requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs) } diff --git a/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java b/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java index dcc0a9dc41..98dc4659f1 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java +++ b/core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java @@ -105,8 +105,24 @@ public void testQuota() { result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.fetch(), request, 500, time + second2millis); assertEquals(0, result); - // Test request quota + // Test slow fetch quota properties.put(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, 0); + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 100); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time); + assertEquals(0, result); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + 10); + assertEquals(0, result); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + second2millis); + assertTrue(result > 0); + + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 1000); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 500, time + second2millis); + assertEquals(0, result); + + // Test request quota + properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 0); properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 1); brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time); @@ -122,6 +138,25 @@ public void testQuota() { assertEquals(0, result); } + @Test + public void testZeroQuota() { + long result; + long time = this.time.milliseconds(); + + // enable quota + Properties properties = new Properties(); + properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true); + brokerQuotaManager.updateQuotaConfigs(Option.apply(properties)); + + brokerQuotaManager.updateQuota(QuotaType.requestRate(), 0); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time); + assertEquals(1000, result); + + brokerQuotaManager.updateQuota(QuotaType.slowFetch(), 0); + result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 1, time); + assertEquals(1000, result); + } + @Test public void testUpdateQuota() { int result; diff --git a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java index db5ecdd872..414a1acff8 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java @@ -107,6 +107,7 @@ public class QuotaConfigs { public static final String BROKER_QUOTA_ENABLED_CONFIG = "broker.quota.enabled"; public static final String BROKER_QUOTA_PRODUCE_BYTES_CONFIG = "broker.quota.produce.bytes"; public static final String BROKER_QUOTA_FETCH_BYTES_CONFIG = "broker.quota.fetch.bytes"; + public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG = "broker.quota.slow.fetch.bytes"; public static final String BROKER_QUOTA_REQUEST_RATE_CONFIG = "broker.quota.request.rate"; public static final String BROKER_QUOTA_WHITE_LIST_USER_CONFIG = "broker.quota.white.list.user"; public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_CONFIG = "broker.quota.white.list.client.id"; @@ -117,6 +118,7 @@ public class QuotaConfigs { public static final String BROKER_QUOTA_ENABLED_DOC = "Enable broker quota."; public static final String BROKER_QUOTA_PRODUCE_BYTES_DOC = "The maximum bytes send by producer in single window."; public static final String BROKER_QUOTA_FETCH_BYTES_DOC = "The maximum bytes receive by consumer in single window."; + public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_DOC = "The maximum bytes receive by slow fetch consumer in single window."; public static final String BROKER_QUOTA_REQUEST_RATE_DOC = "The maximum request count send by client in single window."; public static final String BROKER_QUOTA_WHITE_LIST_USER_DOC = "Broker quota white list for user."; public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_DOC = "Broker quota white list for client id."; diff --git a/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java index ddd6e071f2..a09b0e0f44 100644 --- a/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/BrokerQuotaManagerConfig.java @@ -26,6 +26,7 @@ public class BrokerQuotaManagerConfig extends ClientQuotaManagerConfig { private boolean quotaEnabled = false; private double produceQuota = Double.MAX_VALUE; private double fetchQuota = Double.MAX_VALUE; + private double slowFetchQuota = Double.MAX_VALUE; private double requestRateQuota = Double.MAX_VALUE; private List userWhiteList = List.of(); @@ -42,6 +43,7 @@ public void update(Properties props) { quotaEnabled = getBoolean(map, QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, quotaEnabled); produceQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, produceQuota); fetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, fetchQuota); + slowFetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, slowFetchQuota); requestRateQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestRateQuota); String userWhiteListProp = props.getProperty(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG); @@ -84,6 +86,14 @@ public void fetchQuota(double fetchQuota) { this.fetchQuota = fetchQuota; } + public double slowFetchQuota() { + return slowFetchQuota; + } + + public void slowFetchQuota(double slowFetchQuota) { + this.slowFetchQuota = slowFetchQuota; + } + public double requestRateQuota() { return requestRateQuota; }