Skip to content

Commit

Permalink
feat(quota): support to get current quota metric value... (#2170)
Browse files Browse the repository at this point in the history
* fix: fix logs

Signed-off-by: Ning Yu <ningyu@automq.com>

* feat(quota): support to get current quota metric value

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor(backpressure): remove `Regulator#minimize`

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(quota): increase the max of broker quota throttle time

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(backpressure): decrease cooldown time

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(quota): increase the max of broker quota throttle time

Signed-off-by: Ning Yu <ningyu@automq.com>

* docs: update comments

Signed-off-by: Ning Yu <ningyu@automq.com>

---------

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Nov 25, 2024
1 parent d7b73f0 commit 89f2c6b
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,13 @@ public void config(MetricConfig config) {
this.config = config;
}
}

// AutoMQ inject start
/**
* A public method to expose the {@link #measurableValue} method.
*/
public double measurableValueV2(long timeMs) {
return measurableValue(timeMs);
}
// AutoMQ inject end
}
4 changes: 0 additions & 4 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,6 @@ public void increase() {
@Override
public void decrease() {
}

@Override
public void minimize() {
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
private val quotaCallback: Option[ClientQuotaCallback])
extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) {

protected val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
private val exemptMetricName = metrics.metricName("exempt-request-time",
QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.Session
import org.apache.kafka.server.config.BrokerQuotaManagerConfig

import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
import scala.collection.mutable
import scala.jdk.CollectionConverters._

Expand All @@ -31,6 +32,7 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
private val time: Time,
private val threadNamePrefix: String)
extends ClientRequestQuotaManager(config, metrics, time, threadNamePrefix, None) {
private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds * this.config.numQuotaSamples)
private val metricsTags = Map("domain" -> "broker", "nodeId" -> String.valueOf(config.nodeId()))
private val whiteListCache = mutable.HashMap[String, Boolean]()

Expand All @@ -48,6 +50,16 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,
}
}

/**
* Get the value of the metric for the given quota type at the given time.
* It return empty if the metric is not found, which is possible if the quota is disabled or no request has been
* processed yet.
*/
def getQuotaMetricValue(quotaType: QuotaType, timeMs: Long): Optional[java.lang.Double] = {
Optional.ofNullable(metrics.metric(clientQuotaMetricName(quotaType, metricsTags)))
.map(_.measurableValueV2(timeMs))
}

def recordNoThrottle(quotaType: QuotaType, value: Double): Unit = {
val clientSensors = getOrCreateQuotaSensors(quotaType)
clientSensors.quotaSensor.record(value, time.milliseconds(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,34 +170,43 @@ public void testUpdateQuota() {
brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 1 / 2000ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 1 / 2, time);
assertEquals(0, result);
// rate = 2 / 2010ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 10);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.01, time + 10);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 2);
// rate = 4 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 4 / 2.999, time + 2999);
assertEquals(0, result);
// rate = 5 / 2999ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999);
assertEquals(0, result);
// rate = 6 / 2999ms > 2
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 6 / 2.999, time + 2999);
assertEquals(1, result);

brokerQuotaManager.updateQuota(QuotaType.requestRate(), 1);
// rate = 5 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 5 / 2.999, time + 2999 + 2999);
assertEquals(1000, result);
// rate = 2 / 2000ms
// rate = 2 / 2001ms
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 1);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 2 / 2.001, time + 2999 + 2999 + 1);
assertEquals(0, result);
// rate = 3 / 2999ms > 1
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.requestRate(), request, 1, time + 2999 + 2999 + 2999);
assertQuotaMetricValue(QuotaType.requestRate(), (double) 3 / 2.999, time + 2999 + 2999 + 2999);
assertEquals(1, result);
}

Expand Down Expand Up @@ -263,4 +272,9 @@ public void testWhiteList() {
result = brokerQuotaManager.maybeRecordAndGetThrottleTimeMs(QuotaType.produce(), request, 1000, time.milliseconds());
assertEquals(0, result);
}

private void assertQuotaMetricValue(QuotaType quotaType, double expected, long timeMs) {
double value = brokerQuotaManager.getQuotaMetricValue(quotaType, timeMs).get();
assertEquals(expected, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class DefaultBackPressureManager implements BackPressureManager {

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(20);
public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

Expand Down Expand Up @@ -82,25 +82,15 @@ private void maybeRegulate() {
}

/**
* Regulate the system if necessary, which means
* <ul>
* <li>the system is in a {@link LoadLevel#CRITICAL} state.</li>
* <li>the cooldown time has passed.</li>
* </ul>
* Regulate the system if the cooldown time has passed.
*
* @param isInternal True if it is an internal call, which means it should not schedule the next regulate action.
*/
private void maybeRegulate(boolean isInternal) {
LoadLevel loadLevel = currentLoadLevel();
long now = System.currentTimeMillis();

if (LoadLevel.CRITICAL.equals(loadLevel)) {
// Regulate immediately regardless of the cooldown time.
regulate(loadLevel, now);
return;
}

long timeElapsed = now - lastRegulateTime;

if (timeElapsed < cooldownMs) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
Expand All @@ -109,7 +99,6 @@ private void maybeRegulate(boolean isInternal) {
}
return;
}

regulate(loadLevel, now);
}

Expand All @@ -123,8 +112,10 @@ private LoadLevel currentLoadLevel() {
}

private void regulate(LoadLevel loadLevel, long now) {
if (LoadLevel.NORMAL.equals(loadLevel) && LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
if (LoadLevel.NORMAL.equals(loadLevel)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
}
} else {
LOGGER.info("The system is in a {} state, checkers: {}", loadLevel, loadLevels);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,6 @@ public void regulate(Regulator regulator) {
public void regulate(Regulator regulator) {
regulator.decrease();
}
},
/**
* The system is in a critical state, and the most severe actions should be taken.
*/
CRITICAL {
@Override
public void regulate(Regulator regulator) {
regulator.minimize();
}
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,4 @@ public interface Regulator {
* If the rate is already at the minimum, this method does nothing.
*/
void decrease();

/**
* Minimize the rate of incoming requests.
*/
void minimize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class DefaultBackPressureManagerTest {
Regulator regulator;
int regulatorIncreaseCalled = 0;
int regulatorDecreaseCalled = 0;
int regulatorMinimizeCalled = 0;

ScheduledExecutorService scheduler;
int schedulerScheduleCalled = 0;
Expand All @@ -53,10 +52,6 @@ public void setup() {
regulatorDecreaseCalled++;
return null;
}).when(regulator).decrease();
doAnswer(invocation -> {
regulatorMinimizeCalled++;
return null;
}).when(regulator).minimize();

// Mock the scheduler to run the scheduled task immediately and only once
doAnswer(invocation -> {
Expand All @@ -77,11 +72,10 @@ public void setup() {
public void testPriority1() {
initManager(0);

callChecker(sourceA, LoadLevel.CRITICAL);
callChecker(sourceB, LoadLevel.HIGH);
callChecker(sourceC, LoadLevel.NORMAL);

assertRegulatorCalled(0, 0, 3);
assertRegulatorCalled(0, 2);
}

@Test
Expand All @@ -90,9 +84,8 @@ public void testPriority2() {

callChecker(sourceC, LoadLevel.NORMAL);
callChecker(sourceB, LoadLevel.HIGH);
callChecker(sourceA, LoadLevel.CRITICAL);

assertRegulatorCalled(1, 1, 1);
assertRegulatorCalled(1, 1);
}

@Test
Expand All @@ -101,10 +94,9 @@ public void testOverride() {

callChecker(sourceA, LoadLevel.NORMAL);
callChecker(sourceA, LoadLevel.HIGH);
callChecker(sourceA, LoadLevel.CRITICAL);
callChecker(sourceA, LoadLevel.NORMAL);

assertRegulatorCalled(2, 1, 1);
assertRegulatorCalled(2, 1);
}

@Test
Expand All @@ -114,22 +106,13 @@ public void testCooldown() {

initManager(cooldownMs);

callChecker(sourceA, LoadLevel.CRITICAL);
assertRegulatorCalled(0, 0, 1);
assertSchedulerCalled(0);

callChecker(sourceA, LoadLevel.HIGH);
assertRegulatorCalled(0, 0, 1);
assertRegulatorCalled(0, 0);
assertSchedulerCalled(1);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);

callChecker(sourceA, LoadLevel.NORMAL);
assertRegulatorCalled(0, 0, 1);
assertSchedulerCalled(2);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);

callChecker(sourceA, LoadLevel.CRITICAL);
assertRegulatorCalled(0, 0, 2);
assertRegulatorCalled(0, 0);
assertSchedulerCalled(2);
assertEquals(cooldownMs, schedulerScheduleDelay, tolerance);
}
Expand Down Expand Up @@ -161,10 +144,9 @@ public long intervalMs() {
});
}

private void assertRegulatorCalled(int increase, int decrease, int minimize) {
private void assertRegulatorCalled(int increase, int decrease) {
assertEquals(increase, regulatorIncreaseCalled);
assertEquals(decrease, regulatorDecreaseCalled);
assertEquals(minimize, regulatorMinimizeCalled);
}

private void assertSchedulerCalled(int times) {
Expand Down

0 comments on commit 89f2c6b

Please sign in to comment.