Skip to content

Commit

Permalink
Pipe: Introduce a minimum restart interval to optimize the restart st…
Browse files Browse the repository at this point in the history
…rategy to avoid frequent restarts & Stay tsfile extraction mode if the task is currently restarted (#14374)

Co-authored-by: Steve Yurong Su <rong@apache.org>
  • Loading branch information
XNX02 and SteveYurongSu authored Dec 16, 2024
1 parent f6b16ed commit 6a28a07
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -103,6 +104,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {

private static final AtomicLong LAST_FORCED_RESTART_TIME =
new AtomicLong(System.currentTimeMillis());
private static final Map<String, AtomicLong> PIPE_NAME_TO_LAST_RESTART_TIME_MAP =
new ConcurrentHashMap<>();

////////////////////////// Pipe Task Management Entry //////////////////////////

Expand Down Expand Up @@ -473,6 +476,8 @@ protected void collectPipeMetaListInternal(
///////////////////////// Restart Logic /////////////////////////

public void restartAllStuckPipes() {
removeOutdatedPipeInfoFromLastRestartTimeMap();

if (!tryWriteLockWithTimeOut(5)) {
return;
}
Expand All @@ -484,6 +489,16 @@ public void restartAllStuckPipes() {
releaseWriteLock();
}

// If the pipe has been restarted recently, skip it.
stuckPipes.removeIf(
pipeMeta -> {
final AtomicLong lastRestartTime =
PIPE_NAME_TO_LAST_RESTART_TIME_MAP.get(pipeMeta.getStaticMeta().getPipeName());
return lastRestartTime != null
&& System.currentTimeMillis() - lastRestartTime.get()
< PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs();
});

// Restart all stuck pipes.
// Note that parallelStream cannot be used here. The method PipeTaskAgent#dropPipe also uses
// parallelStream. If parallelStream is used here, the subtasks generated inside the dropPipe
Expand All @@ -493,6 +508,18 @@ public void restartAllStuckPipes() {
stuckPipes.forEach(this::restartStuckPipe);
}

private void removeOutdatedPipeInfoFromLastRestartTimeMap() {
PIPE_NAME_TO_LAST_RESTART_TIME_MAP
.entrySet()
.removeIf(
entry -> {
final AtomicLong lastRestartTime = entry.getValue();
return lastRestartTime == null
|| PipeConfig.getInstance().getPipeStuckRestartMinIntervalMs()
<= System.currentTimeMillis() - lastRestartTime.get();
});
}

private Set<PipeMeta> findAllStuckPipes() {
final Set<PipeMeta> stuckPipes = new HashSet<>();

Expand Down Expand Up @@ -616,7 +643,13 @@ private void restartStuckPipe(final PipeMeta pipeMeta) {
final long startTime = System.currentTimeMillis();
final PipeMeta originalPipeMeta = pipeMeta.deepCopy4TaskAgent();
handleDropPipe(pipeMeta.getStaticMeta().getPipeName());

final long restartTime = System.currentTimeMillis();
PIPE_NAME_TO_LAST_RESTART_TIME_MAP
.computeIfAbsent(pipeMeta.getStaticMeta().getPipeName(), k -> new AtomicLong(restartTime))
.set(restartTime);
handleSinglePipeMetaChanges(originalPipeMeta);

LOGGER.warn(
"Pipe {} was restarted because of stuck, time cost: {} ms.",
originalPipeMeta.getStaticMeta(),
Expand All @@ -628,6 +661,10 @@ private void restartStuckPipe(final PipeMeta pipeMeta) {
}
}

public boolean isPipeTaskCurrentlyRestarted(final String pipeName) {
return PIPE_NAME_TO_LAST_RESTART_TIME_MAP.containsKey(pipeName);
}

///////////////////////// Terminate Logic /////////////////////////

public void markCompleted(final String pipeName, final int regionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,22 +202,29 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
}

private boolean canNotUseTabletAnyMore() {
// In the following 5 cases, we should not extract any more tablet events. all the data
// In the following 7 cases, we should not extract any more tablet events. all the data
// represented by the tablet events should be carried by the following tsfile event:
// 0. If the pipe task is currently restarted.
// 1. If Wal size > maximum size of wal buffer,
// the write operation will be throttled, so we should not extract any more tablet events.
// 2. The number of pinned memtables has reached the dangerous threshold.
// 3. The number of historical tsFile events to transfer has exceeded the limit.
// 4. The number of realtime tsfile events to transfer has exceeded the limit.
// 5. The number of linked tsfiles has reached the dangerous threshold.
return mayWalSizeReachThrottleThreshold()
// 6. The shallow memory usage of the insert node has reached the dangerous threshold.
return isPipeTaskCurrentlyRestarted()
|| mayWalSizeReachThrottleThreshold()
|| mayMemTablePinnedCountReachDangerousThreshold()
|| isHistoricalTsFileEventCountExceededLimit()
|| isRealtimeTsFileEventCountExceededLimit()
|| mayTsFileLinkedCountReachDangerousThreshold()
|| mayInsertNodeMemoryReachDangerousThreshold();
}

private boolean isPipeTaskCurrentlyRestarted() {
return PipeDataNodeAgent.task().isPipeTaskCurrentlyRestarted(pipeName);
}

private boolean mayWalSizeReachThrottleThreshold() {
return 3 * WALManager.getInstance().getTotalDiskUsage()
> IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public class CommonConfig {
private long pipeMaxAllowedLinkedTsFileCount = 100;
private float pipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage = 0.1F;
private long pipeStuckRestartIntervalSeconds = 120;
private long pipeStuckRestartMinIntervalMs = 30 * 60 * 1000L; // 30 minutes

private int pipeMetaReportMaxLogNumPerRound = 10;
private int pipeMetaReportMaxLogIntervalRounds = 36;
Expand Down Expand Up @@ -1029,10 +1030,18 @@ public long getPipeStuckRestartIntervalSeconds() {
return pipeStuckRestartIntervalSeconds;
}

public long getPipeStuckRestartMinIntervalMs() {
return pipeStuckRestartMinIntervalMs;
}

public void setPipeStuckRestartIntervalSeconds(long pipeStuckRestartIntervalSeconds) {
this.pipeStuckRestartIntervalSeconds = pipeStuckRestartIntervalSeconds;
}

public void setPipeStuckRestartMinIntervalMs(long pipeStuckRestartMinIntervalMs) {
this.pipeStuckRestartMinIntervalMs = pipeStuckRestartMinIntervalMs;
}

public int getPipeMetaReportMaxLogNumPerRound() {
return pipeMetaReportMaxLogNumPerRound;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ private void loadPipeProps(TrimProperties properties) {
properties.getProperty(
"pipe_stuck_restart_interval_seconds",
String.valueOf(config.getPipeStuckRestartIntervalSeconds()))));
config.setPipeStuckRestartMinIntervalMs(
Long.parseLong(
properties.getProperty(
"pipe_stuck_restart_min_interval_ms",
String.valueOf(config.getPipeStuckRestartMinIntervalMs()))));

config.setPipeMetaReportMaxLogNumPerRound(
Integer.parseInt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ public long getPipeStuckRestartIntervalSeconds() {
return COMMON_CONFIG.getPipeStuckRestartIntervalSeconds();
}

public long getPipeStuckRestartMinIntervalMs() {
return COMMON_CONFIG.getPipeStuckRestartMinIntervalMs();
}

/////////////////////////////// Logger ///////////////////////////////

public int getPipeMetaReportMaxLogNumPerRound() {
Expand Down Expand Up @@ -442,6 +446,7 @@ public void printAllConfigs() {
"PipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage: {}",
getPipeMaxAllowedLinkedDeletedTsFileDiskUsagePercentage());
LOGGER.info("PipeStuckRestartIntervalSeconds: {}", getPipeStuckRestartIntervalSeconds());
LOGGER.info("PipeStuckRestartMinIntervalMs: {}", getPipeStuckRestartMinIntervalMs());

LOGGER.info("PipeMetaReportMaxLogNumPerRound: {}", getPipeMetaReportMaxLogNumPerRound());
LOGGER.info("PipeMetaReportMaxLogIntervalRounds: {}", getPipeMetaReportMaxLogIntervalRounds());
Expand Down

0 comments on commit 6a28a07

Please sign in to comment.