Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leak while inserting using sql #14733

Merged
merged 2 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ public boolean fetchResults() throws StatementExecutionException, IoTDBConnectio
throw new IoTDBConnectionException("This DataSet is already closed");
}
TSFetchResultsReq req = new TSFetchResultsReq(sessionId, sql, fetchSize, queryId, true);
req.setStatementId(statementId);
req.setTimeout(timeout);
try {
TSFetchResultsResp resp = client.fetchResultsV2(req);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,23 @@ public void addQueryId(Long statementId, long queryId) {

@Override
public void removeQueryId(Long statementId, Long queryId) {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
removeQueryId(statementIdToQueryId, statementId, queryId);
}

public static void removeQueryId(
Map<Long, Set<Long>> statementIdToQueryId, Long statementId, Long queryId) {
if (statementId == null) {
statementIdToQueryId.forEach(
(k, v) -> {
if (v != null) {
v.remove(queryId);
}
});
} else {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public TSConnectionInfo convertToTSConnectionInfo() {

public abstract void addQueryId(Long statementId, long queryId);

// statementId could be null
public abstract void removeQueryId(Long statementId, Long queryId);

public SqlDialect getSqlDialect() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ public void addQueryId(Long statementId, long queryId) {

@Override
public void removeQueryId(Long statementId, Long queryId) {
Set<Long> queryIds = statementIdToQueryId.get(statementId);
if (queryIds != null) {
queryIds.remove(queryId);
}
ClientSession.removeQueryId(statementIdToQueryId, statementId, queryId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ public ClientRPCServiceImpl() {
private TSExecuteStatementResp executeStatementInternal(
TSExecuteStatementReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
String statement = req.getStatement();
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Expand Down Expand Up @@ -362,10 +363,6 @@ private TSExecuteStatementResp executeStatementInternal(
TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported"));
}

// TODO: permission check

// TODO audit log, quota, StatementType

queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);

result =
Expand Down Expand Up @@ -439,7 +436,7 @@ private TSExecuteStatementResp executeStatementInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
statementType, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
Expand All @@ -448,9 +445,21 @@ private TSExecuteStatementResp executeStatementInternal(
}
}

private void clearUp(
IClientSession clientSession,
Long statementId,
Long queryId,
org.apache.thrift.TBase<?, ?> req,
Throwable t) {
COORDINATOR.cleanupQueryExecution(queryId, req, t);
// clear up queryId Map in clientSession
clientSession.removeQueryId(statementId, queryId);
}

private TSExecuteStatementResp executeRawDataQueryInternal(
TSRawDataQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -530,7 +539,7 @@ private TSExecuteStatementResp executeRawDataQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand All @@ -543,6 +552,7 @@ private TSExecuteStatementResp executeRawDataQueryInternal(
private TSExecuteStatementResp executeLastDataQueryInternal(
TSLastDataQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -623,7 +633,7 @@ private TSExecuteStatementResp executeLastDataQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand All @@ -636,6 +646,7 @@ private TSExecuteStatementResp executeLastDataQueryInternal(
private TSExecuteStatementResp executeAggregationQueryInternal(
TSAggregationQueryReq req, SelectResult setResult) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -713,7 +724,7 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down Expand Up @@ -870,6 +881,7 @@ public TSExecuteStatementResp executeLastDataQueryV2(TSLastDataQueryReq req) {
public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
TSFastLastDataQueryForOneDeviceReq req) {
boolean finished = false;
Long statementId = req.getStatementId();
long queryId = Long.MIN_VALUE;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
OperationQuota quota = null;
Expand Down Expand Up @@ -1061,7 +1073,7 @@ public TSExecuteStatementResp executeFastLastDataQueryForOneDeviceV2(
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(queryId, req, t);
clearUp(clientSession, statementId, queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
Expand Down Expand Up @@ -1178,8 +1190,9 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
String statementType = null;
Throwable t = null;
IQueryExecution queryExecution = null;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Long statementId = req.isSetStatementId() ? req.getStatementId() : null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
Expand Down Expand Up @@ -1230,7 +1243,7 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
clearUp(clientSession, statementId, req.queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down Expand Up @@ -1786,8 +1799,9 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
String statementType = null;
Throwable t = null;
IQueryExecution queryExecution = null;
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
Long statementId = req.isSetStatementId() ? req.getStatementId() : null;
try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
finished = true;
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
Expand Down Expand Up @@ -1838,7 +1852,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
CommonUtils.addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost);
COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
clearUp(clientSession, statementId, req.queryId, req, t);
}

SESSION_MANAGER.updateIdleTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,6 @@ public TSStatus executeCQ(TExecuteCQ req) {
return result.status;
}
} catch (Exception e) {
// TODO call the coordinator to release query resource
return onQueryException(e, "\"" + executedSQL + "\". " + OperationType.EXECUTE_STATEMENT);
} finally {
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,6 @@ public int getQueryExecutionMapSize() {
return queryExecutionMap.size();
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ExecutorService getQueryExecutor() {
int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize();
return IoTDBThreadPoolFactory.newFixedThreadPool(
Expand All @@ -472,7 +471,6 @@ private ExecutorService getWriteExecutor() {
coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
}

// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
private ScheduledExecutorService getScheduledExecutor() {
return IoTDBThreadPoolFactory.newScheduledThreadPool(
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ public QueryExecution(IPlanner planner, MPPQueryContext context, ExecutorService
if (!state.isDone()) {
return;
}
// TODO: (xingtanzjr) If the query is in abnormal state, the releaseResource() should be
// invoked
if (state == QueryState.FAILED
|| state == QueryState.ABORTED
|| state == QueryState.CANCELED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,20 +149,16 @@ public void start() {
// QueryState to Running
stateMachine.transitionToRunning();

// TODO: (xingtanzjr) start the stateFetcher/heartbeat for each fragment instance
this.stateTracker.start();
logger.debug("state tracker starts");
}

@Override
public void stop(Throwable t) {
// TODO: It seems that it is unnecessary to check whether they are null or not. Is it a best
// practice ?
dispatcher.abort();
if (stateTracker != null) {
stateTracker.abort();
}
// TODO: (xingtanzjr) handle the exception when the termination cannot succeed
if (queryTerminator != null) {
queryTerminator.terminate(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ struct TSFetchResultsReq{
4: required i64 queryId
5: required bool isAlign
6: optional i64 timeout
7: optional i64 statementId
}

struct TSFetchResultsResp{
Expand Down
Loading