Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Feb 2, 2025
1 parent c677fe2 commit 3d4ec2f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ private static void setupNodeDirectories(File baseDirectory,
private final TestKitNodes nodes;
private final Map<Integer, ControllerServer> controllers;
private final Map<Integer, BrokerServer> brokers;

private final Map<Integer, BrokerServer> dynamicBrokers = new TreeMap<>();
private final Map<Integer, ControllerServer> dynamicControllers = new TreeMap<>();
private final File baseDirectory;
Expand Down Expand Up @@ -384,6 +383,7 @@ private KafkaClusterTestKit(
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
socketFactoryManager.finalInitial();
}

public void format() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public ServerSocketChannel openServerSocket(
) throws IOException {
ServerSocketChannel socketChannel = getSocketForListenerAndMarkAsUsed(
nodeId,
listenerName,
false);
listenerName);


if (socketChannel != null) {
if (socketChannel.isOpen()) {
Expand Down Expand Up @@ -104,6 +104,10 @@ public ServerSocketChannel openServerSocket(
*/
private final Map<Integer, Map<String, ServerSocketChannel>> dynamicSockets = new HashMap<>();

/**
* After initial flow, this flag will be set as false forever
*/
private boolean initalizating = true;

/**
* Maps node IDs to set of the listeners that were used.
Expand All @@ -119,14 +123,13 @@ public ServerSocketChannel openServerSocket(
*
* @return null if the socket was not found; the socket, otherwise.
*/
public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
private synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
int nodeId,
String listener,
boolean isDynamic
String listener
) {

Map<Integer, Map<String, ServerSocketChannel>> checkedsockets = sockets;
if (isDynamic) {
if (!initalizating) {
checkedsockets = this.dynamicSockets;
}

Expand All @@ -138,7 +141,7 @@ public synchronized ServerSocketChannel getSocketForListenerAndMarkAsUsed(
if (socket == null) {
return null;
}
if (!isDynamic)
if (initalizating)
usedSockets.computeIfAbsent(nodeId, __ -> new HashSet<>()).add(listener);
return socket;
}
Expand Down Expand Up @@ -194,13 +197,18 @@ public synchronized int getOrCreatePortForListener(
socketsForNode.put(listener, socketChannel);
}
InetSocketAddress socketAddress = (InetSocketAddress) socketChannel.getLocalAddress();
System.err.println("ZZZ get port " + socketAddress);
// for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
// System.err.println(element);
// }

return socketAddress.getPort();
}

/**
* If this method is called, we create a new socket after initial flow.
* This method should be called after initial flow.
*/
public synchronized void finalInitial() {
initalizating = false;
}

@Override
public synchronized void close() throws Exception {
if (closed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,24 @@ public void testCreateOutOfClusterController() throws Exception {
props.put("node.id", "2");
props.put("controller.quorum.bootstrap.servers", "localhost:9000");

// try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build()) {
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build()) {
assertDoesNotThrow(() -> cluster.createIsolatedController(props));
}
}

// }
@Test
public void testCreateOutOfClusterControllerWithSameId() throws Exception {
TestKitNodes nodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
.build();

Map<String, String> props = new HashMap<>();
props.put("node.id", "0");
props.put("controller.quorum.bootstrap.servers", "localhost:9000");

KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build();
assertDoesNotThrow(() -> cluster.createIsolatedController(props));
cluster.close();
try (KafkaClusterTestKit cluster = new KafkaClusterTestKit.Builder(nodes).build()) {
assertDoesNotThrow(() -> cluster.createIsolatedController(props));
}
}
}

0 comments on commit 3d4ec2f

Please sign in to comment.