Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/iotdb into comment
Browse files Browse the repository at this point in the history
  • Loading branch information
Caideyipi committed Jan 21, 2025
2 parents cad5062 + fcb3fac commit 0cb789c
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 344 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public boolean fetchResults() throws StatementExecutionException, IoTDBConnectio
throw new IoTDBConnectionException("This DataSet is already closed");
}
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
req.setStatementId(statementId);
req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ protected void rollback(Env env) throws IOException, InterruptedException {
throw new UnsupportedOperationException();
}

@Override
protected boolean abort(Env env) {
throw new UnsupportedOperationException();
}

@Override
public void deserialize(ByteBuffer byteBuffer) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,6 @@ public abstract class Procedure<Env> implements Comparable<Procedure<Env>> {

private int[] stackIndexes = null;

private boolean persist = true;

public boolean needPersistance() {
return this.persist;
}

public void resetPersistance() {
this.persist = true;
}

public final void skipPersistance() {
this.persist = false;
}

public final boolean hasLock() {
return locked;
}
Expand Down Expand Up @@ -119,17 +105,6 @@ protected abstract Procedure<Env>[] execute(Env env)
protected abstract void rollback(Env env)
throws IOException, InterruptedException, ProcedureException;

/**
* The abort() call is asynchronous and each procedure must decide how to deal with it, if they
* want to be abortable. The simplest implementation is to have an AtomicBoolean set in the
* abort() method and then the execute() will check if the abort flag is set or not. abort() may
* be called multiple times from the client, so the implementation must be idempotent.
*
* <p>NOTE: abort() is not like Thread.interrupt(). It is just a notification that allows the
* procedure implementor abort.
*/
protected abstract boolean abort(Env env);

public void serialize(DataOutputStream stream) throws IOException {
// procid
stream.writeLong(this.procId);
Expand Down Expand Up @@ -263,28 +238,6 @@ public static Class<?> deserializeTypeInfo(ByteBuffer byteBuffer) {
return clazz;
}

public static Procedure<?> newInstance(ByteBuffer byteBuffer) {
Class<?> procedureClass = deserializeTypeInfo(byteBuffer);
Procedure<?> procedure;
try {
procedure = (Procedure<?>) procedureClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException("Instantiation failed", e);
}
return procedure;
}

/**
* The {@link #doAcquireLock(Object, IProcedureStore)} will be split into two steps, first, it
* will call us to determine whether we need to wait for initialization, second, it will call
* {@link #acquireLock(Object)} to actually handle the lock for this procedure.
*
* @return true means we need to wait until the environment has been initialized, otherwise true.
*/
protected boolean waitInitialized(Env env) {
return false;
}

/**
* Acquire a lock, user should override it if necessary.
*
Expand Down Expand Up @@ -314,34 +267,6 @@ protected boolean holdLock(Env env) {
return false;
}

/**
* Called before the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void beforeRecover(Env env) {
// no op
}

/**
* Called when the procedure is recovered and added into the queue.
*
* @param env environment
*/
protected final void afterRecover(Env env) {
// no op
}

/**
* Called when the procedure is completed (success or rollback). The procedure may use this method
* to clean up in-memory states. This operation will not be retried on failure.
*
* @param env environment
*/
protected void completionCleanup(Env env) {
// no op
}

/**
* To make executor yield between each execution step to give other procedures a chance to run.
*
Expand Down Expand Up @@ -393,9 +318,6 @@ public void doRollback(Env env) throws IOException, InterruptedException, Proced
* @return ProcedureLockState
*/
public final ProcedureLockState doAcquireLock(Env env, IProcedureStore store) {
if (waitInitialized(env)) {
return ProcedureLockState.LOCK_EVENT_WAIT;
}
if (lockedWhenLoading) {
lockedWhenLoading = false;
locked = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
Expand Down Expand Up @@ -186,24 +185,14 @@ private void recover() {
// executing, we need to set its state to RUNNABLE.
procedure.setState(ProcedureState.RUNNABLE);
runnableList.add(procedure);
} else {
procedure.afterRecover(environment);
}
});
restoreLocks();

waitingTimeoutList.forEach(
procedure -> {
procedure.afterRecover(environment);
timeoutExecutor.add(procedure);
});
waitingTimeoutList.forEach(timeoutExecutor::add);

failedList.forEach(scheduler::addBack);
runnableList.forEach(
procedure -> {
procedure.afterRecover(environment);
scheduler.addBack(procedure);
});
runnableList.forEach(scheduler::addBack);
scheduler.signalAll();
}

Expand Down Expand Up @@ -418,21 +407,16 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
"The executing procedure should in RUNNABLE state, but it's not. Procedure is {}", proc);
return;
}
boolean suspended = false;
boolean reExecute;

Procedure<Env>[] subprocs = null;
do {
reExecute = false;
proc.resetPersistance();
try {
subprocs = proc.doExecute(this.environment);
if (subprocs != null && subprocs.length == 0) {
subprocs = null;
}
} catch (ProcedureSuspendedException e) {
LOG.debug("Suspend {}", proc);
suspended = true;
} catch (ProcedureYieldException e) {
LOG.debug("Yield {}", proc);
yieldProcedure(proc);
Expand All @@ -455,22 +439,20 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}
} else if (proc.getState() == ProcedureState.WAITING_TIMEOUT) {
LOG.info("Added into timeoutExecutor {}", proc);
} else if (!suspended) {
} else {
proc.setState(ProcedureState.SUCCESS);
}
}
// add procedure into rollback stack.
rootProcStack.addRollbackStep(proc);

if (proc.needPersistance()) {
updateStoreOnExecution(rootProcStack, proc, subprocs);
}
updateStoreOnExecution(rootProcStack, proc, subprocs);

if (!store.isRunning()) {
return;
}

if (proc.isRunnable() && !suspended && proc.isYieldAfterExecution(this.environment)) {
if (proc.isRunnable() && proc.isYieldAfterExecution(this.environment)) {
yieldProcedure(proc);
return;
}
Expand All @@ -481,7 +463,7 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p
}

releaseLock(proc, false);
if (!suspended && proc.isFinished() && proc.hasParent()) {
if (proc.isFinished() && proc.hasParent()) {
countDownChildren(rootProcStack, proc);
}
}
Expand Down Expand Up @@ -518,6 +500,7 @@ private void submitChildrenProcedures(Procedure<Env>[] subprocs) {
subproc.updateMetricsOnSubmit(getEnvironment());
procedures.put(subproc.getProcId(), subproc);
scheduler.addFront(subproc);
LOG.info("Sub-Procedure pid={} has been submitted", subproc.getProcId());
}
}

Expand Down Expand Up @@ -693,11 +676,6 @@ private void executeCompletionCleanup(Procedure<Env> proc) {
if (proc.hasLock()) {
releaseLock(proc, true);
}
try {
proc.completionCleanup(this.environment);
} catch (Throwable e) {
LOG.error("CODE-BUG:Uncaught runtime exception for procedure {}", proc, e);
}
}

private void rootProcedureCleanup(Procedure<Env> proc) {
Expand Down Expand Up @@ -731,7 +709,7 @@ private class WorkerThread extends StoppableThread {
protected long keepAliveTime = -1;

public WorkerThread(ThreadGroup threadGroup) {
this(threadGroup, "ProcExecWorker-");
this(threadGroup, "ProcedureCoreWorker-");
}

public WorkerThread(ThreadGroup threadGroup, String prefix) {
Expand All @@ -751,26 +729,28 @@ public void run() {
while (isRunning() && keepAlive(lastUpdated)) {
Procedure<Env> procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (procedure == null) {
Thread.sleep(1000);
continue;
}
this.activeProcedure.set(procedure);
int activeCount = activeExecutorCount.incrementAndGet();
activeExecutorCount.incrementAndGet();
startTime.set(System.currentTimeMillis());
executeProcedure(procedure);
activeCount = activeExecutorCount.decrementAndGet();
LOG.trace("Halt pid={}, activeCount={}", procedure.getProcId(), activeCount);
activeExecutorCount.decrementAndGet();
LOG.trace(
"Halt pid={}, activeCount={}", procedure.getProcId(), activeExecutorCount.get());
this.activeProcedure.set(null);
lastUpdated = System.currentTimeMillis();
startTime.set(lastUpdated);
}

} catch (Throwable throwable) {
} catch (Exception e) {
if (this.activeProcedure.get() != null) {
LOG.warn(
"Procedure Worker {} terminated {}",
"Exception happened when worker {} execute procedure {}",
getName(),
this.activeProcedure.get(),
throwable);
e);
}
} finally {
LOG.info("Procedure worker {} terminated.", getName());
Expand All @@ -796,12 +776,12 @@ public long getCurrentRunTime() {
}
}

// A worker thread which can be added when core workers are stuck. Will timeout after
// keepAliveTime if there is no procedure to run.
private final class KeepAliveWorkerThread extends WorkerThread {
// A temporary worker thread will be launched when too many core workers are stuck.
// They will timeout after keepAliveTime if there is no procedure to run.
private final class TemporaryWorkerThread extends WorkerThread {

public KeepAliveWorkerThread(ThreadGroup group) {
super(group, "KAProcExecWorker-");
public TemporaryWorkerThread(ThreadGroup group) {
super(group, "ProcedureTemporaryWorker-");
this.keepAliveTime = TimeUnit.SECONDS.toMillis(10);
}

Expand All @@ -823,22 +803,25 @@ public WorkerMonitor() {
updateTimestamp();
}

private int checkForStuckWorkers() {
private int calculateRunningAndStuckWorkers() {
// Check if any of the worker is stuck
int stuckCount = 0;
int runningCount = 0, stuckCount = 0;
for (WorkerThread worker : workerThreads) {
if (worker.activeProcedure.get() == null
|| worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
if (worker.activeProcedure.get() == null) {
continue;
}

runningCount++;
// WARN the worker is stuck
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
if (worker.getCurrentRunTime() < DEFAULT_WORKER_STUCK_THRESHOLD) {
stuckCount++;
LOG.warn(
"Worker stuck {}({}), run time {} ms",
worker,
worker.activeProcedure.get().getProcType(),
worker.getCurrentRunTime());
}
LOG.info(
"Procedure workers: {} is running, {} is running and stuck", runningCount, stuckCount);
}
return stuckCount;
}
Expand All @@ -854,7 +837,7 @@ private void checkThreadCount(final int stuckCount) {
// Let's add new worker thread more aggressively, as they will timeout finally if there is no
// work to do.
if (stuckPerc >= DEFAULT_WORKER_ADD_STUCK_PERCENTAGE && workerThreads.size() < maxPoolSize) {
final KeepAliveWorkerThread worker = new KeepAliveWorkerThread(threadGroup);
final TemporaryWorkerThread worker = new TemporaryWorkerThread(threadGroup);
workerThreads.add(worker);
worker.start();
LOG.debug("Added new worker thread {}", worker);
Expand All @@ -863,7 +846,7 @@ private void checkThreadCount(final int stuckCount) {

@Override
protected void periodicExecute(Env env) {
final int stuckCount = checkForStuckWorkers();
final int stuckCount = calculateRunningAndStuckWorkers();
checkThreadCount(stuckCount);
updateTimestamp();
}
Expand Down Expand Up @@ -942,28 +925,6 @@ public long submitProcedure(Procedure<Env> procedure) {
return pushProcedure(procedure);
}

/**
* Abort a specified procedure.
*
* @param procId procedure id
* @param force whether abort the running procdure.
* @return true if the procedure exists and has received the abort.
*/
public boolean abort(long procId, boolean force) {
Procedure<Env> procedure = procedures.get(procId);
if (procedure != null) {
if (!force && procedure.wasExecuted()) {
return false;
}
return procedure.abort(this.environment);
}
return false;
}

public boolean abort(long procId) {
return abort(procId, true);
}

public ProcedureScheduler getScheduler() {
return scheduler;
}
Expand Down
Loading

0 comments on commit 0cb789c

Please sign in to comment.