Skip to content

Commit

Permalink
KAFKA-18313 Fix to Kraft or remove tests associate with Zk Broker con…
Browse files Browse the repository at this point in the history
…fig in SocketServerTest and ReplicaFetcherThreadTest (apache#18327)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
TaiJuWu authored Dec 29, 2024
1 parent f2ae20a commit 03f16f3
Showing 1 changed file with 20 additions and 126 deletions.
146 changes: 20 additions & 126 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ import scala.jdk.CollectionConverters._
import scala.util.control.ControlThrowable

class SocketServerTest {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val props = TestUtils.createBrokerConfig(0, null, port = 0)
props.put("listeners", "PLAINTEXT://localhost:0")
props.put("num.network.threads", "1")
props.put("socket.send.buffer.bytes", "300000")
Expand Down Expand Up @@ -314,73 +314,15 @@ class SocketServerTest {
)
}

@Test
def testStagedListenerStartup(): Unit = {
shutdownServerAndMetrics(server)
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROL_PLANE")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
val testableServer = new TestableSocketServer(config)

val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint =>
endpoint.copy(port = testableServer.boundPort(endpoint.listenerName))
}.map(_.toJava)

val externalReadyFuture = new CompletableFuture[Void]()

def controlPlaneListenerStarted() = {
try {
val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost)
sendAndReceiveControllerRequest(socket, testableServer)
true
} catch {
case _: Throwable => false
}
}

def listenerStarted(listenerName: ListenerName) = {
try {
val socket = connect(testableServer, listenerName, localAddr = InetAddress.getLocalHost)
sendAndReceiveRequest(socket, testableServer)
true
} catch {
case _: Throwable => false
}
}

try {
val externalListener = new ListenerName("EXTERNAL")
val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == externalListener.value).get
val controlPlaneListener = new ListenerName("CONTROL_PLANE")
val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get == controlPlaneListener.value).get
val futures = Map(
externalEndpoint -> externalReadyFuture,
controlPlaneEndpoint -> CompletableFuture.completedFuture[Void](null))
val requestProcessingFuture = testableServer.enableRequestProcessing(futures)
TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control plane listener not started")
assertFalse(listenerStarted(config.interBrokerListenerName))
assertFalse(listenerStarted(externalListener))
externalReadyFuture.complete(null)
TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started")
TestUtils.waitUntilTrue(() => listenerStarted(externalListener), "External listener not started")
requestProcessingFuture.get(1, TimeUnit.MINUTES)
} finally {
shutdownServerAndMetrics(testableServer)
}
}

@Test
def testStagedListenerShutdownWhenConnectionQueueIsFull(): Unit = {
shutdownServerAndMetrics(server)
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
testProps.put("advertised.listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
testProps.put("controller.listener.names", "CONTROLLER")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
val connectionQueueSize = 1
Expand Down Expand Up @@ -825,7 +767,7 @@ class SocketServerTest {

@Test
def testZeroMaxConnectionsPerIp(): Unit = {
val newProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val newProps = TestUtils.createBrokerConfig(0, null, port = 0)
newProps.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG, "0")
newProps.setProperty(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, "%s:%s".format("127.0.0.1", "5"))
val server = new SocketServer(KafkaConfig.fromProps(newProps), new Metrics(),
Expand Down Expand Up @@ -864,7 +806,7 @@ class SocketServerTest {
@Test
def testMaxConnectionsPerIpOverrides(): Unit = {
val overrideNum = server.config.maxConnectionsPerIp + 1
val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0)
overrideProps.put(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG, s"localhost:$overrideNum")
val serverMetrics = new Metrics()
val overrideServer = new SocketServer(KafkaConfig.fromProps(overrideProps), serverMetrics,
Expand Down Expand Up @@ -923,7 +865,7 @@ class SocketServerTest {
@Test
def testConnectionRatePerIp(): Unit = {
val defaultTimeoutMs = 2000
val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0)
overrideProps.remove(SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG)
overrideProps.put(QuotaConfig.NUM_QUOTA_SAMPLES_CONFIG, String.valueOf(2))
val connectionRate = 5
Expand Down Expand Up @@ -974,7 +916,7 @@ class SocketServerTest {

@Test
def testThrottledSocketsClosedOnShutdown(): Unit = {
val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val overrideProps = TestUtils.createBrokerConfig(0, null, port = 0)
overrideProps.remove("max.connections.per.ip")
overrideProps.put(QuotaConfig.NUM_QUOTA_SAMPLES_CONFIG, String.valueOf(2))
val connectionRate = 5
Expand Down Expand Up @@ -1051,6 +993,7 @@ class SocketServerTest {
val password = "admin-secret"
val reauthMs = 1500
props.setProperty("listeners", "SASL_PLAINTEXT://localhost:0")
props.setProperty("advertised.listeners", "SASL_PLAINTEXT://localhost:0")
props.setProperty("security.inter.broker.protocol", "SASL_PLAINTEXT")
props.setProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
Expand All @@ -1059,8 +1002,9 @@ class SocketServerTest {
props.setProperty("listener.name.sasl_plaintext.sasl.enabled.mechanisms", "PLAIN")
props.setProperty("num.network.threads", "1")
props.setProperty("connections.max.reauth.ms", reauthMs.toString)
val overrideProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect,
saslProperties = Some(props), enableSaslPlaintext = true)
props.setProperty("listener.security.protocol.map", "SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT")

val overrideProps = TestUtils.createBrokerConfig(0, null, saslProperties = Some(props), enableSaslPlaintext = true)
val time = new MockTime()
val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(overrideProps), time = time)
try {
Expand Down Expand Up @@ -1140,7 +1084,7 @@ class SocketServerTest {
}

private def checkClientDisconnectionUpdatesRequestMetrics(responseBufferSize: Int): Unit = {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val props = TestUtils.createBrokerConfig(0, null, port = 0)
val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))

try {
Expand Down Expand Up @@ -1173,7 +1117,7 @@ class SocketServerTest {
def testServerShutdownWithoutEnable(): Unit = {
// The harness server has already been enabled, so it's invalid for this test.
shutdownServerAndMetrics(server)
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 0)
val props = TestUtils.createBrokerConfig(0, null, port = 0)
val overrideServer = new TestableSocketServer(KafkaConfig.fromProps(props))
overrideServer.shutdown()
assertFalse(overrideServer.testableAcceptor.isOpen)
Expand Down Expand Up @@ -1831,33 +1775,13 @@ class SocketServerTest {
}
}


@Test
def testControlPlaneAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
localAddr = InetAddress.getLocalHost)
val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
assertTrue(sentRequest.context.fromPrivilegedListener)

val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost)
val plainRequest = sendAndReceiveRequest(plainSocket, testableServer)
assertFalse(plainRequest.context.fromPrivilegedListener)
})
}

@Test
def testInterBrokerListenerAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT")
testProps.put("advertised.listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
Expand All @@ -1873,33 +1797,6 @@ class SocketServerTest {
})
}

@Test
def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = {
val testProps = new Properties
testProps ++= props
testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0")
testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT")
testProps.put("control.plane.listener.name", "CONTROLLER")
testProps.put("inter.broker.listener.name", "INTERNAL")
val config = KafkaConfig.fromProps(testProps)
withTestableServer(config, { testableServer =>
val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get,
localAddr = InetAddress.getLocalHost)
val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer)
assertTrue(controlPlaneRequest.context.fromPrivilegedListener)

val interBrokerSocket = connect(testableServer, config.interBrokerListenerName,
localAddr = InetAddress.getLocalHost)
val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer)
assertFalse(interBrokerRequest.context.fromPrivilegedListener)

val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"),
localAddr = InetAddress.getLocalHost)
val externalRequest = sendAndReceiveRequest(externalSocket, testableServer)
assertFalse(externalRequest.context.fromPrivilegedListener)
})
}

@Test
def testListenBacklogSize(): Unit = {
val backlogSize = 128
Expand Down Expand Up @@ -2029,9 +1926,10 @@ class SocketServerTest {

private def sslServerProps: Properties = {
val trustStoreFile = TestUtils.tempFile("truststore", ".jks")
val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
val sslProps = TestUtils.createBrokerConfig(0, null, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL),
trustStoreFile = Some(trustStoreFile))
sslProps.put(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:0")
sslProps.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "SSL://localhost:0")
sslProps.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "1")
sslProps
}
Expand All @@ -2053,11 +1951,6 @@ class SocketServerTest {
}
}

def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
sendRequest(socket, producerRequestBytes())
receiveRequest(server.controlPlaneRequestChannelOpt.get)
}

private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = {
val selector = testableServer.testableSelector
selector.reset()
Expand Down Expand Up @@ -2203,7 +2096,8 @@ class SocketServerTest {
time: Time = Time.SYSTEM,
connectionDisconnectListeners: Seq[ConnectionDisconnectListener] = Seq.empty
) extends SocketServer(
config, new Metrics, time, credentialProvider, apiVersionManager, connectionDisconnectListeners = connectionDisconnectListeners
config, new Metrics, time, credentialProvider, apiVersionManager,
connectionDisconnectListeners = connectionDisconnectListeners
) {

override def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel) : DataPlaneAcceptor = {
Expand Down

0 comments on commit 03f16f3

Please sign in to comment.