diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java b/core/src/test/java/kafka/test/api/ShareConsumerTest.java index 6551f80b5aaf2..237fe34bbe701 100644 --- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java +++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.KafkaShareConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -44,6 +45,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.ByteArrayDeserializer; @@ -59,10 +61,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import java.time.Duration; import java.util.ArrayList; @@ -91,10 +92,12 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; @Timeout(1200) @Tag("integration") @@ -103,17 +106,12 @@ public class ShareConsumerTest { private final TopicPartition tp = new TopicPartition("topic", 0); private final TopicPartition tp2 = new TopicPartition("topic2", 0); private final TopicPartition warmupTp = new TopicPartition("warmup", 0); - private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister"; - private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister"; + private List sgsTopicPartitions; private Admin adminClient; @BeforeEach public void createCluster(TestInfo testInfo) throws Exception { - String persisterClassName = NO_OP_PERSISTER; - if (testInfo.getDisplayName().contains(".persister=")) { - persisterClassName = testInfo.getDisplayName().split("=")[1]; - } cluster = new KafkaClusterTestKit.Builder( new TestKitNodes.Builder() .setNumBrokerNodes(1) @@ -123,7 +121,6 @@ public void createCluster(TestInfo testInfo) throws Exception { .setConfigProp("group.coordinator.rebalance.protocols", "classic,consumer,share") .setConfigProp("group.share.enable", "true") .setConfigProp("group.share.partition.max.record.locks", "10000") - .setConfigProp("group.share.persister.class.name", persisterClassName) .setConfigProp("group.share.record.lock.duration.ms", "15000") .setConfigProp("offsets.topic.replication.factor", "1") .setConfigProp("share.coordinator.state.topic.min.isr", "1") @@ -140,6 +137,9 @@ public void createCluster(TestInfo testInfo) throws Exception { createTopic("topic"); createTopic("topic2"); adminClient = createAdminClient(); + sgsTopicPartitions = IntStream.range(0, 3) + .mapToObj(part -> new TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part)) + .toList(); warmup(); } @@ -149,9 +149,8 @@ public void destroyCluster() throws Exception { cluster.close(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testPollNoSubscribeFails(String persister) { + @Test + public void testPollNoSubscribeFails() { try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { assertEquals(Collections.emptySet(), shareConsumer.subscription()); // "Consumer is not subscribed to any topics." @@ -159,9 +158,8 @@ public void testPollNoSubscribeFails(String persister) { } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeAndPollNoRecords(String persister) { + @Test + public void testSubscribeAndPollNoRecords() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -169,12 +167,12 @@ public void testSubscribeAndPollNoRecords(String persister) { assertEquals(subscription, shareConsumer.subscription()); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribePollUnsubscribe(String persister) { + @Test + public void testSubscribePollUnsubscribe() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -184,12 +182,12 @@ public void testSubscribePollUnsubscribe(String persister) { shareConsumer.unsubscribe(); assertEquals(Collections.emptySet(), shareConsumer.subscription()); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribePollSubscribe(String persister) { + @Test + public void testSubscribePollSubscribe() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -201,12 +199,12 @@ public void testSubscribePollSubscribe(String persister) { assertEquals(subscription, shareConsumer.subscription()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeUnsubscribePollFails(String persister) { + @Test + public void testSubscribeUnsubscribePollFails() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -218,12 +216,12 @@ public void testSubscribeUnsubscribePollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); // due to leader epoch in read } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeSubscribeEmptyPollFails(String persister) { + @Test + public void testSubscribeSubscribeEmptyPollFails() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { Set subscription = Collections.singleton(tp.topic()); @@ -235,12 +233,12 @@ public void testSubscribeSubscribeEmptyPollFails(String persister) { // "Consumer is not subscribed to any topics." assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500))); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); // due to leader epoch in read } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPoll(String persister) { + @Test + public void testSubscriptionAndPoll() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -251,12 +249,12 @@ public void testSubscriptionAndPoll(String persister) { shareConsumer.subscribe(Collections.singleton(tp.topic())); ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPollMultiple(String persister) { + @Test + public void testSubscriptionAndPollMultiple() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -273,12 +271,12 @@ public void testSubscriptionAndPollMultiple(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementSentOnSubscriptionChange(String persister) throws ExecutionException, InterruptedException { + @Test + public void testAcknowledgementSentOnSubscriptionChange() throws ExecutionException, InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -309,12 +307,12 @@ public void testAcknowledgementSentOnSubscriptionChange(String persister) throws // Verifying if the callback was invoked without exceptions for the partitions for both topics. assertNull(partitionExceptionMap.get(tp)); assertNull(partitionExceptionMap.get(tp2)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) throws Exception { + @Test + public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -339,12 +337,12 @@ public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String pe // We expect null exception as the acknowledgment error code is null. assertNull(partitionExceptionMap.get(tp)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackOnClose(String persister) { + @Test + public void testAcknowledgementCommitCallbackOnClose() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -369,13 +367,13 @@ public void testAcknowledgementCommitCallbackOnClose(String persister) { // We expect null exception as the acknowledgment error code is null. assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); + verifyShareGroupStateTopicRecordsProduced(); } } @Flaky("KAFKA-18033") - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackInvalidRecordStateException(String persister) throws Exception { + @Test + public void testAcknowledgementCommitCallbackInvalidRecordStateException() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -428,9 +426,8 @@ public void onComplete(Map> offsetsMap, Exception ex } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testHeaders(String persister) { + @Test + public void testHeaders() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -451,6 +448,7 @@ public void testHeaders(String persister) { if (header != null) assertEquals("headerValue", new String(header.value())); } + verifyShareGroupStateTopicRecordsProduced(); } } @@ -471,15 +469,14 @@ private void testHeadersSerializeDeserialize(Serializer serializer, Dese } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testHeadersSerializerDeserializer(String persister) { + @Test + public void testHeadersSerializerDeserializer() { testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl()); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMaxPollRecords(String persister) { + @Test + public void testMaxPollRecords() { int numRecords = 10000; int maxPollRecords = 2; @@ -507,12 +504,12 @@ public void testMaxPollRecords(String persister) { i++; } + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testControlRecordsSkipped(String persister) throws Exception { + @Test + public void testControlRecordsSkipped() throws Exception { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer transactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1"); KafkaProducer nonTransactionalProducer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); @@ -552,12 +549,12 @@ public void testControlRecordsSkipped(String persister) throws Exception { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeSuccess(String persister) { + @Test + public void testExplicitAcknowledgeSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -573,12 +570,12 @@ public void testExplicitAcknowledgeSuccess(String persister) { producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeCommitSuccess(String persister) { + @Test + public void testExplicitAcknowledgeCommitSuccess() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -596,12 +593,12 @@ public void testExplicitAcknowledgeCommitSuccess(String persister) { assertEquals(1, result.size()); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { + @Test + public void testExplicitAcknowledgementCommitAsync() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -652,12 +649,12 @@ public void testExplicitAcknowledgementCommitAsync(String persister) throws Inte }, 30000, 100L, () -> "Didn't receive call to callback"); assertNull(partitionExceptionMap1.get(tp)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) { + @Test + public void testExplicitAcknowledgementCommitAsyncPartialBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -715,12 +712,12 @@ public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) assertTrue(partitionExceptionMap.containsKey(tp)); assertNull(partitionExceptionMap.get(tp)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleasePollAccept(String persister) { + @Test + public void testExplicitAcknowledgeReleasePollAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -738,12 +735,12 @@ public void testExplicitAcknowledgeReleasePollAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleaseAccept(String persister) { + @Test + public void testExplicitAcknowledgeReleaseAccept() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -758,12 +755,12 @@ public void testExplicitAcknowledgeReleaseAccept(String persister) { records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT)); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeReleaseClose(String persister) { + @Test + public void testExplicitAcknowledgeReleaseClose() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -776,12 +773,12 @@ public void testExplicitAcknowledgeReleaseClose(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { + @Test + public void testExplicitAcknowledgeThrowsNotInBatch() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -798,12 +795,12 @@ public void testExplicitAcknowledgeThrowsNotInBatch(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgeFailsExplicit(String persister) { + @Test + public void testImplicitAcknowledgeFailsExplicit() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -819,12 +816,12 @@ public void testImplicitAcknowledgeFailsExplicit(String persister) { records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgeCommitSync(String persister) { + @Test + public void testImplicitAcknowledgeCommitSync() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -842,12 +839,12 @@ public void testImplicitAcknowledgeCommitSync(String persister) { assertEquals(0, result.size()); records = shareConsumer.poll(Duration.ofMillis(500)); assertEquals(0, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testImplicitAcknowledgementCommitAsync(String persister) throws InterruptedException { + @Test + public void testImplicitAcknowledgementCommitAsync() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -881,12 +878,12 @@ public void testImplicitAcknowledgementCommitAsync(String persister) throws Inte }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit callback did not receive the response yet"); assertNull(partitionExceptionMap1.get(tp)); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) throws Exception { + @Test + public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws Exception { int maxPartitionFetchBytes = 10000; alterShareAutoOffsetReset("group1", "earliest"); @@ -903,12 +900,12 @@ public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) th ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(2, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersWithDifferentGroupIds(String persister) throws InterruptedException { + @Test + public void testMultipleConsumersWithDifferentGroupIds() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); alterShareAutoOffsetReset("group2", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); @@ -954,12 +951,12 @@ public void testMultipleConsumersWithDifferentGroupIds(String persister) throws int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count()); return records1 == 3 && records2 == 5; }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for both consumers for the last batch"); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupSequentialConsumption(String persister) { + @Test + public void testMultipleConsumersInGroupSequentialConsumption() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -995,9 +992,8 @@ public void testMultipleConsumersInGroupSequentialConsumption(String persister) } @Flaky("KAFKA-18033") - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInGroupConcurrentConsumption() throws InterruptedException, ExecutionException, TimeoutException { AtomicInteger totalMessagesConsumed = new AtomicInteger(0); @@ -1030,9 +1026,8 @@ public void testMultipleConsumersInGroupConcurrentConsumption(String persister) assertEquals(producerCount * messagesPerProducer, totalResult); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInMultipleGroupsConcurrentConsumption() throws ExecutionException, InterruptedException, TimeoutException { AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0); AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0); @@ -1091,11 +1086,11 @@ public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String pe assertEquals(totalMessagesSent, totalResult2); assertEquals(totalMessagesSent, totalResult3); assertEquals(totalMessagesSent, actualMessageSent); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testConsumerCloseInGroupSequential(String persister) { + @Test + public void testConsumerCloseInGroupSequential() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer1 = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -1137,12 +1132,12 @@ public void testConsumerCloseInGroupSequential(String persister) { } shareConsumer2.close(); assertEquals(totalMessages, consumer1MessageCount + consumer2MessageCount); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testMultipleConsumersInGroupFailureConcurrentConsumption(String persister) + @Test + public void testMultipleConsumersInGroupFailureConcurrentConsumption() throws InterruptedException, ExecutionException, TimeoutException { AtomicInteger totalMessagesConsumed = new AtomicInteger(0); @@ -1183,11 +1178,11 @@ public void testMultipleConsumersInGroupFailureConcurrentConsumption(String pers int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum(); assertEquals(producerCount * messagesPerProducer, totalSuccessResult); + verifyShareGroupStateTopicRecordsProduced(); } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcquisitionLockTimeoutOnConsumer(String persister) throws InterruptedException { + @Test + public void testAcquisitionLockTimeoutOnConsumer() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1242,6 +1237,7 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr consumerRecords = shareConsumer.poll(Duration.ofMillis(1000)); assertEquals(0, consumerRecords.count()); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1249,9 +1245,8 @@ public void testAcquisitionLockTimeoutOnConsumer(String persister) throws Interr * Test to verify that the acknowledgement commit callback cannot invoke methods of KafkaShareConsumer. * The exception thrown is verified in {@link TestableAcknowledgementCommitCallbackWithShareConsumer} */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) { + @Test + public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1269,6 +1264,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String // The acknowledgement commit callback will be called and the exception is thrown. // This is verified inside the onComplete() method implementation. shareConsumer.poll(Duration.ofMillis(500)); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1292,9 +1288,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that the acknowledgement commit callback can invoke KafkaShareConsumer.wakeup() and it * wakes up the enclosing poll. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException { + @Test + public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1324,6 +1319,7 @@ public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String per } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1344,9 +1340,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that the acknowledgement commit callback can throw an exception, and it is propagated * to the caller of poll(). */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testAcknowledgementCommitCallbackThrowsException(String persister) throws InterruptedException { + @Test + public void testAcknowledgementCommitCallbackThrowsException() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1370,6 +1365,7 @@ public void testAcknowledgementCommitCallbackThrowsException(String persister) t } return exceptionThrown.get(); }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected exception"); + verifyShareGroupStateTopicRecordsProduced(); } } @@ -1384,9 +1380,8 @@ public void onComplete(Map> offsetsMap, Exception ex * Test to verify that calling Thread.interrupt() before KafkaShareConsumer.poll(Duration) * causes it to throw InterruptException */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { + @Test + public void testPollThrowsInterruptExceptionIfInterrupted() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1409,9 +1404,8 @@ public void testPollThrowsInterruptExceptionIfInterrupted(String persister) { * Test to verify that InvalidTopicException is thrown if the consumer subscribes * to an invalid topic. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) { + @Test + public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1427,9 +1421,8 @@ public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persis * Test to ensure that a wakeup when records are buffered doesn't prevent the records * being returned on the next poll. */ - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testWakeupWithFetchedRecordsAvailable(String persister) { + @Test + public void testWakeupWithFetchedRecordsAvailable() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1445,12 +1438,12 @@ public void testWakeupWithFetchedRecordsAvailable(String persister) { ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionFollowedByTopicCreation(String persister) throws InterruptedException { + @Test + public void testSubscriptionFollowedByTopicCreation() throws InterruptedException { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer()); KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1")) { @@ -1474,12 +1467,12 @@ public void testSubscriptionFollowedByTopicCreation(String persister) throws Int producer.send(record); records = shareConsumer.poll(Duration.ofMillis(5000)); assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) throws InterruptedException, ExecutionException { + @Test + public void testSubscriptionAndPollFollowedByTopicDeletion() throws InterruptedException, ExecutionException { String topic1 = "bar"; String topic2 = "baz"; createTopic(topic1); @@ -1516,12 +1509,12 @@ public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) thr producer.send(recordTopic2).get(); TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000)).count() == 1, DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of records"); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testLsoMovementByRecordsDeletion(String persister) { + @Test + public void testLsoMovementByRecordsDeletion() { String groupId = "group1"; alterShareAutoOffsetReset(groupId, "earliest"); @@ -1558,12 +1551,12 @@ public void testLsoMovementByRecordsDeletion(String persister) { messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true); assertEquals(0, messageCount); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetDefaultValue(String persister) { + @Test + public void testShareAutoOffsetResetDefaultValue() { try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1584,12 +1577,12 @@ public void testShareAutoOffsetResetDefaultValue(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // Now the next record should be consumed successfully assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetEarliest(String persister) { + @Test + public void testShareAutoOffsetResetEarliest() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1609,12 +1602,12 @@ public void testShareAutoOffsetResetEarliest(String persister) { records = shareConsumer.poll(Duration.ofMillis(5000)); // The next records should also be consumed successfully assertEquals(1, records.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { + @Test + public void testShareAutoOffsetResetEarliestAfterLsoMovement() { alterShareAutoOffsetReset("group1", "earliest"); try (KafkaShareConsumer shareConsumer = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); KafkaProducer producer = createProducer(new ByteArraySerializer(), new ByteArraySerializer())) { @@ -1633,12 +1626,12 @@ public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) { int consumedMessageCount = consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true); // The records returned belong to offsets 5-9. assertEquals(5, consumedMessageCount); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) { + @Test + public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() { alterShareAutoOffsetReset("group1", "earliest"); alterShareAutoOffsetReset("group2", "latest"); try (KafkaShareConsumer shareConsumerEarliest = createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), "group1"); @@ -1673,12 +1666,12 @@ public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String pers records2 = shareConsumerLatest.poll(Duration.ofMillis(5000)); // The next record should also be consumed successfully by group2 assertEquals(1, records2.count()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetByDuration(String persister) throws Exception { + @Test + public void testShareAutoOffsetResetByDuration() throws Exception { // Set auto offset reset to 1 hour before current time alterShareAutoOffsetReset("group1", "by_duration:PT1H"); @@ -1724,12 +1717,12 @@ public void testShareAutoOffsetResetByDuration(String persister) throws Exceptio shareConsumer.subscribe(Collections.singleton(tp.topic())); List> records = consumeRecords(shareConsumer, 3); assertEquals(3, records.size()); + verifyShareGroupStateTopicRecordsProduced(); } } - @ParameterizedTest(name = "{displayName}.persister={0}") - @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER}) - public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) throws Exception { + @Test + public void testShareAutoOffsetResetByDurationInvalidFormat() throws Exception { // Test invalid duration format ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, "group1"); Map> alterEntries = new HashMap<>(); @@ -1739,14 +1732,14 @@ public void testShareAutoOffsetResetByDurationInvalidFormat(String persister) th GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), AlterConfigOp.OpType.SET))); ExecutionException e1 = assertThrows(ExecutionException.class, () -> adminClient.incrementalAlterConfigs(alterEntries).all().get()); - assertTrue(e1.getCause() instanceof InvalidConfigurationException); + assertInstanceOf(InvalidConfigurationException.class, e1.getCause()); // Test negative duration alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry( GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), AlterConfigOp.OpType.SET))); ExecutionException e2 = assertThrows(ExecutionException.class, () -> adminClient.incrementalAlterConfigs(alterEntries).all().get()); - assertTrue(e2.getCause() instanceof InvalidConfigurationException); + assertInstanceOf(InvalidConfigurationException.class, e2.getCause()); } private int produceMessages(int messageCount) { @@ -1904,9 +1897,7 @@ private KafkaShareConsumer createShareConsumer(Deserializer keyD private void warmup() throws InterruptedException { createTopic(warmupTp.topic()); - TestUtils.waitForCondition(() -> - !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), - DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); + waitForMetadataCache(); ProducerRecord record = new ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, "key".getBytes(), "value".getBytes()); Set subscription = Collections.singleton(warmupTp.topic()); alterShareAutoOffsetReset("warmupgroup1", "earliest"); @@ -1922,6 +1913,40 @@ private void warmup() throws InterruptedException { } } + private void waitForMetadataCache() throws InterruptedException { + TestUtils.waitForCondition(() -> + !cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), + DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet"); + } + + private void verifyShareGroupStateTopicRecordsProduced() { + try { + Map consumerConfigs = new HashMap<>(); + consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + + try (KafkaConsumer consumer = new KafkaConsumer<>(consumerConfigs)) { + consumer.assign(sgsTopicPartitions); + consumer.seekToBeginning(sgsTopicPartitions); + Set> records = new HashSet<>(); + TestUtils.waitForCondition(() -> { + ConsumerRecords msgs = consumer.poll(Duration.ofMillis(5000L)); + if (msgs.count() > 0) { + msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add); + } + return records.size() > 2; // +2 because of extra warmup records + }, + 30000L, + 200L, + () -> "no records produced" + ); + } + } catch (InterruptedException e) { + fail(e); + } + } + private void alterShareAutoOffsetReset(String groupId, String newValue) { ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId); Map> alterEntries = new HashMap<>();