Skip to content

Commit

Permalink
setup
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Nov 22, 2024
1 parent a689726 commit 3121c61
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public class PipeConnectorSubtaskLifeCycle implements AutoCloseable {
protected final PipeConnectorSubtask subtask;
private final UnboundedBlockingPendingQueue<Event> pendingQueue;

private int runningTaskCount;
private int registeredTaskCount;
protected int runningTaskCount;
protected int registeredTaskCount;

public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

public class PipeProcessorSubtask extends PipeReportableSubtask {
Expand Down Expand Up @@ -96,7 +96,7 @@ public PipeProcessorSubtask(
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
final ScheduledExecutorService ignored,
final ExecutorService ignored,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskScheduler = subtaskScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iotdb.db.subscription.task.subtask;

import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.agent.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.pipe.api.PipeConnector;
Expand All @@ -31,8 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class SubscriptionConnectorSubtask extends PipeConnectorSubtask {

private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConnectorSubtask.class);
Expand Down Expand Up @@ -76,13 +73,8 @@ public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {

@Override
protected void registerCallbackHookAfterSubmit(final ListenableFuture<Boolean> future) {
final ListenableFuture<Boolean> nextFuture =
Futures.withTimeout(
future,
Duration.ofSeconds(
SubscriptionConfig.getInstance().getSubscriptionDefaultTimeoutInMs()),
subtaskCallbackListeningExecutor);
Futures.addCallback(nextFuture, this, subtaskCallbackListeningExecutor);
// TODO: Futures.withTimeout
Futures.addCallback(future, this, subtaskCallbackListeningExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends PipeConnectorSubtaskL
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConnectorSubtaskLifeCycle.class);

private int runningTaskCount;
private int registeredTaskCount;

public SubscriptionConnectorSubtaskLifeCycle(
final PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor
final PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
final UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(executor, subtask, pendingQueue);

runningTaskCount = 0;
registeredTaskCount = 0;
}

@Override
Expand All @@ -69,7 +63,7 @@ public synchronized void register() {
}

@Override
public synchronized boolean deregister(final String ignored, int regionId) {
public synchronized boolean deregister(final String pipeNameToDeregister, int regionId) {
if (registeredTaskCount <= 0) {
throw new IllegalStateException("registeredTaskCount <= 0");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;

public abstract class PipeSubtaskExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeSubtaskExecutor.class);

private static final ScheduledExecutorService subtaskCallbackListeningExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
private static final ExecutorService subtaskCallbackListeningExecutor =
IoTDBThreadPoolFactory.newSingleThreadExecutor(
ThreadName.PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL.getName());

protected final WrappedThreadPoolExecutor underlyingThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;

public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask {

Expand All @@ -43,7 +43,7 @@ public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask
protected PipeConnector outputPipeConnector;

// For thread pool to execute callbacks
protected ScheduledExecutorService subtaskCallbackListeningExecutor;
protected ExecutorService subtaskCallbackListeningExecutor;

// For controlling subtask submitting, making sure that
// a subtask is submitted to only one thread at a time
Expand All @@ -62,7 +62,7 @@ protected PipeAbstractConnectorSubtask(
@Override
public void bindExecutors(
final ListeningExecutorService subtaskWorkerThreadPoolExecutor,
final ScheduledExecutorService subtaskCallbackListeningExecutor,
final ExecutorService subtaskCallbackListeningExecutor,
final PipeSubtaskScheduler subtaskScheduler) {
this.subtaskWorkerThreadPoolExecutor = subtaskWorkerThreadPoolExecutor;
this.subtaskCallbackListeningExecutor = subtaskCallbackListeningExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -65,7 +65,7 @@ protected PipeSubtask(final String taskID, final long creationTime) {

public abstract void bindExecutors(
ListeningExecutorService subtaskWorkerThreadPoolExecutor,
ScheduledExecutorService subtaskCallbackListeningExecutor,
ExecutorService subtaskCallbackListeningExecutor,
PipeSubtaskScheduler subtaskScheduler);

@Override
Expand Down

0 comments on commit 3121c61

Please sign in to comment.