Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASSIGNOR] Implement CostAwareAssignor #1524

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
011ccb3
Implement NetworkIngressAssignor
harryteng9527 Feb 27, 2023
f77a8c7
Add a test for checking greedyAssign
harryteng9527 Feb 27, 2023
34f8492
Add round-robin assign when the cost equals zero
harryteng9527 Feb 27, 2023
17fa82c
spotless
harryteng9527 Feb 27, 2023
2f6c7e7
Merge branch 'main' into impl-assignor
harryteng9527 Feb 27, 2023
ed52d24
add throwing exception when there is no mbeanObjects
harryteng9527 Mar 2, 2023
a9938e8
Merge branch 'main' into impl-assignor
harryteng9527 Mar 2, 2023
9a1f57e
spotless
harryteng9527 Mar 2, 2023
152e5bb
add more condition to verify whether there are sufficient metrics or not
harryteng9527 Mar 2, 2023
1e5cb45
Merge branch 'main' into impl-assignor
harryteng9527 Mar 4, 2023
a3e99b8
Add a parameter to set the waiting time that wait for fetch beanObject
harryteng9527 Mar 4, 2023
18a98ce
mask ClusterInfo with subscribed topics
harryteng9527 Mar 4, 2023
748259e
Add calculating the traffic interval to the score of cost
harryteng9527 Mar 5, 2023
03afbb6
Merge branch 'main' into impl-assignor
harryteng9527 Mar 5, 2023
25f84c8
tweak and add comment
harryteng9527 Mar 6, 2023
29c569c
rename and add comment
harryteng9527 Mar 7, 2023
e92a050
rename and add more comment
harryteng9527 Mar 8, 2023
9d85e8e
Move Kafka configuration to ours
harryteng9527 Mar 8, 2023
c2a8293
Add test
harryteng9527 Mar 8, 2023
9c514d3
Merge branch 'main' into impl-assignor
harryteng9527 Mar 8, 2023
5f389f1
Add ClusterInfo masked
harryteng9527 Mar 8, 2023
25ea0ee
Merge branch 'main' into impl-assignor
harryteng9527 Mar 9, 2023
b1ae99a
remove masked
harryteng9527 Mar 10, 2023
1e0b6bd
add new assign methods
harryteng9527 Mar 11, 2023
f1ec882
add comment and modify greedyAssign
harryteng9527 Mar 11, 2023
3690aa9
Merge branch 'main' into impl-assignor
harryteng9527 Mar 11, 2023
84b6ed4
spotless
harryteng9527 Mar 11, 2023
3e4146d
Add throw exception and change field type
harryteng9527 Mar 12, 2023
e0ccc02
Move the fields to sub-class
harryteng9527 Mar 12, 2023
b1e0acc
Merge branch 'main' into impl-assignor
harryteng9527 Mar 12, 2023
9316267
modify retry machanism
harryteng9527 Mar 15, 2023
574abf3
Fix coding style
harryteng9527 Mar 15, 2023
349da0b
Merge branch 'main' into impl-assignor
harryteng9527 Apr 11, 2023
897d4e8
Merge branch 'main' into impl-assignor
harryteng9527 Apr 14, 2023
c2d73b5
Reference feedback to assign partitions
harryteng9527 Apr 15, 2023
4d4f0b4
Add test for greedyAssign
harryteng9527 Apr 15, 2023
1407b6d
Pass config into NetworkIngressCost
harryteng9527 Apr 15, 2023
52553a5
Replace flatMap to map
harryteng9527 Apr 16, 2023
5f2eb0d
Revise lambda to avoid creating unnecessary object
harryteng9527 Apr 16, 2023
e594ce8
Add comment for greedyAssign
harryteng9527 Apr 16, 2023
38a7d83
Merge branch 'main' into impl-assignor
harryteng9527 Apr 17, 2023
e5aaa8a
Modify object name
harryteng9527 Apr 17, 2023
279128e
Separate assign and check incompatibility
harryteng9527 Apr 23, 2023
489b071
Merge branch 'main' into impl-assignor
harryteng9527 Apr 23, 2023
03517c7
Fix style
harryteng9527 Apr 23, 2023
8406bf6
Add Assign interface to move greedy impl to it
harryteng9527 Apr 23, 2023
534d7cf
Add the interface to reassign based on incompatible
harryteng9527 Apr 23, 2023
cc6582f
Change name
harryteng9527 Apr 23, 2023
10a87de
Fix test
harryteng9527 Apr 23, 2023
8e925cc
Rename interfaces
harryteng9527 Apr 25, 2023
9db68d2
revise shuffle
harryteng9527 May 1, 2023
7462966
Reduce the complexity of shuffle
harryteng9527 May 2, 2023
2fade96
Add test
harryteng9527 May 2, 2023
a6b9b3b
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
74f24cf
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
af35031
Spotless
harryteng9527 May 2, 2023
3e4b57c
Add wait
harryteng9527 May 2, 2023
f842f00
Revise wait
harryteng9527 May 2, 2023
3b0258a
Remove retry and test
harryteng9527 May 2, 2023
353eca7
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
80219e4
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
0d5820e
Use new shuffler
harryteng9527 May 23, 2023
7e5aac6
Add filter to avoid Null pointer and make skewCostLimiter more strict
harryteng9527 May 23, 2023
98be5ee
Modify randomShuffler signature, replace config to shuffleTime
harryteng9527 May 23, 2023
9fb2b11
Add GeneratorTest and modify Hint
harryteng9527 May 23, 2023
43a25f9
Fix Hint
harryteng9527 May 23, 2023
d4f55a8
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,22 @@
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.cost.NetworkIngressCost;
import org.astraea.common.metrics.JndiClient;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.collector.MetricStore;
import org.astraea.common.partitioner.PartitionerUtils;

/** Abstract assignor implementation which does some common work (e.g., configuration). */
public abstract class Assignor implements ConsumerPartitionAssignor, Configurable {
private Configuration config;
protected Configuration config;
public static final String COST_PREFIX = "assignor.cost";
public static final String JMX_PORT = "jmx.port";
Function<Integer, Integer> jmxPortGetter =
(id) -> {
throw new NoSuchElementException("must define either broker.x.jmx.port or jmx.port");
};
HasPartitionCost costFunction = HasPartitionCost.EMPTY;
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: metric collector may be configured by user in the future.
// TODO: need to track the performance when using the assignor in large scale consumers, see
// https://github.com/skiptests/astraea/pull/1162#discussion_r1036285677
protected MetricStore metricStore = null;
Expand All @@ -68,8 +67,6 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
*/
protected abstract Map<String, List<TopicPartition>> assign(
Map<String, SubscriptionInfo> subscriptions, ClusterInfo clusterInfo);
// TODO: replace the topicPartitions by ClusterInfo after Assignor is able to handle Admin
// https://github.com/skiptests/astraea/issues/1409

/**
* Parse config to get JMX port and cost function type.
Expand Down Expand Up @@ -171,7 +168,7 @@ public final void configure(Map<String, ?> configs) {
var defaultJMXPort = config.integer(JMX_PORT);
this.costFunction =
costFunctions.isEmpty()
? HasPartitionCost.of(Map.of(new ReplicaLeaderSizeCost(), 1D))
? HasPartitionCost.of(Map.of(new NetworkIngressCost(config), 1D))
: HasPartitionCost.of(costFunctions);
this.jmxPortGetter =
id ->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.TopicPartition;

/**
* This assignor scores the partitions by cost function(s) that user given. Each cost function
* evaluate the partitions' cost in each node by metrics depend on which cost function user use. The
* default cost function ranks partitions that are in the same node by NetworkIngressCost{@link
* org.astraea.common.cost.NetworkIngressCost}
*
* <p>The important configs are JMX port. Most cost function need the JMX metrics to score
* partitions. Normally, all brokers use the same JMX port, so you could just define the
* `jmx.port=12345`. If one of brokers uses different JMX client port, you can define
* `broker.1001.jmx.port=3456` (`1001` is the broker id) to replace the value of `jmx.port`. If the
* jmx port is undefined, only local mbean client is created for each cost function.
*/
public class CostAwareAssignor extends Assignor {
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
protected static final String MAX_RETRY_TIME = "max.retry.time";
protected static final String SHUFFLE_TIME = "shuffle.time";
Duration maxRetryTime = Duration.ofSeconds(30);
Duration shuffleTime = Duration.ofSeconds(5);

@Override
protected Map<String, List<TopicPartition>> assign(
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, SubscriptionInfo> subscriptions, ClusterInfo clusterInfo) {
var subscribedTopics =
subscriptions.values().stream()
.map(SubscriptionInfo::topics)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());

metricStore.wait(
(clusterBean) ->
costFunction.partitionCost(clusterInfo, clusterBean).value().values().stream()
.noneMatch(v -> Double.isNaN(v)),
shuffleTime);

var clusterBean = metricStore.clusterBean();
var partitionCost = costFunction.partitionCost(clusterInfo, clusterBean);
var cost =
partitionCost.value().entrySet().stream()
.filter(e -> subscribedTopics.contains(e.getKey().topic()))
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));
var incompatiblePartition = partitionCost.incompatibility();

var shuffler =
Shuffler.randomShuffler(
subscriptions,
cost,
incompatiblePartition,
config.duration(SHUFFLE_TIME).orElse(shuffleTime).toMillis());
return shuffler.shuffle();
}

@Override
protected void configure(Configuration config) {
config.duration(MAX_RETRY_TIME).ifPresent(v -> this.maxRetryTime = v);
config.duration(SHUFFLE_TIME).ifPresent(v -> this.shuffleTime = v);
}

@Override
public String name() {
return "costAware";
}
}
24 changes: 19 additions & 5 deletions common/src/main/java/org/astraea/common/assignor/Hint.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,28 @@ static Hint of(Set<Hint> hints) {
static Hint lowCostHint(
Map<String, SubscriptionInfo> subscriptions, Map<TopicPartition, Double> partitionCost) {
return (currentAssignment, tp) -> {
var candidates =
var consumerPerCost =
currentAssignment.entrySet().stream()
.filter(e -> subscriptions.get(e.getKey()).topics().contains(tp.topic()))
.map(
e ->
Map.entry(
e.getKey(), e.getValue().stream().mapToDouble(partitionCost::get).sum()))
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.toList();
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
List<String> candidates;

if (consumerPerCost.containsValue(0.0))
candidates =
consumerPerCost.entrySet().stream()
.filter(e -> e.getValue() == 0.0)
.map(Map.Entry::getKey)
.toList();
else
candidates =
consumerPerCost.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.toList();

return candidates.stream().limit((long) Math.ceil(candidates.size() / 2.0)).toList();
};
Expand All @@ -61,7 +73,8 @@ static Hint incompatibleHint(
.filter(e -> e.getValue().topics().contains(tp.topic()))
.map(Map.Entry::getKey)
.toList();
if (incompatibilities.get(tp).isEmpty()) return subscriber;
if (incompatibilities.containsKey(tp) && incompatibilities.get(tp).isEmpty())
return subscriber;

var candidates =
currentAssignment.entrySet().stream()
Expand All @@ -71,6 +84,7 @@ static Hint incompatibleHint(
Map.entry(
e.getKey(),
e.getValue().stream()
.filter(incompatibilities::containsKey)
.filter(p -> incompatibilities.get(p).contains(tp))
.count()))
.collect(
Expand Down
19 changes: 2 additions & 17 deletions common/src/main/java/org/astraea/common/assignor/Limiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,14 @@ static Limiter incompatibleLimiter(Map<TopicPartition, Set<TopicPartition>> inco
== 0;
}

static Limiter skewCostLimiter(
Map<TopicPartition, Double> partitionCost, Map<String, SubscriptionInfo> subscriptions) {
var tmpConsumerCost =
subscriptions.keySet().stream()
.map(c -> Map.entry(c, 0.0))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
partitionCost.entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.forEach(
tp -> {
var minCostConsumer =
tmpConsumerCost.entrySet().stream().min(Map.Entry.comparingByValue()).get();
minCostConsumer.setValue(minCostConsumer.getValue() + partitionCost.get(tp));
});
static Limiter skewCostLimiter(Map<TopicPartition, Double> partitionCost) {
var standardDeviation =
(Function<Collection<Double>, Double>)
(vs) -> {
var average = vs.stream().mapToDouble(c -> c).average().getAsDouble();
return Math.sqrt(
vs.stream().mapToDouble(v -> Math.pow(v - average, 2)).average().getAsDouble());
};
var limit = standardDeviation.apply(tmpConsumerCost.values());

return (combinator) -> {
var sd =
Expand All @@ -84,7 +69,7 @@ static Limiter skewCostLimiter(
.map(tps -> tps.stream().mapToDouble(partitionCost::get).sum())
.collect(Collectors.toSet()));

return sd < limit;
return sd == 0.0;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.astraea.common.Configuration;
import org.astraea.common.admin.TopicPartition;

public interface Shuffler {
Expand All @@ -32,19 +31,17 @@ static Shuffler randomShuffler(
Map<String, SubscriptionInfo> subscriptions,
Map<TopicPartition, Double> partitionCost,
Map<TopicPartition, Set<TopicPartition>> incompatible,
Configuration config) {
long shuffleTime) {
var limiters =
Limiter.of(
Set.of(
Limiter.skewCostLimiter(partitionCost, subscriptions),
Limiter.incompatibleLimiter(incompatible)));
Limiter.skewCostLimiter(partitionCost), Limiter.incompatibleLimiter(incompatible)));
var hints =
Hint.of(
Set.of(
Hint.lowCostHint(subscriptions, partitionCost),
Hint.incompatibleHint(subscriptions, incompatible)));
var generator = Generator.randomGenerator(subscriptions, partitionCost, hints);
var shuffleTime = config.duration("shuffle.time").get().toMillis();
var standardDeviation =
(Function<Map<String, List<TopicPartition>>, Double>)
(combinator) -> {
Expand Down Expand Up @@ -79,7 +76,6 @@ static Shuffler randomShuffler(
}
rejectedCombinators.add(combinator);
}

return result == null
? rejectedCombinators.stream()
.map(c -> Map.entry(c, standardDeviation.apply(c)))
Expand Down
130 changes: 130 additions & 0 deletions common/src/test/java/org/astraea/common/assignor/GeneratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.astraea.common.admin.TopicPartition;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class GeneratorTest {
@Test
void testCombinator() {
var checkSubscriptionAndAssignment =
(BiFunction<Map<String, List<TopicPartition>>, Map<String, SubscriptionInfo>, Boolean>)
(assignment, subscription) ->
assignment.entrySet().stream()
.allMatch(
e ->
e.getValue().stream()
.allMatch(
tp ->
subscription
.get(e.getKey())
.topics()
.contains(tp.topic())));

// test same subscription
var subscription =
Map.of(
"c1",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null),
"c2",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null));
var cost =
Map.of(
TopicPartition.of("t1", 0),
0.2,
TopicPartition.of("t1", 1),
0.4,
TopicPartition.of("t2", 0),
0.2,
TopicPartition.of("t3", 0),
0.4);
var randomGenerator =
Generator.randomGenerator(subscription, cost, Hint.lowCostHint(subscription, cost));
var result = randomGenerator.get();
var partitionsInResult = result.values().stream().flatMap(Collection::stream).toList();
var partitionInCluster = cost.keySet();
Assertions.assertEquals(partitionInCluster.size(), partitionsInResult.size());
Assertions.assertTrue(partitionsInResult.containsAll(partitionInCluster));
Assertions.assertTrue(checkSubscriptionAndAssignment.apply(result, subscription));
result.forEach((c, tps) -> Assertions.assertFalse(tps.isEmpty()));

// Validate that unsubscribed partitions are not be assigned to the consumer.
subscription =
Map.of(
"c1",
new SubscriptionInfo(List.of("t1"), null),
"c2",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null));
randomGenerator =
Generator.randomGenerator(subscription, cost, Hint.lowCostHint(subscription, cost));
result = randomGenerator.get();
partitionsInResult = result.values().stream().flatMap(Collection::stream).toList();

Assertions.assertEquals(partitionInCluster.size(), partitionsInResult.size());
Assertions.assertTrue(partitionsInResult.containsAll(partitionInCluster));
Assertions.assertTrue(checkSubscriptionAndAssignment.apply(result, subscription));

// Every consumer get at least one partition
subscription =
Map.of(
"c1",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null),
"c2",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null),
"c3",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null),
"c4",
new SubscriptionInfo(List.of("t1", "t2", "t3"), null));
randomGenerator =
Generator.randomGenerator(subscription, cost, Hint.lowCostHint(subscription, cost));
result = randomGenerator.get();
result.forEach((c, tps) -> Assertions.assertEquals(1, tps.size()));

// Validate there is no consumer was assigned more than 1 partition
cost =
Map.of(
TopicPartition.of("t1", 0),
0.2,
TopicPartition.of("t2", 0),
0.2,
TopicPartition.of("t3", 0),
0.4);
randomGenerator =
Generator.randomGenerator(subscription, cost, Hint.lowCostHint(subscription, cost));
result = randomGenerator.get();
result.forEach((c, tps) -> Assertions.assertTrue(tps.size() <= 1));

subscription =
Map.of(
"c1",
new SubscriptionInfo(List.of("t1"), null),
"c2",
new SubscriptionInfo(List.of("t2"), null),
"c3",
new SubscriptionInfo(List.of("t3"), null));
randomGenerator =
Generator.randomGenerator(subscription, cost, Hint.lowCostHint(subscription, cost));
result = randomGenerator.get();
Assertions.assertTrue(checkSubscriptionAndAssignment.apply(result, subscription));
}
}