Skip to content

Commit

Permalink
feat(tools/perf): run benchmark without consumer (#2135)
Browse files Browse the repository at this point in the history
Signed-off-by: Ning Yu <ningyu@automq.com>
  • Loading branch information
Chillax-0v0 authored Nov 8, 2024
1 parent 73e8901 commit cbaf3a2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 3 deletions.
24 changes: 21 additions & 3 deletions tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private void run() {
LOGGER.info("Created {} producers, took {} ms", producers, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Waiting for topics to be ready...");
waitTopicsReady();
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
Expand Down Expand Up @@ -167,7 +167,16 @@ private void messageReceived(TopicPartition topicPartition, byte[] payload, long
stats.messageReceived(payload.length, sendTimeNanos);
}

private void waitTopicsReady() {
private void waitTopicsReady(boolean hasConsumer) {
if (hasConsumer) {
waitTopicsReadyWithConsumer();
} else {
waitTopicsReadyWithoutConsumer();
}
stats.reset();
}

private void waitTopicsReadyWithConsumer() {
long start = System.nanoTime();
boolean ready = false;
while (System.nanoTime() < start + TOPIC_READY_TIMEOUT_NANOS) {
Expand All @@ -187,7 +196,16 @@ private void waitTopicsReady() {
if (!ready) {
throw new RuntimeException("Timeout waiting for topics to be ready");
}
stats.reset();
}

private void waitTopicsReadyWithoutConsumer() {
producerService.probe();
try {
// If there is no consumer, we can only wait for a fixed time to ensure the topic is ready.
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ public void resetOffset(long startMillis, long intervalMillis) {
}
}

public int consumerCount() {
return groups.stream()
.mapToInt(Group::consumerCount)
.sum();
}

@Override
public void close() {
admin.close();
Expand Down

0 comments on commit cbaf3a2

Please sign in to comment.