diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java index 086f2f3ce7..bc95534f22 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java @@ -27,6 +27,8 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.ControllerRegistrationRequestData; +import org.apache.kafka.common.message.GetKVsRequestData; +import org.apache.kafka.common.message.PutKVsRequestData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; @@ -39,6 +41,7 @@ import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange; @@ -59,8 +62,10 @@ import org.slf4j.Logger; +import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -100,11 +105,17 @@ static class Builder { // AutoMQ for Kafka inject start private List quorumVoters; + private KVControlManager kvControlManager; Builder setQuorumVoters(List quorumVoters) { this.quorumVoters = quorumVoters; return this; } + + Builder setKVControlManager(KVControlManager kvControlManager) { + this.kvControlManager = kvControlManager; + return this; + } // AutoMQ for Kafka inject end Builder setLogContext(LogContext logContext) { @@ -180,7 +191,10 @@ ClusterControlManager build() { featureControl, zkMigrationEnabled, brokerUncleanShutdownHandler, - quorumVoters + // AutoMQ inject start + quorumVoters, + kvControlManager + // AutoMQ inject end ); } } @@ -292,6 +306,12 @@ boolean check() { * The real next available node id is generally one greater than this value. */ private AtomicInteger nextNodeId = new AtomicInteger(-1); + + /** + * A set of node IDs that have been unregistered and can be reused for new node assignments. + */ + private final KVControlManager kvControlManager; + private static final String REUSABLE_NODE_IDS_KEY = "__automq_reusable_node_ids/"; // AutoMQ for Kafka inject end private ClusterControlManager( @@ -304,7 +324,10 @@ private ClusterControlManager( FeatureControlManager featureControl, boolean zkMigrationEnabled, BrokerUncleanShutdownHandler brokerUncleanShutdownHandler, - List quorumVoters + // AutoMQ inject start + List quorumVoters, + KVControlManager kvControlManager + // AutoMQ inject end ) { this.logContext = logContext; this.clusterId = clusterId; @@ -323,6 +346,7 @@ private ClusterControlManager( this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler; // AutoMQ for Kafka inject start this.maxControllerId = QuorumConfig.parseVoterConnections(quorumVoters).keySet().stream().max(Integer::compareTo).orElse(0); + this.kvControlManager = kvControlManager; // AutoMQ for Kafka inject end } @@ -369,16 +393,73 @@ boolean zkRegistrationAllowed() { // AutoMQ for Kafka inject start public ControllerResult getNextNodeId() { - int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0); - int maxNodeId = Math.max(maxBrokerId, maxControllerId); - int nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1); - // Let the broker's nodeId start from 1000 to easily distinguish broker and controller. - nextId = Math.max(nextId, 1000); - UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId); + int nextId; + Set reusableNodeIds = getReusableNodeIds(); + if (!reusableNodeIds.isEmpty()) { + Iterator iterator = reusableNodeIds.iterator(); + nextId = iterator.next(); + // we simply remove the id from reusable id set because we're unable to determine if the id + // will finally be used. + iterator.remove(); + return ControllerResult.atomicOf(putReusableNodeIds(reusableNodeIds), nextId); + } else { + int maxBrokerId = brokerRegistrations.keySet().stream().max(Integer::compareTo).orElse(0); + int maxNodeId = Math.max(maxBrokerId, maxControllerId); + nextId = this.nextNodeId.accumulateAndGet(maxNodeId, (x, y) -> Math.max(x, y) + 1); + // Let the broker's nodeId start from 1000 to easily distinguish broker and controller. + nextId = Math.max(nextId, 1000); + UpdateNextNodeIdRecord record = new UpdateNextNodeIdRecord().setNodeId(nextId); - List records = new ArrayList<>(); - records.add(new ApiMessageAndVersion(record, (short) 0)); - return ControllerResult.atomicOf(records, nextId); + List records = new ArrayList<>(); + records.add(new ApiMessageAndVersion(record, (short) 0)); + return ControllerResult.atomicOf(records, nextId); + } + } + + Set getReusableNodeIds() { + return deserializeReusableNodeIds(kvControlManager.getKV( + new GetKVsRequestData.GetKVRequest().setKey(REUSABLE_NODE_IDS_KEY)).value()); + } + + List putReusableNodeIds(Set reusableNodeIds) { + return kvControlManager.putKV(new PutKVsRequestData.PutKVRequest() + .setKey(REUSABLE_NODE_IDS_KEY) + .setValue(serializeReusableNodeIds(reusableNodeIds)) + .setOverwrite(true)) + .records(); + } + + private Set deserializeReusableNodeIds(byte[] value) { + if (value == null) { + return new HashSet<>(); + } + ByteBuffer buffer = ByteBuffer.wrap(value); + Set reusableNodeIds = new HashSet<>(); + while (buffer.hasRemaining()) { + reusableNodeIds.add(buffer.getInt()); + } + return reusableNodeIds; + } + + private byte[] serializeReusableNodeIds(Set reusableNodeIds) { + ByteBuffer buffer = ByteBuffer.allocate(reusableNodeIds.size() * Integer.BYTES); + reusableNodeIds.forEach(buffer::putInt); + return buffer.array(); + } + + public List registerBrokerRecords(int brokerId) { + Set reusableNodeIds = getReusableNodeIds(); + if (reusableNodeIds.contains(brokerId)) { + reusableNodeIds.remove(brokerId); + return putReusableNodeIds(reusableNodeIds); + } + return Collections.emptyList(); + } + + public List unRegisterBrokerRecords(int brokerId) { + Set reusableNodeIds = getReusableNodeIds(); + reusableNodeIds.add(brokerId); + return putReusableNodeIds(reusableNodeIds); } // AutoMQ for Kafka inject end @@ -496,6 +577,10 @@ public ControllerResult registerBroker( } heartbeatManager.register(brokerId, record.fenced()); + // AutoMQ for Kafka inject start + records.addAll(registerBrokerRecords(brokerId)); + // AutoMQ for Kafka inject end + return ControllerResult.atomicOf(records, new BrokerRegistrationReply(record.brokerEpoch())); } @@ -583,6 +668,7 @@ public void replay(RegisterBrokerRecord record, long offset) { if (prevRegistration != null) heartbeatManager.remove(brokerId); heartbeatManager.register(brokerId, record.fenced()); } + if (prevRegistration == null) { log.info("Replayed initial RegisterBrokerRecord for broker {}: {}", record.brokerId(), record); } else if (prevRegistration.incarnationId().equals(record.incarnationId())) { @@ -608,6 +694,7 @@ public void replay(UnregisterBrokerRecord record) { if (heartbeatManager != null) heartbeatManager.remove(brokerId); updateDirectories(brokerId, registration.directories(), null); brokerRegistrations.remove(brokerId); + // AutoMQ injection end log.info("Replayed {}", record); } } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 3f3ef07340..92ca84e486 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2047,6 +2047,9 @@ private QuorumController( this.time = time; this.controllerMetrics = controllerMetrics; this.snapshotRegistry = new SnapshotRegistry(logContext); + // AutoMQ for Kafka inject start + this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); + // AutoMQ for Kafka inject end this.deferredEventQueue = new DeferredEventQueue(logContext); this.deferredUnstableEventQueue = new DeferredEventQueue(logContext); this.offsetControl = new OffsetControlManager.Builder(). @@ -2094,6 +2097,7 @@ private QuorumController( setZkMigrationEnabled(zkMigrationEnabled). // AutoMQ for Kafka inject start setQuorumVoters(quorumVoters). + setKVControlManager(kvControlManager). // AutoMQ for Kafka inject end setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown). build(); @@ -2156,7 +2160,6 @@ private QuorumController( featureControl::autoMQVersion, time); this.streamControlManager = new StreamControlManager(this, snapshotRegistry, logContext, this.s3ObjectControlManager, clusterControl, featureControl, replicationControl); - this.kvControlManager = new KVControlManager(snapshotRegistry, logContext); this.topicDeletionManager = new TopicDeletionManager(snapshotRegistry, this, streamControlManager, kvControlManager); this.nodeControlManager = new NodeControlManager(snapshotRegistry, new DefaultNodeRuntimeInfoGetter(clusterControl, streamControlManager)); this.extension = extension.apply(this); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java index 7104ea1945..76ebfd9163 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java @@ -1486,6 +1486,7 @@ void handleBrokerUnregistered(int brokerId, long brokerEpoch, (short) 0)); // AutoMQ for Kafka inject start records.add(nodeControlManager.unregisterNodeRecord(brokerId)); + records.addAll(clusterControl.unRegisterBrokerRecords(brokerId)); // AutoMQ for Kafka inject end } diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java index ddc7506b0d..4b9564a1f1 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.message.ControllerRegistrationRequestData; import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord; import org.apache.kafka.common.metadata.FenceBrokerRecord; +import org.apache.kafka.common.metadata.KVRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint; import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection; @@ -36,6 +37,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.controller.stream.KVControlManager; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.metadata.BrokerRegistration; import org.apache.kafka.metadata.BrokerRegistrationFencingChange; @@ -52,6 +54,7 @@ import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -60,12 +63,14 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import static java.util.Arrays.asList; @@ -718,4 +723,57 @@ public void testReRegistrationAndBrokerEpoch(boolean newIncarnationId) { clusterControl.brokerRegistrations().get(1).epoch()); } } + + @Test + public void testReusableNodeIds() { + MockTime time = new MockTime(0, 0, 0); + SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext()); + KVControlManager kvControl = new KVControlManager(snapshotRegistry, new LogContext()); + FeatureControlManager featureControl = new FeatureControlManager.Builder(). + setSnapshotRegistry(snapshotRegistry). + setQuorumFeatures(new QuorumFeatures(0, + QuorumFeatures.defaultFeatureMap(true), + Collections.singletonList(0))). + setMetadataVersion(MetadataVersion.IBP_3_9_IV0). + build(); + ClusterControlManager clusterControl = new ClusterControlManager.Builder(). + setTime(time). + setSnapshotRegistry(snapshotRegistry). + setSessionTimeoutNs(1000). + setFeatureControlManager(featureControl). + setBrokerUncleanShutdownHandler((brokerId, records) -> { }). + setQuorumVoters(new ArrayList<>()). + setKVControlManager(kvControl). + build(); + clusterControl.activate(); + Set nodeIds = clusterControl.getReusableNodeIds(); + Assertions.assertTrue(nodeIds.isEmpty()); + clusterControl.putReusableNodeIds(Set.of(1, 2, 3)).forEach(r -> { + kvControl.replay((KVRecord) r.message()); + }); + nodeIds = clusterControl.getReusableNodeIds(); + Assertions.assertEquals(Set.of(1, 2, 3), nodeIds); + + clusterControl.unRegisterBrokerRecords(4).forEach(r -> { + kvControl.replay((KVRecord) r.message()); + }); + nodeIds = clusterControl.getReusableNodeIds(); + Assertions.assertEquals(Set.of(1, 2, 3, 4), nodeIds); + + clusterControl.registerBrokerRecords(2).forEach(r -> { + kvControl.replay((KVRecord) r.message()); + }); + nodeIds = clusterControl.getReusableNodeIds(); + Assertions.assertEquals(Set.of(1, 3, 4), nodeIds); + + ControllerResult result = clusterControl.getNextNodeId(); + result.records().forEach(r -> { + if (r.message() instanceof KVRecord) { + kvControl.replay((KVRecord) r.message()); + } + }); + Set remainIds = new HashSet<>(Set.of(1, 3, 4)); + remainIds.remove(result.response()); + Assertions.assertEquals(remainIds, clusterControl.getReusableNodeIds()); + } }