Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public void serialize(DataOutputStream stream) throws IOException {
deletion.serializeWithoutFileOffset(stream);
}

public Deletion getModEntry() {
return this.deletion;
}

public static DeletionData deserialize(InputStream stream)
throws IllegalPathException, IOException {
return new DeletionData(Deletion.deserializeWithoutFileOffset(new DataInputStream(stream)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ private void processTimeChunkOrNonAlignedChunk(TsFileSequenceReader reader, byte
long chunkOffset = reader.position();
timeChunkIndexOfCurrentValueColumn = pageIndex2TimesList.size();
consumeAllAlignedChunkData(chunkOffset, pageIndex2ChunkData);
handleModification(offset2Deletions, chunkOffset);

ChunkHeader header = reader.readChunkHeader(marker);
String measurementId = header.getMeasurementID();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.load.LoadFileException;
Expand All @@ -30,10 +31,15 @@
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.splitter.AlignedChunkData;
import org.apache.iotdb.db.storageengine.load.splitter.DeletionData;
import org.apache.iotdb.db.storageengine.load.splitter.TsFileData;
import org.apache.iotdb.db.storageengine.load.splitter.TsFileSplitter;

Expand All @@ -54,7 +60,9 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -233,6 +241,84 @@ public void testCompactionFlushPageAndSplitByTimePartition()
consumeChunkDataAndValidate(targetResource);
}

@Test
public void testDeletionDataShouldOnlyBeGeneratedOnceAtEnd()
throws IOException, MetadataException, LoadFileException, IllegalPathException {
TsFileResource resource = createAlignedMultiDeviceFile();
try (ModificationFile modificationFile = ModificationFile.getNormalMods(resource)) {
modificationFile.write(
new Deletion(new MeasurementPath("root.testsg.d0.s0"), Long.MAX_VALUE, 100));
modificationFile.write(
new Deletion(new MeasurementPath("root.testsg.d0.s1"), Long.MAX_VALUE, 200));
modificationFile.write(
new Deletion(new MeasurementPath("root.testsg.d1.s0"), Long.MAX_VALUE, 300));
modificationFile.write(
new Deletion(new MeasurementPath("root.testsg.d1.s1"), Long.MAX_VALUE, 400));
}

List<Modification> expectedMods =
new ArrayList<>(ModificationFile.getNormalMods(resource).getModifications());
List<Modification> deletionMods = new ArrayList<>();
File actualModsFile = new File(resource.getTsFilePath() + ".mods");
try (ModificationFile actualModificationFile =
new ModificationFile(actualModsFile.getAbsolutePath())) {
TsFileSplitter splitter =
new TsFileSplitter(
resource.getTsFile(),
tsFileData -> {
if (tsFileData instanceof DeletionData) {
deletionMods.add(((DeletionData) tsFileData).getModEntry());
}
return true;
});
splitter.splitTsFileByDataPartition();
}

List<Modification> actualMods;
try (ModificationFile actualModificationFile =
new ModificationFile(actualModsFile.getAbsolutePath())) {
actualMods = new ArrayList<>(actualModificationFile.getModifications());
}
Assert.assertEquals(expectedMods, actualMods);
Files.deleteIfExists(actualModsFile.toPath());
}

private TsFileResource createAlignedMultiDeviceFile() throws IOException {
TsFileResource resource = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new CompactionTestFileWriter(resource)) {
writer.startChunkGroup("d0");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s0", "s1"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)},
new TimeRange[] {
new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020)
}
},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false));
writer.endChunkGroup();

writer.startChunkGroup("d1");
writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
Arrays.asList("s0", "s1"),
new TimeRange[][] {
new TimeRange[] {new TimeRange(1, 100), new TimeRange(200, 300)},
new TimeRange[] {
new TimeRange(604799900, 604800020), new TimeRange(604810020, 604820020)
}
},
TSEncoding.PLAIN,
CompressionType.LZ4,
Arrays.asList(false, false));
writer.endChunkGroup();

writer.endFile();
}
return resource;
}

private TsFileResource performCompaction()
throws StorageEngineException,
IOException,
Expand Down
Loading