Skip to content

Commit

Permalink
avoid notify
Browse files Browse the repository at this point in the history
  • Loading branch information
SpriCoder committed Feb 5, 2025
1 parent a01495f commit 3e2efb5
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class MemoryControlledWALEntryQueue {

private final BlockingQueue<WALEntry> queue;
private static final Object nonFullCondition = new Object();

public MemoryControlledWALEntryQueue() {
queue = new LinkedBlockingQueue<>();
Expand All @@ -38,19 +39,29 @@ public WALEntry poll(long timeout, TimeUnit unit) throws InterruptedException {
WALEntry e = queue.poll(timeout, unit);
if (e != null) {
SystemInfo.getInstance().getWalBufferQueueMemoryBlock().release(getElementSize(e));
synchronized (nonFullCondition) {
nonFullCondition.notifyAll();
}
}
return e;
}

public void put(WALEntry e) throws InterruptedException {
long elementSize = getElementSize(e);
SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocateUntilAvailable(elementSize);
synchronized (nonFullCondition) {
while (!SystemInfo.getInstance().getWalBufferQueueMemoryBlock().allocate(elementSize)) {
nonFullCondition.wait();
}
}
queue.put(e);
}

public WALEntry take() throws InterruptedException {
WALEntry e = queue.take();
SystemInfo.getInstance().getWalBufferQueueMemoryBlock().release(getElementSize(e));
synchronized (nonFullCondition) {
nonFullCondition.notifyAll();
}
return e;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public void addCompactionMemoryCost(
}
boolean allocateResult =
waitUntilAcquired
? compactionMemoryBlock.allocateUntilAvailable(memoryCost)
? compactionMemoryBlock.allocateUntilAvailable(memoryCost, 100)
: compactionMemoryBlock.allocate(memoryCost);
if (!allocateResult) {
throw new CompactionMemoryNotEnoughException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public abstract class IMemoryBlock implements AutoCloseable {
*/
public abstract boolean allocateUntilAvailable(final long sizeInByte) throws InterruptedException;

/**
* Allocate memory managed by this memory block until the required memory is available
*
* @param sizeInByte the size of memory to be allocated, should positive
* @param timeInterval the time interval to wait for memory to be available
*/
public abstract boolean allocateUntilAvailable(final long sizeInByte, long timeInterval)
throws InterruptedException;

/**
* Try to record memory managed by this memory block
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,24 @@ public boolean allocateUntilAvailable(long sizeInByte) throws InterruptedExcepti
return true;
}

@Override
public boolean allocateUntilAvailable(long sizeInByte, long timeInterval)
throws InterruptedException {
long originSize = memoryUsageInBytes.get();
while (true) {
boolean canUpdate = originSize + sizeInByte <= maxMemorySizeInByte;
if (canUpdate && memoryUsageInBytes.compareAndSet(originSize, originSize + sizeInByte)) {
break;
}
Thread.sleep(TimeUnit.MILLISECONDS.toMillis(timeInterval));
originSize = memoryUsageInBytes.get();
}
return true;
}

@Override
public void release(long sizeInByte) {
memoryUsageInBytes.addAndGet(-sizeInByte);
synchronized (memoryUsageInBytes) {
memoryUsageInBytes.notifyAll();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ public void TestGetName() {
MemoryManager memoryManager = MemoryManager.global();
Assert.assertEquals("GlobalMemoryManager", memoryManager.getName());
}
}
}

0 comments on commit 3e2efb5

Please sign in to comment.