diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java index 5f4fcf0dd77e..4218cc8f754c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +63,7 @@ public void setMemoryUsageInBytes(final long memoryUsageInBytes) { public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) { this.shrinkMethod.set(shrinkMethod); + pipeMemoryManager.addShrinkableBlock(this); return this; } @@ -72,6 +74,7 @@ public PipeMemoryBlock setShrinkCallback(final BiConsumer shrinkCall public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) { this.expandMethod.set(extendMethod); + pipeMemoryManager.addExpandableBlock(this); return this; } @@ -177,6 +180,12 @@ public void close() { if (lock.tryLock(50, TimeUnit.MICROSECONDS)) { try { pipeMemoryManager.release(this); + if (Objects.nonNull(shrinkMethod.get())) { + pipeMemoryManager.removeShrinkableBlock(this); + } + if (Objects.nonNull(expandMethod.get())) { + pipeMemoryManager.removeExpandableBlock(this); + } if (isInterrupted) { LOGGER.warn("{} is released after thread interruption.", this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index ada788363d7f..d806a7450dc6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -61,6 +61,8 @@ public class PipeMemoryManager { // Only non-zero memory blocks will be added to this set. private final Set allocatedBlocks = new HashSet<>(); + private final Set shrinkableBlocks = new HashSet<>(); + private final Set expandableBlocks = new HashSet<>(); public PipeMemoryManager() { PipeDataNodeAgent.runtime() @@ -528,8 +530,9 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp return returnedMemoryBlock; } + // Single-threaded logic private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) { - final List shuffledBlocks = new ArrayList<>(allocatedBlocks); + final List shuffledBlocks = new ArrayList<>(shrinkableBlocks); Collections.shuffle(shuffledBlocks); while (true) { @@ -549,46 +552,64 @@ private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) { } } + void addShrinkableBlock(final PipeMemoryBlock block) { + shrinkableBlocks.add(block); + } + + void removeShrinkableBlock(final PipeMemoryBlock block) { + shrinkableBlocks.remove(block); + } + public synchronized void tryExpandAllAndCheckConsistency() { - allocatedBlocks.forEach(PipeMemoryBlock::expand); + expandableBlocks.forEach(PipeMemoryBlock::expand); + + if (LOGGER.isDebugEnabled()) { + final long blockSum = + allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum(); + if (blockSum != memoryBlock.getUsedMemoryInBytes()) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks," + + " usedMemorySizeInBytes is {} but sum of all blocks is {}", + memoryBlock.getUsedMemoryInBytes(), + blockSum); + } - long blockSum = - allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum(); - if (blockSum != memoryBlock.getUsedMemoryInBytes()) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks," - + " usedMemorySizeInBytes is {} but sum of all blocks is {}", - memoryBlock.getUsedMemoryInBytes(), - blockSum); - } + final long tabletBlockSum = + allocatedBlocks.stream() + .filter(PipeTabletMemoryBlock.class::isInstance) + .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) + .sum(); + if (tabletBlockSum != usedMemorySizeInBytesOfTablets) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks," + + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}", + usedMemorySizeInBytesOfTablets, + tabletBlockSum); + } - long tabletBlockSum = - allocatedBlocks.stream() - .filter(PipeTabletMemoryBlock.class::isInstance) - .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) - .sum(); - if (tabletBlockSum != usedMemorySizeInBytesOfTablets) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks," - + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}", - usedMemorySizeInBytesOfTablets, - tabletBlockSum); - } - - long tsFileBlockSum = - allocatedBlocks.stream() - .filter(PipeTsFileMemoryBlock.class::isInstance) - .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) - .sum(); - if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks," - + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}", - usedMemorySizeInBytesOfTsFiles, - tsFileBlockSum); + final long tsFileBlockSum = + allocatedBlocks.stream() + .filter(PipeTsFileMemoryBlock.class::isInstance) + .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) + .sum(); + if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks," + + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}", + usedMemorySizeInBytesOfTsFiles, + tsFileBlockSum); + } } } + void addExpandableBlock(final PipeMemoryBlock block) { + expandableBlocks.add(block); + } + + void removeExpandableBlock(final PipeMemoryBlock block) { + expandableBlocks.remove(block); + } + public synchronized void release(PipeMemoryBlock block) { if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) { return;