Skip to content

Commit

Permalink
fix(s3stream): fix available bandwidth metrics (#2119)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh authored Nov 5, 2024
1 parent 04b53f4 commit 4f93b92
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
33 changes: 20 additions & 13 deletions core/src/main/scala/kafka/log/stream/s3/DefaultS3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,6 @@

package kafka.log.stream.s3;

import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.log.stream.s3.network.ControllerRequestSender;
import kafka.log.stream.s3.objects.ControllerObjectManager;
import kafka.log.stream.s3.streams.ControllerStreamManager;
import kafka.server.BrokerServer;

import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.automq.AutoMQVersion;

import com.automq.stream.api.Client;
import com.automq.stream.api.KVClient;
import com.automq.stream.api.StreamClient;
Expand All @@ -39,6 +30,8 @@
import com.automq.stream.s3.failover.HaltStorageFailureHandler;
import com.automq.stream.s3.failover.StorageFailureHandlerChain;
import com.automq.stream.s3.index.LocalStreamRangeIndexCache;
import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.s3.metrics.stats.NetworkStats;
import com.automq.stream.s3.network.AsyncNetworkBandwidthLimiter;
import com.automq.stream.s3.network.GlobalNetworkBandwidthLimiters;
import com.automq.stream.s3.network.NetworkBandwidthLimiter;
Expand All @@ -55,16 +48,24 @@
import com.automq.stream.utils.LogContext;
import com.automq.stream.utils.Time;
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import kafka.autobalancer.metricsreporter.metric.Derivator;
import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.log.stream.s3.network.ControllerRequestSender;
import kafka.log.stream.s3.objects.ControllerObjectManager;
import kafka.log.stream.s3.streams.ControllerStreamManager;
import kafka.server.BrokerServer;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.automq.AutoMQVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultS3Client implements Client {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultS3Client.class);
protected final Config config;
protected final Derivator networkInboundRate = new Derivator();
protected final Derivator networkOutboundRate = new Derivator();
private StreamMetadataManager metadataManager;

protected ControllerRequestSender requestSender;
Expand Down Expand Up @@ -110,9 +111,15 @@ public void start() {
GlobalNetworkBandwidthLimiters.instance().setup(AsyncNetworkBandwidthLimiter.Type.INBOUND,
refillToken, config.refillPeriodMs(), maxToken);
networkInboundLimiter = GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.INBOUND);
S3StreamMetricsManager.registerNetworkAvailableBandwidthSupplier(AsyncNetworkBandwidthLimiter.Type.INBOUND, () ->
config.networkBaselineBandwidth() - (long) networkInboundRate.derive(
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkInboundUsageTotal().get()));
GlobalNetworkBandwidthLimiters.instance().setup(AsyncNetworkBandwidthLimiter.Type.OUTBOUND,
refillToken, config.refillPeriodMs(), maxToken);
networkOutboundLimiter = GlobalNetworkBandwidthLimiters.instance().get(AsyncNetworkBandwidthLimiter.Type.OUTBOUND);
S3StreamMetricsManager.registerNetworkAvailableBandwidthSupplier(AsyncNetworkBandwidthLimiter.Type.OUTBOUND, () ->
config.networkBaselineBandwidth() - (long) networkOutboundRate.derive(
TimeUnit.NANOSECONDS.toSeconds(System.nanoTime()), NetworkStats.getInstance().networkOutboundUsageTotal().get()));
ObjectStorage objectStorage = ObjectStorageFactory.instance().builder(dataBucket).tagging(config.objectTagging())
.inboundLimiter(networkInboundLimiter).outboundLimiter(networkOutboundLimiter).readWriteIsolate(true)
.threadPrefix("dataflow").build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,21 +475,29 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) {
});
}

public static void registerNetworkLimiterSupplier(AsyncNetworkBandwidthLimiter.Type type,
Supplier<Long> networkAvailableBandwidthSupplier,
public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type,
Supplier<Integer> networkLimiterQueueSizeSupplier) {
switch (type) {
case INBOUND:
S3StreamMetricsManager.networkInboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier;
S3StreamMetricsManager.networkInboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier;
break;
case OUTBOUND:
S3StreamMetricsManager.networkOutboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier;
S3StreamMetricsManager.networkOutboundLimiterQueueSizeSupplier = networkLimiterQueueSizeSupplier;
break;
}
}

public static void registerNetworkAvailableBandwidthSupplier(AsyncNetworkBandwidthLimiter.Type type, Supplier<Long> networkAvailableBandwidthSupplier) {
switch (type) {
case INBOUND:
S3StreamMetricsManager.networkInboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier;
break;
case OUTBOUND:
S3StreamMetricsManager.networkOutboundAvailableBandwidthSupplier = networkAvailableBandwidthSupplier;
break;
}
}

public static void registerDeltaWalOffsetSupplier(Supplier<Long> deltaWalStartOffsetSupplier,
Supplier<Long> deltaWalTrimmedOffsetSupplier) {
S3StreamMetricsManager.deltaWalStartOffsetSupplier = deltaWalStartOffsetSupplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public AsyncNetworkBandwidthLimiter(Type type, long tokenSize, int refillInterva
this.callbackThreadPool = Executors.newFixedThreadPool(1, new DefaultThreadFactory("callback-thread"));
this.callbackThreadPool.execute(this::run);
this.refillThreadPool.scheduleAtFixedRate(this::refillToken, refillIntervalMs, refillIntervalMs, TimeUnit.MILLISECONDS);
S3StreamMetricsManager.registerNetworkLimiterSupplier(type, this::getAvailableTokens, this::getQueueSize);
S3StreamMetricsManager.registerNetworkLimiterQueueSizeSupplier(type, this::getQueueSize);
LOGGER.info("AsyncNetworkBandwidthLimiter initialized, type: {}, tokenSize: {}, maxTokens: {}, refillIntervalMs: {}",
type.getName(), tokenSize, maxTokens, refillIntervalMs);
}
Expand Down

0 comments on commit 4f93b92

Please sign in to comment.