Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing a memory control mechanism during the query planning stage #12573

Merged
merged 8 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import org.apache.iotdb.db.queryengine.statistics.QueryPlanStatistics;

import org.apache.tsfile.read.filter.basic.Filter;
Expand Down Expand Up @@ -75,6 +76,19 @@ public class MPPQueryContext {

QueryPlanStatistics queryPlanStatistics = null;

// To avoid query front-end from consuming too much memory, it needs to reserve memory when
// constructing some Expression and PlanNode.
private long reservedBytesInTotalForFrontEnd = 0;

private long bytesToBeReservedForFrontEnd = 0;

// To avoid reserving memory too frequently, we choose to do it in batches. This is the lower
// bound
// for each batch.
private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;

private final LocalExecutionPlanner LOCAL_EXECUTION_PLANNER = LocalExecutionPlanner.getInstance();

public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
this.endPointBlackList = new LinkedList<>();
Expand Down Expand Up @@ -113,6 +127,7 @@ public MPPQueryContext(

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseMemoryForFrontEnd();
}

private void initResultNodeContext() {
Expand Down Expand Up @@ -290,4 +305,49 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) {
}
queryPlanStatistics.setLogicalOptimizationCost(logicalOptimizeCost);
}

// region =========== FE memory related, make sure its not called concurrently ===========

/**
* This method does not require concurrency control because the query plan is generated in a
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
this.bytesToBeReservedForFrontEnd += bytes;
if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
reserveMemoryForFrontEndImmediately();
}
}

public void reserveMemoryForFrontEndImmediately() {
if (bytesToBeReservedForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId());
this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
this.bytesToBeReservedForFrontEnd = 0;
}
}

public void releaseMemoryForFrontEnd() {
if (reservedBytesInTotalForFrontEnd != 0) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
reservedBytesInTotalForFrontEnd = 0;
}
}

public void releaseMemoryForFrontEnd(final long bytes) {
if (bytes != 0) {
long bytesToRelease;
if (bytes <= bytesToBeReservedForFrontEnd) {
bytesToBeReservedForFrontEnd -= bytes;
} else {
bytesToRelease = bytes - bytesToBeReservedForFrontEnd;
bytesToBeReservedForFrontEnd = 0;
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytesToRelease);
reservedBytesInTotalForFrontEnd -= bytesToRelease;
}
}
}

// endregion
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

package org.apache.iotdb.db.queryengine.exception;

import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.rpc.TSStatusCode;

public class MemoryNotEnoughException extends IoTDBException {
public class MemoryNotEnoughException extends RuntimeException {

public MemoryNotEnoughException(String message) {
super(message, TSStatusCode.QUOTA_MEM_QUERY_NOT_ENOUGH.getStatusCode(), true);
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,10 @@ public static long getEstimatedSizeOfPartialPath(@Nullable final PartialPath par
totalSize += MEASUREMENT_PATH_INSTANCE_SIZE;
MeasurementPath measurementPath = (MeasurementPath) partialPath;
totalSize += RamUsageEstimator.sizeOf(measurementPath.getMeasurementAlias());
totalSize +=
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
if (measurementPath.getMeasurementSchema() != null) {
totalSize +=
RamUsageEstimator.sizeOf(measurementPath.getMeasurementSchema().getMeasurementId());
}
} else {
totalSize += PARTIAL_PATH_INSTANCE_SIZE;
totalSize += RamUsageEstimator.sizeOf(partialPath.getMeasurement());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ private ExecutionResult execution(
}
return result;
} finally {
if (queryContext != null) {
queryContext.releaseMemoryForFrontEnd();
}
if (queryContext != null && !queryContext.getAcquiredLockNumMap().isEmpty()) {
Map<SchemaLockType, Integer> lockMap = queryContext.getAcquiredLockNumMap();
for (Map.Entry<SchemaLockType, Integer> entry : lockMap.entrySet()) {
Expand Down
Loading
Loading