Skip to content

Commit

Permalink
perf: limit the inflight requests (#2100)
Browse files Browse the repository at this point in the history
* docs: add todos

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(network): limit the inflight requests by size

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(ReplicaManager): limit the queue size of the `fetchExecutor`s

Signed-off-by: Ning Yu <ningyu@automq.com>

* perf(KafkaApis): limit the queue size of async request handlers

Signed-off-by: Ning Yu <ningyu@automq.com>

* refactor(network): make "queued.max.requests.size.bytes" configurable

Signed-off-by: Ning Yu <ningyu@automq.com>

* style: fix lint

Signed-off-by: Ning Yu <ningyu@automq.com>

* fix(network): limit the min queued request size per queue

Signed-off-by: Ning Yu <ningyu@automq.com>

---------

Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Nov 1, 2024
1 parent 33e0be0 commit a7250cd
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 15 deletions.
9 changes: 8 additions & 1 deletion core/src/main/resources/jmx/rules/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ rules:
type: gauge
desc: Size of the request queue

- bean: kafka.network:type=RequestChannel,name=AvailableRequestSize
mapping:
Value:
metric: kafka.available.request.size
type: gauge
desc: Remaining permitted request size in the request queue

- bean: kafka.network:type=RequestChannel,name=ResponseQueueSize
mapping:
Value:
Expand Down Expand Up @@ -370,4 +377,4 @@ rules:
connection-accept-throttle-time:
metric: kafka.listener.connection.accept.throttle.time
type: gauge
desc: The average throttle-time pre listener
desc: The average throttle-time pre listener
48 changes: 44 additions & 4 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ object RequestChannel extends Logging {
private val ResponseQueueSizeMetric = "ResponseQueueSize"
val ProcessorMetricTag = "processor"

// AutoMQ inject start
private val AvailableRequestSizeMetric = "AvailableRequestSize"
// AutoMQ inject end

private def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled

sealed trait BaseRequest
Expand Down Expand Up @@ -347,6 +351,9 @@ object RequestChannel extends Logging {
}

class RequestChannel(val queueSize: Int,
// AutoMQ inject start
val queuedRequestSize: Int,
// AutoMQ inject end
val metricNamePrefix: String,
time: Time,
val metrics: RequestChannel.Metrics) {
Expand All @@ -355,12 +362,23 @@ class RequestChannel(val queueSize: Int,
private val metricsGroup = new KafkaMetricsGroup(this.getClass)

private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
// AutoMQ inject start
/**
* Queue of requests to be handled, in the order they arrived.
* Note: Before any request enters this queue, it needs to acquire {@link multiQueuedRequestSizeSemaphore}
*/
private val multiRequestQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]()

/**
* Semaphore to limit the total size of requests in the {@link multiRequestQueue}.
*/
private val multiQueuedRequestSizeSemaphore = new java.util.ArrayList[Semaphore]()
private val availableRequestSizeMetricName = metricNamePrefix.concat(AvailableRequestSizeMetric)
// AutoMQ inject end
private val processors = new ConcurrentHashMap[Int, Processor]()
private val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
// AutoMQ inject start
private val multiCallbackQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]()
private var notifiedShutdown = false

Expand All @@ -371,6 +389,14 @@ class RequestChannel(val queueSize: Int,
requestQueue.size()
}
})
metricsGroup.newGauge(availableRequestSizeMetricName, () => {
multiQueuedRequestSizeSemaphore.stream().mapToInt(s => s.availablePermits()).sum()
})

def this(queueSize: Int, metricNamePrefix: String, time: Time, metrics: RequestChannel.Metrics) {
this(queueSize, Integer.MAX_VALUE, metricNamePrefix, time, metrics)
}
// AutoMQ inject end

metricsGroup.newGauge(responseQueueSizeMetricName, () => {
processors.values.asScala.foldLeft(0) {(total, processor) =>
Expand All @@ -386,14 +412,21 @@ class RequestChannel(val queueSize: Int,
Map(ProcessorMetricTag -> processor.id.toString).asJava)
}

def registerNRequestHandler(count: Int): util.List[BlockingQueue[BaseRequest]] = {
// AutoMQ inject start
def registerNRequestHandler(count: Int): Unit = {
val queueSize = math.max(this.queueSize / count, 1)
for (i <- 0 until count) {
// TODO: maxQueuedRequestSize will be 100 / 8 = 12.5 MiB as a default.
// However, if the request size is too large, it will block at the semaphore.
// Currently, the max request size is 1 MiB (max.request.size) by default, so it is not very problematic.
val maxQueuedRequestSize = math.max(this.queuedRequestSize / count, 10 * 1024 * 1024)
for (_ <- 0 until count) {
multiRequestQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize))
multiQueuedRequestSizeSemaphore.add(new Semaphore(maxQueuedRequestSize))
multiCallbackQueue.add(new ArrayBlockingQueue[BaseRequest](queueSize))
}
Collections.unmodifiableList(multiRequestQueue)
}
// AutoMQ inject end

def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
Expand All @@ -403,6 +436,8 @@ class RequestChannel(val queueSize: Int,
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
def sendRequest(request: RequestChannel.Request): Unit = {
if (multiRequestQueue.size() != 0) {
val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(math.abs(request.context.connectionId.hashCode % multiQueuedRequestSizeSemaphore.size()))
requestSizeSemaphore.acquire(request.sizeInBytes)
val requestQueue = multiRequestQueue.get(math.abs(request.context.connectionId.hashCode % multiRequestQueue.size()))
requestQueue.put(request)
} else {
Expand Down Expand Up @@ -505,21 +540,26 @@ class RequestChannel(val queueSize: Int,
}
}

// AutoMQ inject start
def receiveRequest(timeout: Long, id: Int): RequestChannel.BaseRequest = {
val callbackQueue = multiCallbackQueue.get(id)
val requestQueue = multiRequestQueue.get(id)
val requestSizeSemaphore = multiQueuedRequestSizeSemaphore.get(id)
val callbackRequest = callbackQueue.poll()
if (callbackRequest != null)
callbackRequest
else {
val request = requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
request match {
case WakeupRequest => callbackQueue.poll()
case request: Request =>
requestSizeSemaphore.release(request.sizeInBytes)
request
case _ => request
}
}
}

// AutoMQ inject end
/** Get the next request or block until there is one */
@Deprecated
def receiveRequest(): RequestChannel.BaseRequest =
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ class SocketServer(val config: KafkaConfig,
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
// AutoMQ inject start
// val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, config.queuedMaxRequestSize, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
// AutoMQ inject end
// control-plane
private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object KafkaConfig {
AutoBalancerControllerConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key))
AutoBalancerMetricsReporterConfig.CONFIG_DEF.configKeys().values().forEach(key => configDef.define(key))
// AutoMQ inject end


def configNames: Seq[String] = configDef.names.asScala.toBuffer.sorted
private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
Expand Down Expand Up @@ -435,6 +435,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val connectionsMaxIdleMs = getLong(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG)
val failedAuthenticationDelayMs = getInt(SocketServerConfigs.FAILED_AUTHENTICATION_DELAY_MS_CONFIG)
val queuedMaxRequests = getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_CONFIG)
// AutoMQ inject start
val queuedMaxRequestSize = getInt(SocketServerConfigs.QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG)
// AutoMQ inject end
val queuedMaxBytes = getLong(SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG)
def numNetworkThreads = getInt(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG)

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class KafkaRequestHandlerPool(
this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)

var multiRequestQueue = requestChannel.registerNRequestHandler(numThreads)
requestChannel.registerNRequestHandler(numThreads)

for (i <- 0 until numThreads) {
createHandler(i)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka.server.streamaspect

import com.automq.stream.s3.metrics.TimerUtil
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import com.yammer.metrics.core.Histogram
import kafka.automq.zonerouter.{ClientIdMetadata, NoopProduceRouter, ProduceRouter}
import kafka.coordinator.transaction.TransactionCoordinator
Expand Down Expand Up @@ -29,7 +30,7 @@ import org.apache.kafka.common.requests.s3.AutomqZoneRouterRequest
import org.apache.kafka.common.requests.{AbstractResponse, DeleteTopicsRequest, DeleteTopicsResponse, FetchRequest, FetchResponse, ProduceRequest, ProduceResponse, RequestUtils}
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC, TRANSACTIONAL_ID}
import org.apache.kafka.common.utils.{ThreadUtils, Time}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.server.ClientMetricsManager
import org.apache.kafka.server.authorizer.Authorizer
Expand All @@ -40,7 +41,7 @@ import org.apache.kafka.storage.internals.log.{FetchIsolation, FetchParams, Fetc
import java.util
import java.util.{Collections, Optional}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.stream.IntStream
import scala.annotation.nowarn
import scala.collection.{Map, Seq, mutable}
Expand Down Expand Up @@ -76,8 +77,8 @@ class ElasticKafkaApis(
tokenManager: DelegationTokenManager,
apiVersionManager: ApiVersionManager,
clientMetricsManager: Option[ClientMetricsManager],
val asyncHandleExecutor: ExecutorService = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-async-handle-executor-%d", true)),
val listOffsetHandleExecutor: ExecutorService = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("kafka-apis-list-offset-handle-executor-%d", true))
val deleteTopicHandleExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, "kafka-apis-delete-topic-handle-executor", true, 1000),
val listOffsetHandleExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, "kafka-apis-list-offset-handle-executor", true, 1000)
) extends KafkaApis(requestChannel, metadataSupport, replicaManager, groupCoordinator, txnCoordinator,
autoTopicCreationManager, brokerId, config, configRepository, metadataCache, metrics, authorizer, quotas,
fetchManager, brokerTopicStats, clusterId, time, tokenManager, apiVersionManager, clientMetricsManager) {
Expand Down Expand Up @@ -131,7 +132,7 @@ class ElasticKafkaApis(
response.asInstanceOf[DeleteTopicsResponse].data().responses().forEach(result => {
if (result.errorCode() == Errors.NONE.code()) {
if (!metadataCache.autoMQVersion().isTopicCleanupByControllerSupported) {
asyncHandleExecutor.submit(new Runnable {
deleteTopicHandleExecutor.submit(new Runnable {
override def run(): Unit = {
topicNameToPartitionEpochsMap.get(result.name()).foreach(partitionEpochs => {
ElasticLogManager.destroyLog(new TopicPartition(result.name(), partitionEpochs._1), result.topicId(), partitionEpochs._2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka.server.streamaspect

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.utils.FutureUtil
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import kafka.cluster.Partition
import kafka.log.remote.RemoteLogManager
import kafka.log.streamaspect.{ElasticLogManager, PartitionStatusTracker, ReadHint}
Expand Down Expand Up @@ -83,8 +84,8 @@ class ElasticReplicaManager(
brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
private val fastFetchExecutor: ExecutorService = Executors.newFixedThreadPool(4, ThreadUtils.createThreadFactory("kafka-apis-fast-fetch-executor-%d", true)),
private val slowFetchExecutor: ExecutorService = Executors.newFixedThreadPool(12, ThreadUtils.createThreadFactory("kafka-apis-slow-fetch-executor-%d", true)),
private val fastFetchExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(4, 4, 0L, TimeUnit.MILLISECONDS, "kafka-apis-fast-fetch-executor", true, 10000),
private val slowFetchExecutor: ExecutorService = S3StreamThreadPoolMonitor.createAndMonitor(12, 12, 0L, TimeUnit.MILLISECONDS, "kafka-apis-slow-fetch-executor", true, 10000),
private val partitionMetricsCleanerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("kafka-partition-metrics-cleaner", true)),
) extends ReplicaManager(config, metrics, time, scheduler, logManager, remoteLogManager, quotaManagers, metadataCache,
logDirFailureChannel, alterPartitionManager, brokerTopicStats, isShuttingDown, zkClient, delayedProducePurgatoryParam,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ public class SocketServerConfigs {
public static final int QUEUED_MAX_REQUESTS_DEFAULT = 500;
public static final String QUEUED_MAX_REQUESTS_DOC = "The number of queued requests allowed for data-plane, before blocking the network threads";

// AutoMQ inject start
public static final String QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG = "queued.max.requests.size.bytes";
public static final int QUEUED_MAX_REQUESTS_SIZE_BYTES_DEFAULT = 100 * 1024 * 1024;
public static final String QUEUED_MAX_REQUESTS_SIZE_BYTES_DOC = "The number of queued requests size in total allowed for data-plane, before blocking the network threads";
// AutoMQ inject end

public static final String QUEUED_MAX_BYTES_CONFIG = "queued.max.request.bytes";
public static final int QUEUED_MAX_REQUEST_BYTES_DEFAULT = -1;
public static final String QUEUED_MAX_REQUEST_BYTES_DOC = "The number of queued bytes allowed before no more requests are read";
Expand All @@ -182,6 +188,9 @@ public class SocketServerConfigs {
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG, LONG, CONNECTIONS_MAX_IDLE_MS_DEFAULT, MEDIUM, CONNECTIONS_MAX_IDLE_MS_DOC)
.define(FAILED_AUTHENTICATION_DELAY_MS_CONFIG, INT, FAILED_AUTHENTICATION_DELAY_MS_DEFAULT, atLeast(0), LOW, FAILED_AUTHENTICATION_DELAY_MS_DOC)
.define(QUEUED_MAX_REQUESTS_CONFIG, INT, QUEUED_MAX_REQUESTS_DEFAULT, atLeast(1), HIGH, QUEUED_MAX_REQUESTS_DOC)
// AutoMQ inject start
.define(QUEUED_MAX_REQUESTS_SIZE_BYTES_CONFIG, INT, QUEUED_MAX_REQUESTS_SIZE_BYTES_DEFAULT, atLeast(1024 * 1024), HIGH, QUEUED_MAX_REQUESTS_SIZE_BYTES_DOC)
// AutoMQ inject end
.define(QUEUED_MAX_BYTES_CONFIG, LONG, QUEUED_MAX_REQUEST_BYTES_DEFAULT, MEDIUM, QUEUED_MAX_REQUEST_BYTES_DOC)
.define(NUM_NETWORK_THREADS_CONFIG, INT, NUM_NETWORK_THREADS_DEFAULT, atLeast(1), HIGH, NUM_NETWORK_THREADS_DOC);
}

0 comments on commit a7250cd

Please sign in to comment.