diff --git a/app/src/main/java/org/astraea/app/web/SensorHandler.java b/app/src/main/java/org/astraea/app/web/SensorHandler.java index 15d793950e..df69a9ebc5 100644 --- a/app/src/main/java/org/astraea/app/web/SensorHandler.java +++ b/app/src/main/java/org/astraea/app/web/SensorHandler.java @@ -32,7 +32,8 @@ public class SensorHandler implements Handler { private static final Set DEFAULT_COSTS = Set.of( "org.astraea.common.cost.ReplicaLeaderCost", - "org.astraea.common.cost.NetworkIngressCost"); + "org.astraea.common.cost.NetworkIngressCost", + "org.astraea.common.cost.PartitionMigrateTimeCost"); SensorHandler(Sensors sensors) { this.sensors = sensors; diff --git a/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java b/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java index bce5c14fdd..070266e9f9 100644 --- a/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java +++ b/app/src/test/java/org/astraea/app/web/SensorHandlerTest.java @@ -38,7 +38,7 @@ void testBeans() { Assertions.assertInstanceOf( SensorHandler.Response.class, defaultCostHandler.get(Channel.EMPTY).toCompletableFuture().join()); - Assertions.assertEquals(2, defaultCostResponse.costs.size()); + Assertions.assertEquals(3, defaultCostResponse.costs.size()); var changedCostResponse = Assertions.assertInstanceOf( diff --git a/common/src/main/java/org/astraea/common/balancer/Balancer.java b/common/src/main/java/org/astraea/common/balancer/Balancer.java index 3118c76dad..49da6f29bd 100644 --- a/common/src/main/java/org/astraea/common/balancer/Balancer.java +++ b/common/src/main/java/org/astraea/common/balancer/Balancer.java @@ -33,28 +33,17 @@ public interface Balancer { */ Optional offer(AlgorithmConfig config); + /** + * @param initialClusterCost The {@link ClusterCost} score of the original {@link ClusterInfo} + * when this plan is start generating. + * @param proposalClusterCost The {@link ClusterCost} score of the proposed new allocation. + */ record Plan( ClusterBean clusterBean, ClusterInfo initialClusterInfo, ClusterCost initialClusterCost, ClusterInfo proposal, - ClusterCost proposalClusterCost) { - - /** - * The {@link ClusterCost} score of the original {@link ClusterInfo} when this plan is start - * generating. - */ - @Override - public ClusterCost initialClusterCost() { - return initialClusterCost; - } - - /** The {@link ClusterCost} score of the proposed new allocation. */ - @Override - public ClusterCost proposalClusterCost() { - return proposalClusterCost; - } - } + ClusterCost proposalClusterCost) {} /** The official implementation of {@link Balancer}. */ enum Official implements EnumInfo { diff --git a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java index 27abca9c9f..0606cfbf63 100644 --- a/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java +++ b/common/src/main/java/org/astraea/common/balancer/BalancerProblemFormat.java @@ -42,6 +42,7 @@ public class BalancerProblemFormat { "org.astraea.common.cost.RecordSizeCost", "org.astraea.common.cost.ReplicaNumberCost", "org.astraea.common.cost.ReplicaLeaderSizeCost", + "org.astraea.common.cost.PartitionMigrateTimeCost", "org.astraea.common.cost.BrokerDiskSpaceCost"); public AlgorithmConfig parse() { diff --git a/common/src/main/java/org/astraea/common/cost/MigrateTimeCost.java b/common/src/main/java/org/astraea/common/cost/MigrateTimeCost.java new file mode 100644 index 0000000000..e4c715fd81 --- /dev/null +++ b/common/src/main/java/org/astraea/common/cost/MigrateTimeCost.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import static org.astraea.common.cost.MigrationCost.brokerMaxRate; + +import java.time.Duration; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.astraea.common.Configuration; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.Sensor; +import org.astraea.common.metrics.broker.HasMaxRate; +import org.astraea.common.metrics.broker.ServerMetrics; +import org.astraea.common.metrics.collector.MetricSensor; +import org.astraea.common.metrics.stats.Max; + +/** MoveCost: more max write rate change -> higher migrate cost. */ +public class MigrateTimeCost implements HasMoveCost { + static final String REPLICATION_RATE = "replication_rate"; + static final String MAX_MIGRATE_TIME_KEY = "max.migrated.time.limit"; + + // metrics windows size + private final Duration maxMigrateTime; + + public MigrateTimeCost(Configuration config) { + this.maxMigrateTime = + config.duration(MAX_MIGRATE_TIME_KEY).orElse(Duration.ofSeconds(Long.MAX_VALUE)); + } + + @Override + public MetricSensor metricSensor() { + return (client, clusterBean) -> { + var oldInRate = + brokerMaxRate( + client.identity(), clusterBean, MigrateTimeCost.MaxReplicationInRateBean.class); + var oldOutRate = + brokerMaxRate( + client.identity(), clusterBean, MigrateTimeCost.MaxReplicationOutRateBean.class); + var newInMetrics = ServerMetrics.BrokerTopic.REPLICATION_BYTES_IN_PER_SEC.fetch(client); + var newOutMetrics = ServerMetrics.BrokerTopic.REPLICATION_BYTES_OUT_PER_SEC.fetch(client); + var current = Duration.ofMillis(System.currentTimeMillis()); + var maxInRateSensor = Sensor.builder().addStat(REPLICATION_RATE, Max.of()).build(); + var maxOutRateSensor = Sensor.builder().addStat(REPLICATION_RATE, Max.of()).build(); + maxInRateSensor.record(newInMetrics.oneMinuteRate()); + maxOutRateSensor.record(newOutMetrics.oneMinuteRate()); + var inRate = maxInRateSensor.measure(REPLICATION_RATE); + var outRate = maxOutRateSensor.measure(REPLICATION_RATE); + return List.of( + new MaxReplicationInRateBean( + new BeanObject( + newInMetrics.beanObject().domainName(), + newInMetrics.beanObject().properties(), + Map.of(REPLICATION_RATE, Math.max(oldInRate.orElse(0.0), inRate)), + current.toMillis())), + new MaxReplicationOutRateBean( + new BeanObject( + newOutMetrics.beanObject().domainName(), + newOutMetrics.beanObject().properties(), + Map.of(REPLICATION_RATE, Math.max(oldOutRate.orElse(0.0), outRate)), + current.toMillis()))); + }; + } + + @Override + public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { + var brokerMigrateSecond = MigrationCost.brokerMigrationSecond(before, after, clusterBean); + var planMigrateSecond = + brokerMigrateSecond.values().stream() + .max(Comparator.comparing(Function.identity())) + .orElse(Long.MAX_VALUE); + return () -> planMigrateSecond > this.maxMigrateTime.getSeconds(); + } + + public record MaxReplicationInRateBean(BeanObject beanObject) implements HasMaxRate { + @Override + public BeanObject beanObject() { + return beanObject; + } + } + + public record MaxReplicationOutRateBean(BeanObject beanObject) implements HasMaxRate { + @Override + public BeanObject beanObject() { + return beanObject; + } + } +} diff --git a/common/src/main/java/org/astraea/common/cost/MigrationCost.java b/common/src/main/java/org/astraea/common/cost/MigrationCost.java index 9bb6b92274..1978bccbd8 100644 --- a/common/src/main/java/org/astraea/common/cost/MigrationCost.java +++ b/common/src/main/java/org/astraea/common/cost/MigrationCost.java @@ -16,8 +16,10 @@ */ package org.astraea.common.cost; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -26,17 +28,19 @@ import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.Replica; import org.astraea.common.metrics.ClusterBean; +import org.astraea.common.metrics.HasBeanObject; +import org.astraea.common.metrics.broker.HasMaxRate; public class MigrationCost { public final String name; - public final Map brokerCosts; public static final String TO_SYNC_BYTES = "record size to sync (bytes)"; public static final String TO_FETCH_BYTES = "record size to fetch (bytes)"; public static final String REPLICA_LEADERS_TO_ADDED = "leader number to add"; public static final String REPLICA_LEADERS_TO_REMOVE = "leader number to remove"; public static final String CHANGED_REPLICAS = "changed replicas"; + public static final String MIGRATION_ELAPSED_TIME = "migration elapsed time"; public static List migrationCosts( ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { @@ -48,6 +52,9 @@ public static List migrationCosts( return List.of( new MigrationCost(TO_SYNC_BYTES, migrateInBytes), new MigrationCost(TO_FETCH_BYTES, migrateOutBytes), + new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum), + new MigrationCost( + MIGRATION_ELAPSED_TIME, brokerMigrationSecond(before, after, clusterBean)), new MigrationCost(REPLICA_LEADERS_TO_ADDED, migrateInLeader), new MigrationCost(REPLICA_LEADERS_TO_REMOVE, migrateOutLeader), new MigrationCost(CHANGED_REPLICAS, migrateReplicaNum)); @@ -78,6 +85,71 @@ static Map replicaLeaderToRemove(ClusterInfo before, ClusterInfo return migratedChanged(before, after, false, Replica::isLeader, ignore -> 1L); } + /** + * @param before the ClusterInfo before migrated replicas + * @param after the ClusterInfo after migrated replicas + * @param clusterBean cluster metrics + * @return estimated migrated time required by all brokers (seconds) + */ + public static Map brokerMigrationSecond( + ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { + var brokerInRate = + before.brokers().stream() + .collect( + Collectors.toMap( + Broker::id, + nodeInfo -> + brokerMaxRate( + nodeInfo.id(), + clusterBean, + MigrateTimeCost.MaxReplicationInRateBean.class))); + var brokerOutRate = + before.brokers().stream() + .collect( + Collectors.toMap( + Broker::id, + nodeInfo -> + brokerMaxRate( + nodeInfo.id(), + clusterBean, + MigrateTimeCost.MaxReplicationOutRateBean.class))); + var brokerMigrateInSecond = + MigrationCost.recordSizeToSync(before, after).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + brokerSize -> + brokerSize.getValue() / brokerInRate.get(brokerSize.getKey()).orElse(0.0))); + var brokerMigrateOutSecond = + MigrationCost.recordSizeToFetch(before, after).entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + brokerSize -> + brokerSize.getValue() + / brokerOutRate.get(brokerSize.getKey()).orElse(0.0))); + return Stream.concat(before.brokers().stream(), after.brokers().stream()) + .map(Broker::id) + .distinct() + .collect( + Collectors.toMap( + nodeId -> nodeId, + nodeId -> + (long) + Math.max( + brokerMigrateInSecond.get(nodeId), + brokerMigrateOutSecond.get(nodeId)))); + } + + static Optional brokerMaxRate( + int identity, ClusterBean clusterBean, Class statisticMetrics) { + return clusterBean + .brokerMetrics(identity, statisticMetrics) + .filter(b -> b instanceof HasMaxRate) + .map(b -> ((HasMaxRate) b).maxRate()) + .max(Comparator.naturalOrder()); + } + /** * @param before the ClusterInfo before migrated replicas * @param after the ClusterInfo after migrated replicas @@ -101,7 +173,8 @@ private static Map migratedChanged( p -> dest.replicas(p).stream() .filter(predicate) - .filter(r -> !source.replicas(p).contains(r))) + .filter( + r -> source.replicas(p).stream().noneMatch(x -> checkoutSameTPR(r, x)))) .map( r -> { if (migrateOut) return dest.replicaLeader(r.topicPartition()).orElse(r); @@ -109,7 +182,7 @@ private static Map migratedChanged( }) .collect( Collectors.groupingBy( - r -> r.brokerId(), + Replica::brokerId, Collectors.mapping( Function.identity(), Collectors.summingLong(replicaFunction::apply)))); return Stream.concat(dest.brokers().stream(), source.brokers().stream()) @@ -119,6 +192,10 @@ private static Map migratedChanged( .collect(Collectors.toMap(Function.identity(), n -> cost.getOrDefault(n, 0L))); } + private static boolean checkoutSameTPR(Replica r1, Replica r2) { + return r1.topicPartitionReplica().equals(r2.topicPartitionReplica()); + } + private static Map changedReplicaNumber(ClusterInfo before, ClusterInfo after) { return Stream.concat(before.brokers().stream(), after.brokers().stream()) .map(Broker::id) diff --git a/common/src/main/java/org/astraea/common/metrics/broker/HasMaxRate.java b/common/src/main/java/org/astraea/common/metrics/broker/HasMaxRate.java new file mode 100644 index 0000000000..77e7e3d87b --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/broker/HasMaxRate.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.broker; + +import org.astraea.common.metrics.HasBeanObject; + +public interface HasMaxRate extends HasBeanObject { + default double maxRate() { + return (double) beanObject().attributes().get("replication_rate"); + } +} diff --git a/common/src/test/java/org/astraea/common/cost/MigrateTimeCostTest.java b/common/src/test/java/org/astraea/common/cost/MigrateTimeCostTest.java new file mode 100644 index 0000000000..843a941527 --- /dev/null +++ b/common/src/test/java/org/astraea/common/cost/MigrateTimeCostTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.cost; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.astraea.common.Configuration; +import org.astraea.common.admin.Broker; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Replica; +import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.ClusterBean; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class MigrateTimeCostTest { + + private static final BeanObject inBean0 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 1000.0)); + private static final BeanObject outBean0 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 1500.0)); + private static final BeanObject inBean1 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 2000.0)); + private static final BeanObject outBean1 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 2500.0)); + private static final BeanObject inBean2 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 3000.0)); + private static final BeanObject outBean2 = + new BeanObject("domain", Map.of(), Map.of(MigrateTimeCost.REPLICATION_RATE, 3500.0)); + + @Test + void testMigratedCost() { + // before(partition-broker): p10-0, p11-1, p12-2, p12-0 + // after(partition-broker): p10-1, p11-2, p12-0, p12-1 + // b0: migrate in: ; migrate out: p10 + // b1: migrate in: p10,p12 ; migrate out: p11 + // b2: migrate in: p11; migrate out: p12 + var before = of(before(), brokers()); + var after = of(after(), brokers()); + var migrationCost = MigrationCost.brokerMigrationSecond(before, after, clusterBean()); + Assertions.assertEquals(Math.max(0, 10000000 / 1500), migrationCost.get(0)); + Assertions.assertEquals( + Math.max((10000000 + 30000000) / 2000, 20000000 / 2500), migrationCost.get(1)); + Assertions.assertEquals(Math.max(20000000 / 3000, 30000000 / 3500), migrationCost.get(2)); + } + + private List brokers() { + return before().stream() + .map(Replica::brokerId) + .distinct() + .map(nodeId -> Broker.of(nodeId, "", -1)) + .collect(Collectors.toList()); + } + + @Test + void testMostCost() { + var before = of(before(), brokers()); + var after = of(after(), brokers()); + var timeLimit = new Configuration(Map.of(MigrateTimeCost.MAX_MIGRATE_TIME_KEY, "20000")); + var overFlowTimeLimit = + new Configuration(Map.of(MigrateTimeCost.MAX_MIGRATE_TIME_KEY, "19999")); + var cf = new MigrateTimeCost(timeLimit); + var overFlowCf = new MigrateTimeCost(overFlowTimeLimit); + var moveCost = cf.moveCost(before, after, clusterBean()); + var overflowCost = overFlowCf.moveCost(before, after, clusterBean()); + Assertions.assertFalse(moveCost.overflow()); + Assertions.assertTrue(overflowCost.overflow()); + } + + public static ClusterInfo of(List replicas, List nodeInfos) { + return ClusterInfo.of("fake", nodeInfos, Map.of(), replicas); + } + + private List after() { + return List.of( + Replica.builder() + .topic("t") + .partition(10) + .isLeader(true) + .size(10000000) + .brokerId(1) + .build(), + Replica.builder() + .topic("t") + .partition(11) + .isLeader(true) + .size(20000000) + .brokerId(2) + .build(), + Replica.builder() + .topic("t") + .partition(12) + .isLeader(true) + .size(30000000) + .brokerId(0) + .build(), + Replica.builder() + .topic("t") + .partition(12) + .isLeader(false) + .size(30000000) + .brokerId(1) + .build()); + } + + private List before() { + return List.of( + Replica.builder() + .topic("t") + .partition(10) + .isLeader(true) + .size(10000000) + .brokerId(0) + .build(), + Replica.builder() + .topic("t") + .partition(11) + .isLeader(true) + .size(20000000) + .brokerId(1) + .build(), + Replica.builder() + .topic("t") + .partition(12) + .isLeader(true) + .size(30000000) + .brokerId(2) + .build(), + Replica.builder() + .topic("t") + .partition(12) + .isLeader(false) + .size(30000000) + .brokerId(0) + .build()); + } + + private static ClusterBean clusterBean() { + return ClusterBean.of( + Map.of( + 0, + List.of( + new MigrateTimeCost.MaxReplicationInRateBean(inBean0), + new MigrateTimeCost.MaxReplicationOutRateBean(outBean0)), + 1, + List.of( + new MigrateTimeCost.MaxReplicationInRateBean(inBean1), + new MigrateTimeCost.MaxReplicationOutRateBean(outBean1)), + 2, + List.of( + new MigrateTimeCost.MaxReplicationInRateBean(inBean2), + new MigrateTimeCost.MaxReplicationOutRateBean(outBean2)))); + } +} diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 432da98212..7c4278be77 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -42,13 +42,14 @@ POST /balancer costConfig: -| config key | config value | value format | -| --------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | -| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | -| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | +| config key | config value | value format | +| --------------------------- |----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------| +| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.migrated.time.limit | 設定最大可接受的partition搬移時間 | "`limit second`" ex. 1,2,3,100 | +| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | | max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | 目前支援的 Cost Function