diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java index 14d4f604c22c..ecbbc641e4b9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java @@ -34,8 +34,8 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable { protected final PipeConnectorSubtask subtask; private final UnboundedBlockingPendingQueue pendingQueue; - private int runningTaskCount; - private int registeredTaskCount; + protected int runningTaskCount; + protected int registeredTaskCount; public PipeConnectorSubtaskLifeCycle( PipeConnectorSubtaskExecutor executor, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index e5383bfaae41..7f29b3c55f99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; public class PipeProcessorSubtask extends PipeReportableSubtask { @@ -96,7 +96,7 @@ public PipeProcessorSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, - final ScheduledExecutorService ignored, + final ExecutorService ignored, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; this.subtaskScheduler = subtaskScheduler; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java index c5b841a8c66f..9b7587381d99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java @@ -20,7 +20,6 @@ package org.apache.iotdb.db.subscription.task.subtask; import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue; -import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask; import org.apache.iotdb.db.subscription.agent.SubscriptionAgent; import org.apache.iotdb.pipe.api.PipeConnector; @@ -31,8 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; - public class SubscriptionConnectorSubtask extends PipeConnectorSubtask { private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class); @@ -76,13 +73,8 @@ public UnboundedBlockingPendingQueue getInputPendingQueue() { @Override protected void registerCallbackHookAfterSubmit(final ListenableFuture future) { - final ListenableFuture nextFuture = - Futures.withTimeout( - future, - Duration.ofSeconds( - SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs()), - subtaskCallbackListeningExecutor); - Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor); + // TODO: Futures.withTimeout + Futures.addCallback(future, this, subtaskCallbackListeningExecutor); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java index 57fb2c7004de..359690fa2729 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java @@ -34,17 +34,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends PipeConnectorSubtaskL private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtaskLifeCycle.class); - private int runningTaskCount; - private int registeredTaskCount; - public SubscriptionConnectorSubtaskLifeCycle( final PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask final UnboundedBlockingPendingQueue pendingQueue) { super(executor, subtask, pendingQueue); - - runningTaskCount = 0; - registeredTaskCount = 0; } @Override @@ -69,7 +63,7 @@ public synchronized void register() { } @Override - public synchronized boolean deregister(final String ignored, int regionId) { + public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) { if (registeredTaskCount <= 0) { throw new IllegalStateException("registeredTaskCount <= 0"); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java index c4c96dad3e77..4ea7714962bf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/execution/PipeSubtaskExecutor.java @@ -32,14 +32,14 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; public abstract class PipeSubtaskExecutor { private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class); - private static final ScheduledExecutorService subtaskCallbackListeningExecutor = - IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + private static final ExecutorService subtaskCallbackListeningExecutor = + IoTDBThreadPoolFactory.newSingleThreadExecutor( ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName()); protected final WrappedThreadPoolExecutor underlyingThreadPool; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java index 58cc142713d0..cfd987758e46 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeAbstractConnectorSubtask.java @@ -33,7 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask { @@ -43,7 +43,7 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask protected PipeConnector outputPipeConnector; // For thread pool to execute callbacks - protected ScheduledExecutorService subtaskCallbackListeningExecutor; + protected ExecutorService subtaskCallbackListeningExecutor; // For controlling subtask submitting, making sure that // a subtask is submitted to only one thread at a time @@ -62,7 +62,7 @@ protected PipeAbstractConnectorSubtask( @Override public void bindExecutors( final ListeningExecutorService subtaskWorkerThreadPoolExecutor, - final ScheduledExecutorService subtaskCallbackListeningExecutor, + final ExecutorService subtaskCallbackListeningExecutor, final PipeSubtaskScheduler subtaskScheduler) { this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor; this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java index 1169711b46d2..2da797c2b3b5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/subtask/PipeSubtask.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.Callable; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -65,7 +65,7 @@ protected PipeSubtask(final String taskID, final long creationTime) { public abstract void bindExecutors( ListeningExecutorService subtaskWorkerThreadPoolExecutor, - ScheduledExecutorService subtaskCallbackListeningExecutor, + ExecutorService subtaskCallbackListeningExecutor, PipeSubtaskScheduler subtaskScheduler); @Override