Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -62,6 +63,7 @@ public void setMemoryUsageInBytes(final long memoryUsageInBytes) {

public PipeMemoryBlock setShrinkMethod(final LongUnaryOperator shrinkMethod) {
this.shrinkMethod.set(shrinkMethod);
pipeMemoryManager.addShrinkableBlock(this);
return this;
}

Expand All @@ -72,6 +74,7 @@ public PipeMemoryBlock setShrinkCallback(final BiConsumer<Long, Long> shrinkCall

public PipeMemoryBlock setExpandMethod(final LongUnaryOperator extendMethod) {
this.expandMethod.set(extendMethod);
pipeMemoryManager.addExpandableBlock(this);
return this;
}

Expand Down Expand Up @@ -177,6 +180,12 @@ public void close() {
if (lock.tryLock(50, TimeUnit.MICROSECONDS)) {
try {
pipeMemoryManager.release(this);
if (Objects.nonNull(shrinkMethod.get())) {
pipeMemoryManager.removeShrinkableBlock(this);
}
if (Objects.nonNull(expandMethod.get())) {
pipeMemoryManager.removeExpandableBlock(this);
}
if (isInterrupted) {
LOGGER.warn("{} is released after thread interruption.", this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class PipeMemoryManager {

// Only non-zero memory blocks will be added to this set.
private final Set<PipeMemoryBlock> allocatedBlocks = new HashSet<>();
private final Set<PipeMemoryBlock> shrinkableBlocks = new HashSet<>();
private final Set<PipeMemoryBlock> expandableBlocks = new HashSet<>();

public PipeMemoryManager() {
PipeDataNodeAgent.runtime()
Expand Down Expand Up @@ -528,8 +530,9 @@ private PipeMemoryBlock registerMemoryBlock(long sizeInBytes, PipeMemoryBlockTyp
return returnedMemoryBlock;
}

// Single-threaded logic
private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
final List<PipeMemoryBlock> shuffledBlocks = new ArrayList<>(allocatedBlocks);
final List<PipeMemoryBlock> shuffledBlocks = new ArrayList<>(shrinkableBlocks);
Collections.shuffle(shuffledBlocks);

while (true) {
Expand All @@ -549,46 +552,64 @@ private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
}
}

void addShrinkableBlock(final PipeMemoryBlock block) {
shrinkableBlocks.add(block);
}

void removeShrinkableBlock(final PipeMemoryBlock block) {
shrinkableBlocks.remove(block);
}

public synchronized void tryExpandAllAndCheckConsistency() {
allocatedBlocks.forEach(PipeMemoryBlock::expand);
expandableBlocks.forEach(PipeMemoryBlock::expand);

if (LOGGER.isDebugEnabled()) {
final long blockSum =
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
if (blockSum != memoryBlock.getUsedMemoryInBytes()) {
LOGGER.debug(
"tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks,"
+ " usedMemorySizeInBytes is {} but sum of all blocks is {}",
memoryBlock.getUsedMemoryInBytes(),
blockSum);
}

long blockSum =
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
if (blockSum != memoryBlock.getUsedMemoryInBytes()) {
LOGGER.warn(
"tryExpandAllAndCheckConsistency: memory usage is not consistent with allocated blocks,"
+ " usedMemorySizeInBytes is {} but sum of all blocks is {}",
memoryBlock.getUsedMemoryInBytes(),
blockSum);
}
final long tabletBlockSum =
allocatedBlocks.stream()
.filter(PipeTabletMemoryBlock.class::isInstance)
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
.sum();
if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
LOGGER.debug(
"tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks,"
+ " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}",
usedMemorySizeInBytesOfTablets,
tabletBlockSum);
}

long tabletBlockSum =
allocatedBlocks.stream()
.filter(PipeTabletMemoryBlock.class::isInstance)
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
.sum();
if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
LOGGER.warn(
"tryExpandAllAndCheckConsistency: memory usage of tablets is not consistent with allocated blocks,"
+ " usedMemorySizeInBytesOfTablets is {} but sum of all tablet blocks is {}",
usedMemorySizeInBytesOfTablets,
tabletBlockSum);
}

long tsFileBlockSum =
allocatedBlocks.stream()
.filter(PipeTsFileMemoryBlock.class::isInstance)
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
.sum();
if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
LOGGER.warn(
"tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks,"
+ " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}",
usedMemorySizeInBytesOfTsFiles,
tsFileBlockSum);
final long tsFileBlockSum =
allocatedBlocks.stream()
.filter(PipeTsFileMemoryBlock.class::isInstance)
.mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
.sum();
if (tsFileBlockSum != usedMemorySizeInBytesOfTsFiles) {
LOGGER.debug(
"tryExpandAllAndCheckConsistency: memory usage of tsfiles is not consistent with allocated blocks,"
+ " usedMemorySizeInBytesOfTsFiles is {} but sum of all tsfile blocks is {}",
usedMemorySizeInBytesOfTsFiles,
tsFileBlockSum);
}
}
}

void addExpandableBlock(final PipeMemoryBlock block) {
expandableBlocks.add(block);
}

void removeExpandableBlock(final PipeMemoryBlock block) {
expandableBlocks.remove(block);
}

public synchronized void release(PipeMemoryBlock block) {
if (!PIPE_MEMORY_MANAGEMENT_ENABLED || block == null || block.isReleased()) {
return;
Expand Down
Loading