Skip to content

Commit

Permalink
Add Pipe Memory Manager
Browse files Browse the repository at this point in the history
  • Loading branch information
SpriCoder committed Jan 21, 2025
1 parent 105ddf1 commit ba536e8
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public class IoTDBConfig {
private MemoryManager ConsensusMemoryManager;

/** Memory allocated for the pipe */
private long allocateMemoryForPipe = Runtime.getRuntime().maxMemory() / 10;
private MemoryManager PipeMemoryManager;

/** Ratio of memory allocated for buffered arrays */
private double bufferedArraysMemoryProportion = 0.6;
Expand Down Expand Up @@ -2149,12 +2149,12 @@ void setAllocateMemoryForRead(long allocateMemoryForRead) {
this.allocateMemoryForTimeIndex = allocateMemoryForRead * 200 / 1001;
}

public long getAllocateMemoryForPipe() {
return allocateMemoryForPipe;
public MemoryManager getPipeMemoryManager() {
return PipeMemoryManager;
}

public void setAllocateMemoryForPipe(long allocateMemoryForPipe) {
this.allocateMemoryForPipe = allocateMemoryForPipe;
public void setPipeMemoryManager(MemoryManager pipeMemoryManager) {
PipeMemoryManager = pipeMemoryManager;
}

public boolean isEnablePartialInsert() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,7 @@ private void initMemoryAllocate(TrimProperties properties) {

long storageEngineMemorySize = Runtime.getRuntime().maxMemory() * 3 / 10;
long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10;
long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10;
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
Expand All @@ -2190,16 +2191,16 @@ private void initMemoryAllocate(TrimProperties properties) {
maxMemoryAvailable * Integer.parseInt(proportions[3].trim()) / proportionSum;
// if pipe proportion is set, use it, otherwise use the default value
if (proportions.length >= 6) {
conf.setAllocateMemoryForPipe(
maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum);
pipeMemorySize =
maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) / proportionSum;
} else {
conf.setAllocateMemoryForPipe(
pipeMemorySize =
(maxMemoryAvailable
- (conf.getStorageEngineMemoryManager().getTotalMemorySizeInBytes()
+ conf.getAllocateMemoryForRead()
+ conf.getAllocateMemoryForSchema()
+ conf.getConsensusMemoryManager().getTotalMemorySizeInBytes()))
/ 2);
/ 2;
}
}
}
Expand All @@ -2209,6 +2210,9 @@ private void initMemoryAllocate(TrimProperties properties) {
MemoryManager consensusMemoryManager =
globalMemoryManager.createMemoryManager("Consensus", consensusMemorySize);
conf.setConsensusMemoryManager(consensusMemoryManager);
MemoryManager pipeMemoryManager =
globalMemoryManager.createMemoryManager("Pipe", pipeMemorySize);
conf.setPipeMemoryManager(pipeMemoryManager);

LOGGER.info("initial allocateMemoryForRead = {}", conf.getAllocateMemoryForRead());
LOGGER.info(
Expand All @@ -2218,7 +2222,9 @@ private void initMemoryAllocate(TrimProperties properties) {
LOGGER.info(
"initial allocateMemoryForConsensus = {}",
conf.getConsensusMemoryManager().getTotalMemorySizeInBytes());
LOGGER.info("initial allocateMemoryForPipe = {}", conf.getAllocateMemoryForPipe());
LOGGER.info(
"initial allocateMemoryForPipe = {}",
conf.getPipeMemoryManager().getTotalMemorySizeInBytes());

initSchemaMemoryAllocate(properties);
initStorageEngineAllocate(storageEngineMemoryManager, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class PipeMemoryManager {
PipeConfig.getInstance().getPipeMemoryAllocateRetryIntervalInMs();

private static final long TOTAL_MEMORY_SIZE_IN_BYTES =
IoTDBDescriptor.getInstance().getConfig().getAllocateMemoryForPipe();
IoTDBDescriptor.getInstance().getConfig().getPipeMemoryManager().getTotalMemorySizeInBytes();
private static final long MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES =
PipeConfig.getInstance().getPipeMemoryAllocateMinSizeInBytes();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void bindTo(AbstractMetricService metricService) {
GlobalMemoryMetrics.ON_HEAP,
Tag.LEVEL.toString(),
GlobalMemoryMetrics.LEVELS[1])
.set(config.getAllocateMemoryForPipe());
.set(config.getPipeMemoryManager().getTotalMemorySizeInBytes());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private WALInsertNodeCache(final Integer dataRegionId) {
(long)
Math.min(
(double) 2 * CONFIG.getWalFileSizeThresholdInByte(),
CONFIG.getAllocateMemoryForPipe() * 0.8 / 5);
CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes() * 0.8 / 5);
allocatedMemoryBlock =
PipeDataNodeResourceManager.memory()
.tryAllocate(requestedAllocateSize)
Expand Down

0 comments on commit ba536e8

Please sign in to comment.