diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6d5529675fdf4..d1b5395591c4b 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -66,7 +66,7 @@ import scala.jdk.CollectionConverters._ import scala.util.control.ControlThrowable class SocketServerTest { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val props = TestUtils.createBrokerConfig(0, null, port = 0) props.put("listeners", "PLAINTEXT://localhost:0") props.put("num.network.threads", "1") props.put("socket.send.buffer.bytes", "300000") @@ -314,73 +314,15 @@ class SocketServerTest { ) } - @Test - def testStagedListenerStartup(): Unit = { - shutdownServerAndMetrics(server) - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0") - testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROL_PLANE") - testProps.put("inter.broker.listener.name", "INTERNAL") - val config = KafkaConfig.fromProps(testProps) - val testableServer = new TestableSocketServer(config) - - val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint => - endpoint.copy(port = testableServer.boundPort(endpoint.listenerName)) - }.map(_.toJava) - - val externalReadyFuture = new CompletableFuture[Void]() - - def controlPlaneListenerStarted() = { - try { - val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost) - sendAndReceiveControllerRequest(socket, testableServer) - true - } catch { - case _: Throwable => false - } - } - - def listenerStarted(listenerName: ListenerName) = { - try { - val socket = connect(testableServer, listenerName, localAddr = InetAddress.getLocalHost) - sendAndReceiveRequest(socket, testableServer) - true - } catch { - case _: Throwable => false - } - } - - try { - val externalListener = new ListenerName("EXTERNAL") - val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == externalListener.value).get - val controlPlaneListener = new ListenerName("CONTROL_PLANE") - val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get == controlPlaneListener.value).get - val futures = Map( - externalEndpoint -> externalReadyFuture, - controlPlaneEndpoint -> CompletableFuture.completedFuture[Void](null)) - val requestProcessingFuture = testableServer.enableRequestProcessing(futures) - TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control plane listener not started") - assertFalse(listenerStarted(config.interBrokerListenerName)) - assertFalse(listenerStarted(externalListener)) - externalReadyFuture.complete(null) - TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started") - TestUtils.waitUntilTrue(() => listenerStarted(externalListener), "External listener not started") - requestProcessingFuture.get(1, TimeUnit.MINUTES) - } finally { - shutdownServerAndMetrics(testableServer) - } - } - @Test def testStagedListenerShutdownWhenConnectionQueueIsFull(): Unit = { shutdownServerAndMetrics(server) val testProps = new Properties testProps ++= props - testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0") + testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0") + testProps.put("advertised.listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0") testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROLLER") + testProps.put("controller.listener.names", "CONTROLLER") testProps.put("inter.broker.listener.name", "INTERNAL") val config = KafkaConfig.fromProps(testProps) val connectionQueueSize = 1 @@ -825,7 +767,7 @@ class SocketServerTest { @Test def testZeroMaxConnectionsPerIp(): Unit = { - val newProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val newProps = TestUtils.createBrokerConfig(0, null, port = 0) newProps.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, "0") newProps.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, "%s:%s".format("127.0.0.1", "5")) val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(), @@ -864,7 +806,7 @@ class SocketServerTest { @Test def testMaxConnectionsPerIpOverrides(): Unit = { val overrideNum = server.config.maxConnectionsPerIp + 1 - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0) overrideProps.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$overrideNum") val serverMetrics = new Metrics() val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics, @@ -923,7 +865,7 @@ class SocketServerTest { @Test def testConnectionRatePerIp(): Unit = { val defaultTimeoutMs = 2000 - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0) overrideProps.remove(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG) overrideProps.put(QuotaConfig.NUM_QUOTA_SAMPLES_CONFIG, String.valueOf(2)) val connectionRate = 5 @@ -974,7 +916,7 @@ class SocketServerTest { @Test def testThrottledSocketsClosedOnShutdown(): Unit = { - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0) overrideProps.remove("max.connections.per.ip") overrideProps.put(QuotaConfig.NUM_QUOTA_SAMPLES_CONFIG, String.valueOf(2)) val connectionRate = 5 @@ -1051,6 +993,7 @@ class SocketServerTest { val password = "admin-secret" val reauthMs = 1500 props.setProperty("listeners", "SASL_PLAINTEXT://localhost:0") + props.setProperty("advertised.listeners", "SASL_PLAINTEXT://localhost:0") props.setProperty("security.inter.broker.protocol", "SASL_PLAINTEXT") props.setProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required " + @@ -1059,8 +1002,9 @@ class SocketServerTest { props.setProperty("listener.name.sasl_plaintext.sasl.enabled.mechanisms", "PLAIN") props.setProperty("num.network.threads", "1") props.setProperty("connections.max.reauth.ms", reauthMs.toString) - val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, - saslProperties = Some(props), enableSaslPlaintext = true) + props.setProperty("listener.security.protocol.map", "SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT") + + val overrideProps = TestUtils.createBrokerConfig(0, null, saslProperties = Some(props), enableSaslPlaintext = true) val time = new MockTime() val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(overrideProps), time = time) try { @@ -1140,7 +1084,7 @@ class SocketServerTest { } private def checkClientDisconnectionUpdatesRequestMetrics(responseBufferSize: Int): Unit = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val props = TestUtils.createBrokerConfig(0, null, port = 0) val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props)) try { @@ -1173,7 +1117,7 @@ class SocketServerTest { def testServerShutdownWithoutEnable(): Unit = { // The harness server has already been enabled, so it's invalid for this test. shutdownServerAndMetrics(server) - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0) + val props = TestUtils.createBrokerConfig(0, null, port = 0) val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props)) overrideServer.shutdown() assertFalse(overrideServer.testableAcceptor.isOpen) @@ -1831,33 +1775,13 @@ class SocketServerTest { } } - - @Test - def testControlPlaneAsPrivilegedListener(): Unit = { - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") - testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROLLER") - val config = KafkaConfig.fromProps(testProps) - withTestableServer(config, { testableServer => - val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get, - localAddr = InetAddress.getLocalHost) - val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer) - assertTrue(sentRequest.context.fromPrivilegedListener) - - val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost) - val plainRequest = sendAndReceiveRequest(plainSocket, testableServer) - assertFalse(plainRequest.context.fromPrivilegedListener) - }) - } - @Test def testInterBrokerListenerAsPrivilegedListener(): Unit = { val testProps = new Properties testProps ++= props testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0") - testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT") + testProps.put("advertised.listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0") + testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") testProps.put("inter.broker.listener.name", "INTERNAL") val config = KafkaConfig.fromProps(testProps) withTestableServer(config, { testableServer => @@ -1873,33 +1797,6 @@ class SocketServerTest { }) } - @Test - def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = { - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0") - testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROLLER") - testProps.put("inter.broker.listener.name", "INTERNAL") - val config = KafkaConfig.fromProps(testProps) - withTestableServer(config, { testableServer => - val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get, - localAddr = InetAddress.getLocalHost) - val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer) - assertTrue(controlPlaneRequest.context.fromPrivilegedListener) - - val interBrokerSocket = connect(testableServer, config.interBrokerListenerName, - localAddr = InetAddress.getLocalHost) - val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer) - assertFalse(interBrokerRequest.context.fromPrivilegedListener) - - val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"), - localAddr = InetAddress.getLocalHost) - val externalRequest = sendAndReceiveRequest(externalSocket, testableServer) - assertFalse(externalRequest.context.fromPrivilegedListener) - }) - } - @Test def testListenBacklogSize(): Unit = { val backlogSize = 128 @@ -2029,9 +1926,10 @@ class SocketServerTest { private def sslServerProps: Properties = { val trustStoreFile = TestUtils.tempFile("truststore", ".jks") - val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), + val sslProps = TestUtils.createBrokerConfig(0, null, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), trustStoreFile = Some(trustStoreFile)) sslProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0") + sslProps.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "SSL://localhost:0") sslProps.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "1") sslProps } @@ -2053,11 +1951,6 @@ class SocketServerTest { } } - def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { - sendRequest(socket, producerRequestBytes()) - receiveRequest(server.controlPlaneRequestChannelOpt.get) - } - private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = { val selector = testableServer.testableSelector selector.reset() @@ -2203,7 +2096,8 @@ class SocketServerTest { time: Time = Time.SYSTEM, connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = Seq.empty ) extends SocketServer( - config, new Metrics, time, credentialProvider, apiVersionManager, connectionDisconnectListeners = connectionDisconnectListeners + config, new Metrics, time, credentialProvider, apiVersionManager, + connectionDisconnectListeners = connectionDisconnectListeners ) { override def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel) : DataPlaneAcceptor = {