From 0d3054d47c4221bc272cf66aa01de9b1d010602e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:00:59 +0800 Subject: [PATCH 1/2] fix --- .../db/pipe/resource/memory/PipeMemoryBlock.java | 5 +++++ .../db/pipe/resource/memory/PipeMemoryManager.java | 12 +++++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java index 5f4fcf0dd77e2..ee71c94508cae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -62,6 +63,7 @@ public void setMemoryUsageInBytes(final long memoryUsageInBytes) { public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) { this.shrinkMethod.set(shrinkMethod); + pipeMemoryManager.addShrinkableBlock(this); return this; } @@ -177,6 +179,9 @@ public void close() { if (lock.tryLock(50, TimeUnit.MICROSECONDS)) { try { pipeMemoryManager.release(this); + if (Objects.nonNull(shrinkMethod.get())) { + pipeMemoryManager.removeShrinkableBlock(this); + } if (isInterrupted) { LOGGER.warn("{} is released after thread interruption.", this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index ada788363d7fa..797d65fe6faf2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -61,6 +61,7 @@ public class PipeMemoryManager { // Only non-zero memory blocks will be added to this set. private final Set allocatedBlocks = new HashSet<>(); + private final Set shrinkableBlocks = new HashSet<>(); public PipeMemoryManager() { PipeDataNodeAgent.runtime() @@ -528,8 +529,9 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp return returnedMemoryBlock; } + // Single-threaded logic private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) { - final List shuffledBlocks = new ArrayList<>(allocatedBlocks); + final List shuffledBlocks = new ArrayList<>(shrinkableBlocks); Collections.shuffle(shuffledBlocks); while (true) { @@ -549,6 +551,14 @@ private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) { } } + void addShrinkableBlock(final PipeMemoryBlock block) { + shrinkableBlocks.add(block); + } + + void removeShrinkableBlock(final PipeMemoryBlock block) { + shrinkableBlocks.remove(block); + } + public synchronized void tryExpandAllAndCheckConsistency() { allocatedBlocks.forEach(PipeMemoryBlock::expand); From d725e6c9013f26490abeade4ace63cb7021840fb Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 24 Apr 2026 10:13:58 +0800 Subject: [PATCH 2/2] exp --- .../pipe/resource/memory/PipeMemoryBlock.java | 4 + .../resource/memory/PipeMemoryManager.java | 79 +++++++++++-------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java index ee71c94508cae..4218cc8f754cd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryBlock.java @@ -74,6 +74,7 @@ public PipeMemoryBlock setShrinkCallback(final BiConsumer shrinkCall public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) { this.expandMethod.set(extendMethod); + pipeMemoryManager.addExpandableBlock(this); return this; } @@ -182,6 +183,9 @@ public void close() { if (Objects.nonNull(shrinkMethod.get())) { pipeMemoryManager.removeShrinkableBlock(this); } + if (Objects.nonNull(expandMethod.get())) { + pipeMemoryManager.removeExpandableBlock(this); + } if (isInterrupted) { LOGGER.warn("{} is released after thread interruption.", this); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index 797d65fe6faf2..d806a7450dc6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -62,6 +62,7 @@ public class PipeMemoryManager { // Only non-zero memory blocks will be added to this set. private final Set allocatedBlocks = new HashSet<>(); private final Set shrinkableBlocks = new HashSet<>(); + private final Set expandableBlocks = new HashSet<>(); public PipeMemoryManager() { PipeDataNodeAgent.runtime() @@ -560,45 +561,55 @@ void removeShrinkableBlock(final PipeMemoryBlock block) { } public synchronized void tryExpandAllAndCheckConsistency() { - allocatedBlocks.forEach(PipeMemoryBlock::expand); + expandableBlocks.forEach(PipeMemoryBlock::expand); + + if (LOGGER.isDebugEnabled()) { + final long blockSum = + allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum(); + if (blockSum != memoryBlock.getUsedMemoryInBytes()) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks," + + " usedMemorySizeInBytes is {} but sum of all blocks is {}", + memoryBlock.getUsedMemoryInBytes(), + blockSum); + } - long blockSum = - allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum(); - if (blockSum != memoryBlock.getUsedMemoryInBytes()) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks," - + " usedMemorySizeInBytes is {} but sum of all blocks is {}", - memoryBlock.getUsedMemoryInBytes(), - blockSum); - } + final long tabletBlockSum = + allocatedBlocks.stream() + .filter(PipeTabletMemoryBlock.class::isInstance) + .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) + .sum(); + if (tabletBlockSum != usedMemorySizeInBytesOfTablets) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks," + + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}", + usedMemorySizeInBytesOfTablets, + tabletBlockSum); + } - long tabletBlockSum = - allocatedBlocks.stream() - .filter(PipeTabletMemoryBlock.class::isInstance) - .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) - .sum(); - if (tabletBlockSum != usedMemorySizeInBytesOfTablets) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks," - + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}", - usedMemorySizeInBytesOfTablets, - tabletBlockSum); - } - - long tsFileBlockSum = - allocatedBlocks.stream() - .filter(PipeTsFileMemoryBlock.class::isInstance) - .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) - .sum(); - if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) { - LOGGER.warn( - "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks," - + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}", - usedMemorySizeInBytesOfTsFiles, - tsFileBlockSum); + final long tsFileBlockSum = + allocatedBlocks.stream() + .filter(PipeTsFileMemoryBlock.class::isInstance) + .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes) + .sum(); + if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) { + LOGGER.debug( + "tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks," + + " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}", + usedMemorySizeInBytesOfTsFiles, + tsFileBlockSum); + } } } + void addExpandableBlock(final PipeMemoryBlock block) { + expandableBlocks.add(block); + } + + void removeExpandableBlock(final PipeMemoryBlock block) { + expandableBlocks.remove(block); + } + public synchronized void release(PipeMemoryBlock block) { if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) { return;