From ff3b68ea07cad2bf3febc6ea0175637dd8455732 Mon Sep 17 00:00:00 2001 From: Yu Ning <78631860+Chillax-0v0@users.noreply.github.com> Date: Wed, 20 Nov 2024 15:45:54 +0800 Subject: [PATCH] feat(tools/perf): create topics in batch (#2166) Signed-off-by: Ning Yu --- build.gradle | 1 + .../kafka/tools/automq/perf/TopicService.java | 23 ++++++++++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/build.gradle b/build.gradle index ed7f763691..2e0b99e13d 100644 --- a/build.gradle +++ b/build.gradle @@ -2242,6 +2242,7 @@ project(':tools') { implementation libs.awsSdkAuth implementation libs.hdrHistogram implementation libs.spotbugsAnnotations + implementation libs.guava // for SASL/OAUTHBEARER JWT validation implementation (libs.jose4j){ diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java index a0ed549c0e..85f6f2c45c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java @@ -11,6 +11,7 @@ package org.apache.kafka.tools.automq.perf; +import com.google.common.collect.Lists; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; @@ -35,6 +36,13 @@ public class TopicService implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class); + /** + * The maximum number of partitions per batch. + * + * @see org.apache.kafka.controller.ReplicationControlManager + */ + private static final int MAX_PARTITIONS_PER_BATCH = 10_000; + private final Admin admin; public TopicService(String bootstrapServer, Map adminConfigs) { @@ -52,9 +60,18 @@ public List createTopics(TopicsConfig config) { .mapToObj(i -> generateTopicName(config.topicPrefix, config.partitionsPerTopic, i)) .map(name -> new NewTopic(name, config.partitionsPerTopic, (short) 1).configs(config.topicConfigs)) .collect(Collectors.toList()); - CreateTopicsResult topics = admin.createTopics(newTopics); - topics.values().forEach(this::waitTopicCreated); - return topics.values().keySet().stream() + + int topicsPerBatch = MAX_PARTITIONS_PER_BATCH / config.partitionsPerTopic; + List> requests = Lists.partition(newTopics, topicsPerBatch); + + Map> results = requests.stream() + .map(admin::createTopics) + .map(CreateTopicsResult::values) + .flatMap(map -> map.entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + results.forEach(this::waitTopicCreated); + + return results.keySet().stream() .map(name -> new Topic(name, config.partitionsPerTopic)) .collect(Collectors.toList()); }