From 81c24d32b5b5cc6c644451f22207cf4a65801817 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Tue, 21 Apr 2026 17:17:41 +0000 Subject: [PATCH 1/4] GH-3522: Reuse intermediate buffers in RunLengthBitPackingHybridDecoder PACKED path Allocate the int[] values buffer and byte[] read-staging buffer once per decoder and grow them lazily, instead of allocating fresh arrays on every PACKED run. Resolves the existing "TODO: reuse a buffer" comment. A new currentBufferLength field tracks the logical length of the active region in packedValuesBuffer (which may now exceed the current run's size after a prior larger run grew it). Benchmark (RleDictionaryIndexDecodingBenchmark, 100k INT32, BIT_WIDTH=10, JMH -wi 5 -i 10 -f 2): Pattern | master ops/s | optimized ops/s | Improvement SEQUENTIAL | 93,061,521 | 113,856,860 | +22.3% RANDOM | 92,929,824 | 114,238,638 | +22.9% LOW_CARDINALITY | 92,813,229 | 115,271,347 | +24.2% End-to-end FileReadBenchmark sees ~2% improvement (RLE decoding is a small fraction of full file reads). Validation: 573 parquet-column tests pass. Built with -Dspotless.check.skip=true -Drat.skip=true -Djapicmp.skip=true. --- .../rle/RunLengthBitPackingHybridDecoder.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index e55b276b29..da9db00c7b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -48,6 +48,11 @@ private static enum MODE { private int currentCount; private int currentValue; private int[] currentBuffer; + private int currentBufferLength; + + // Reusable buffers to avoid per-run allocation in PACKED mode + private int[] packedValuesBuffer = new int[0]; + private byte[] packedBytesBuffer = new byte[0]; public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { LOG.debug("decoding bitWidth {}", bitWidth); @@ -69,7 +74,7 @@ public int readInt() throws IOException { result = currentValue; break; case PACKED: - result = currentBuffer[currentBuffer.length - 1 - currentCount]; + result = currentBuffer[currentBufferLength - 1 - currentCount]; break; default: throw new ParquetDecodingException("not a valid mode " + mode); @@ -90,17 +95,24 @@ private void readNext() throws IOException { case PACKED: int numGroups = header >>> 1; currentCount = numGroups * 8; + currentBufferLength = currentCount; LOG.debug("reading {} values BIT PACKED", currentCount); - currentBuffer = new int[currentCount]; // TODO: reuse a buffer - byte[] bytes = new byte[numGroups * bitWidth]; + if (packedValuesBuffer.length < currentCount) { + packedValuesBuffer = new int[currentCount]; + } + currentBuffer = packedValuesBuffer; + int bytesRequired = numGroups * bitWidth; + if (packedBytesBuffer.length < bytesRequired) { + packedBytesBuffer = new byte[bytesRequired]; + } // At the end of the file RLE data though, there might not be that many bytes left. int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(bytes, 0, bytesToRead); + new DataInputStream(in).readFully(packedBytesBuffer, 0, bytesToRead); for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex); + packer.unpack8Values(packedBytesBuffer, byteIndex, currentBuffer, valueIndex); } break; default: From 50e555251a9e5d3e5522446cb3383500ef67b6fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 20:57:15 +0200 Subject: [PATCH 2/4] GH-3522: Convert RunLengthBitPackingHybridDecoder from InputStream to direct ByteBuffer Replace the InputStream-based I/O in RunLengthBitPackingHybridDecoder with direct ByteBuffer access using LITTLE_ENDIAN byte order. This eliminates the InputStream/DataInputStream abstraction layer and enables more efficient reads via buffer.get(), buffer.getShort(), buffer.getInt() intrinsics. Changes: - Add ByteBuffer overloads for BytesUtils.readUnsignedVarInt() and readIntLittleEndianPaddedOnBitWidth() - Update all callers (DictionaryValuesReader, RunLengthBitPackingHybrid- ValuesReader, ColumnReaderBase) to pass ByteBuffer instead of InputStream - Remove IOException from readInt() since ByteBuffer operations don't throw checked exceptions, simplifying all call sites - Update tests to use ByteBuffer API directly All 573 parquet-column tests pass. --- .../parquet/column/impl/ColumnReaderBase.java | 8 +-- .../dictionary/DictionaryValuesReader.java | 52 +++++-------------- .../rle/RunLengthBitPackingHybridDecoder.java | 31 ++++++----- ...RunLengthBitPackingHybridValuesReader.java | 11 ++-- ...LengthBitPackingHybridIntegrationTest.java | 4 +- .../TestRunLengthBitPackingHybridEncoder.java | 7 +-- .../org/apache/parquet/bytes/BytesUtils.java | 44 ++++++++++++++++ 7 files changed, 86 insertions(+), 71 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java index 2b3e47116c..ef9a386314 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderBase.java @@ -773,7 +773,7 @@ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) { return new NullIntIterator(); } return new RLEIntIterator(new RunLengthBitPackingHybridDecoder( - BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toInputStream())); + BytesUtils.getWidthFromMaxInt(maxLevel), bytes.toByteBuffer())); } catch (IOException e) { throw new ParquetDecodingException("could not read levels in page for col " + path, e); } @@ -832,11 +832,7 @@ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) { @Override int nextInt() { - try { - return delegate.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return delegate.readInt(); } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java index 53fafc55dc..77e3784392 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.values.dictionary; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.Dictionary; @@ -52,12 +53,13 @@ public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IO LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available()); int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in); LOG.debug("bit width {}", bitWidth); - decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + ByteBuffer buf = in.slice(in.available()); + decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); } else { - decoder = new RunLengthBitPackingHybridDecoder(1, in) { + decoder = new RunLengthBitPackingHybridDecoder(1, ByteBuffer.allocate(0)) { @Override - public int readInt() throws IOException { - throw new IOException("Attempt to read from empty page"); + public int readInt() { + throw new ParquetDecodingException("Attempt to read from empty page"); } }; } @@ -65,64 +67,36 @@ public int readInt() throws IOException { @Override public int readValueDictionaryId() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override public Binary readBytes() { - try { - return dictionary.decodeToBinary(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToBinary(decoder.readInt()); } @Override public float readFloat() { - try { - return dictionary.decodeToFloat(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToFloat(decoder.readInt()); } @Override public double readDouble() { - try { - return dictionary.decodeToDouble(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToDouble(decoder.readInt()); } @Override public int readInteger() { - try { - return dictionary.decodeToInt(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToInt(decoder.readInt()); } @Override public long readLong() { - try { - return dictionary.decodeToLong(decoder.readInt()); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return dictionary.decodeToLong(decoder.readInt()); } @Override public void skip() { - try { - decoder.readInt(); // Type does not matter as we are just skipping dictionary keys - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + decoder.readInt(); // Type does not matter as we are just skipping dictionary keys } } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index da9db00c7b..cf7af0bda0 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -18,9 +18,8 @@ */ package org.apache.parquet.column.values.rle; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.bitpacking.BytePacker; @@ -42,7 +41,7 @@ private static enum MODE { private final int bitWidth; private final BytePacker packer; - private final InputStream in; + private final ByteBuffer buffer; private MODE mode; private int currentCount; @@ -54,16 +53,22 @@ private static enum MODE { private int[] packedValuesBuffer = new int[0]; private byte[] packedBytesBuffer = new byte[0]; - public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) { + public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBuffer buffer) { LOG.debug("decoding bitWidth {}", bitWidth); Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32"); this.bitWidth = bitWidth; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); - this.in = in; + this.buffer = buffer.order(ByteOrder.LITTLE_ENDIAN); } - public int readInt() throws IOException { + /** + * Reads the next int value from the RLE/Bit-Packing hybrid stream. + * + * @return the next decoded integer value + * @throws ParquetDecodingException if a decoding error occurs + */ + public int readInt() { if (currentCount == 0) { readNext(); } @@ -82,15 +87,15 @@ public int readInt() throws IOException { return result; } - private void readNext() throws IOException { - Preconditions.checkArgument(in.available() > 0, "Reading past RLE/BitPacking stream."); - final int header = BytesUtils.readUnsignedVarInt(in); + private void readNext() { + Preconditions.checkArgument(buffer.hasRemaining(), "Reading past RLE/BitPacking stream."); + final int header = BytesUtils.readUnsignedVarInt(buffer); mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED; switch (mode) { case RLE: currentCount = header >>> 1; LOG.debug("reading {} values RLE", currentCount); - currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(in, bitWidth); + currentValue = BytesUtils.readIntLittleEndianPaddedOnBitWidth(buffer, bitWidth); break; case PACKED: int numGroups = header >>> 1; @@ -107,8 +112,8 @@ private void readNext() throws IOException { } // At the end of the file RLE data though, there might not be that many bytes left. int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); - bytesToRead = Math.min(bytesToRead, in.available()); - new DataInputStream(in).readFully(packedBytesBuffer, 0, bytesToRead); + bytesToRead = Math.min(bytesToRead, buffer.remaining()); + buffer.get(packedBytesBuffer, 0, bytesToRead); for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) { diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java index 0bd5a18d2b..8050662f14 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java @@ -19,10 +19,10 @@ package org.apache.parquet.column.values.rle; import java.io.IOException; +import java.nio.ByteBuffer; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; /** * This ValuesReader does all the reading in {@link #initFromPage} @@ -39,7 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) { @Override public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException { int length = BytesUtils.readIntLittleEndian(stream); - this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, stream.sliceStream(length)); + ByteBuffer buf = stream.slice(length); + this.decoder = new RunLengthBitPackingHybridDecoder(bitWidth, buf); // 4 is for the length which is stored as 4 bytes little endian updateNextOffset(length + 4); @@ -47,11 +48,7 @@ public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws I @Override public int readInteger() { - try { - return decoder.readInt(); - } catch (IOException e) { - throw new ParquetDecodingException(e); - } + return decoder.readInt(); } @Override diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java index 01a4c96e85..b06672cee9 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals; import java.nio.ByteBuffer; -import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.DirectByteBufferAllocator; import org.junit.Test; @@ -57,9 +56,8 @@ private void doIntegrationTest(int bitWidth) throws Exception { encoder.writeInt((int) (17 % modValue)); } ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer(); - ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, encodedBytes); for (int i = 0; i < 100; i++) { assertEquals(i % modValue, decoder.readInt()); diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java index 93a6c8deb4..b38ea671e0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import org.apache.parquet.bytes.BytesUtils; @@ -290,12 +291,12 @@ public void testGroupBoundary() throws Exception { // bit width 2. bytes[0] = (1 << 1) | 1; bytes[1] = (1 << 0) | (2 << 2) | (3 << 4); - ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream); + ByteBuffer buffer = ByteBuffer.wrap(bytes); + RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, buffer); assertEquals(decoder.readInt(), 1); assertEquals(decoder.readInt(), 2); assertEquals(decoder.readInt(), 3); - assertEquals(stream.available(), 0); + assertEquals(buffer.remaining(), 0); } private static List unpack(int bitWidth, int numValues, ByteArrayInputStream is) throws Exception { diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java index b8373a898d..b791eb3f32 100644 --- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java +++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java @@ -141,6 +141,33 @@ public static int readIntLittleEndianPaddedOnBitWidth(InputStream in, int bitWid } } + /** + * Reads a little-endian int padded to the byte count for the given bit width from a ByteBuffer. + * The buffer must be in {@link java.nio.ByteOrder#LITTLE_ENDIAN} order for 2-byte and 4-byte reads. + * + * @param in a ByteBuffer in LITTLE_ENDIAN order + * @param bitWidth the bit width determining how many bytes to read + * @return the value read + */ + public static int readIntLittleEndianPaddedOnBitWidth(ByteBuffer in, int bitWidth) { + int bytesWidth = paddedByteCountFromBits(bitWidth); + switch (bytesWidth) { + case 0: + return 0; + case 1: + return in.get() & 0xFF; + case 2: + return in.getShort() & 0xFFFF; + case 3: + return (in.get() & 0xFF) | ((in.get() & 0xFF) << 8) | ((in.get() & 0xFF) << 16); + case 4: + return in.getInt(); + default: + throw new IllegalArgumentException( + String.format("Encountered bitWidth (%d) that requires more than 4 bytes", bitWidth)); + } + } + public static void writeIntLittleEndianOnOneByte(OutputStream out, int v) throws IOException { out.write((v >>> 0) & 0xFF); } @@ -210,6 +237,23 @@ public static int readUnsignedVarInt(InputStream in) throws IOException { return value | (b << i); } + /** + * Reads an unsigned variable-length integer (varint) from a ByteBuffer. + * + * @param in a ByteBuffer + * @return the unsigned varint value + */ + public static int readUnsignedVarInt(ByteBuffer in) { + int value = 0; + int i = 0; + int b; + while (((b = in.get() & 0xFF) & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + } + return value | (b << i); + } + /** * uses a trick mentioned in https://developers.google.com/protocol-buffers/docs/encoding to read zigZag encoded data * From dc23dbc2a6b62dad51a46a36dbabfebdb69a02a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 20:59:10 +0200 Subject: [PATCH 3/4] GH-3522: Use pack32Values fast path in RLE hybrid encoder Buffer four 8-value groups (32 values) before packing, then use the packer's pack32Values entry point instead of calling pack8Values four separate times. This reduces per-group overhead and enables the packer's optimized 32-value code path. When fewer than 32 values remain at run end, flushBitPackedValues falls back to per-8 packing via pack8Values. All 573 parquet-column tests pass. --- .../rle/RunLengthBitPackingHybridEncoder.java | 46 +++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java index e33824bff1..7dfeb28088 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java @@ -74,6 +74,11 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private final byte[] packBuffer; + /** + * Buffer four 8-value groups so we can use the packer's 32-value fast path. + */ + private final int[] bitPackedValuesBuffer; + /** * Previous value written, used to detect repeated values */ @@ -98,6 +103,8 @@ public class RunLengthBitPackingHybridEncoder implements AutoCloseable { */ private int bitPackedGroupCount; + private int numBitPackedValues; + /** * A "pointer" to a single byte in baos, * which we use as our bit-packed-header. It's really @@ -125,7 +132,8 @@ public RunLengthBitPackingHybridEncoder( this.bitWidth = bitWidth; this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); - this.packBuffer = new byte[bitWidth]; + this.packBuffer = new byte[bitWidth * 4]; + this.bitPackedValuesBuffer = new int[32]; this.bufferedValues = new int[8]; this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); reset(false); @@ -139,6 +147,7 @@ private void reset(boolean resetBaos) { this.numBufferedValues = 0; this.repeatCount = 0; this.bitPackedGroupCount = 0; + this.numBitPackedValues = 0; this.bitPackedRunHeaderPointer = -1; this.toBytesCalled = false; } @@ -196,8 +205,9 @@ private void writeOrAppendBitPackedRun() throws IOException { bitPackedRunHeaderPointer = baos.getCurrentIndex(); } - packer.pack8Values(bufferedValues, 0, packBuffer, 0); - baos.write(packBuffer); + System.arraycopy(bufferedValues, 0, bitPackedValuesBuffer, numBitPackedValues, 8); + numBitPackedValues += 8; + flushBitPackedValuesIfFull(); // empty the buffer, they've all been written numBufferedValues = 0; @@ -209,6 +219,34 @@ private void writeOrAppendBitPackedRun() throws IOException { ++bitPackedGroupCount; } + private void flushBitPackedValuesIfFull() { + if (numBitPackedValues == bitPackedValuesBuffer.length) { + packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0); + baos.write(packBuffer, 0, bitWidth * 4); + numBitPackedValues = 0; + } + } + + private void flushBitPackedValues() { + if (numBitPackedValues == 0) { + return; + } + + if (numBitPackedValues == bitPackedValuesBuffer.length) { + packer.pack32Values(bitPackedValuesBuffer, 0, packBuffer, 0); + baos.write(packBuffer, 0, bitWidth * 4); + } else { + int outPos = 0; + for (int inPos = 0; inPos < numBitPackedValues; inPos += 8) { + packer.pack8Values(bitPackedValuesBuffer, inPos, packBuffer, outPos); + outPos += bitWidth; + } + baos.write(packBuffer, 0, outPos); + } + + numBitPackedValues = 0; + } + /** * If we are currently writing a bit-packed-run, update the * bit-packed-header and consider this run to be over @@ -221,6 +259,8 @@ private void endPreviousBitPackedRun() { return; } + flushBitPackedValues(); + // create bit-packed-header, which needs to fit in 1 byte byte bitPackHeader = (byte) ((bitPackedGroupCount << 1) | 1); From af02aefceb8271bfe35a4de2f863f7d4f21c5a55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Mej=C3=ADa?= Date: Fri, 1 May 2026 21:00:21 +0200 Subject: [PATCH 4/4] GH-3522: Use unpack32Values fast path in RLE hybrid decoder PACKED branch Symmetric to the encoder's pack32Values fast path, the decoder's PACKED branch now batches 4 groups (32 values) into a single unpack32Values call instead of looping unpack8Values four times. Falls back to unpack8Values for residual <4-group tails. This benefits long PACKED runs (>=32 values) by reducing loop overhead and enabling the packer's optimized 32-value code path. Combined benchmark for the full par9 branch (all 4 commits: buffer reuse + ByteBuffer conversion + pack32Values encoder + unpack32Values decoder): RleDictionaryIndexDecodingBenchmark (100k dictionary IDs, JMH -wi 3 -i 5 -f 1): Pattern Before (ops/s) After (ops/s) Improvement SEQUENTIAL 603,445,362 698,066,810 +16% (1.16x) RANDOM 613,691,096 681,685,407 +11% (1.11x) LOW_CARDINALITY 611,963,736 686,200,341 +12% (1.12x) IntEncodingBenchmark.decodeDictionary (100k INT32 values, full dictionary decode path including RLE index decode): Pattern Before (ops/s) After (ops/s) Improvement SEQUENTIAL 418,357,276 539,458,940 +29% (1.29x) RANDOM 417,041,197 527,231,831 +26% (1.26x) LOW_CARDINALITY 605,354,083 628,283,691 +4% HIGH_CARDINALITY 416,731,808 535,763,242 +29% (1.29x) All 573 parquet-column tests pass. --- .../rle/RunLengthBitPackingHybridDecoder.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java index cf7af0bda0..407cd154e1 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java @@ -114,10 +114,18 @@ private void readNext() { int bytesToRead = (int) Math.ceil(currentCount * bitWidth / 8.0); bytesToRead = Math.min(bytesToRead, buffer.remaining()); buffer.get(packedBytesBuffer, 0, bytesToRead); - for (int valueIndex = 0, byteIndex = 0; - valueIndex < currentCount; - valueIndex += 8, byteIndex += bitWidth) { - packer.unpack8Values(packedBytesBuffer, byteIndex, currentBuffer, valueIndex); + // Unpack 32 values (4 groups) at a time when possible — symmetric to the encoder's + // pack32Values fast path. Falls back to unpack8Values for any residual groups. + int groupIdx = 0; + int byteIndex = 0; + final int step32 = bitWidth * 4; + while (groupIdx + 4 <= numGroups) { + packer.unpack32Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8); + groupIdx += 4; + byteIndex += step32; + } + for (; groupIdx < numGroups; groupIdx++, byteIndex += bitWidth) { + packer.unpack8Values(packedBytesBuffer, byteIndex, currentBuffer, groupIdx * 8); } break; default: