From 4080f19c5cfd9da10e0b4fd7708cf4aac31d191d Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 30 Dec 2024 15:57:16 +0100 Subject: [PATCH] KAFKA-17615 Remove KafkaServer from tests (#18271) Reviewers: Chia-Ping Tsai --- .../kafka/api/ConsumerBounceTest.scala | 2 +- .../kafka/admin/ReplicationQuotaUtils.scala | 56 ------------------- .../server/DynamicBrokerConfigTest.scala | 38 +------------ .../scala/unit/kafka/utils/TestUtils.scala | 17 ------ 4 files changed, 4 insertions(+), 109 deletions(-) delete mode 100644 core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 45436876fca94..a3825d28afefc 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -341,7 +341,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { for (serverIdx <- brokerServers.indices) { killBroker(serverIdx) val config = newConfigs(serverIdx) - servers(serverIdx) = TestUtils.createServer(config, time = brokerTime(config.brokerId)) + servers(serverIdx) = createBroker(config, time = brokerTime(config.brokerId)) restartDeadBrokers() } diff --git a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala b/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala deleted file mode 100644 index bdea54262df10..0000000000000 --- a/core/src/test/scala/unit/kafka/admin/ReplicationQuotaUtils.scala +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ -package kafka.admin - -import kafka.server.KafkaServer -import kafka.utils.TestUtils -import kafka.zk.AdminZkClient -import org.apache.kafka.server.config.{ConfigType, QuotaConfig} - -import scala.collection.Seq - -object ReplicationQuotaUtils { - - def checkThrottleConfigRemovedFromZK(adminZkClient: AdminZkClient, topic: String, servers: Seq[KafkaServer]): Unit = { - TestUtils.waitUntilTrue(() => { - val hasRateProp = servers.forall { server => - val brokerConfig = adminZkClient.fetchEntityConfig(ConfigType.BROKER, server.config.brokerId.toString) - brokerConfig.contains(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG) || - brokerConfig.contains(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG) - } - val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.TOPIC, topic) - val hasReplicasProp = topicConfig.contains(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG) || - topicConfig.contains(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG) - !hasRateProp && !hasReplicasProp - }, "Throttle limit/replicas was not unset") - } - - def checkThrottleConfigAddedToZK(adminZkClient: AdminZkClient, expectedThrottleRate: Long, servers: Seq[KafkaServer], topic: String, throttledLeaders: Set[String], throttledFollowers: Set[String]): Unit = { - TestUtils.waitUntilTrue(() => { - //Check for limit in ZK - val brokerConfigAvailable = servers.forall { server => - val configInZk = adminZkClient.fetchEntityConfig(ConfigType.BROKER, server.config.brokerId.toString) - val zkLeaderRate = configInZk.getProperty(QuotaConfig.LEADER_REPLICATION_THROTTLED_RATE_CONFIG) - val zkFollowerRate = configInZk.getProperty(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_RATE_CONFIG) - zkLeaderRate != null && expectedThrottleRate == zkLeaderRate.toLong && - zkFollowerRate != null && expectedThrottleRate == zkFollowerRate.toLong - } - //Check replicas assigned - val topicConfig = adminZkClient.fetchEntityConfig(ConfigType.TOPIC, topic) - val leader = topicConfig.getProperty(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG).split(",").toSet - val follower = topicConfig.getProperty(QuotaConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG).split(",").toSet - val topicConfigAvailable = leader == throttledLeaders && follower == throttledFollowers - brokerConfigAvailable && topicConfigAvailable - }, "throttle limit/replicas was not set") - } -} diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index d471114f567f6..9c2d95be45c60 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -21,7 +21,6 @@ import java.{lang, util} import java.util.{Optional, Properties, Map => JMap} import java.util.concurrent.{CompletionStage, TimeUnit} import java.util.concurrent.atomic.AtomicReference -import kafka.controller.KafkaController import kafka.log.LogManager import kafka.log.remote.RemoteLogManager import kafka.network.{DataPlaneAcceptor, SocketServer} @@ -98,37 +97,6 @@ class DynamicBrokerConfigTest { } } - @Test - def testEnableDefaultUncleanLeaderElection(): Unit = { - val origProps = TestUtils.createBrokerConfig(0, null, port = 8181) - origProps.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "false") - - val config = KafkaConfig(origProps) - val serverMock = Mockito.mock(classOf[KafkaServer]) - val controllerMock = Mockito.mock(classOf[KafkaController]) - val logManagerMock = Mockito.mock(classOf[LogManager]) - - Mockito.when(serverMock.config).thenReturn(config) - Mockito.when(serverMock.kafkaController).thenReturn(controllerMock) - Mockito.when(serverMock.logManager).thenReturn(logManagerMock) - Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty) - - val currentDefaultLogConfig = new AtomicReference(new LogConfig(new Properties)) - Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ => currentDefaultLogConfig.get()) - Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig]))) - .thenAnswer(invocation => currentDefaultLogConfig.set(invocation.getArgument(0))) - - config.dynamicConfig.initialize(None, None) - config.dynamicConfig.addBrokerReconfigurable(new DynamicLogConfig(logManagerMock, serverMock)) - - val props = new Properties() - - props.put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true") - config.dynamicConfig.updateDefaultConfig(props) - assertTrue(config.uncleanLeaderElectionEnable) - Mockito.verify(controllerMock).enableDefaultUncleanLeaderElection() - } - @Test def testUpdateDynamicThreadPool(): Unit = { val origProps = TestUtils.createBrokerConfig(0, null, port = 8181) @@ -434,7 +402,7 @@ class DynamicBrokerConfigTest { validProps.foreach { case (k, v) => props.put(k, v) } invalidProps.foreach { case (k, v) => props.put(k, v) } - // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in + // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid. assertThrows(classOf[ConfigException], () => config.dynamicConfig.validate(props, perBrokerConfig = true)) @@ -513,7 +481,7 @@ class DynamicBrokerConfigTest { props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://hostname:9092") new DynamicListenerConfig(kafkaServer).validateReconfiguration(KafkaConfig(props)) - // it is illegal to update non-reconfiguable configs of existent listeners + // it is illegal to update non-reconfigurable configs of existent listeners props.put("listener.name.plaintext.you.should.not.pass", "failure") val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer) assertThrows(classOf[ConfigException], () => dynamicListenerConfig.validateReconfiguration(KafkaConfig(props))) @@ -1114,7 +1082,7 @@ class DynamicBrokerConfigTest { } } -class TestDynamicThreadPool() extends BrokerReconfigurable { +class TestDynamicThreadPool extends BrokerReconfigurable { override def reconfigurableConfigs: Set[String] = { DynamicThreadPool.ReconfigurableConfigs diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 389291f18003c..45027078ca38e 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -47,7 +47,6 @@ import org.apache.kafka.common.resource.ResourcePattern import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol} import org.apache.kafka.common.serialization._ import org.apache.kafka.common.utils.Utils.formatAddress -import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinatorConfig import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.LeaderAndIsr @@ -151,22 +150,6 @@ object TestUtils extends Logging { JTestUtils.tempFile(content) } - /** - * Create a kafka server instance with appropriate test settings - * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST - * - * @param config The configuration of the server - */ - def createServer(config: KafkaConfig, time: Time = Time.SYSTEM): KafkaServer = { - createServer(config, time, None, startup = true) - } - - def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = { - val server = new KafkaServer(config, time, threadNamePrefix) - if (startup) server.startup() - server - } - /** * Create a test config for the provided parameters. *