Skip to content

Commit

Permalink
feat(quota): support to update broker request rate quota (#2158)
Browse files Browse the repository at this point in the history
* refactor(quota): refactor `maybeRecordAndGetThrottleTimeMs`

Signed-off-by: Ning Yu <ningyu@automq.com>

* fix(quota): throttle the produce request whatever the acks is

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor(quota): separate `Request` in `ClientQuotaManager` and `RequestRate` in `BrokerQuotaManager`

Signed-off-by: Ning Yu <ningyu@automq.com>

* sytle: fix lint

Signed-off-by: Ning Yu <ningyu@automq.com>

* feat(quota): support to update broker request rate quota

Signed-off-by: Ning Yu <ningyu@automq.com>

* test(quota): test update quota

Signed-off-by: Ning Yu <ningyu@automq.com>

---------

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Nov 14, 2024
1 parent 5eea51d commit 54ea5d4
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,8 @@ public enum ClientQuotaType {
PRODUCE,
FETCH,
REQUEST,
// AutoMQ for Kafka inject start
REQUEST_RATE,
// AutoMQ for Kafka inject end
CONTROLLER_MUTATION
}
15 changes: 13 additions & 2 deletions core/src/main/scala/kafka/server/QuotaFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ object QuotaType {
case object Fetch extends QuotaType
case object Produce extends QuotaType
case object Request extends QuotaType
// AutoMQ for Kafka inject start
/**
* Quota type for request rate limiting.
*/
case object RequestRate extends QuotaType
// AutoMQ for Kafka inject end
case object ControllerMutation extends QuotaType
case object LeaderReplication extends QuotaType
case object FollowerReplication extends QuotaType
Expand All @@ -44,11 +50,15 @@ object QuotaType {
case QuotaType.Fetch => ClientQuotaType.FETCH
case QuotaType.Produce => ClientQuotaType.PRODUCE
case QuotaType.Request => ClientQuotaType.REQUEST
// AutoMQ for Kafka inject start
case QuotaType.RequestRate => ClientQuotaType.REQUEST_RATE
// AutoMQ for Kafka inject end
case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
case _ => throw new IllegalArgumentException(s"Not a client quota type: $quotaType")
}
}

// AutoMQ for Kafka inject start
// for test
def fetch(): QuotaType = {
QuotaType.Fetch
Expand All @@ -58,9 +68,10 @@ object QuotaType {
QuotaType.Produce
}

def request(): QuotaType = {
QuotaType.Request
def requestRate(): QuotaType = {
QuotaType.RequestRate
}
// AutoMQ for Kafka inject end
}

sealed trait QuotaType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import kafka.utils.QuotaUtils
import org.apache.kafka.common.MetricName
import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Rate}
import org.apache.kafka.common.metrics.{Metrics, Quota, QuotaViolationException, Sensor}
import org.apache.kafka.common.requests.RequestContext
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
Expand Down Expand Up @@ -60,11 +59,15 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
return 0
}

maybeRecordAndGetThrottleTimeMs(quotaType, request.session, request.context, value, timeMs)
if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) {
return 0
}

maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
}

protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = {
if (quotaType == QuotaType.Request) {
if (quotaType == QuotaType.RequestRate) {
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
} else {
QuotaUtils.throttleTime(e, timeMs)
Expand All @@ -84,11 +87,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
}
}

def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, session: Session, context: RequestContext, value: Double,
timeMs: Long): Int = {
if (isInWhiteList(session.principal, context.clientId(), context.listenerName())) {
return 0
}
def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, value: Double, timeMs: Long): Int = {
val clientSensors = getOrCreateQuotaSensors(quotaType)
try {
clientSensors.quotaSensor.record(value, timeMs, true)
Expand All @@ -112,34 +111,51 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
whiteListCache.clear()

if (!config.quotaEnabled) {
metrics.removeSensor(getQuotaSensorName(QuotaType.Request, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.RequestRate, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.Produce, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.Fetch, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Request, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.RequestRate, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Produce, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Fetch, metricsTags))
return
}

val allMetrics = metrics.metrics()

val requestMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Request, metricsTags))
if (requestMetrics != null) {
requestMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Request)))
val requestRateMetric = allMetrics.get(clientQuotaMetricName(QuotaType.RequestRate, metricsTags))
if (requestRateMetric != null) {
requestRateMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.RequestRate)))
}

val produceMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags))
if (produceMetrics != null) {
produceMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce)))
val produceMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Produce, metricsTags))
if (produceMetric != null) {
produceMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Produce)))
}

val fetchMetrics = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags))
if (fetchMetrics != null) {
fetchMetrics.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch)))
val fetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.Fetch, metricsTags))
if (fetchMetric != null) {
fetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch)))
}
}
}

def updateQuota(quotaType: QuotaType, quota: Double): Unit = {
// update the quota in the config first to make sure the new quota will be used if {@link #updateQuotaMetricConfigs} is called
quotaType match {
case QuotaType.RequestRate => config.requestRateQuota(quota)
case QuotaType.Produce => config.produceQuota(quota)
case QuotaType.Fetch => config.fetchQuota(quota)
case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType")
}

// update the metric config
val allMetrics = metrics.metrics()
val metric = allMetrics.get(clientQuotaMetricName(quotaType, metricsTags))
if (metric != null) {
metric.config(getQuotaMetricConfig(quotaLimit(quotaType)))
}
}

def throttle(
quotaType: QuotaType,
throttleCallback: ThrottleCallback,
Expand All @@ -162,7 +178,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"

private def quotaLimit(quotaType: QuotaType): Double = {
if (quotaType == QuotaType.Request) config.requestQuota
if (quotaType == QuotaType.RequestRate) config.requestRateQuota
else if (quotaType == QuotaType.Produce) config.produceQuota
else if (quotaType == QuotaType.Fetch) config.fetchQuota
else throw new IllegalArgumentException(s"Unknown quota type $quotaType")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,7 @@ class ElasticKafkaApis(
val requestThrottleTimeMs =
if (produceRequest.acks == 0) 0
else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val brokerRequestThrottleTimeMs =
if (produceRequest.acks == 0) 0
else quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs)
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)
val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
Expand All @@ -325,7 +323,7 @@ class ElasticKafkaApis(
} else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.Produce, quotas.broker, request, brokerBandwidthThrottleTimeMs)
} else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs)
requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs)
}
}
// AutoMQ for Kafka inject end
Expand Down Expand Up @@ -700,7 +698,7 @@ class ElasticKafkaApis(
// AutoMQ for Kafka inject start
val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Request, request, 1, timeMs)
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)
val brokerBandwidthThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Fetch, request, responseSize, timeMs)

val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
Expand All @@ -717,7 +715,7 @@ class ElasticKafkaApis(
} else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.Fetch, quotas.broker, request, brokerBandwidthThrottleTimeMs)
} else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.Request, quotas.broker, request, brokerRequestThrottleTimeMs)
requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs)
}
// AutoMQ for Kafka inject end

Expand Down
54 changes: 49 additions & 5 deletions core/src/test/scala/unit/kafka/server/BrokerQuotaManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,67 @@ public void testQuota() {
properties.put(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, 0);
properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 1);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
assertEquals(0, result);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + 10);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
assertEquals(0, result);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 1, time + second2millis);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + second2millis);
assertTrue(result > 0);

properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 10);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.request(), request, 0, time + second2millis);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 0, time + second2millis);
assertEquals(0, result);
}

@Test
public void testUpdateQuota() {
int result;
long time = this.time.milliseconds();

// enable quota
Properties properties = new Properties();
properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 1 / 2000ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
assertEquals(0, result);
// rate = 2 / 2010ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 2);
// rate = 4 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertEquals(0, result);
// rate = 5 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertEquals(0, result);
// rate = 6 / 2999ms > 2
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 5 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999);
assertEquals(1000, result);
// rate = 2 / 2000ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 1);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 2999);
assertEquals(1, result);
}

@Test
public void testThrottle() {
AtomicInteger throttleCounter = new AtomicInteger(0);
brokerQuotaManager.throttle(QuotaType.request(), new ThrottleCallback() {
brokerQuotaManager.throttle(QuotaType.requestRate(), new ThrottleCallback() {
@Override
public void startThrottling() {
throttleCounter.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class BrokerQuotaManagerConfig extends ClientQuotaManagerConfig {
private boolean quotaEnabled = false;
private double produceQuota = Double.MAX_VALUE;
private double fetchQuota = Double.MAX_VALUE;
private double requestQuota = Double.MAX_VALUE;
private double requestRateQuota = Double.MAX_VALUE;

private List<String> userWhiteList = List.of();
private List<String> clientIdWhiteList = List.of();
Expand All @@ -42,7 +42,7 @@ public void update(Properties props) {
quotaEnabled = getBoolean(map, QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, quotaEnabled);
produceQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, produceQuota);
fetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, fetchQuota);
requestQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestQuota);
requestRateQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestRateQuota);

String userWhiteListProp = props.getProperty(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG);
if (null != userWhiteListProp && !userWhiteListProp.isBlank()) {
Expand Down Expand Up @@ -72,12 +72,24 @@ public double produceQuota() {
return produceQuota;
}

public void produceQuota(double produceQuota) {
this.produceQuota = produceQuota;
}

public double fetchQuota() {
return fetchQuota;
}

public double requestQuota() {
return requestQuota;
public void fetchQuota(double fetchQuota) {
this.fetchQuota = fetchQuota;
}

public double requestRateQuota() {
return requestRateQuota;
}

public void requestRateQuota(double requestRateQuota) {
this.requestRateQuota = requestRateQuota;
}

public List<String> userWhiteList() {
Expand Down

0 comments on commit 54ea5d4

Please sign in to comment.