Skip to content

Commit

Permalink
fix error test
Browse files Browse the repository at this point in the history
  • Loading branch information
TaiJuWu committed Feb 4, 2025
1 parent 7137eb8 commit 5ca7653
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,12 @@ public ControllerServer createController(Map<String, String> props) throws IOExc
throw new RuntimeException(String.format("Node %d already exist.", nodeId));
}

props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
Long.toString(TimeUnit.MINUTES.toMillis(10)));
props.putIfAbsent(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller");
props.putIfAbsent(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER");
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
Long.toString(TimeUnit.MINUTES.toMillis(10)));

boolean isCombined = props.get(KRaftConfigs.PROCESS_ROLES_CONFIG).contains("broker");

ServerSocketFactory serverSocketFactor = socketFactoryManager.getOrCreateSocketFactory(nodeId);
props.compute(SocketServerConfigs.LISTENERS_CONFIG, (key, val) ->
Expand All @@ -431,7 +433,6 @@ public ControllerServer createController(Map<String, String> props) throws IOExc
);

KafkaConfig config = new KafkaConfig(props);
boolean isCombined = config.getString(KRaftConfigs.PROCESS_ROLES_CONFIG).contains("broker");

TestKitNode node = nodes.createControllerNode(config, isCombined);
MetaPropertiesEnsemble metaPropsEnsemble = node.initialMetaPropertiesEnsemble();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,6 @@ public TestKitNodes build() {
private final BootstrapMetadata bootstrapMetadata;
private final SortedMap<Integer, TestKitNode> controllerNodes;
private final SortedMap<Integer, TestKitNode> brokerNodes;
private final SortedMap<Integer, TestKitNode> dynamicControllerNodes;
private final SortedMap<Integer, TestKitNode> dynamicBrokerNodes;
private final ListenerName brokerListenerName;
private final ListenerName controllerListenerName;
private final SecurityProtocol brokerSecurityProtocol;
Expand All @@ -244,10 +242,8 @@ private TestKitNodes(
this.baseDirectory = Objects.requireNonNull(baseDirectory);
this.clusterId = Objects.requireNonNull(clusterId);
this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes)));
this.controllerNodes = new TreeMap<>(Objects.requireNonNull(controllerNodes));
this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes)));
this.dynamicControllerNodes = new TreeMap<>();
this.dynamicBrokerNodes = new TreeMap<>();
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
this.controllerListenerName = Objects.requireNonNull(controllerListenerName);
this.brokerSecurityProtocol = Objects.requireNonNull(brokerSecurityProtocol);
Expand Down Expand Up @@ -304,9 +300,9 @@ public TestKitNode createControllerNode(KafkaConfig config, boolean isCombined)
isCombined,
config.originalsStrings()
);
dynamicControllerNodes.put(newNodeId, controller);
controllerNodes.put(newNodeId, controller);
if (isCombined) {
dynamicBrokerNodes.put(newNodeId, controller);
brokerNodes.put(newNodeId, controller);
}

return controller;
Expand Down

0 comments on commit 5ca7653

Please sign in to comment.