Skip to content

Commit

Permalink
feat(backpressure): support dynamic configs (#2204)
Browse files Browse the repository at this point in the history
* feat(backpressure): make back pressure manager configurable

Signed-off-by: Ning Yu <ningyu@automq.com>

* test: test diabled

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor: move backpressure from s3stream to kafka.core

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor: init `BackPressureManager` in `BrokerServer`

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor: introduce `BackPressureConfig`

Signed-off-by: Ning Yu <ningyu@automq.com>

* feat: make `BackPressureManager` reconfigurable

Signed-off-by: Ning Yu <ningyu@automq.com>

* test: test reconfigurable

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor: rename config key

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor: move metric "back_pressure_state" from s3stream to core

Signed-off-by: Ning Yu <ningyu@automq.com>

---------

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Dec 3, 2024
1 parent b470ba4 commit 52eb245
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 82 deletions.
11 changes: 11 additions & 0 deletions core/src/main/java/kafka/automq/AutoMQConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
Expand Down Expand Up @@ -171,6 +172,14 @@ public class AutoMQConfig {
public static final String CLUSTER_ID_CONFIG = "cluster.id";
public static final String CLUSTER_ID_DOC = "If the cluster.id is set, Kafka will auto format the storage.";

public static final String S3_BACK_PRESSURE_ENABLED_CONFIG = "automq.backpressure.enabled";
public static final String S3_BACK_PRESSURE_ENABLED_DOC = "Whether back pressure is enabled";
public static final boolean S3_BACK_PRESSURE_ENABLED_DEFAULT = true;

public static final String S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG = "automq.backpressure.cooldown.ms";
public static final String S3_BACK_PRESSURE_COOLDOWN_MS_DOC = "The cooldown time in milliseconds to wait between two regulator actions";
public static final long S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT = TimeUnit.SECONDS.toMillis(15);

// Deprecated config start
public static final String S3_ENDPOINT_CONFIG = "s3.endpoint";
public static final String S3_ENDPOINT_DOC = "[DEPRECATED]please use s3.data.buckets. The object storage endpoint, ex. <code>https://s3.us-east-1.amazonaws.com</code>.";
Expand Down Expand Up @@ -255,6 +264,8 @@ public static void define(ConfigDef configDef) {
.define(AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_CONFIG, INT, S3_METRICS_EXPORTER_REPORT_INTERVAL_MS, MEDIUM, AutoMQConfig.S3_TELEMETRY_EXPORTER_REPORT_INTERVAL_MS_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_TELEMETRY_METRICS_EXPORTER_URI_DOC)
.define(AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_CONFIG, STRING, null, MEDIUM, AutoMQConfig.S3_TELEMETRY_METRICS_BASE_LABELS_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG, BOOLEAN, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_DOC)
.define(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, LONG, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DEFAULT, MEDIUM, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_DOC)
// Deprecated config start
.define(AutoMQConfig.S3_ENDPOINT_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_ENDPOINT_DOC)
.define(AutoMQConfig.S3_REGION_CONFIG, STRING, null, HIGH, AutoMQConfig.S3_REGION_DOC)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package kafka.automq.backpressure;

import kafka.automq.AutoMQConfig;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.ConfigUtils;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class BackPressureConfig {

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG,
AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG
);

private volatile boolean enabled;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private long cooldownMs;

public static BackPressureConfig from(KafkaConfig config) {
return new BackPressureConfig(config.s3BackPressureEnabled(), config.s3BackPressureCooldownMs());
}

public static BackPressureConfig from(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
return new BackPressureConfig(
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG),
ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)
);
}

public BackPressureConfig(boolean enabled, long cooldownMs) {
this.enabled = enabled;
this.cooldownMs = cooldownMs;
}

public static void validate(Map<String, ?> raw) throws ConfigException {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
validateCooldownMs(ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG));
}
}

public static void validateCooldownMs(long cooldownMs) throws ConfigException {
if (cooldownMs < 0) {
throw new ConfigException(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG, cooldownMs, "The cooldown time must be non-negative.");
}
}

public void update(Map<String, ?> raw) {
Map<String, Object> configs = new HashMap<>(raw);
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG)) {
this.enabled = ConfigUtils.getBoolean(configs, AutoMQConfig.S3_BACK_PRESSURE_ENABLED_CONFIG);
}
if (configs.containsKey(AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG)) {
this.cooldownMs = ConfigUtils.getLong(configs, AutoMQConfig.S3_BACK_PRESSURE_COOLDOWN_MS_CONFIG);
}
}

public boolean enabled() {
return enabled;
}

public long cooldownMs() {
return cooldownMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.Reconfigurable;

/**
* It checks the {@link LoadLevel} of the system and takes actions based on the load level
* to prevent the system from being overwhelmed.
*/
public interface BackPressureManager {
public interface BackPressureManager extends Reconfigurable {

/**
* Start the back pressure manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* A checker to check the load level of the system periodically.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;

Expand All @@ -20,20 +22,18 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DefaultBackPressureManager implements BackPressureManager {
import static kafka.automq.backpressure.BackPressureConfig.RECONFIGURABLE_CONFIGS;

public static final long DEFAULT_COOLDOWN_MS = TimeUnit.SECONDS.toMillis(15);
public class DefaultBackPressureManager implements BackPressureManager {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBackPressureManager.class);

private final BackPressureConfig config;
private final Regulator regulator;
/**
* The cooldown time in milliseconds to wait between two regulator actions.
*/
private final long cooldownMs;

/**
* The scheduler to schedule the checker periodically.
Expand All @@ -55,20 +55,23 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;
/**
* The current state metrics of the system.
* Only used for monitoring.
*
* @see S3StreamKafkaMetricsManager#setBackPressureStateSupplier
*/
private final Map<String, Integer> stateMetrics = new HashMap<>(LoadLevel.values().length);

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
}

public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
public DefaultBackPressureManager(BackPressureConfig config, Regulator regulator) {
this.config = config;
this.regulator = regulator;
this.cooldownMs = cooldownMs;
}

@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
S3StreamKafkaMetricsManager.setBackPressureStateSupplier(this::stateMetrics);
}

@Override
Expand All @@ -85,6 +88,9 @@ public void shutdown() {
}

private void maybeRegulate() {
if (!config.enabled()) {
return;
}
maybeRegulate(false);
}

Expand All @@ -98,11 +104,11 @@ private void maybeRegulate(boolean isInternal) {
long now = System.currentTimeMillis();
long timeElapsed = now - lastRegulateTime;

if (timeElapsed < cooldownMs) {
if (timeElapsed < config.cooldownMs()) {
// Skip regulating if the cooldown time has not passed.
if (!isInternal) {
// Schedule the next regulate action if it is not an internal call.
checkerScheduler.schedule(() -> maybeRegulate(true), cooldownMs - timeElapsed, TimeUnit.MILLISECONDS);
checkerScheduler.schedule(() -> maybeRegulate(true), config.cooldownMs() - timeElapsed, TimeUnit.MILLISECONDS);
}
return;
}
Expand Down Expand Up @@ -134,4 +140,32 @@ private void regulate(LoadLevel loadLevel, long now) {
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}

private Map<String, Integer> stateMetrics() {
LoadLevel current = currentLoadLevel();
for (LoadLevel level : LoadLevel.values()) {
int value = level.equals(current) ? 1 : -1;
stateMetrics.put(level.name(), value);
}
return stateMetrics;
}

@Override
public Set<String> reconfigurableConfigs() {
return RECONFIGURABLE_CONFIGS;
}

@Override
public void validateReconfiguration(Map<String, ?> configs) throws ConfigException {
BackPressureConfig.validate(configs);
}

@Override
public void reconfigure(Map<String, ?> configs) {
config.update(configs);
}

@Override
public void configure(Map<String, ?> configs) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* Represents the load level of the system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.backpressure;
package kafka.automq.backpressure;

/**
* The Regulator class is responsible for controlling and limiting the rate of external requests.
Expand Down
20 changes: 0 additions & 20 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@
import com.automq.stream.s3.Config;
import com.automq.stream.s3.S3Storage;
import com.automq.stream.s3.S3StreamClient;
import com.automq.stream.s3.backpressure.BackPressureManager;
import com.automq.stream.s3.backpressure.DefaultBackPressureManager;
import com.automq.stream.s3.backpressure.Regulator;
import com.automq.stream.s3.cache.S3BlockCache;
import com.automq.stream.s3.cache.blockcache.DefaultObjectReaderFactory;
import com.automq.stream.s3.cache.blockcache.ObjectReaderFactory;
Expand Down Expand Up @@ -89,8 +86,6 @@ public class DefaultS3Client implements Client {

protected CompactionManager compactionManager;

protected BackPressureManager backPressureManager;

protected S3StreamClient streamClient;

protected KVClient kvClient;
Expand Down Expand Up @@ -151,7 +146,6 @@ public void start() {
this.objectManager.setCommitStreamSetObjectHook(localIndexCache::updateIndexFromRequest);
this.blockCache = new StreamReaders(this.config.blockCacheSize(), objectManager, objectStorage, objectReaderFactory);
this.compactionManager = new CompactionManager(this.config, this.objectManager, this.streamManager, compactionobjectStorage);
this.backPressureManager = new DefaultBackPressureManager(backPressureRegulator());
this.writeAheadLog = buildWAL();
StorageFailureHandlerChain storageFailureHandler = new StorageFailureHandlerChain();
this.storage = new S3Storage(this.config, writeAheadLog, streamManager, objectManager, blockCache, objectStorage, storageFailureHandler);
Expand All @@ -168,13 +162,11 @@ public void start() {

this.storage.startup();
this.compactionManager.start();
this.backPressureManager.start();
LOGGER.info("S3Client started");
}

@Override
public void shutdown() {
this.backPressureManager.shutdown();
this.compactionManager.shutdown();
this.streamClient.shutdown();
this.storage.shutdown();
Expand Down Expand Up @@ -235,18 +227,6 @@ protected ObjectManager newObjectManager(int nodeId, long nodeEpoch, boolean fai
this::getAutoMQVersion, failoverMode);
}

protected Regulator backPressureRegulator() {
return new Regulator() {
@Override
public void increase() {
}

@Override
public void decrease() {
}
};
}

protected Failover failover() {
return new Failover(new FailoverFactory() {
@Override
Expand Down
Loading

0 comments on commit 52eb245

Please sign in to comment.