Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ASSIGNOR] Implement CostAwareAssignor #1524

Open
wants to merge 67 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
011ccb3
Implement NetworkIngressAssignor
harryteng9527 Feb 27, 2023
f77a8c7
Add a test for checking greedyAssign
harryteng9527 Feb 27, 2023
34f8492
Add round-robin assign when the cost equals zero
harryteng9527 Feb 27, 2023
17fa82c
spotless
harryteng9527 Feb 27, 2023
2f6c7e7
Merge branch 'main' into impl-assignor
harryteng9527 Feb 27, 2023
ed52d24
add throwing exception when there is no mbeanObjects
harryteng9527 Mar 2, 2023
a9938e8
Merge branch 'main' into impl-assignor
harryteng9527 Mar 2, 2023
9a1f57e
spotless
harryteng9527 Mar 2, 2023
152e5bb
add more condition to verify whether there are sufficient metrics or not
harryteng9527 Mar 2, 2023
1e5cb45
Merge branch 'main' into impl-assignor
harryteng9527 Mar 4, 2023
a3e99b8
Add a parameter to set the waiting time that wait for fetch beanObject
harryteng9527 Mar 4, 2023
18a98ce
mask ClusterInfo with subscribed topics
harryteng9527 Mar 4, 2023
748259e
Add calculating the traffic interval to the score of cost
harryteng9527 Mar 5, 2023
03afbb6
Merge branch 'main' into impl-assignor
harryteng9527 Mar 5, 2023
25f84c8
tweak and add comment
harryteng9527 Mar 6, 2023
29c569c
rename and add comment
harryteng9527 Mar 7, 2023
e92a050
rename and add more comment
harryteng9527 Mar 8, 2023
9d85e8e
Move Kafka configuration to ours
harryteng9527 Mar 8, 2023
c2a8293
Add test
harryteng9527 Mar 8, 2023
9c514d3
Merge branch 'main' into impl-assignor
harryteng9527 Mar 8, 2023
5f389f1
Add ClusterInfo masked
harryteng9527 Mar 8, 2023
25ea0ee
Merge branch 'main' into impl-assignor
harryteng9527 Mar 9, 2023
b1ae99a
remove masked
harryteng9527 Mar 10, 2023
1e0b6bd
add new assign methods
harryteng9527 Mar 11, 2023
f1ec882
add comment and modify greedyAssign
harryteng9527 Mar 11, 2023
3690aa9
Merge branch 'main' into impl-assignor
harryteng9527 Mar 11, 2023
84b6ed4
spotless
harryteng9527 Mar 11, 2023
3e4146d
Add throw exception and change field type
harryteng9527 Mar 12, 2023
e0ccc02
Move the fields to sub-class
harryteng9527 Mar 12, 2023
b1e0acc
Merge branch 'main' into impl-assignor
harryteng9527 Mar 12, 2023
9316267
modify retry machanism
harryteng9527 Mar 15, 2023
574abf3
Fix coding style
harryteng9527 Mar 15, 2023
349da0b
Merge branch 'main' into impl-assignor
harryteng9527 Apr 11, 2023
897d4e8
Merge branch 'main' into impl-assignor
harryteng9527 Apr 14, 2023
c2d73b5
Reference feedback to assign partitions
harryteng9527 Apr 15, 2023
4d4f0b4
Add test for greedyAssign
harryteng9527 Apr 15, 2023
1407b6d
Pass config into NetworkIngressCost
harryteng9527 Apr 15, 2023
52553a5
Replace flatMap to map
harryteng9527 Apr 16, 2023
5f2eb0d
Revise lambda to avoid creating unnecessary object
harryteng9527 Apr 16, 2023
e594ce8
Add comment for greedyAssign
harryteng9527 Apr 16, 2023
38a7d83
Merge branch 'main' into impl-assignor
harryteng9527 Apr 17, 2023
e5aaa8a
Modify object name
harryteng9527 Apr 17, 2023
279128e
Separate assign and check incompatibility
harryteng9527 Apr 23, 2023
489b071
Merge branch 'main' into impl-assignor
harryteng9527 Apr 23, 2023
03517c7
Fix style
harryteng9527 Apr 23, 2023
8406bf6
Add Assign interface to move greedy impl to it
harryteng9527 Apr 23, 2023
534d7cf
Add the interface to reassign based on incompatible
harryteng9527 Apr 23, 2023
cc6582f
Change name
harryteng9527 Apr 23, 2023
10a87de
Fix test
harryteng9527 Apr 23, 2023
8e925cc
Rename interfaces
harryteng9527 Apr 25, 2023
9db68d2
revise shuffle
harryteng9527 May 1, 2023
7462966
Reduce the complexity of shuffle
harryteng9527 May 2, 2023
2fade96
Add test
harryteng9527 May 2, 2023
a6b9b3b
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
74f24cf
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
af35031
Spotless
harryteng9527 May 2, 2023
3e4b57c
Add wait
harryteng9527 May 2, 2023
f842f00
Revise wait
harryteng9527 May 2, 2023
3b0258a
Remove retry and test
harryteng9527 May 2, 2023
353eca7
Merge branch 'origin/main' into impl-assignor
harryteng9527 May 2, 2023
80219e4
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
0d5820e
Use new shuffler
harryteng9527 May 23, 2023
7e5aac6
Add filter to avoid Null pointer and make skewCostLimiter more strict
harryteng9527 May 23, 2023
98be5ee
Modify randomShuffler signature, replace config to shuffleTime
harryteng9527 May 23, 2023
9fb2b11
Add GeneratorTest and modify Hint
harryteng9527 May 23, 2023
43a25f9
Fix Hint
harryteng9527 May 23, 2023
d4f55a8
Merge branch 'main' into impl-assignor
harryteng9527 May 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions common/src/main/java/org/astraea/common/assignor/Assignor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
Expand All @@ -34,18 +36,21 @@
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.consumer.ConsumerConfigs;
import org.astraea.common.cost.HasPartitionCost;
import org.astraea.common.cost.ReplicaLeaderSizeCost;
import org.astraea.common.cost.NetworkIngressCost;
import org.astraea.common.metrics.collector.LocalMetricCollector;
import org.astraea.common.metrics.collector.MetricCollector;
import org.astraea.common.partitioner.PartitionerUtils;

/** Abstract assignor implementation which does some common work (e.g., configuration). */
public abstract class Assignor implements ConsumerPartitionAssignor, Configurable {
public static final String JMX_PORT = "jmx.port";
public static final String MAX_WAIT_BEAN = "max.wait.bean";
public static final String MAX_TRAFFIC_MIB_INTERVAL = "max.traffic.mib.interval";
int maxWaitBean = 3;
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
int maxTrafficMiBInterval = 10;
Function<Integer, Optional<Integer>> jmxPortGetter = (id) -> Optional.empty();
private String bootstrap;
HasPartitionCost costFunction = HasPartitionCost.EMPTY;
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: metric collector may be configured by user in the future.
// TODO: need to track the performance when using the assignor in large scale consumers, see
// https://github.com/skiptests/astraea/pull/1162#discussion_r1036285677
protected MetricCollector metricCollector = null;
Expand All @@ -59,8 +64,6 @@ public abstract class Assignor implements ConsumerPartitionAssignor, Configurabl
*/
protected abstract Map<String, List<TopicPartition>> assign(
Map<String, org.astraea.common.assignor.Subscription> subscriptions, ClusterInfo clusterInfo);
// TODO: replace the topicPartitions by ClusterInfo after Assignor is able to handle Admin
// https://github.com/skiptests/astraea/issues/1409

/**
* Parse config to get JMX port and cost function type.
Expand All @@ -71,6 +74,20 @@ protected void configure(Configuration config) {}

// -----------------------[helper]-----------------------//

/**
* get all topics which consumers subscribe.
*
* @param subscriptions Consumers' subscriptions
* @return A set of topics the consumers subscribe
*/
protected Set<String> topics(
Map<String, org.astraea.common.assignor.Subscription> subscriptions) {
return subscriptions.values().stream()
.map(org.astraea.common.assignor.Subscription::topics)
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
}

/**
* check the nodes which wasn't register yet.
*
Expand Down Expand Up @@ -141,12 +158,14 @@ public final void configure(Map<String, ?> configs) {
configs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
config.string(ConsumerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> bootstrap = s);
config.integer(MAX_WAIT_BEAN).ifPresent(value -> this.maxWaitBean = value);
config.integer(MAX_TRAFFIC_MIB_INTERVAL).ifPresent(value -> this.maxTrafficMiBInterval = value);
var costFunctions = Utils.costFunctions(config, HasPartitionCost.class);
var customJMXPort = PartitionerUtils.parseIdJMXPort(config);
var defaultJMXPort = config.integer(JMX_PORT);
this.costFunction =
costFunctions.isEmpty()
? HasPartitionCost.of(Map.of(new ReplicaLeaderSizeCost(), 1D))
? HasPartitionCost.of(Map.of(new NetworkIngressCost(), 1D))
: HasPartitionCost.of(costFunctions);
this.jmxPortGetter = id -> Optional.ofNullable(customJMXPort.get(id)).or(() -> defaultJMXPort);
metricCollector =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.assignor;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.astraea.common.DataRate;
import org.astraea.common.Utils;
import org.astraea.common.admin.BrokerTopic;
import org.astraea.common.admin.ClusterBean;
import org.astraea.common.admin.ClusterInfo;
import org.astraea.common.admin.Replica;
import org.astraea.common.admin.TopicPartition;
import org.astraea.common.cost.NoSufficientMetricsException;
import org.astraea.common.metrics.HasBeanObject;
import org.astraea.common.metrics.broker.HasRate;
import org.astraea.common.metrics.broker.ServerMetrics;

public class NetworkIngressAssignor extends Assignor {
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected Map<String, List<TopicPartition>> assign(
Map<String, org.astraea.common.assignor.Subscription> subscriptions,
ClusterInfo clusterInfo) {
var consumers = subscriptions.keySet();
var subscribedTopics = topics(subscriptions);
// 1. check unregister node. if there are unregister nodes, register them
registerUnregisterNode(clusterInfo);
// wait for clusterBean
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
Utils.waitFor(
() -> !metricCollector.clusterBean().all().isEmpty(), Duration.ofSeconds(maxWaitBean));
var clusterBean = metricCollector.clusterBean();

// 2. get the network cost of all subscribed topic
var networkCost =
costFunction
.partitionCost(ClusterInfo.masked(clusterInfo, subscribedTopics::contains), clusterBean)
.value();
var costPerBroker = costPerBroker(clusterInfo, subscribedTopics, networkCost);
var intervalPerBroker = convertTrafficToCost(clusterInfo, clusterBean, costPerBroker);
return greedyAssign(costPerBroker, consumers, intervalPerBroker);
}

/**
* register unregistered nodes if present. if we didn't register unregistered nodes, we would miss
* the beanObjects from the nodes
*
* @param clusterInfo Currently cluster information.
*/
private void registerUnregisterNode(ClusterInfo clusterInfo) {
var unregister = checkUnregister(clusterInfo.nodes());
if (!unregister.isEmpty()) registerJMX(unregister);
}

/**
* perform assign algorithm to get balanced assignment and ensure that 1. each consumer would
* receive the cost that are as close as possible to each other. 2. similar loads within a node
* would be assigned to the same consumer.
*
* @param costs the tp and their cost within a node
* @param consumers consumers' name
* @return the assignment
*/
Map<String, List<TopicPartition>> greedyAssign(
Map<Integer, Map<TopicPartition, Double>> costs,
Set<String> consumers,
Map<Integer, Double> limitedPerBroker) {
// initial
var assignment = new HashMap<String, List<TopicPartition>>();
for (var consumer : consumers) {
assignment.put(consumer, new ArrayList<>());
}
var costPerConsumer =
assignment.keySet().stream()
.map(c -> Map.entry(c, (double) 0))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
costs
.entrySet()
.forEach(
costPerBroker -> {
if (costPerBroker.getValue().values().stream().mapToDouble(x -> x).sum() == 0) {
// TODO: use logLeaderSize cost to assign when there is no network ingress cost
// if there are no cost, round-robin assign per node
var iter = consumers.iterator();
for (var tp : costPerBroker.getValue().keySet()) {
assignment.get(iter.next()).add(tp);
if (!iter.hasNext()) iter = consumers.iterator();
}
} else {
// let networkIngress cost be ascending order
var sortedCost = new LinkedHashMap<TopicPartition, Double>();
costPerBroker.getValue().entrySet().stream()
.sorted(Map.Entry.comparingByValue())
.forEach(entry -> sortedCost.put(entry.getKey(), entry.getValue()));
// maintain the temp loading of the consumer
var tmpCostPerConsumer = new HashMap<>(costPerConsumer);
// get the consumer with the largest load
Supplier<String> largestCostConsumer =
() ->
Collections.max(tmpCostPerConsumer.entrySet(), Map.Entry.comparingByValue())
.getKey();
var consumer = largestCostConsumer.get();
var lastValue = Collections.min(sortedCost.values());

for (var e : sortedCost.entrySet()) {
var tp = e.getKey();
var cost = e.getValue();

if (cost - lastValue > limitedPerBroker.get(costPerBroker.getKey())) {
tmpCostPerConsumer.remove(consumer);
consumer = largestCostConsumer.get();
lastValue = cost;
}

assignment.get(consumer).add(tp);
costPerConsumer.computeIfPresent(consumer, (ignore, c) -> c + cost);
}
}
});
return assignment;
}

Map<Integer, Map<TopicPartition, Double>> costPerBroker(
ClusterInfo clusterInfo, Set<String> topics, Map<TopicPartition, Double> cost) {
return clusterInfo
.replicaStream()
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.filter(replica -> topics.contains(replica.topic()))
.collect(Collectors.groupingBy(replica -> replica.nodeInfo().id()))
.entrySet()
.stream()
.map(
e ->
Map.entry(
e.getKey(),
e.getValue().stream()
.map(
replica ->
Map.entry(
replica.topicPartition(), cost.get(replica.topicPartition())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
// visible for test
/**
* For all nodes, calculate the cost of given traffic. The assignor would use the cost to produce
* the assignment.
*
* @param clusterInfo the clusterInfo
* @param clusterBean the clusterBean
* @param tpCostPerBroker the partition cost of every broker
* @return the Map from broker id to the cost of given traffic
*/
Map<Integer, Double> convertTrafficToCost(
ClusterInfo clusterInfo,
ClusterBean clusterBean,
Map<Integer, Map<TopicPartition, Double>> tpCostPerBroker) {
var interval = DataRate.MiB.of(maxTrafficMiBInterval).perSecond().byteRate();
var partitionsTraffic =
replicaLeaderLocation(clusterInfo).entrySet().stream()
.flatMap(
e -> {
var bt = e.getKey();
var totalReplicaSize = e.getValue().stream().mapToLong(Replica::size).sum();
var totalShare =
(double)
clusterBean
.brokerTopicMetrics(bt, ServerMetrics.Topic.Meter.class)
.filter(
bean -> bean.type().equals(ServerMetrics.Topic.BYTES_IN_PER_SEC))
.max(Comparator.comparingLong(HasBeanObject::createdTimestamp))
.map(HasRate::fifteenMinuteRate)
.orElse(0.0);

if (Double.isNaN(totalShare) || totalShare < 0.0 || totalReplicaSize < 0) {
throw new NoSufficientMetricsException(
costFunction,
Duration.ofSeconds(1),
"no enough metric to calculate traffic");
}
var calculateShare =
(Function<Replica, Long>)
(replica) ->
totalReplicaSize > 0
? (long) ((totalShare * replica.size()) / totalReplicaSize)
: 0L;
return e.getValue().stream()
.map(r -> Map.entry(r.topicPartition(), calculateShare.apply(r)));
})
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, Map.Entry::getValue));

return tpCostPerBroker.entrySet().stream()
.map(
e -> {
var tpCost =
e.getValue().entrySet().stream()
.filter(entry -> entry.getValue() > 0.0)
.findFirst()
.orElseThrow();
var traffic = partitionsTraffic.get(tpCost.getKey());
var normalizedCost = tpCost.getValue();

var result = normalizedCost / (traffic / interval);
return Map.entry(e.getKey(), result);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private Map<BrokerTopic, List<Replica>> replicaLeaderLocation(ClusterInfo clusterInfo) {
return clusterInfo
.replicaStream()
.filter(Replica::isLeader)
.filter(Replica::isOnline)
.map(
replica -> Map.entry(BrokerTopic.of(replica.nodeInfo().id(), replica.topic()), replica))
.collect(
Collectors.groupingBy(
Map.Entry::getKey,
Collectors.mapping(Map.Entry::getValue, Collectors.toUnmodifiableList())));
}

@Override
public String name() {
return "networkIngress";
}
}
Loading