Skip to content

Commit

Permalink
Refactor procedure framework (simplified StateMachineProcedure, and s…
Browse files Browse the repository at this point in the history
…ome other things) (#14683) (#14741)

* delete KAWorking

* delete ProcedureSuspended

* delete "aborted" and suspended

* refactor states and delete stateCount

* refactor setNextState

* delete previousState

* spotless

* compile

* Revert "delete KAWorking"

This reverts commit 43176fa.

* recover KAWorker but rename it to TemporaryWorker

* spotless

* fix npe

* delete useless code

* delete useless code

* necessary log

(cherry picked from commit 41a49e7)
  • Loading branch information
liyuheng55555 authored Jan 21, 2025
1 parent 3c5b2e4 commit b8fa0b6
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ protected void rollback(Env env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
protected boolean abort(Env env) {
throw new UnsupportedOperationException();
}

@Override
public void deserialize(ByteBuffer byteBuffer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {

private int[] stackIndexes = null;

private boolean persist = true;

public boolean needPersistance() {
return this.persist;
}

public void resetPersistance() {
this.persist = true;
}

public final void skipPersistance() {
this.persist = false;
}

public final boolean hasLock() {
return locked;
}
Expand Down Expand Up @@ -119,17 +105,6 @@ protected abstract Procedure<Env>[] execute(Env env)
protected abstract void rollback(Env env)
throws IOException, InterruptedException, ProcedureException;

/**
* The abort() call is asynchronous and each procedure must decide how to deal with it, if they
* want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
* abort() method and then the execute() will check if the abort flag is set or not. abort() may
* be called multiple times from the client, so the implementation must be idempotent.
*
* <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
* procedure implementor abort.
*/
protected abstract boolean abort(Env env);

public void serialize(DataOutputStream stream) throws IOException {
// procid
stream.writeLong(this.procId);
Expand Down Expand Up @@ -263,28 +238,6 @@ public static Class<?> deserializeTypeInfo(ByteBuffer byteBuffer) {
return clazz;
}

public static Procedure<?> newInstance(ByteBuffer byteBuffer) {
Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
Procedure<?> procedure;
try {
procedure = (Procedure<?>) procedureClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Instantiation failed", e);
}
return procedure;
}

/**
* The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
* will call us to determine whether we need to wait for initialization, second, it will call
* {@link #acquireLock(Object)} to actually handle the lock for this procedure.
*
* @return true means we need to wait until the environment has been initialized, otherwise true.
*/
protected boolean waitInitialized(Env env) {
return false;
}

/**
* Acquire a lock, user should override it if necessary.
*
Expand Down Expand Up @@ -314,34 +267,6 @@ protected boolean holdLock(Env env) {
return false;
}

/**
* Called before the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void beforeRecover(Env env) {
// no op
}

/**
* Called when the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void afterRecover(Env env) {
// no op
}

/**
* Called when the procedure is completed (success or rollback). The procedure may use this method
* to clean up in-memory states. This operation will not be retried on failure.
*
* @param env environment
*/
protected void completionCleanup(Env env) {
// no op
}

/**
* To make executor yield between each execution step to give other procedures a chance to run.
*
Expand Down Expand Up @@ -393,9 +318,6 @@ public void doRollback(Env env) throws IOException, InterruptedException, Proced
* @return ProcedureLockState
*/
public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
if (waitInitialized(env)) {
return ProcedureLockState.LOCK_EVENT_WAIT;
}
if (lockedWhenLoading) {
lockedWhenLoading = false;
locked = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand Down Expand Up @@ -186,24 +185,14 @@ private void recover() {
// executing, we need to set its state to RUNNABLE.
procedure.setState(ProcedureState.RUNNABLE);
runnableList.add(procedure);
} else {
procedure.afterRecover(environment);
}
});
restoreLocks();

waitingTimeoutList.forEach(
procedure -> {
procedure.afterRecover(environment);
timeoutExecutor.add(procedure);
});
waitingTimeoutList.forEach(timeoutExecutor::add);

failedList.forEach(scheduler::addBack);
runnableList.forEach(
procedure -> {
procedure.afterRecover(environment);
scheduler.addBack(procedure);
});
runnableList.forEach(scheduler::addBack);
scheduler.signalAll();
}

Expand Down Expand Up @@ -418,21 +407,16 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
return;
}
boolean suspended = false;
boolean reExecute;

Procedure<Env>[] subprocs = null;
do {
reExecute = false;
proc.resetPersistance();
try {
subprocs = proc.doExecute(this.environment);
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
LOG.debug("Suspend {}", proc);
suspended = true;
} catch (ProcedureYieldException e) {
LOG.debug("Yield {}", proc);
yieldProcedure(proc);
Expand All @@ -455,22 +439,20 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.info("Added into timeoutExecutor {}", proc);
} else if (!suspended) {
} else {
proc.setState(ProcedureState.SUCCESS);
}
}
// add procedure into rollback stack.
rootProcStack.addRollbackStep(proc);

if (proc.needPersistance()) {
updateStoreOnExecution(rootProcStack, proc, subprocs);
}
updateStoreOnExecution(rootProcStack, proc, subprocs);

if (!store.isRunning()) {
return;
}

if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) {
yieldProcedure(proc);
return;
}
Expand All @@ -481,7 +463,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}

releaseLock(proc, false);
if (!suspended && proc.isFinished() && proc.hasParent()) {
if (proc.isFinished() && proc.hasParent()) {
countDownChildren(rootProcStack, proc);
}
}
Expand Down Expand Up @@ -518,6 +500,7 @@ private void submitChildrenProcedures(Procedure<Env>[] subprocs) {
subproc.updateMetricsOnSubmit(getEnvironment());
procedures.put(subproc.getProcId(), subproc);
scheduler.addFront(subproc);
LOG.info("Sub-Procedure pid={} has been submitted", subproc.getProcId());
}
}

Expand Down Expand Up @@ -693,11 +676,6 @@ private void executeCompletionCleanup(Procedure<Env> proc) {
if (proc.hasLock()) {
releaseLock(proc, true);
}
try {
proc.completionCleanup(this.environment);
} catch (Throwable e) {
LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e);
}
}

private void rootProcedureCleanup(Procedure<Env> proc) {
Expand Down Expand Up @@ -731,7 +709,7 @@ private class WorkerThread extends StoppableThread {
protected long keepAliveTime = -1;

public WorkerThread(ThreadGroup threadGroup) {
this(threadGroup, "ProcExecWorker-");
this(threadGroup, "ProcedureCoreWorker-");
}

public WorkerThread(ThreadGroup threadGroup, String prefix) {
Expand All @@ -751,26 +729,28 @@ public void run() {
while (isRunning() && keepAlive(lastUpdated)) {
Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) {
Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
int activeCount = activeExecutorCount.incrementAndGet();
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
activeCount = activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}

} catch (Throwable throwable) {
} catch (Exception e) {
if (this.activeProcedure.get() != null) {
LOG.warn(
"Procedure Worker {} terminated {}",
"Exception happened when worker {} execute procedure {}",
getName(),
this.activeProcedure.get(),
throwable);
e);
}
} finally {
LOG.info("Procedure worker {} terminated.", getName());
Expand All @@ -796,12 +776,12 @@ public long getCurrentRunTime() {
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {
// A temporary worker thread will be launched when too many core workers are stuck.
// They will timeout after keepAliveTime if there is no procedure to run.
private final class TemporaryWorkerThread extends WorkerThread {

public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KAProcExecWorker-");
public TemporaryWorkerThread(ThreadGroup group) {
super(group, "ProcedureTemporaryWorker-");
this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
}

Expand All @@ -823,22 +803,25 @@ public WorkerMonitor() {
updateTimestamp();
}

private int checkForStuckWorkers() {
private int calculateRunningAndStuckWorkers() {
// Check if any of the worker is stuck
int stuckCount = 0;
int runningCount = 0, stuckCount = 0;
for (WorkerThread worker : workerThreads) {
if (worker.activeProcedure.get() == null
|| worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
if (worker.activeProcedure.get() == null) {
continue;
}

runningCount++;
// WARN the worker is stuck
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
}
LOG.info(
"Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount);
}
return stuckCount;
}
Expand All @@ -854,7 +837,7 @@ private void checkThreadCount(final int stuckCount) {
// Let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread {}", worker);
Expand All @@ -863,7 +846,7 @@ private void checkThreadCount(final int stuckCount) {

@Override
protected void periodicExecute(Env env) {
final int stuckCount = checkForStuckWorkers();
final int stuckCount = calculateRunningAndStuckWorkers();
checkThreadCount(stuckCount);
updateTimestamp();
}
Expand Down Expand Up @@ -942,28 +925,6 @@ public long submitProcedure(Procedure<Env> procedure) {
return pushProcedure(procedure);
}

/**
* Abort a specified procedure.
*
* @param procId procedure id
* @param force whether abort the running procdure.
* @return true if the procedure exists and has received the abort.
*/
public boolean abort(long procId, boolean force) {
Procedure<Env> procedure = procedures.get(procId);
if (procedure != null) {
if (!force && procedure.wasExecuted()) {
return false;
}
return procedure.abort(this.environment);
}
return false;
}

public boolean abort(long procId) {
return abort(procId, true);
}

public ProcedureScheduler getScheduler() {
return scheduler;
}
Expand Down
Loading

0 comments on commit b8fa0b6

Please sign in to comment.