From b8fa0b6448f38313ccb22f0719e928e99f1ed910 Mon Sep 17 00:00:00 2001 From: Li Yu Heng Date: Tue, 21 Jan 2025 14:05:38 +0800 Subject: [PATCH] Refactor procedure framework (simplified StateMachineProcedure, and some other things) (#14683) (#14741) * delete KAWorking * delete ProcedureSuspended * delete "aborted" and suspended * refactor states and delete stateCount * refactor setNextState * delete previousState * spotless * compile * Revert "delete KAWorking" This reverts commit 43176fa105dd2536611ee59f29a7a022c84a4356. * recover KAWorker but rename it to TemporaryWorker * spotless * fix npe * delete useless code * delete useless code * necessary log (cherry picked from commit 41a49e7c1e6db3e88242f0b38f62bb5074fa8810) --- .../procedure/InternalProcedure.java | 5 - .../iotdb/confignode/procedure/Procedure.java | 78 -------- .../procedure/ProcedureExecutor.java | 113 ++++------- .../procedure/impl/StateMachineProcedure.java | 181 ++++++------------ .../impl/sync/StartPipeProcedure.java | 6 - .../procedure/entity/IncProcedure.java | 5 - .../procedure/entity/NoopProcedure.java | 5 - .../procedure/entity/SimpleLockProcedure.java | 5 - .../procedure/entity/SleepProcedure.java | 5 - .../procedure/entity/StuckProcedure.java | 5 - 10 files changed, 96 insertions(+), 312 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java index 817a8e381c38..473234304cb4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/InternalProcedure.java @@ -48,11 +48,6 @@ protected void rollback(Env env) throws IOException, InterruptedException { throw new UnsupportedOperationException(); } - @Override - protected boolean abort(Env env) { - throw new UnsupportedOperationException(); - } - @Override public void deserialize(ByteBuffer byteBuffer) {} } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java index 35fc40217aae..d4f97c1c31b0 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java @@ -70,20 +70,6 @@ public abstract class Procedure implements Comparable> { private int[] stackIndexes = null; - private boolean persist = true; - - public boolean needPersistance() { - return this.persist; - } - - public void resetPersistance() { - this.persist = true; - } - - public final void skipPersistance() { - this.persist = false; - } - public final boolean hasLock() { return locked; } @@ -119,17 +105,6 @@ protected abstract Procedure[] execute(Env env) protected abstract void rollback(Env env) throws IOException, InterruptedException, ProcedureException; - /** - * The abort() call is asynchronous and each procedure must decide how to deal with it, if they - * want to be abortable. The simplest implementation is to have an AtomicBoolean set in the - * abort() method and then the execute() will check if the abort flag is set or not. abort() may - * be called multiple times from the client, so the implementation must be idempotent. - * - *

NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the - * procedure implementor abort. - */ - protected abstract boolean abort(Env env); - public void serialize(DataOutputStream stream) throws IOException { // procid stream.writeLong(this.procId); @@ -263,28 +238,6 @@ public static Class deserializeTypeInfo(ByteBuffer byteBuffer) { return clazz; } - public static Procedure newInstance(ByteBuffer byteBuffer) { - Class procedureClass = deserializeTypeInfo(byteBuffer); - Procedure procedure; - try { - procedure = (Procedure) procedureClass.newInstance(); - } catch (InstantiationException | IllegalAccessException e) { - throw new RuntimeException("Instantiation failed", e); - } - return procedure; - } - - /** - * The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it - * will call us to determine whether we need to wait for initialization, second, it will call - * {@link #acquireLock(Object)} to actually handle the lock for this procedure. - * - * @return true means we need to wait until the environment has been initialized, otherwise true. - */ - protected boolean waitInitialized(Env env) { - return false; - } - /** * Acquire a lock, user should override it if necessary. * @@ -314,34 +267,6 @@ protected boolean holdLock(Env env) { return false; } - /** - * Called before the procedure is recovered and added into the queue. - * - * @param env environment - */ - protected final void beforeRecover(Env env) { - // no op - } - - /** - * Called when the procedure is recovered and added into the queue. - * - * @param env environment - */ - protected final void afterRecover(Env env) { - // no op - } - - /** - * Called when the procedure is completed (success or rollback). The procedure may use this method - * to clean up in-memory states. This operation will not be retried on failure. - * - * @param env environment - */ - protected void completionCleanup(Env env) { - // no op - } - /** * To make executor yield between each execution step to give other procedures a chance to run. * @@ -393,9 +318,6 @@ public void doRollback(Env env) throws IOException, InterruptedException, Proced * @return ProcedureLockState */ public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) { - if (waitInitialized(env)) { - return ProcedureLockState.LOCK_EVENT_WAIT; - } if (lockedWhenLoading) { lockedWhenLoading = false; locked = true; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java index d1bbb3219857..84e2b7f3f3a3 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java @@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; -import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException; import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler; import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler; @@ -186,24 +185,14 @@ private void recover() { // executing, we need to set its state to RUNNABLE. procedure.setState(ProcedureState.RUNNABLE); runnableList.add(procedure); - } else { - procedure.afterRecover(environment); } }); restoreLocks(); - waitingTimeoutList.forEach( - procedure -> { - procedure.afterRecover(environment); - timeoutExecutor.add(procedure); - }); + waitingTimeoutList.forEach(timeoutExecutor::add); failedList.forEach(scheduler::addBack); - runnableList.forEach( - procedure -> { - procedure.afterRecover(environment); - scheduler.addBack(procedure); - }); + runnableList.forEach(scheduler::addBack); scheduler.signalAll(); } @@ -418,21 +407,16 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p "The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc); return; } - boolean suspended = false; boolean reExecute; Procedure[] subprocs = null; do { reExecute = false; - proc.resetPersistance(); try { subprocs = proc.doExecute(this.environment); if (subprocs != null && subprocs.length == 0) { subprocs = null; } - } catch (ProcedureSuspendedException e) { - LOG.debug("Suspend {}", proc); - suspended = true; } catch (ProcedureYieldException e) { LOG.debug("Yield {}", proc); yieldProcedure(proc); @@ -455,22 +439,20 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p } } else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) { LOG.info("Added into timeoutExecutor {}", proc); - } else if (!suspended) { + } else { proc.setState(ProcedureState.SUCCESS); } } // add procedure into rollback stack. rootProcStack.addRollbackStep(proc); - if (proc.needPersistance()) { - updateStoreOnExecution(rootProcStack, proc, subprocs); - } + updateStoreOnExecution(rootProcStack, proc, subprocs); if (!store.isRunning()) { return; } - if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) { + if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) { yieldProcedure(proc); return; } @@ -481,7 +463,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure p } releaseLock(proc, false); - if (!suspended && proc.isFinished() && proc.hasParent()) { + if (proc.isFinished() && proc.hasParent()) { countDownChildren(rootProcStack, proc); } } @@ -518,6 +500,7 @@ private void submitChildrenProcedures(Procedure[] subprocs) { subproc.updateMetricsOnSubmit(getEnvironment()); procedures.put(subproc.getProcId(), subproc); scheduler.addFront(subproc); + LOG.info("Sub-Procedure pid={} has been submitted", subproc.getProcId()); } } @@ -693,11 +676,6 @@ private void executeCompletionCleanup(Procedure proc) { if (proc.hasLock()) { releaseLock(proc, true); } - try { - proc.completionCleanup(this.environment); - } catch (Throwable e) { - LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e); - } } private void rootProcedureCleanup(Procedure proc) { @@ -731,7 +709,7 @@ private class WorkerThread extends StoppableThread { protected long keepAliveTime = -1; public WorkerThread(ThreadGroup threadGroup) { - this(threadGroup, "ProcExecWorker-"); + this(threadGroup, "ProcedureCoreWorker-"); } public WorkerThread(ThreadGroup threadGroup, String prefix) { @@ -751,26 +729,28 @@ public void run() { while (isRunning() && keepAlive(lastUpdated)) { Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (procedure == null) { + Thread.sleep(1000); continue; } this.activeProcedure.set(procedure); - int activeCount = activeExecutorCount.incrementAndGet(); + activeExecutorCount.incrementAndGet(); startTime.set(System.currentTimeMillis()); executeProcedure(procedure); - activeCount = activeExecutorCount.decrementAndGet(); - LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount); + activeExecutorCount.decrementAndGet(); + LOG.trace( + "Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get()); this.activeProcedure.set(null); lastUpdated = System.currentTimeMillis(); startTime.set(lastUpdated); } - } catch (Throwable throwable) { + } catch (Exception e) { if (this.activeProcedure.get() != null) { LOG.warn( - "Procedure Worker {} terminated {}", + "Exception happened when worker {} execute procedure {}", getName(), this.activeProcedure.get(), - throwable); + e); } } finally { LOG.info("Procedure worker {} terminated.", getName()); @@ -796,12 +776,12 @@ public long getCurrentRunTime() { } } - // A worker thread which can be added when core workers are stuck. Will timeout after - // keepAliveTime if there is no procedure to run. - private final class KeepAliveWorkerThread extends WorkerThread { + // A temporary worker thread will be launched when too many core workers are stuck. + // They will timeout after keepAliveTime if there is no procedure to run. + private final class TemporaryWorkerThread extends WorkerThread { - public KeepAliveWorkerThread(ThreadGroup group) { - super(group, "KAProcExecWorker-"); + public TemporaryWorkerThread(ThreadGroup group) { + super(group, "ProcedureTemporaryWorker-"); this.keepAliveTime = TimeUnit.SECONDS.toMillis(10); } @@ -823,22 +803,25 @@ public WorkerMonitor() { updateTimestamp(); } - private int checkForStuckWorkers() { + private int calculateRunningAndStuckWorkers() { // Check if any of the worker is stuck - int stuckCount = 0; + int runningCount = 0, stuckCount = 0; for (WorkerThread worker : workerThreads) { - if (worker.activeProcedure.get() == null - || worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) { + if (worker.activeProcedure.get() == null) { continue; } - + runningCount++; // WARN the worker is stuck - stuckCount++; - LOG.warn( - "Worker stuck {}({}), run time {} ms", - worker, - worker.activeProcedure.get().getProcType(), - worker.getCurrentRunTime()); + if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) { + stuckCount++; + LOG.warn( + "Worker stuck {}({}), run time {} ms", + worker, + worker.activeProcedure.get().getProcType(), + worker.getCurrentRunTime()); + } + LOG.info( + "Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount); } return stuckCount; } @@ -854,7 +837,7 @@ private void checkThreadCount(final int stuckCount) { // Let's add new worker thread more aggressively, as they will timeout finally if there is no // work to do. if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) { - final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup); + final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup); workerThreads.add(worker); worker.start(); LOG.debug("Added new worker thread {}", worker); @@ -863,7 +846,7 @@ private void checkThreadCount(final int stuckCount) { @Override protected void periodicExecute(Env env) { - final int stuckCount = checkForStuckWorkers(); + final int stuckCount = calculateRunningAndStuckWorkers(); checkThreadCount(stuckCount); updateTimestamp(); } @@ -942,28 +925,6 @@ public long submitProcedure(Procedure procedure) { return pushProcedure(procedure); } - /** - * Abort a specified procedure. - * - * @param procId procedure id - * @param force whether abort the running procdure. - * @return true if the procedure exists and has received the abort. - */ - public boolean abort(long procId, boolean force) { - Procedure procedure = procedures.get(procId); - if (procedure != null) { - if (!force && procedure.wasExecuted()) { - return false; - } - return procedure.abort(this.environment); - } - return false; - } - - public boolean abort(long procId) { - return abort(procId, true); - } - public ProcedureScheduler getScheduler() { return scheduler; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java index ed32ca6f0755..1c775c2d2933 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/StateMachineProcedure.java @@ -32,9 +32,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.LinkedList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; /** * Procedure described by a series of steps. @@ -51,24 +50,21 @@ public abstract class StateMachineProcedure extends Procedure private static final int EOF_STATE = Integer.MIN_VALUE; - private final AtomicBoolean aborted = new AtomicBoolean(false); - private Flow stateFlow = Flow.HAS_MORE_STATE; - protected int stateCount = 0; - private int[] states = null; + private final LinkedList states = new LinkedList<>(); - private List> subProcList = null; + private final List> subProcList = new ArrayList<>(); - /** Cycles on same state. Good for figuring if we are stuck. */ + /** Cycles on the same state. Good for figuring if we are stuck. */ private int cycles = 0; - /** Ordinal of the previous state. So we can tell if we are progressing or not. */ - private int previousState; + private static final int NO_NEXT_STATE = -1; + private int nextState = NO_NEXT_STATE; /** Mark whether this procedure is called by a pipe forwarded request. */ protected boolean isGeneratedByPipe; - private boolean stateDeserialized = false; + private boolean isStateDeserialized = false; protected StateMachineProcedure() { this(false); @@ -136,20 +132,6 @@ protected abstract void rollbackState(Env env, TState state) */ protected void setNextState(final TState state) { setNextState(getStateId(state)); - failIfAborted(); - } - - /** - * By default, the executor will try ro run all the steps of the procedure start to finish. Return - * true to make the executor yield between execution steps to give other procedures time to run - * their steps. - * - * @param state the state we are going to execute next. - * @return Return true if the executor should yield before the execution of the specified step. - * Defaults to return false. - */ - protected boolean isYieldBeforeExecuteFromState(Env env, TState state) { - return false; } /** @@ -158,105 +140,84 @@ protected boolean isYieldBeforeExecuteFromState(Env env, TState state) { * @param childProcedure the child procedure */ protected void addChildProcedure(Procedure childProcedure) { - if (childProcedure == null) { - return; - } - if (subProcList == null) { - subProcList = new ArrayList<>(); - } subProcList.add(childProcedure); } @Override - protected Procedure[] execute(final Env env) + protected Procedure[] execute(final Env env) throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { updateTimestamp(); try { - failIfAborted(); - - if (!hasMoreState() || isFailed()) { + if (noMoreState() || isFailed()) { return null; } TState state = getCurrentState(); - if (stateCount == 0) { + if (states.isEmpty()) { setNextState(getStateId(state)); } - LOG.debug("{} {}; cycles={}", state, this, cycles); - // Keep running count of cycles - if (getStateId(state) != this.previousState) { - this.previousState = getStateId(state); - this.cycles = 0; - } else { - this.cycles++; - } - LOG.trace("{}", this); stateFlow = executeFromState(env, state); + addNextStateAndCalculateCycles(); setStateDeserialized(false); - if (!hasMoreState()) { - setNextState(EOF_STATE); - } - if (subProcList != null && !subProcList.isEmpty()) { - Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); - subProcList = null; + if (!subProcList.isEmpty()) { + Procedure[] subProcedures = subProcList.toArray(new Procedure[0]); + subProcList.clear(); return subProcedures; } - return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; + return (isWaiting() || isFailed() || noMoreState()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); } } + private void addNextStateAndCalculateCycles() { + int stateToBeAdded = EOF_STATE; + if (Flow.HAS_MORE_STATE == stateFlow) { + if (nextState == NO_NEXT_STATE) { + LOG.error( + "StateMachineProcedure pid={} not set next state, but return HAS_MORE_STATE", + getProcId()); + } else { + stateToBeAdded = nextState; + } + } else { + if (nextState != NO_NEXT_STATE) { + LOG.warn( + "StateMachineProcedure pid={} set next state to {}, but return NO_MORE_STATE", + getProcId(), + nextState); + } + } + if (getStateId(getCurrentState()) == stateToBeAdded) { + cycles++; + } else { + cycles = 0; + } + states.add(stateToBeAdded); + nextState = NO_NEXT_STATE; + } + @Override protected void rollback(final Env env) throws IOException, InterruptedException, ProcedureException { if (isEofState()) { - stateCount--; + states.removeLast(); } try { updateTimestamp(); rollbackState(env, getCurrentState()); } finally { - stateCount--; + states.removeLast(); updateTimestamp(); } } protected boolean isEofState() { - return stateCount > 0 && states[stateCount - 1] == EOF_STATE; - } - - @Override - protected boolean abort(final Env env) { - LOG.debug("Abort requested for {}", this); - if (!hasMoreState()) { - LOG.warn("Ignore abort request on {} because it has already been finished", this); - return false; - } - if (!isRollbackSupported(getCurrentState())) { - LOG.warn("Ignore abort request on {} because it does not support rollback", this); - return false; - } - aborted.set(true); - return true; - } - - /** - * If procedure has more states then abort it otherwise procedure is finished and abort can be - * ignored. - */ - protected final void failIfAborted() { - if (aborted.get()) { - if (hasMoreState()) { - setAbortFailure(getClass().getSimpleName(), "abort requested"); - } else { - LOG.warn("Ignoring abort request on state='{}' for {}", getCurrentState(), this); - } - } + return !states.isEmpty() && states.getLast() == EOF_STATE; } /** @@ -267,50 +228,28 @@ protected boolean isRollbackSupported(final TState state) { return false; } - @Override - protected boolean isYieldAfterExecution(final Env env) { - return isYieldBeforeExecuteFromState(env, getCurrentState()); - } - - private boolean hasMoreState() { - return stateFlow != Flow.NO_MORE_STATE; + private boolean noMoreState() { + return stateFlow == Flow.NO_MORE_STATE; } @Nullable protected TState getCurrentState() { - if (stateCount > 0) { - if (states[stateCount - 1] == EOF_STATE) { + if (!states.isEmpty()) { + if (states.getLast() == EOF_STATE) { return null; } - return getState(states[stateCount - 1]); + return getState(states.getLast()); } return getInitialState(); } - /** - * This method is used from test code as it cannot be assumed that state transition will happen - * sequentially. Some procedures may skip steps/ states, some may add intermediate steps in - * future. - */ - public int getCurrentStateId() { - return getStateId(getCurrentState()); - } - /** * Set the next state for the procedure. * * @param stateId the ordinal() of the state enum (or state id) */ private void setNextState(final int stateId) { - if (states == null || states.length == stateCount) { - int newCapacity = stateCount + 8; - if (states != null) { - states = Arrays.copyOf(states, newCapacity); - } else { - states = new int[newCapacity]; - } - } - states[stateCount++] = stateId; + nextState = stateId; } @Override @@ -324,26 +263,24 @@ protected void toStringState(StringBuilder builder) { @Override public void serialize(DataOutputStream stream) throws IOException { super.serialize(stream); - stream.writeInt(stateCount); - for (int i = 0; i < stateCount; ++i) { - stream.writeInt(states[i]); + stream.writeInt(states.size()); + for (int state : states) { + stream.writeInt(state); } } @Override public void deserialize(ByteBuffer byteBuffer) { super.deserialize(byteBuffer); - stateCount = byteBuffer.getInt(); + int stateCount = byteBuffer.getInt(); + states.clear(); if (stateCount > 0) { - states = new int[stateCount]; for (int i = 0; i < stateCount; ++i) { - states[i] = byteBuffer.getInt(); + states.add(byteBuffer.getInt()); } if (isEofState()) { stateFlow = Flow.NO_MORE_STATE; } - } else { - states = null; } this.setStateDeserialized(true); } @@ -355,10 +292,10 @@ public void deserialize(ByteBuffer byteBuffer) { * the code in this stage, which is the purpose of this variable. */ public boolean isStateDeserialized() { - return stateDeserialized; + return isStateDeserialized; } private void setStateDeserialized(boolean isDeserialized) { - this.stateDeserialized = isDeserialized; + this.isStateDeserialized = isDeserialized; } } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java index 7513d23c4588..d85aba66c5ae 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/sync/StartPipeProcedure.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.sync.PipeInfo; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2; import org.apache.iotdb.confignode.procedure.store.ProcedureType; @@ -57,11 +56,6 @@ public StartPipeProcedure(PipeInfo pipeInfo) { this.pipeInfo = pipeInfo; } - @Override - protected boolean abort(ConfigNodeProcedureEnv configNodeProcedureEnv) { - return false; - } - @Override public void serialize(DataOutputStream stream) throws IOException { stream.writeShort(ProcedureType.START_PIPE_PROCEDURE.getTypeCode()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java index 2a86110825d3..d85e40522a00 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/IncProcedure.java @@ -51,11 +51,6 @@ protected void rollback(TestProcEnv testProcEnv) throws IOException, Interrupted testProcEnv.rolledBackCount.getAndIncrement(); } - @Override - protected boolean abort(TestProcEnv testProcEnv) { - return true; - } - @Override public void serialize(DataOutputStream stream) throws IOException { stream.writeInt(TestProcedureFactory.TestProcedureType.INC_PROCEDURE.ordinal()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java index bdaf0401cc8f..159ee6f10775 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/NoopProcedure.java @@ -36,9 +36,4 @@ protected Procedure[] execute(TestProcEnv testProcEnv) @Override protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {} - - @Override - protected boolean abort(TestProcEnv testProcEnv) { - return false; - } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java index 564b98079420..9675df1f0675 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SimpleLockProcedure.java @@ -48,11 +48,6 @@ protected Procedure[] execute(TestProcEnv testProcEnv) @Override protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {} - @Override - protected boolean abort(TestProcEnv testProcEnv) { - return false; - } - @Override protected ProcedureLockState acquireLock(TestProcEnv testProcEnv) { if (testProcEnv.getEnvLock().tryLock()) { diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java index f3b2abf054a5..26a9a9afe61b 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/SleepProcedure.java @@ -40,11 +40,6 @@ protected Procedure[] execute(TestProcEnv testProcEnv) @Override protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {} - @Override - protected boolean abort(TestProcEnv testProcEnv) { - return false; - } - @Override public void serialize(DataOutputStream stream) throws IOException { stream.writeInt(TestProcedureFactory.TestProcedureType.SLEEP_PROCEDURE.ordinal()); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java index 0238b6a33bcb..1351d216beef 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/entity/StuckProcedure.java @@ -53,9 +53,4 @@ protected Procedure[] execute(final TestProcEnv env) { @Override protected void rollback(TestProcEnv testProcEnv) throws IOException, InterruptedException {} - - @Override - protected boolean abort(TestProcEnv testProcEnv) { - return false; - } }