From bfc0e61a4d89b48ca3bdb1ff63620b283fc352f2 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Tue, 26 Nov 2024 10:04:34 +0800 Subject: [PATCH] fix(quota): limit the max throttle time (#2180) Signed-off-by: Ning Yu --- .../server/streamaspect/BrokerQuotaManager.scala | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index bf94256718..5107008d32 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -85,14 +85,8 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs) } - protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = { - quotaType match { - case QuotaType.SlowFetch | QuotaType.RequestRate => - QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs) - case _ => - QuotaUtils.throttleTime(e, timeMs) - } - + override protected def throttleTime(e: QuotaViolationException, timeMs: Long): Long = { + QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs) } private def isInternalClient(clientId: String): Boolean = { @@ -119,7 +113,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, 0 } catch { case e: QuotaViolationException => - val throttleTimeMs = throttleTime(quotaType, e, timeMs).toInt + val throttleTimeMs = throttleTime(e, timeMs).toInt debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)") throttleTimeMs }