Skip to content

Commit

Permalink
feat(quota): support broker quota for slow fetch (#2160)
Browse files Browse the repository at this point in the history
* feat(quota): introduce `SLOW_FETCH` broker quota

Signed-off-by: Ning Yu <ningyu@automq.com>

* feat(quota): add slow fetch quota

Signed-off-by: Ning Yu <ningyu@automq.com>

* test(quota): test broker slow fetch quota

Signed-off-by: Ning Yu <ningyu@automq.com>

* test(quota): test zero quota value

Signed-off-by: Ning Yu <ningyu@automq.com>

---------

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Nov 14, 2024
1 parent 54ea5d4 commit 4597b74
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum ClientQuotaType {
FETCH,
REQUEST,
// AutoMQ for Kafka inject start
SLOW_FETCH,
REQUEST_RATE,
// AutoMQ for Kafka inject end
CONTROLLER_MUTATION
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ class DelayedFetch(
error(s"Unexpected error in delayed fetch: $params $fetchInfos ", e)
}
}
ReadHint.clear()
// AutoMQ for Kafka inject end

val fetchPartitionData = logReadResults.map { case (tp, result) =>
Expand All @@ -252,6 +251,11 @@ class DelayedFetch(
}

responseCallback(fetchPartitionData)
// AutoMQ for Kafka inject start
// clear hint after callback as it will be used in the callback
// see {@link ElasticKafkaApis#handleFetchRequest#processResponseCallback}
ReadHint.clear()
// AutoMQ for Kafka inject end
}
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/server/QuotaFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ object QuotaType {
case object Produce extends QuotaType
case object Request extends QuotaType
// AutoMQ for Kafka inject start
/**
* Quota type for slow fetch throughput limiting.
*/
case object SlowFetch extends QuotaType
/**
* Quota type for request rate limiting.
*/
Expand All @@ -51,6 +55,7 @@ object QuotaType {
case QuotaType.Produce => ClientQuotaType.PRODUCE
case QuotaType.Request => ClientQuotaType.REQUEST
// AutoMQ for Kafka inject start
case QuotaType.SlowFetch => ClientQuotaType.SLOW_FETCH
case QuotaType.RequestRate => ClientQuotaType.REQUEST_RATE
// AutoMQ for Kafka inject end
case QuotaType.ControllerMutation => ClientQuotaType.CONTROLLER_MUTATION
Expand All @@ -68,6 +73,10 @@ object QuotaType {
QuotaType.Produce
}

def slowFetch(): QuotaType = {
QuotaType.SlowFetch
}

def requestRate(): QuotaType = {
QuotaType.RequestRate
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,26 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
def maybeRecordAndGetThrottleTimeMs(quotaType: QuotaType, request: RequestChannel.Request, value: Double,
timeMs: Long): Int = {
if (!config.quotaEnabled) {
// Quota is disabled, no need to throttle
return 0
}

if (isInWhiteList(request.session.principal, request.context.clientId(), request.context.listenerName())) {
// Client is in the white list, no need to throttle
return 0
}

maybeRecordAndGetThrottleTimeMs(quotaType, value, timeMs)
}

protected def throttleTime(quotaType: QuotaType, e: QuotaViolationException, timeMs: Long): Long = {
if (quotaType == QuotaType.RequestRate) {
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
} else {
QuotaUtils.throttleTime(e, timeMs)
quotaType match {
case QuotaType.SlowFetch | QuotaType.RequestRate =>
QuotaUtils.boundedThrottleTime(e, maxThrottleTimeMs, timeMs)
case _ =>
QuotaUtils.throttleTime(e, timeMs)
}

}

private def isInWhiteList(principal: KafkaPrincipal, clientId: String, listenerName: String): Boolean = {
Expand Down Expand Up @@ -114,9 +118,11 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
metrics.removeSensor(getQuotaSensorName(QuotaType.RequestRate, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.Produce, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.Fetch, metricsTags))
metrics.removeSensor(getQuotaSensorName(QuotaType.SlowFetch, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.RequestRate, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Produce, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.Fetch, metricsTags))
metrics.removeSensor(getThrottleTimeSensorName(QuotaType.SlowFetch, metricsTags))
return
}

Expand All @@ -136,6 +142,11 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
if (fetchMetric != null) {
fetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.Fetch)))
}

val slowFetchMetric = allMetrics.get(clientQuotaMetricName(QuotaType.SlowFetch, metricsTags))
if (slowFetchMetric != null) {
slowFetchMetric.config(getQuotaMetricConfig(quotaLimit(QuotaType.SlowFetch)))
}
}
}

Expand All @@ -145,6 +156,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
case QuotaType.RequestRate => config.requestRateQuota(quota)
case QuotaType.Produce => config.produceQuota(quota)
case QuotaType.Fetch => config.fetchQuota(quota)
case QuotaType.SlowFetch => config.slowFetchQuota(quota)
case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType")
}

Expand Down Expand Up @@ -178,10 +190,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
s"$quotaType-${metricTagsToSensorSuffix(metricTags)}"

private def quotaLimit(quotaType: QuotaType): Double = {
if (quotaType == QuotaType.RequestRate) config.requestRateQuota
else if (quotaType == QuotaType.Produce) config.produceQuota
else if (quotaType == QuotaType.Fetch) config.fetchQuota
else throw new IllegalArgumentException(s"Unknown quota type $quotaType")
quotaType match {
case QuotaType.RequestRate => config.requestRateQuota
case QuotaType.Produce => config.produceQuota
case QuotaType.Fetch => config.fetchQuota
case QuotaType.SlowFetch => config.slowFetchQuota
case _ => throw new IllegalArgumentException(s"Unknown quota type $quotaType")
}
}

protected def clientQuotaMetricName(quotaType: QuotaType, quotaMetricTags: Map[String, String]): MetricName = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import com.yammer.metrics.core.Histogram
import kafka.automq.zonerouter.{ClientIdMetadata, NoopProduceRouter, ProduceRouter}
import kafka.coordinator.transaction.TransactionCoordinator
import kafka.log.streamaspect.ElasticLogManager
import kafka.log.streamaspect.{ElasticLogManager, ReadHint}
import kafka.metrics.KafkaMetricsUtil
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
Expand Down Expand Up @@ -696,24 +696,32 @@ class ElasticKafkaApis(
val timeMs = time.milliseconds()

// AutoMQ for Kafka inject start
val isSlowRead = !ReadHint.isFastRead

val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)
val brokerBandwidthThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.Fetch, request, responseSize, timeMs)
val brokerSlowFetchThrottleTimeMs = if (isSlowRead) quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.SlowFetch, request, responseSize, timeMs) else 0
val brokerRequestThrottleTimeMs = quotas.broker.maybeRecordAndGetThrottleTimeMs(QuotaType.RequestRate, request, 1, timeMs)

val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
val maxThrottleTimeMs = IntStream.of(bandwidthThrottleTimeMs, requestThrottleTimeMs, brokerBandwidthThrottleTimeMs, brokerSlowFetchThrottleTimeMs, brokerRequestThrottleTimeMs).max().orElse(0)
if (maxThrottleTimeMs > 0) {
request.apiThrottleTimeMs = maxThrottleTimeMs
// Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
// from the fetch quota because we are going to return an empty response.
quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
quotas.broker.unrecordQuotaSensor(QuotaType.Fetch, responseSize, timeMs)
if (isSlowRead) {
quotas.broker.unrecordQuotaSensor(QuotaType.SlowFetch, responseSize, timeMs)
}
if (bandwidthThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(quotas.fetch, request, bandwidthThrottleTimeMs)
} else if (requestThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(quotas.request, request, requestThrottleTimeMs)
} else if (brokerBandwidthThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.Fetch, quotas.broker, request, brokerBandwidthThrottleTimeMs)
} else if (brokerSlowFetchThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.SlowFetch, quotas.broker, request, brokerSlowFetchThrottleTimeMs)
} else if (brokerRequestThrottleTimeMs == maxThrottleTimeMs) {
requestHelper.throttle(QuotaType.RequestRate, quotas.broker, request, brokerRequestThrottleTimeMs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,24 @@ public void testQuota() {
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.fetch(), request, 500, time + second2millis);
assertEquals(0, result);

// Test request quota
// Test slow fetch quota
properties.put(QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, 0);
properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 100);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time);
assertEquals(0, result);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + 10);
assertEquals(0, result);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 100, time + second2millis);
assertTrue(result > 0);

properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 1000);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 500, time + second2millis);
assertEquals(0, result);

// Test request quota
properties.put(QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, 0);
properties.put(QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, 1);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
Expand All @@ -122,6 +138,25 @@ public void testQuota() {
assertEquals(0, result);
}

@Test
public void testZeroQuota() {
long result;
long time = this.time.milliseconds();

// enable quota
Properties properties = new Properties();
properties.put(QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, true);
brokerQuotaManager.updateQuotaConfigs(Option.apply(properties));

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 0);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
assertEquals(1000, result);

brokerQuotaManager.updateQuota(QuotaType.slowFetch(), 0);
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.slowFetch(), request, 1, time);
assertEquals(1000, result);
}

@Test
public void testUpdateQuota() {
int result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public class QuotaConfigs {
public static final String BROKER_QUOTA_ENABLED_CONFIG = "broker.quota.enabled";
public static final String BROKER_QUOTA_PRODUCE_BYTES_CONFIG = "broker.quota.produce.bytes";
public static final String BROKER_QUOTA_FETCH_BYTES_CONFIG = "broker.quota.fetch.bytes";
public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG = "broker.quota.slow.fetch.bytes";
public static final String BROKER_QUOTA_REQUEST_RATE_CONFIG = "broker.quota.request.rate";
public static final String BROKER_QUOTA_WHITE_LIST_USER_CONFIG = "broker.quota.white.list.user";
public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_CONFIG = "broker.quota.white.list.client.id";
Expand All @@ -117,6 +118,7 @@ public class QuotaConfigs {
public static final String BROKER_QUOTA_ENABLED_DOC = "Enable broker quota.";
public static final String BROKER_QUOTA_PRODUCE_BYTES_DOC = "The maximum bytes send by producer in single window.";
public static final String BROKER_QUOTA_FETCH_BYTES_DOC = "The maximum bytes receive by consumer in single window.";
public static final String BROKER_QUOTA_SLOW_FETCH_BYTES_DOC = "The maximum bytes receive by slow fetch consumer in single window.";
public static final String BROKER_QUOTA_REQUEST_RATE_DOC = "The maximum request count send by client in single window.";
public static final String BROKER_QUOTA_WHITE_LIST_USER_DOC = "Broker quota white list for user.";
public static final String BROKER_QUOTA_WHITE_LIST_CLIENT_ID_DOC = "Broker quota white list for client id.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class BrokerQuotaManagerConfig extends ClientQuotaManagerConfig {
private boolean quotaEnabled = false;
private double produceQuota = Double.MAX_VALUE;
private double fetchQuota = Double.MAX_VALUE;
private double slowFetchQuota = Double.MAX_VALUE;
private double requestRateQuota = Double.MAX_VALUE;

private List<String> userWhiteList = List.of();
Expand All @@ -42,6 +43,7 @@ public void update(Properties props) {
quotaEnabled = getBoolean(map, QuotaConfigs.BROKER_QUOTA_ENABLED_CONFIG, quotaEnabled);
produceQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_PRODUCE_BYTES_CONFIG, produceQuota);
fetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_FETCH_BYTES_CONFIG, fetchQuota);
slowFetchQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_SLOW_FETCH_BYTES_CONFIG, slowFetchQuota);
requestRateQuota = getDouble(map, QuotaConfigs.BROKER_QUOTA_REQUEST_RATE_CONFIG, requestRateQuota);

String userWhiteListProp = props.getProperty(QuotaConfigs.BROKER_QUOTA_WHITE_LIST_USER_CONFIG);
Expand Down Expand Up @@ -84,6 +86,14 @@ public void fetchQuota(double fetchQuota) {
this.fetchQuota = fetchQuota;
}

public double slowFetchQuota() {
return slowFetchQuota;
}

public void slowFetchQuota(double slowFetchQuota) {
this.slowFetchQuota = slowFetchQuota;
}

public double requestRateQuota() {
return requestRateQuota;
}
Expand Down

0 comments on commit 4597b74

Please sign in to comment.