From 032412396b9c497fbb5815fbb8590e4262290514 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Fri, 26 May 2023 21:33:10 +0800 Subject: [PATCH 1/5] limit brokerCost to ReplicaLeaderCost --- .../common/cost/ReplicaLeaderCost.java | 23 +++++++++++++- .../common/cost/ReplicaLeaderCostTest.java | 31 ++++++++++++++++++- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index 0ea33d71d2..125ec85cb7 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.astraea.common.Configuration; +import org.astraea.common.admin.Broker; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.metrics.ClusterBean; import org.astraea.common.metrics.broker.ServerMetrics; @@ -32,13 +33,24 @@ public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMove private final Dispersion dispersion = Dispersion.normalizedStandardDeviation(); private final Configuration config; public static final String MAX_MIGRATE_LEADER_KEY = "max.migrated.leader.number"; + static final String BROKER_COST_LIMIT_KEY = "max.broker.total.leader.number"; + private final Map brokerMoveCostLimit; public ReplicaLeaderCost() { - this.config = new Configuration(Map.of()); + this(Configuration.EMPTY); } public ReplicaLeaderCost(Configuration config) { this.config = config; + this.brokerMoveCostLimit = brokerMoveCostLimit(config); + } + + private Map brokerMoveCostLimit(Configuration configuration) { + return configuration.list(BROKER_COST_LIMIT_KEY, ",").stream() + .collect( + Collectors.toMap( + idAndPath -> Integer.parseInt(idAndPath.split(":")[0]), + idAndPath -> Integer.parseInt(idAndPath.split(":")[1]))); } @Override @@ -79,8 +91,17 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { var replicaLeaderIn = replicaLeaderToAdd(before, after); + var brokerLeaderNum = + after.brokers().stream() + .collect(Collectors.toMap(Broker::id, b -> after.replicaLeaders(b.id()).size())); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); + var brokerOverflow = + this.brokerMoveCostLimit.entrySet().stream() + .anyMatch( + leaderLimit -> + brokerLeaderNum.getOrDefault(leaderLimit.getKey(), 0) > leaderLimit.getValue()); + if (brokerOverflow) return () -> true; var overflow = maxMigratedLeader < replicaLeaderIn.values().stream().map(Math::abs).mapToLong(s -> s).sum(); diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index caa18ea1e7..fd7e688390 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -33,6 +33,8 @@ public class ReplicaLeaderCostTest { @Test void testLeaderCount() { + + // test sum overflow var baseCluster = ClusterInfo.builder() .addNode(Set.of(1, 2)) @@ -63,9 +65,36 @@ void testLeaderCount() { new ReplicaLeaderCost( new Configuration(Map.of(ReplicaLeaderCost.MAX_MIGRATE_LEADER_KEY, "10"))) .moveCost(sourceCluster, overFlowTargetCluster, ClusterBean.EMPTY); - Assertions.assertTrue(overFlowMoveCost.overflow()); Assertions.assertFalse(noOverFlowMoveCost.overflow()); + + // test broker overflow + var brokerTargetCluster = + ClusterInfo.builder(baseCluster) + .addTopic( + "topic1", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) + .addTopic( + "topic2", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(2)).build()) + .build(); + + var brokerOverFlowTargetCluster = + ClusterInfo.builder(baseCluster) + .addTopic( + "topic1", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) + .addTopic( + "topic2", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) + .build(); + var overFlowMoveCost2 = + new ReplicaLeaderCost( + new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "1:5"))) + .moveCost(sourceCluster, brokerOverFlowTargetCluster, ClusterBean.EMPTY); + + var noOverFlowMoveCost2 = + new ReplicaLeaderCost( + new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "1:3"))) + .moveCost(sourceCluster, brokerTargetCluster, ClusterBean.EMPTY); + Assertions.assertTrue(overFlowMoveCost2.overflow()); + Assertions.assertFalse(noOverFlowMoveCost2.overflow()); } @Test From 79b41b9d9d267f0b6a8f8653f30917dbedd3b7eb Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Mon, 29 May 2023 16:47:17 +0800 Subject: [PATCH 2/5] update docs --- docs/web_server/web_api_balancer_chinese.md | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 309f924645..1db0c6b06b 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -33,14 +33,15 @@ POST /balancer costConfig: -| config key | config value | value format | -| --------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | -| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | -| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | -| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | -| max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | +| config key | config value | value format | +| ------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ | +| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | +| max.broker.total.leader.number | 設定一個broker最大可持有的leader數量 | "`broker Id` + `:` + `limit number`" ex. "0:150 ,1:30" | +| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | +| max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | 目前支援的 Cost Function From e3dc0ef8a6f221e4eea4edf5359197de970dd19d Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 30 May 2023 01:32:53 +0800 Subject: [PATCH 3/5] update docs --- .../astraea/common/cost/ReplicaLeaderCost.java | 9 +++------ .../common/cost/ReplicaLeaderCostTest.java | 4 ++-- docs/web_server/web_api_balancer_chinese.md | 18 +++++++++--------- 3 files changed, 14 insertions(+), 17 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index 125ec85cb7..d3d79e0d92 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -22,7 +22,6 @@ import java.util.Map; import java.util.stream.Collectors; import org.astraea.common.Configuration; -import org.astraea.common.admin.Broker; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.metrics.ClusterBean; import org.astraea.common.metrics.broker.ServerMetrics; @@ -33,7 +32,7 @@ public class ReplicaLeaderCost implements HasBrokerCost, HasClusterCost, HasMove private final Dispersion dispersion = Dispersion.normalizedStandardDeviation(); private final Configuration config; public static final String MAX_MIGRATE_LEADER_KEY = "max.migrated.leader.number"; - static final String BROKER_COST_LIMIT_KEY = "max.broker.total.leader.number"; + static final String BROKER_COST_LIMIT_KEY = "max.broker.migrated.leader.number"; private final Map brokerMoveCostLimit; public ReplicaLeaderCost() { @@ -91,16 +90,14 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { var replicaLeaderIn = replicaLeaderToAdd(before, after); - var brokerLeaderNum = - after.brokers().stream() - .collect(Collectors.toMap(Broker::id, b -> after.replicaLeaders(b.id()).size())); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); var brokerOverflow = this.brokerMoveCostLimit.entrySet().stream() .anyMatch( leaderLimit -> - brokerLeaderNum.getOrDefault(leaderLimit.getKey(), 0) > leaderLimit.getValue()); + replicaLeaderIn.getOrDefault(leaderLimit.getKey(), 0L) + > leaderLimit.getValue()); if (brokerOverflow) return () -> true; var overflow = maxMigratedLeader diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index fd7e688390..f922281e2a 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -86,12 +86,12 @@ void testLeaderCount() { .build(); var overFlowMoveCost2 = new ReplicaLeaderCost( - new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "1:5"))) + new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "2:2"))) .moveCost(sourceCluster, brokerOverFlowTargetCluster, ClusterBean.EMPTY); var noOverFlowMoveCost2 = new ReplicaLeaderCost( - new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "1:3"))) + new Configuration(Map.of(ReplicaLeaderCost.BROKER_COST_LIMIT_KEY, "2:3"))) .moveCost(sourceCluster, brokerTargetCluster, ClusterBean.EMPTY); Assertions.assertTrue(overFlowMoveCost2.overflow()); Assertions.assertFalse(noOverFlowMoveCost2.overflow()); diff --git a/docs/web_server/web_api_balancer_chinese.md b/docs/web_server/web_api_balancer_chinese.md index 1db0c6b06b..af40c5d6d6 100644 --- a/docs/web_server/web_api_balancer_chinese.md +++ b/docs/web_server/web_api_balancer_chinese.md @@ -33,15 +33,15 @@ POST /balancer costConfig: -| config key | config value | value format | -| ------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ | -| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | -| max.broker.total.leader.number | 設定一個broker最大可持有的leader數量 | "`broker Id` + `:` + `limit number`" ex. "0:150 ,1:30" | -| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | -| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | -| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | -| max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | +| config key | config value | value format | +| --------------------------------- | ------------------------------------------------------------ | ------------------------------------------------------------ | +| max.migrated.size | 設定最大可搬移的資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.migrated.leader.number | 設定最大可搬移的leader 數量 | "`limit number`" ex. 1,2,3,100 | +| max.broker.migrated.leader.number | 設定搬移過程中broker最大可搬移的leader數量 | "`broker Id` + `:` + `limit number`" ex. "0:150 ,1:30" | +| max.migrated.replica.number | 設定最大可搬移的replica 數量 | "`limit number`" ex. 1,2,3,100 | +| max.migrated.leader.size | 設定最大可搬移的leader 資料量 | "`data size` + `unit`" ex.100KB, 500MB, 3GB | +| max.broker.total.disk.space | 設定搬移過程中broker最大可以佔用的replica 資料量 | "`broker Id` + `:` + `data size` " ex. "0:1500MB ,1:1000MB ,2:1500MB" | +| max.broker.path.disk.space | 設定搬移過程中broker上的data folder最大可以佔用的replica 資料量 | "`broker Id` + `-` + `data path` + `:` + `data size` " ex. "0-/path0:1500MB,1-/path0:1000MB,2-/path0:1500MB,2-/path1:900MB" | 目前支援的 Cost Function From 3a38b033b7a91ad7a7b4eeaec69260f52dc27ff7 Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 30 May 2023 01:42:45 +0800 Subject: [PATCH 4/5] fix conflict --- .../astraea/common/cost/ReplicaLeaderCostTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java index cb539255d1..03f6cf9b38 100644 --- a/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/ReplicaLeaderCostTest.java @@ -83,18 +83,14 @@ void testLeaderCount() { // test broker overflow var brokerTargetCluster = ClusterInfo.builder(baseCluster) - .addTopic( - "topic1", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) - .addTopic( - "topic2", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(2)).build()) + .addTopic("topic1", 3, (short) 1, r -> Replica.builder(r).brokerId(1).build()) + .addTopic("topic2", 3, (short) 1, r -> Replica.builder(r).brokerId(2).build()) .build(); var brokerOverFlowTargetCluster = ClusterInfo.builder(baseCluster) - .addTopic( - "topic1", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) - .addTopic( - "topic2", 3, (short) 1, r -> Replica.builder(r).broker(baseCluster.node(1)).build()) + .addTopic("topic1", 3, (short) 1, r -> Replica.builder(r).brokerId(1).build()) + .addTopic("topic2", 3, (short) 1, r -> Replica.builder(r).brokerId(1).build()) .build(); var overFlowMoveCost2 = new ReplicaLeaderCost( From 55de744f4e86360d5709101bfe9cc4ea732beb3c Mon Sep 17 00:00:00 2001 From: qoo332001 Date: Tue, 30 May 2023 04:53:48 +0800 Subject: [PATCH 5/5] fix bug --- .../main/java/org/astraea/common/cost/ReplicaLeaderCost.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java index d3d79e0d92..151a537351 100644 --- a/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java +++ b/common/src/main/java/org/astraea/common/cost/ReplicaLeaderCost.java @@ -17,6 +17,7 @@ package org.astraea.common.cost; import static org.astraea.common.cost.MigrationCost.replicaLeaderToAdd; +import static org.astraea.common.cost.MigrationCost.replicaLeaderToRemove; import java.util.List; import java.util.Map; @@ -90,6 +91,7 @@ public Configuration config() { @Override public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clusterBean) { var replicaLeaderIn = replicaLeaderToAdd(before, after); + var replicaLeaderOut = replicaLeaderToRemove(before, after); var maxMigratedLeader = config.string(MAX_MIGRATE_LEADER_KEY).map(Long::parseLong).orElse(Long.MAX_VALUE); var brokerOverflow = @@ -97,6 +99,7 @@ public MoveCost moveCost(ClusterInfo before, ClusterInfo after, ClusterBean clus .anyMatch( leaderLimit -> replicaLeaderIn.getOrDefault(leaderLimit.getKey(), 0L) + + replicaLeaderOut.getOrDefault(leaderLimit.getKey(), 0L) > leaderLimit.getValue()); if (brokerOverflow) return () -> true; var overflow =