diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java index 874c99fded..757d65e906 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java @@ -29,6 +29,8 @@ import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble; +import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA; @@ -147,6 +149,26 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu } }, + /** + * ALP (Adaptive Lossless floating-Point) encoding for FLOAT and DOUBLE types. + * Works by converting floating-point values to integers using decimal scaling, + * then applying Frame of Reference (FOR) encoding and bit-packing. + */ + ALP { + @Override + public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) { + switch (descriptor.getType()) { + case FLOAT: + return new AlpValuesReaderForFloat(); + case DOUBLE: + return new AlpValuesReaderForDouble(); + default: + throw new ParquetDecodingException( + "ALP encoding is only supported for FLOAT and DOUBLE, not " + descriptor.getType()); + } + } + }, + /** * @deprecated This is no longer used, and has been replaced by {@link #RLE} * which is combination of bit packing and rle diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index f29214b458..c5f9d87750 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -50,6 +50,7 @@ public class ParquetProperties { public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE; public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false; + public static final boolean DEFAULT_IS_ALP_ENABLED = false; public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0; public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true; public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; @@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) { private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final ColumnProperty byteStreamSplitEnabled; + private final ColumnProperty alpEnabled; private final Map extraMetaData; private final ColumnProperty statistics; private final ColumnProperty sizeStatistics; @@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build(); + this.alpEnabled = builder.alpEnabled.build(); this.extraMetaData = builder.extraMetaData; this.statistics = builder.statistics.build(); this.sizeStatistics = builder.sizeStatistics.build(); @@ -259,6 +262,23 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) { } } + /** + * Check if ALP encoding is enabled for the given column. + * ALP encoding is only supported for FLOAT and DOUBLE types. + * + * @param column the column descriptor + * @return true if ALP encoding is enabled for this column + */ + public boolean isAlpEnabled(ColumnDescriptor column) { + switch (column.getPrimitiveType().getPrimitiveTypeName()) { + case FLOAT: + case DOUBLE: + return alpEnabled.getValue(column); + default: + return false; + } + } + public ByteBufferAllocator getAllocator() { return allocator; } @@ -416,6 +436,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private final ColumnProperty.Builder byteStreamSplitEnabled; + private final ColumnProperty.Builder alpEnabled; private Map extraMetaData = new HashMap<>(); private final ColumnProperty.Builder statistics; private final ColumnProperty.Builder sizeStatistics; @@ -427,6 +448,7 @@ private Builder() { DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED ? ByteStreamSplitMode.FLOATING_POINT : ByteStreamSplitMode.NONE); + alpEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_ALP_ENABLED); bloomFilterEnabled = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED); bloomFilterNDVs = ColumnProperty.builder().withDefaultValue(null); bloomFilterFPPs = ColumnProperty.builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP); @@ -457,6 +479,7 @@ private Builder(ParquetProperties toCopy) { this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled); + this.alpEnabled = ColumnProperty.builder(toCopy.alpEnabled); this.extraMetaData = toCopy.extraMetaData; this.statistics = ColumnProperty.builder(toCopy.statistics); this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics); @@ -534,6 +557,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) { return this; } + /** + * Enable or disable ALP encoding for FLOAT and DOUBLE columns. + * + * @param enable whether ALP encoding should be enabled + * @return this builder for method chaining. + */ + public Builder withAlpEncoding(boolean enable) { + this.alpEnabled.withDefaultValue(enable); + return this; + } + + /** + * Enable or disable ALP encoding for the specified column. + * + * @param columnPath the path of the column (dot-string) + * @param enable whether ALP encoding should be enabled + * @return this builder for method chaining. + */ + public Builder withAlpEncoding(String columnPath, boolean enable) { + this.alpEnabled.withValue(columnPath, enable); + return this; + } + /** * Set the Parquet format dictionary page size. * diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java new file mode 100644 index 0000000000..b7b93e1faa --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpConstants.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import org.apache.parquet.Preconditions; + +/** + * Constants for the ALP (Adaptive Lossless floating-Point) encoding. + * + *

ALP encoding converts floating-point values to integers using decimal scaling, + * then applies Frame of Reference encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Based on the paper: "ALP: Adaptive Lossless floating-Point Compression" (SIGMOD 2024) + * + * @see ALP Paper + */ +public final class AlpConstants { + + private AlpConstants() { + // Utility class + } + + // Page header fields + public static final int ALP_VERSION = 1; + public static final int ALP_COMPRESSION_MODE = 0; + public static final int ALP_INTEGER_ENCODING_FOR = 0; + public static final int ALP_HEADER_SIZE = 8; + + public static final int DEFAULT_VECTOR_SIZE = 1024; + public static final int DEFAULT_VECTOR_SIZE_LOG = 10; + + // Capped at 15 (vectorSize=32768) because num_exceptions is uint16, + // so vectorSize must not exceed 65535 to avoid overflow when all values are exceptions. + static final int MAX_LOG_VECTOR_SIZE = 15; + static final int MIN_LOG_VECTOR_SIZE = 3; + + static final int FLOAT_MAX_EXPONENT = 10; + static final int DOUBLE_MAX_EXPONENT = 18; + + // Preset caching: full search for the first N vectors, then lock in the top combos + static final int SAMPLER_SAMPLE_VECTORS = 8; + static final int MAX_PRESET_COMBINATIONS = 5; + + // Magic numbers for the fast-rounding trick (see ALP paper, Section 3.2) + static final float MAGIC_FLOAT = 12_582_912.0f; // 2^22 + 2^23 + static final double MAGIC_DOUBLE = 6_755_399_441_055_744.0; // 2^51 + 2^52 + + // Per-vector metadata sizes in bytes + public static final int ALP_INFO_SIZE = 4; // exponent(1) + factor(1) + num_exceptions(2) + public static final int FLOAT_FOR_INFO_SIZE = 5; // frame_of_reference(4) + bit_width(1) + public static final int DOUBLE_FOR_INFO_SIZE = 9; // frame_of_reference(8) + bit_width(1) + + static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f}; + + static final double[] DOUBLE_POW10 = { + 1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18 + }; + + static final int FLOAT_NEGATIVE_ZERO_BITS = 0x80000000; + static final long DOUBLE_NEGATIVE_ZERO_BITS = 0x8000000000000000L; + + /** Validates vector size: must be a power of 2 in [2^MIN_LOG .. 2^MAX_LOG]. */ + static int validateVectorSize(int vectorSize) { + Preconditions.checkArgument( + vectorSize > 0 && (vectorSize & (vectorSize - 1)) == 0, + "Vector size must be a power of 2, got: %s", + vectorSize); + int logSize = Integer.numberOfTrailingZeros(vectorSize); + Preconditions.checkArgument( + logSize >= MIN_LOG_VECTOR_SIZE && logSize <= MAX_LOG_VECTOR_SIZE, + "Vector size log2 must be between %s and %s, got: %s (vectorSize=%s)", + MIN_LOG_VECTOR_SIZE, + MAX_LOG_VECTOR_SIZE, + logSize, + vectorSize); + return vectorSize; + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java new file mode 100644 index 0000000000..45a0056fd6 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpEncoderDecoder.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +/** + * Core ALP (Adaptive Lossless floating-Point) encoding and decoding logic. + * + *

ALP works by converting floating-point values to integers using decimal scaling, + * then applying Frame of Reference encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Encoding formula: encoded = round(value * 10^(exponent - factor)) + *

Decoding formula: value = encoded / 10^(exponent - factor) + * + *

Exception conditions: + *

    + *
  • NaN values
  • + *
  • Infinity values
  • + *
  • Negative zero (-0.0)
  • + *
  • Out of integer range
  • + *
  • Round-trip failure (decode(encode(v)) != v)
  • + *
+ */ +final class AlpEncoderDecoder { + + private AlpEncoderDecoder() { + // Utility class + } + + static float getFloatMultiplier(int exponent, int factor) { + float multiplier = FLOAT_POW10[exponent]; + if (factor > 0) { + multiplier /= FLOAT_POW10[factor]; + } + return multiplier; + } + + static double getDoubleMultiplier(int exponent, int factor) { + double multiplier = DOUBLE_POW10[exponent]; + if (factor > 0) { + multiplier /= DOUBLE_POW10[factor]; + } + return multiplier; + } + + /** NaN, Inf, and -0.0 can never be encoded regardless of exponent/factor. */ + static boolean isFloatException(float value) { + if (Float.isNaN(value)) { + return true; + } + if (Float.isInfinite(value)) { + return true; + } + return Float.floatToRawIntBits(value) == FLOAT_NEGATIVE_ZERO_BITS; + } + + /** Check round-trip: encode then decode, and see if we get the same bits back. */ + static boolean isFloatException(float value, int exponent, int factor) { + if (isFloatException(value)) { + return true; + } + float multiplier = getFloatMultiplier(exponent, factor); + float scaled = value * multiplier; + if (scaled > Integer.MAX_VALUE || scaled < Integer.MIN_VALUE) { + return true; + } + int encoded = encodeFloat(value, exponent, factor); + float decoded = decodeFloat(encoded, exponent, factor); + return Float.floatToRawIntBits(value) != Float.floatToRawIntBits(decoded); + } + + /** Encode: round(value * 10^exponent / 10^factor) */ + static int encodeFloat(float value, int exponent, int factor) { + return fastRoundFloat(value * getFloatMultiplier(exponent, factor)); + } + + /** Decode: encoded / 10^exponent * 10^factor */ + static float decodeFloat(int encoded, int exponent, int factor) { + return encoded / getFloatMultiplier(exponent, factor); + } + + // Uses the 2^22+2^23 magic-number trick to round without branching on the FPU. + static int fastRoundFloat(float value) { + if (value >= 0) { + return (int) ((value + MAGIC_FLOAT) - MAGIC_FLOAT); + } else { + return (int) ((value - MAGIC_FLOAT) + MAGIC_FLOAT); + } + } + + static boolean isDoubleException(double value) { + if (Double.isNaN(value)) { + return true; + } + if (Double.isInfinite(value)) { + return true; + } + return Double.doubleToRawLongBits(value) == DOUBLE_NEGATIVE_ZERO_BITS; + } + + static boolean isDoubleException(double value, int exponent, int factor) { + if (isDoubleException(value)) { + return true; + } + double multiplier = getDoubleMultiplier(exponent, factor); + double scaled = value * multiplier; + if (scaled > Long.MAX_VALUE || scaled < Long.MIN_VALUE) { + return true; + } + long encoded = encodeDouble(value, exponent, factor); + double decoded = decodeDouble(encoded, exponent, factor); + return Double.doubleToRawLongBits(value) != Double.doubleToRawLongBits(decoded); + } + + static long encodeDouble(double value, int exponent, int factor) { + return fastRoundDouble(value * getDoubleMultiplier(exponent, factor)); + } + + static double decodeDouble(long encoded, int exponent, int factor) { + return encoded / getDoubleMultiplier(exponent, factor); + } + + // Same trick but with 2^51+2^52 for double precision. + static long fastRoundDouble(double value) { + if (value >= 0) { + return (long) ((value + MAGIC_DOUBLE) - MAGIC_DOUBLE); + } else { + return (long) ((value - MAGIC_DOUBLE) + MAGIC_DOUBLE); + } + } + + /** Number of bits needed to represent maxDelta as an unsigned value. */ + static int bitWidthForInt(int maxDelta) { + if (maxDelta == 0) { + return 0; + } + return Integer.SIZE - Integer.numberOfLeadingZeros(maxDelta); + } + + static int bitWidthForLong(long maxDelta) { + if (maxDelta == 0) { + return 0; + } + return Long.SIZE - Long.numberOfLeadingZeros(maxDelta); + } + + public static class EncodingParams { + public final int exponent; + public final int factor; + public final int numExceptions; + + EncodingParams(int exponent, int factor, int numExceptions) { + this.exponent = exponent; + this.factor = factor; + this.numExceptions = numExceptions; + } + } + + /** Try all (exponent, factor) combos and pick the one with fewest exceptions. */ + static EncodingParams findBestFloatParams(float[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; + + for (int e = 0; e <= FLOAT_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isFloatException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + /** Same as findBestFloatParams but only tries the cached preset combos. */ + static EncodingParams findBestFloatParamsWithPresets(float[] values, int offset, int length, int[][] presets) { + int bestExponent = presets[0][0]; + int bestFactor = presets[0][1]; + int bestExceptions = length; + + for (int[] preset : presets) { + int e = preset[0]; + int f = preset[1]; + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isFloatException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + static EncodingParams findBestDoubleParams(double[] values, int offset, int length) { + int bestExponent = 0; + int bestFactor = 0; + int bestExceptions = length; + + for (int e = 0; e <= DOUBLE_MAX_EXPONENT; e++) { + for (int f = 0; f <= e; f++) { + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isDoubleException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + + static EncodingParams findBestDoubleParamsWithPresets(double[] values, int offset, int length, int[][] presets) { + int bestExponent = presets[0][0]; + int bestFactor = presets[0][1]; + int bestExceptions = length; + + for (int[] preset : presets) { + int e = preset[0]; + int f = preset[1]; + int exceptions = 0; + for (int i = 0; i < length; i++) { + if (isDoubleException(values[offset + i], e, f)) { + exceptions++; + } + } + if (exceptions < bestExceptions) { + bestExponent = e; + bestFactor = f; + bestExceptions = exceptions; + if (bestExceptions == 0) { + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } + } + } + return new EncodingParams(bestExponent, bestFactor, bestExceptions); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java new file mode 100644 index 0000000000..7dd3fc7960 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReader.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * Abstract base class for ALP values readers with lazy per-vector decoding. + * + *

Reads ALP-encoded values from the interleaved page layout: + *

+ * ┌─────────┬──────────────────────┬──────────────┬──────────────┬─────┐
+ * │ Header  │ Offset Array         │ Vector 0     │ Vector 1     │ ... │
+ * │ 8 bytes │ 4B × numVectors │ (interleaved)│ (interleaved)│     │
+ * └─────────┴──────────────────────┴──────────────┴──────────────┴─────┘
+ * 
+ * + *

Each vector is decoded lazily on first access. Skipping values does not + * trigger decoding of intermediate vectors. + */ +abstract class AlpValuesReader extends ValuesReader { + + protected int vectorSize; + protected int totalCount; + protected int numVectors; + protected int currentIndex; + protected int currentVectorIndex; + + protected int[] vectorOffsets; + protected ByteBuffer vectorsData; + protected int offsetArraySize; + + AlpValuesReader() { + this.currentIndex = 0; + this.totalCount = 0; + this.currentVectorIndex = -1; + } + + @Override + public void initFromPage(int valuesCount, ByteBufferInputStream stream) + throws ParquetDecodingException, IOException { + ByteBuffer headerBuf = stream.slice(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + int version = headerBuf.get() & 0xFF; + int compressionMode = headerBuf.get() & 0xFF; + int integerEncoding = headerBuf.get() & 0xFF; + int logVectorSize = headerBuf.get() & 0xFF; + int numElements = headerBuf.getInt(); + + if (version != ALP_VERSION) { + throw new ParquetDecodingException("Unsupported ALP version: " + version + ", expected " + ALP_VERSION); + } + if (compressionMode != ALP_COMPRESSION_MODE) { + throw new ParquetDecodingException("Unsupported ALP compression mode: " + compressionMode); + } + if (integerEncoding != ALP_INTEGER_ENCODING_FOR) { + throw new ParquetDecodingException("Unsupported ALP integer encoding: " + integerEncoding); + } + if (logVectorSize < MIN_LOG_VECTOR_SIZE || logVectorSize > MAX_LOG_VECTOR_SIZE) { + throw new ParquetDecodingException("Invalid ALP log vector size: " + logVectorSize + ", must be between " + + MIN_LOG_VECTOR_SIZE + " and " + MAX_LOG_VECTOR_SIZE); + } + if (numElements < 0) { + throw new ParquetDecodingException("Invalid ALP element count: " + numElements); + } + + this.vectorSize = 1 << logVectorSize; + this.totalCount = numElements; + this.numVectors = (numElements + vectorSize - 1) / vectorSize; + this.currentIndex = 0; + this.currentVectorIndex = -1; + + this.offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsetBuf = stream.slice(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + this.vectorOffsets = new int[numVectors]; + for (int v = 0; v < numVectors; v++) { + vectorOffsets[v] = offsetBuf.getInt(); + } + + // Slice remaining bytes into a 0-based view so decodeVector can use + // absolute get methods (vectorsData.get(pos)) directly. + int remainingBytes = (int) stream.available(); + ByteBuffer rawSlice = stream.slice(remainingBytes); + this.vectorsData = rawSlice.slice().order(ByteOrder.LITTLE_ENDIAN); + + allocateDecodedBuffer(vectorSize); + } + + protected int getVectorLength(int vectorIdx) { + if (vectorIdx < numVectors - 1) { + return vectorSize; + } + // Last vector may be partial + int lastVectorLen = totalCount % vectorSize; + return lastVectorLen == 0 ? vectorSize : lastVectorLen; + } + + // Offsets in the page are relative to the compression body (after header), + // but vectorsData starts after the offset array, so adjust. + protected int getVectorDataPosition(int vectorIdx) { + return vectorOffsets[vectorIdx] - offsetArraySize; + } + + @Override + public void skip() { + skip(1); + } + + @Override + public void skip(int n) { + if (n < 0 || currentIndex + n > totalCount) { + throw new ParquetDecodingException(String.format( + "Cannot skip this many elements. Current index: %d. Skip %d. Total count: %d", + currentIndex, n, totalCount)); + } + currentIndex += n; + } + + protected void ensureVectorDecoded() { + int vectorIdx = currentIndex / vectorSize; + if (vectorIdx != currentVectorIndex) { + decodeVector(vectorIdx); + currentVectorIndex = vectorIdx; + } + } + + protected abstract void allocateDecodedBuffer(int capacity); + + protected abstract void decodeVector(int vectorIdx); +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java new file mode 100644 index 0000000000..21da7bc714 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForDouble.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * ALP values reader for DOUBLE type with lazy per-vector decoding. + * + *

Reads ALP-encoded double values from the interleaved page layout. + * Each vector is decoded on first access using BytePackerForLong-based unpacking. + */ +public class AlpValuesReaderForDouble extends AlpValuesReader { + + private double[] decodedValues; + + public AlpValuesReaderForDouble() { + super(); + } + + @Override + protected void allocateDecodedBuffer(int capacity) { + this.decodedValues = new double[capacity]; + } + + @Override + public double readDouble() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP double data was already exhausted."); + } + ensureVectorDecoded(); + int indexInVector = currentIndex % vectorSize; + currentIndex++; + return decodedValues[indexInVector]; + } + + @Override + protected void decodeVector(int vectorIdx) { + int vectorLen = getVectorLength(vectorIdx); + int pos = getVectorDataPosition(vectorIdx); + + int exponent = vectorsData.get(pos) & 0xFF; + int factor = vectorsData.get(pos + 1) & 0xFF; + int numExceptions = getShortLE(vectorsData, pos + 2) & 0xFFFF; + pos += ALP_INFO_SIZE; + + long frameOfReference = getLongLE(vectorsData, pos); + int bitWidth = vectorsData.get(pos + 8) & 0xFF; + pos += DOUBLE_FOR_INFO_SIZE; + + long[] deltas = new long[vectorLen]; + if (bitWidth > 0) { + pos = unpackLongsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + } + + for (int i = 0; i < vectorLen; i++) { + long encoded = deltas[i] + frameOfReference; + decodedValues[i] = AlpEncoderDecoder.decodeDouble(encoded, exponent, factor); + } + + if (numExceptions > 0) { + int[] excPositions = new int[numExceptions]; + for (int e = 0; e < numExceptions; e++) { + excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + pos += Short.BYTES; + } + for (int e = 0; e < numExceptions; e++) { + decodedValues[excPositions[e]] = getDoubleLE(vectorsData, pos); + pos += Double.BYTES; + } + } + } + + private int unpackLongsWithBytePacker(ByteBuffer buf, int pos, long[] output, int count, int bitWidth) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + + for (int g = 0; g < numFullGroups; g++) { + packer.unpack8Values(buf, pos, output, g * 8); + pos += bitWidth; + } + + // Last group might have fewer than 8 values; zero-pad and unpack, + // but only advance pos by the actual bytes in the page. + if (remaining > 0) { + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyRead = numFullGroups * bitWidth; + int partialBytes = totalPackedBytes - alreadyRead; + + byte[] padded = new byte[bitWidth]; + for (int i = 0; i < partialBytes; i++) { + padded[i] = buf.get(pos + i); + } + + long[] temp = new long[8]; + packer.unpack8Values(padded, 0, temp, 0); + System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + pos += partialBytes; + } + + return pos; + } + + private static int getShortLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); + } + + private static long getLongLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFFL) + | ((buf.get(pos + 1) & 0xFFL) << 8) + | ((buf.get(pos + 2) & 0xFFL) << 16) + | ((buf.get(pos + 3) & 0xFFL) << 24) + | ((buf.get(pos + 4) & 0xFFL) << 32) + | ((buf.get(pos + 5) & 0xFFL) << 40) + | ((buf.get(pos + 6) & 0xFFL) << 48) + | ((buf.get(pos + 7) & 0xFFL) << 56); + } + + private static double getDoubleLE(ByteBuffer buf, int pos) { + return Double.longBitsToDouble(getLongLE(buf, pos)); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java new file mode 100644 index 0000000000..cf4aa5ded7 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesReaderForFloat.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; + +/** + * ALP values reader for FLOAT type with lazy per-vector decoding. + * + *

Reads ALP-encoded float values from the interleaved page layout. + * Each vector is decoded on first access using BytePacker-based unpacking. + */ +public class AlpValuesReaderForFloat extends AlpValuesReader { + + private float[] decodedValues; + + public AlpValuesReaderForFloat() { + super(); + } + + @Override + protected void allocateDecodedBuffer(int capacity) { + this.decodedValues = new float[capacity]; + } + + @Override + public float readFloat() { + if (currentIndex >= totalCount) { + throw new ParquetDecodingException("ALP float data was already exhausted."); + } + ensureVectorDecoded(); + int indexInVector = currentIndex % vectorSize; + currentIndex++; + return decodedValues[indexInVector]; + } + + @Override + protected void decodeVector(int vectorIdx) { + int vectorLen = getVectorLength(vectorIdx); + int pos = getVectorDataPosition(vectorIdx); + + int exponent = vectorsData.get(pos) & 0xFF; + int factor = vectorsData.get(pos + 1) & 0xFF; + int numExceptions = getShortLE(vectorsData, pos + 2) & 0xFFFF; + pos += ALP_INFO_SIZE; + + int frameOfReference = getIntLE(vectorsData, pos); + int bitWidth = vectorsData.get(pos + 4) & 0xFF; + pos += FLOAT_FOR_INFO_SIZE; + + int[] deltas = new int[vectorLen]; + if (bitWidth > 0) { + pos = unpackIntsWithBytePacker(vectorsData, pos, deltas, vectorLen, bitWidth); + } + + // Reverse the frame-of-reference subtraction, then decimal-decode + for (int i = 0; i < vectorLen; i++) { + int encoded = deltas[i] + frameOfReference; + decodedValues[i] = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + } + + // Overwrite exception slots with their original float values + if (numExceptions > 0) { + int[] excPositions = new int[numExceptions]; + for (int e = 0; e < numExceptions; e++) { + excPositions[e] = getShortLE(vectorsData, pos) & 0xFFFF; + pos += Short.BYTES; + } + for (int e = 0; e < numExceptions; e++) { + decodedValues[excPositions[e]] = getFloatLE(vectorsData, pos); + pos += Float.BYTES; + } + } + } + + /** Unpack bit-packed ints in groups of 8, returns position after packed data. */ + private int unpackIntsWithBytePacker(ByteBuffer buf, int pos, int[] output, int count, int bitWidth) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + + for (int g = 0; g < numFullGroups; g++) { + packer.unpack8Values(buf, pos, output, g * 8); + pos += bitWidth; + } + + if (remaining > 0) { + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyRead = numFullGroups * bitWidth; + int partialBytes = totalPackedBytes - alreadyRead; + + byte[] padded = new byte[bitWidth]; + for (int i = 0; i < partialBytes; i++) { + padded[i] = buf.get(pos + i); + } + + int[] temp = new int[8]; + packer.unpack8Values(padded, 0, temp, 0); + System.arraycopy(temp, 0, output, numFullGroups * 8, remaining); + pos += partialBytes; + } + + return pos; + } + + private static int getShortLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) | ((buf.get(pos + 1) & 0xFF) << 8); + } + + // Explicit LE reads instead of relying on ByteBuffer order, since + // we use absolute get() which ignores the buffer's byte order. + private static int getIntLE(ByteBuffer buf, int pos) { + return (buf.get(pos) & 0xFF) + | ((buf.get(pos + 1) & 0xFF) << 8) + | ((buf.get(pos + 2) & 0xFF) << 16) + | ((buf.get(pos + 3) & 0xFF) << 24); + } + + private static float getFloatLE(ByteBuffer buf, int pos) { + return Float.intBitsToFloat(getIntLE(buf, pos)); + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java new file mode 100644 index 0000000000..0e3e82a06e --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/alp/AlpValuesWriter.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.apache.parquet.column.values.alp.AlpConstants.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.parquet.bytes.ByteBufferAllocator; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.CapacityByteArrayOutputStream; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; + +/** + * ALP (Adaptive Lossless floating-Point) values writer. + * + *

ALP encoding converts floating-point values to integers using decimal scaling, + * then applies Frame of Reference encoding and bit-packing. + * Values that cannot be losslessly converted are stored as exceptions. + * + *

Writing is incremental: values are buffered in a fixed-size vector buffer, + * and each full vector is encoded and flushed to the output stream immediately. + * On {@link #getBytes()}, any remaining partial vector is flushed, and the + * final page bytes are assembled. + * + *

Interleaved Page Layout: + *

+ * ┌─────────┬──────────────────────┬──────────────┬──────────────┬─────┐
+ * │ Header  │ Offset Array         │ Vector 0     │ Vector 1     │ ... │
+ * │ 8 bytes │ 4B × numVectors │ (interleaved)│ (interleaved)│     │
+ * └─────────┴──────────────────────┴──────────────┴──────────────┴─────┘
+ * 
+ * + *

Each vector contains interleaved: + * AlpInfo(4B) + ForInfo(5B/9B) + PackedValues + ExceptionPositions + ExceptionValues + */ +public abstract class AlpValuesWriter extends ValuesWriter { + + protected final int initialCapacity; + protected final int pageSize; + protected final ByteBufferAllocator allocator; + protected final int vectorSize; + protected final int logVectorSize; + + AlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + AlpConstants.validateVectorSize(vectorSize); + this.initialCapacity = initialCapacity; + this.pageSize = pageSize; + this.allocator = allocator; + this.vectorSize = vectorSize; + this.logVectorSize = Integer.numberOfTrailingZeros(vectorSize); + } + + @Override + public Encoding getEncoding() { + return Encoding.ALP; + } + + /** Float writer. Buffers one vector at a time, encodes and flushes when full. */ + public static class FloatAlpValuesWriter extends AlpValuesWriter { + private final float[] vectorBuffer; + private int bufferCount; + private int totalCount; + private CapacityByteArrayOutputStream encodedVectors; + private final List vectorByteSizes; + + // Preset caching + private int vectorsProcessed; + private int[][] cachedPresets; + private final Map presetCounts; + + public FloatAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); + } + + public FloatAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + super(initialCapacity, pageSize, allocator, vectorSize); + this.vectorBuffer = new float[vectorSize]; + this.bufferCount = 0; + this.totalCount = 0; + this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); + this.vectorByteSizes = new ArrayList<>(); + this.vectorsProcessed = 0; + this.cachedPresets = null; + this.presetCounts = new HashMap<>(); + } + + @Override + public void writeFloat(float v) { + vectorBuffer[bufferCount++] = v; + totalCount++; + if (bufferCount == vectorSize) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + } + + private void encodeAndFlushVector(int vectorLen) { + // Use cached presets after the sampling phase, full search before + AlpEncoderDecoder.EncodingParams params; + if (cachedPresets != null) { + params = AlpEncoderDecoder.findBestFloatParamsWithPresets(vectorBuffer, 0, vectorLen, cachedPresets); + } else { + params = AlpEncoderDecoder.findBestFloatParams(vectorBuffer, 0, vectorLen); + long key = ((long) params.exponent << Integer.SIZE) | params.factor; + presetCounts.merge(key, 1, Integer::sum); + } + + vectorsProcessed++; + if (cachedPresets == null && vectorsProcessed >= SAMPLER_SAMPLE_VECTORS) { + buildPresetCache(); + } + + int[] encoded = new int[vectorLen]; + short[] excPositions = new short[params.numExceptions]; + float[] excValues = new float[params.numExceptions]; + int excIdx = 0; + + // We need a valid encoded value to fill exception slots (placeholder). + // Any non-exception value works; it gets overwritten on decode. + int placeholder = 0; + for (int i = 0; i < vectorLen; i++) { + if (!AlpEncoderDecoder.isFloatException(vectorBuffer[i], params.exponent, params.factor)) { + placeholder = AlpEncoderDecoder.encodeFloat(vectorBuffer[i], params.exponent, params.factor); + break; + } + } + + int minValue = Integer.MAX_VALUE; + for (int i = 0; i < vectorLen; i++) { + if (AlpEncoderDecoder.isFloatException(vectorBuffer[i], params.exponent, params.factor)) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = vectorBuffer[i]; + excIdx++; + encoded[i] = placeholder; + } else { + encoded[i] = AlpEncoderDecoder.encodeFloat(vectorBuffer[i], params.exponent, params.factor); + } + if (encoded[i] < minValue) { + minValue = encoded[i]; + } + } + + // Subtract min so deltas start at 0, reducing bit width. + // The subtraction may wrap for large ranges but unsigned bits stay correct. + int maxDelta = 0; + for (int i = 0; i < vectorLen; i++) { + encoded[i] = encoded[i] - minValue; + if (Integer.compareUnsigned(encoded[i], maxDelta) > 0) { + maxDelta = encoded[i]; + } + } + + int bitWidth = AlpEncoderDecoder.bitWidthForInt(maxDelta); + + long startSize = encodedVectors.size(); + + // AlpInfo: exponent(1) + factor(1) + numExceptions(2) + ByteBuffer alpInfo = ByteBuffer.allocate(ALP_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + alpInfo.put((byte) params.exponent); + alpInfo.put((byte) params.factor); + alpInfo.putShort((short) params.numExceptions); + encodedVectors.write(alpInfo.array(), 0, ALP_INFO_SIZE); + + // ForInfo: frameOfReference(4) + bitWidth(1) + ByteBuffer forInfo = ByteBuffer.allocate(FLOAT_FOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + forInfo.putInt(minValue); + forInfo.put((byte) bitWidth); + encodedVectors.write(forInfo.array(), 0, FLOAT_FOR_INFO_SIZE); + + if (bitWidth > 0) { + packIntsWithBytePacker(encoded, vectorLen, bitWidth); + } + + // Exception positions then values, written as separate blocks + if (params.numExceptions > 0) { + ByteBuffer excPosBuf = + ByteBuffer.allocate(params.numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < params.numExceptions; i++) { + excPosBuf.putShort(excPositions[i]); + } + encodedVectors.write(excPosBuf.array(), 0, params.numExceptions * Short.BYTES); + + ByteBuffer excValBuf = + ByteBuffer.allocate(params.numExceptions * Float.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < params.numExceptions; i++) { + excValBuf.putFloat(excValues[i]); + } + encodedVectors.write(excValBuf.array(), 0, params.numExceptions * Float.BYTES); + } + + vectorByteSizes.add((int) (encodedVectors.size() - startSize)); + } + + private void packIntsWithBytePacker(int[] values, int count, int bitWidth) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + byte[] packed = new byte[bitWidth]; + + for (int g = 0; g < numFullGroups; g++) { + packer.pack8Values(values, g * 8, packed, 0); + encodedVectors.write(packed, 0, bitWidth); + } + + // Partial last group: pack 8 values (zero-padded), but only write + // ceil(count * bitWidth / 8) - alreadyWritten bytes per spec. + if (remaining > 0) { + int[] padded = new int[8]; + System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); + packer.pack8Values(padded, 0, packed, 0); + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyWritten = numFullGroups * bitWidth; + encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + } + } + + private void buildPresetCache() { + List> sorted = new ArrayList<>(presetCounts.entrySet()); + sorted.sort(Comparator., Integer>comparing(Map.Entry::getValue) + .reversed()); + int numPresets = Math.min(sorted.size(), MAX_PRESET_COMBINATIONS); + cachedPresets = new int[numPresets][2]; + for (int i = 0; i < numPresets; i++) { + long key = sorted.get(i).getKey(); + cachedPresets[i][0] = (int) (key >>> Integer.SIZE); // exponent + cachedPresets[i][1] = (int) key; // factor + } + } + + @Override + public long getBufferedSize() { + // Encoded vectors so far + estimate for buffered values + return encodedVectors.size() + (long) bufferCount * 3; + } + + @Override + public BytesInput getBytes() { + if (totalCount == 0) { + return BytesInput.empty(); + } + + if (bufferCount > 0) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + + int numVectors = vectorByteSizes.size(); + + // Assemble: [header 8B] [offset array 4B*N] [vector data...] + ByteBuffer header = ByteBuffer.allocate(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + header.put((byte) ALP_VERSION); + header.put((byte) ALP_COMPRESSION_MODE); + header.put((byte) ALP_INTEGER_ENCODING_FOR); + header.put((byte) logVectorSize); + header.putInt(totalCount); + + int offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + int currentOffset = offsetArraySize; + for (int v = 0; v < numVectors; v++) { + offsets.putInt(currentOffset); + currentOffset += vectorByteSizes.get(v); + } + + return BytesInput.concat( + BytesInput.from(header.array()), BytesInput.from(offsets.array()), BytesInput.from(encodedVectors)); + } + + @Override + public void reset() { + bufferCount = 0; + totalCount = 0; + encodedVectors.reset(); + vectorByteSizes.clear(); + vectorsProcessed = 0; + cachedPresets = null; + presetCounts.clear(); + } + + @Override + public void close() { + encodedVectors.close(); + } + + @Override + public long getAllocatedSize() { + return (long) vectorBuffer.length * Float.BYTES + encodedVectors.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s FloatAlpValuesWriter %d values, %d bytes allocated", prefix, totalCount, getAllocatedSize()); + } + } + + /** Double writer. Same structure as FloatAlpValuesWriter but uses longs. */ + public static class DoubleAlpValuesWriter extends AlpValuesWriter { + private final double[] vectorBuffer; + private int bufferCount; + private int totalCount; + private CapacityByteArrayOutputStream encodedVectors; + private final List vectorByteSizes; + + // Preset caching + private int vectorsProcessed; + private int[][] cachedPresets; + private final Map presetCounts; + + public DoubleAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { + this(initialCapacity, pageSize, allocator, DEFAULT_VECTOR_SIZE); + } + + public DoubleAlpValuesWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator, int vectorSize) { + super(initialCapacity, pageSize, allocator, vectorSize); + this.vectorBuffer = new double[vectorSize]; + this.bufferCount = 0; + this.totalCount = 0; + this.encodedVectors = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); + this.vectorByteSizes = new ArrayList<>(); + this.vectorsProcessed = 0; + this.cachedPresets = null; + this.presetCounts = new HashMap<>(); + } + + @Override + public void writeDouble(double v) { + vectorBuffer[bufferCount++] = v; + totalCount++; + if (bufferCount == vectorSize) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + } + + private void encodeAndFlushVector(int vectorLen) { + AlpEncoderDecoder.EncodingParams params; + if (cachedPresets != null) { + params = AlpEncoderDecoder.findBestDoubleParamsWithPresets(vectorBuffer, 0, vectorLen, cachedPresets); + } else { + params = AlpEncoderDecoder.findBestDoubleParams(vectorBuffer, 0, vectorLen); + long key = ((long) params.exponent << Integer.SIZE) | params.factor; + presetCounts.merge(key, 1, Integer::sum); + } + + vectorsProcessed++; + if (cachedPresets == null && vectorsProcessed >= SAMPLER_SAMPLE_VECTORS) { + buildPresetCache(); + } + + long[] encoded = new long[vectorLen]; + short[] excPositions = new short[params.numExceptions]; + double[] excValues = new double[params.numExceptions]; + int excIdx = 0; + long placeholder = 0; + for (int i = 0; i < vectorLen; i++) { + if (!AlpEncoderDecoder.isDoubleException(vectorBuffer[i], params.exponent, params.factor)) { + placeholder = AlpEncoderDecoder.encodeDouble(vectorBuffer[i], params.exponent, params.factor); + break; + } + } + + long minValue = Long.MAX_VALUE; + for (int i = 0; i < vectorLen; i++) { + if (AlpEncoderDecoder.isDoubleException(vectorBuffer[i], params.exponent, params.factor)) { + excPositions[excIdx] = (short) i; + excValues[excIdx] = vectorBuffer[i]; + excIdx++; + encoded[i] = placeholder; + } else { + encoded[i] = AlpEncoderDecoder.encodeDouble(vectorBuffer[i], params.exponent, params.factor); + } + if (encoded[i] < minValue) { + minValue = encoded[i]; + } + } + + long maxDelta = 0; + for (int i = 0; i < vectorLen; i++) { + encoded[i] = encoded[i] - minValue; + if (Long.compareUnsigned(encoded[i], maxDelta) > 0) { + maxDelta = encoded[i]; + } + } + + int bitWidth = AlpEncoderDecoder.bitWidthForLong(maxDelta); + + long startSize = encodedVectors.size(); + + ByteBuffer alpInfo = ByteBuffer.allocate(ALP_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + alpInfo.put((byte) params.exponent); + alpInfo.put((byte) params.factor); + alpInfo.putShort((short) params.numExceptions); + encodedVectors.write(alpInfo.array(), 0, ALP_INFO_SIZE); + + // ForInfo is 9 bytes for double (vs 5 for float) due to 8-byte frame of reference + ByteBuffer forInfo = ByteBuffer.allocate(DOUBLE_FOR_INFO_SIZE).order(ByteOrder.LITTLE_ENDIAN); + forInfo.putLong(minValue); + forInfo.put((byte) bitWidth); + encodedVectors.write(forInfo.array(), 0, DOUBLE_FOR_INFO_SIZE); + + if (bitWidth > 0) { + packLongsWithBytePacker(encoded, vectorLen, bitWidth); + } + + if (params.numExceptions > 0) { + ByteBuffer excPosBuf = + ByteBuffer.allocate(params.numExceptions * Short.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < params.numExceptions; i++) { + excPosBuf.putShort(excPositions[i]); + } + encodedVectors.write(excPosBuf.array(), 0, params.numExceptions * Short.BYTES); + + ByteBuffer excValBuf = + ByteBuffer.allocate(params.numExceptions * Double.BYTES).order(ByteOrder.LITTLE_ENDIAN); + for (int i = 0; i < params.numExceptions; i++) { + excValBuf.putDouble(excValues[i]); + } + encodedVectors.write(excValBuf.array(), 0, params.numExceptions * Double.BYTES); + } + + vectorByteSizes.add((int) (encodedVectors.size() - startSize)); + } + + private void packLongsWithBytePacker(long[] values, int count, int bitWidth) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + int numFullGroups = count / 8; + int remaining = count % 8; + byte[] packed = new byte[bitWidth]; + + for (int g = 0; g < numFullGroups; g++) { + packer.pack8Values(values, g * 8, packed, 0); + encodedVectors.write(packed, 0, bitWidth); + } + + if (remaining > 0) { + long[] padded = new long[8]; + System.arraycopy(values, numFullGroups * 8, padded, 0, remaining); + packer.pack8Values(padded, 0, packed, 0); + int totalPackedBytes = (count * bitWidth + 7) / 8; + int alreadyWritten = numFullGroups * bitWidth; + encodedVectors.write(packed, 0, totalPackedBytes - alreadyWritten); + } + } + + private void buildPresetCache() { + List> sorted = new ArrayList<>(presetCounts.entrySet()); + sorted.sort(Comparator., Integer>comparing(Map.Entry::getValue) + .reversed()); + int numPresets = Math.min(sorted.size(), MAX_PRESET_COMBINATIONS); + cachedPresets = new int[numPresets][2]; + for (int i = 0; i < numPresets; i++) { + long key = sorted.get(i).getKey(); + cachedPresets[i][0] = (int) (key >>> Integer.SIZE); + cachedPresets[i][1] = (int) key; + } + } + + @Override + public long getBufferedSize() { + return encodedVectors.size() + (long) bufferCount * 5; + } + + @Override + public BytesInput getBytes() { + if (totalCount == 0) { + return BytesInput.empty(); + } + + if (bufferCount > 0) { + encodeAndFlushVector(bufferCount); + bufferCount = 0; + } + + int numVectors = vectorByteSizes.size(); + + ByteBuffer header = ByteBuffer.allocate(ALP_HEADER_SIZE).order(ByteOrder.LITTLE_ENDIAN); + header.put((byte) ALP_VERSION); + header.put((byte) ALP_COMPRESSION_MODE); + header.put((byte) ALP_INTEGER_ENCODING_FOR); + header.put((byte) logVectorSize); + header.putInt(totalCount); + + // Offset array lets the reader jump to any vector in O(1) + int offsetArraySize = numVectors * Integer.BYTES; + ByteBuffer offsets = ByteBuffer.allocate(offsetArraySize).order(ByteOrder.LITTLE_ENDIAN); + int currentOffset = offsetArraySize; + for (int v = 0; v < numVectors; v++) { + offsets.putInt(currentOffset); + currentOffset += vectorByteSizes.get(v); + } + + return BytesInput.concat( + BytesInput.from(header.array()), BytesInput.from(offsets.array()), BytesInput.from(encodedVectors)); + } + + @Override + public void reset() { + bufferCount = 0; + totalCount = 0; + encodedVectors.reset(); + vectorByteSizes.clear(); + vectorsProcessed = 0; + cachedPresets = null; + presetCounts.clear(); + } + + @Override + public void close() { + encodedVectors.close(); + } + + @Override + public long getAllocatedSize() { + return (long) vectorBuffer.length * Double.BYTES + encodedVectors.getCapacity(); + } + + @Override + public String memUsageString(String prefix) { + return String.format( + "%s DoubleAlpValuesWriter %d values, %d bytes allocated", prefix, totalCount, getAllocatedSize()); + } + } +} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java index c50b4e49c5..b72343f4ff 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java @@ -25,6 +25,7 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.values.ValuesWriter; +import org.apache.parquet.column.values.alp.AlpValuesWriter; import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; @@ -159,7 +160,12 @@ private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) { private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) { final ValuesWriter fallbackWriter; - if (this.parquetProperties.isByteStreamSplitEnabled(path)) { + if (this.parquetProperties.isAlpEnabled(path)) { + fallbackWriter = new AlpValuesWriter.DoubleAlpValuesWriter( + parquetProperties.getInitialSlabSize(), + parquetProperties.getPageSizeThreshold(), + parquetProperties.getAllocator()); + } else if (this.parquetProperties.isByteStreamSplitEnabled(path)) { fallbackWriter = new ByteStreamSplitValuesWriter.DoubleByteStreamSplitValuesWriter( parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), @@ -176,7 +182,12 @@ private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) { private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) { final ValuesWriter fallbackWriter; - if (this.parquetProperties.isByteStreamSplitEnabled(path)) { + if (this.parquetProperties.isAlpEnabled(path)) { + fallbackWriter = new AlpValuesWriter.FloatAlpValuesWriter( + parquetProperties.getInitialSlabSize(), + parquetProperties.getPageSizeThreshold(), + parquetProperties.getAllocator()); + } else if (this.parquetProperties.isByteStreamSplitEnabled(path)) { fallbackWriter = new ByteStreamSplitValuesWriter.FloatByteStreamSplitValuesWriter( parquetProperties.getInitialSlabSize(), parquetProperties.getPageSizeThreshold(), diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java new file mode 100644 index 0000000000..88fe68ed18 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpBitPackingTest.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.BytePackerForLong; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.junit.Test; + +/** + * Tests for bit-packing behavior in the ALP encoding pipeline. + */ +public class AlpBitPackingTest { + + @Test + public void testBytePackerIntRoundTrip() { + // Verify BytePacker pack/unpack round-trip for various bit widths + for (int bitWidth = 1; bitWidth <= 31; bitWidth++) { + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth); + int maxVal = (int) Math.min((1L << bitWidth) - 1, Integer.MAX_VALUE); + + int[] input = new int[8]; + for (int i = 0; i < 8; i++) { + input[i] = (maxVal / 8) * i; + } + + byte[] packed = new byte[bitWidth]; + packer.pack8Values(input, 0, packed, 0); + + int[] unpacked = new int[8]; + ByteBuffer buf = ByteBuffer.wrap(packed); + packer.unpack8Values(buf, 0, unpacked, 0); + + for (int i = 0; i < 8; i++) { + assertEquals("BitWidth=" + bitWidth + " index=" + i, input[i], unpacked[i]); + } + } + } + + @Test + public void testBytePackerForLongRoundTrip() { + // Verify BytePackerForLong pack/unpack round-trip for various bit widths + for (int bitWidth = 1; bitWidth <= 63; bitWidth++) { + BytePackerForLong packer = Packer.LITTLE_ENDIAN.newBytePackerForLong(bitWidth); + long maxVal = (bitWidth == 63) ? Long.MAX_VALUE : (1L << bitWidth) - 1; + + long[] input = new long[8]; + for (int i = 0; i < 8; i++) { + input[i] = (maxVal / 8) * i; + } + + byte[] packed = new byte[bitWidth]; + packer.pack8Values(input, 0, packed, 0); + + long[] unpacked = new long[8]; + ByteBuffer buf = ByteBuffer.wrap(packed); + packer.unpack8Values(buf, 0, unpacked, 0); + + for (int i = 0; i < 8; i++) { + assertEquals("BitWidth=" + bitWidth + " index=" + i, input[i], unpacked[i]); + } + } + } + + @Test + public void testSimpleTwoFloats() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(2, ByteBufferInputStream.wrap(input.toByteBuffer())); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(2.0f), Float.floatToRawIntBits(reader.readFloat())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testThreeFloatsWithNegative() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + writer.writeFloat(1.0f); + writer.writeFloat(-1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(3, ByteBufferInputStream.wrap(input.toByteBuffer())); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(-1.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(2.0f), Float.floatToRawIntBits(reader.readFloat())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testEncoderDecoderDirectly() { + float value = 1.0f; + int exponent = 2; + int factor = 0; + + assertFalse(AlpEncoderDecoder.isFloatException(value, exponent, factor)); + int encoded = AlpEncoderDecoder.encodeFloat(value, exponent, factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + assertEquals(Float.floatToRawIntBits(value), Float.floatToRawIntBits(decoded)); + } + + @Test + public void testHeaderFormatValidation() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + + // Parse and validate header + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + int version = buf.get() & 0xFF; + int compressionMode = buf.get() & 0xFF; + int integerEncoding = buf.get() & 0xFF; + int logVectorSize = buf.get() & 0xFF; + int numElements = buf.getInt(); + + assertEquals(AlpConstants.ALP_VERSION, version); + assertEquals(AlpConstants.ALP_COMPRESSION_MODE, compressionMode); + assertEquals(AlpConstants.ALP_INTEGER_ENCODING_FOR, integerEncoding); + assertEquals(AlpConstants.DEFAULT_VECTOR_SIZE_LOG, logVectorSize); + assertEquals(2, numElements); + + // Verify offset array follows header (1 vector = 1 offset entry) + int offset0 = buf.getInt(); + // First vector starts right after the offset array (1 vector * 4 bytes = 4) + assertEquals(Integer.BYTES, offset0); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testPartialGroupPacking() throws Exception { + // Test with fewer than 8 values to exercise partial group packing/unpacking + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testBitWidthZeroPacking() throws Exception { + // All identical values should result in bitWidth=0 (no packed data) + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + for (int i = 0; i < 10; i++) { + writer.writeFloat(5.0f); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(10, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < 10; i++) { + assertEquals(Float.floatToRawIntBits(5.0f), Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoublePackingRoundTrip() throws Exception { + // Verify double packing round-trip with varying values + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + double[] values = {1.0, -1.0, 2.0, 100.5, 0.0, 3.14, 42.0, 99.99, 0.001, 7.77}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(reader.readDouble())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testExactEightValues() throws Exception { + // Exactly 8 values = one full packing group, no partial group + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java new file mode 100644 index 0000000000..58267fdb93 --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpEncoderDecoderTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import org.junit.Test; + +/** + * Tests for the core ALP encoder/decoder logic. + */ +public class AlpEncoderDecoderTest { + + // ========== Float Encoding/Decoding Tests ========== + + @Test + public void testFloatRoundTrip() { + float[] testValues = {0.0f, 1.0f, -1.0f, 3.14159f, 100.5f, 0.001f, 1234567.0f}; + + for (float value : testValues) { + for (int exponent = 0; exponent <= AlpConstants.FLOAT_MAX_EXPONENT; exponent++) { + for (int factor = 0; factor <= exponent; factor++) { + if (!AlpEncoderDecoder.isFloatException(value, exponent, factor)) { + int encoded = AlpEncoderDecoder.encodeFloat(value, exponent, factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, exponent, factor); + assertEquals( + "Round-trip failed for value=" + value + ", exponent=" + exponent + ", factor=" + + factor, + Float.floatToRawIntBits(value), + Float.floatToRawIntBits(decoded)); + } + } + } + } + } + + @Test + public void testFloatExceptionDetection() { + assertTrue("NaN should be an exception", AlpEncoderDecoder.isFloatException(Float.NaN)); + assertTrue( + "Positive infinity should be an exception", + AlpEncoderDecoder.isFloatException(Float.POSITIVE_INFINITY)); + assertTrue( + "Negative infinity should be an exception", + AlpEncoderDecoder.isFloatException(Float.NEGATIVE_INFINITY)); + assertTrue("Negative zero should be an exception", AlpEncoderDecoder.isFloatException(-0.0f)); + + assertFalse("1.0f should not be a basic exception", AlpEncoderDecoder.isFloatException(1.0f)); + assertFalse("0.0f should not be a basic exception", AlpEncoderDecoder.isFloatException(0.0f)); + } + + @Test + public void testFloatEncoding() { + assertEquals(123, AlpEncoderDecoder.encodeFloat(1.23f, 2, 0)); + assertEquals(123, AlpEncoderDecoder.encodeFloat(12.3f, 2, 1)); + assertEquals(0, AlpEncoderDecoder.encodeFloat(0.0f, 5, 0)); + } + + @Test + public void testFloatDecoding() { + assertEquals(1.23f, AlpEncoderDecoder.decodeFloat(123, 2, 0), 1e-6f); + assertEquals(12.3f, AlpEncoderDecoder.decodeFloat(123, 2, 1), 1e-6f); + assertEquals(0.0f, AlpEncoderDecoder.decodeFloat(0, 5, 0), 0.0f); + } + + @Test + public void testFastRoundFloat() { + assertEquals(5, AlpEncoderDecoder.fastRoundFloat(5.4f)); + assertEquals(6, AlpEncoderDecoder.fastRoundFloat(5.5f)); + assertEquals(6, AlpEncoderDecoder.fastRoundFloat(5.6f)); + assertEquals(-5, AlpEncoderDecoder.fastRoundFloat(-5.4f)); + assertEquals(-6, AlpEncoderDecoder.fastRoundFloat(-5.5f)); + assertEquals(-6, AlpEncoderDecoder.fastRoundFloat(-5.6f)); + assertEquals(0, AlpEncoderDecoder.fastRoundFloat(0.0f)); + } + + @Test + public void testFloatMultiplier() { + assertEquals(1.0f, AlpEncoderDecoder.getFloatMultiplier(0, 0), 0.0f); + assertEquals(100.0f, AlpEncoderDecoder.getFloatMultiplier(2, 0), 0.0f); + assertEquals(10.0f, AlpEncoderDecoder.getFloatMultiplier(2, 1), 1e-6f); + assertEquals(1.0f, AlpEncoderDecoder.getFloatMultiplier(2, 2), 1e-6f); + } + + // ========== Double Encoding/Decoding Tests ========== + + @Test + public void testDoubleRoundTrip() { + double[] testValues = {0.0, 1.0, -1.0, 3.14159265358979, 100.5, 0.001, 12345678901234.0}; + + for (double value : testValues) { + for (int exponent = 0; exponent <= Math.min(AlpConstants.DOUBLE_MAX_EXPONENT, 10); exponent++) { + for (int factor = 0; factor <= exponent; factor++) { + if (!AlpEncoderDecoder.isDoubleException(value, exponent, factor)) { + long encoded = AlpEncoderDecoder.encodeDouble(value, exponent, factor); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, exponent, factor); + assertEquals( + "Round-trip failed for value=" + value + ", exponent=" + exponent + ", factor=" + + factor, + Double.doubleToRawLongBits(value), + Double.doubleToRawLongBits(decoded)); + } + } + } + } + } + + @Test + public void testDoubleExceptionDetection() { + assertTrue("NaN should be an exception", AlpEncoderDecoder.isDoubleException(Double.NaN)); + assertTrue( + "Positive infinity should be an exception", + AlpEncoderDecoder.isDoubleException(Double.POSITIVE_INFINITY)); + assertTrue( + "Negative infinity should be an exception", + AlpEncoderDecoder.isDoubleException(Double.NEGATIVE_INFINITY)); + assertTrue("Negative zero should be an exception", AlpEncoderDecoder.isDoubleException(-0.0)); + + assertFalse("1.0 should not be a basic exception", AlpEncoderDecoder.isDoubleException(1.0)); + assertFalse("0.0 should not be a basic exception", AlpEncoderDecoder.isDoubleException(0.0)); + } + + @Test + public void testDoubleEncoding() { + assertEquals(123L, AlpEncoderDecoder.encodeDouble(1.23, 2, 0)); + assertEquals(123L, AlpEncoderDecoder.encodeDouble(12.3, 2, 1)); + assertEquals(0L, AlpEncoderDecoder.encodeDouble(0.0, 5, 0)); + } + + @Test + public void testDoubleDecoding() { + assertEquals(1.23, AlpEncoderDecoder.decodeDouble(123, 2, 0), 1e-10); + assertEquals(12.3, AlpEncoderDecoder.decodeDouble(123, 2, 1), 1e-10); + assertEquals(0.0, AlpEncoderDecoder.decodeDouble(0, 5, 0), 0.0); + } + + @Test + public void testFastRoundDouble() { + assertEquals(5L, AlpEncoderDecoder.fastRoundDouble(5.4)); + assertEquals(6L, AlpEncoderDecoder.fastRoundDouble(5.5)); + assertEquals(6L, AlpEncoderDecoder.fastRoundDouble(5.6)); + assertEquals(-5L, AlpEncoderDecoder.fastRoundDouble(-5.4)); + assertEquals(-6L, AlpEncoderDecoder.fastRoundDouble(-5.5)); + assertEquals(-6L, AlpEncoderDecoder.fastRoundDouble(-5.6)); + assertEquals(0L, AlpEncoderDecoder.fastRoundDouble(0.0)); + } + + @Test + public void testDoubleMultiplier() { + assertEquals(1.0, AlpEncoderDecoder.getDoubleMultiplier(0, 0), 0.0); + assertEquals(100.0, AlpEncoderDecoder.getDoubleMultiplier(2, 0), 0.0); + assertEquals(10.0, AlpEncoderDecoder.getDoubleMultiplier(2, 1), 1e-10); + assertEquals(1.0, AlpEncoderDecoder.getDoubleMultiplier(2, 2), 1e-10); + } + + // ========== Bit Width Tests (renamed methods) ========== + + @Test + public void testBitWidthForInt() { + assertEquals(0, AlpEncoderDecoder.bitWidthForInt(0)); + assertEquals(1, AlpEncoderDecoder.bitWidthForInt(1)); + assertEquals(2, AlpEncoderDecoder.bitWidthForInt(2)); + assertEquals(2, AlpEncoderDecoder.bitWidthForInt(3)); + assertEquals(3, AlpEncoderDecoder.bitWidthForInt(4)); + assertEquals(8, AlpEncoderDecoder.bitWidthForInt(255)); + assertEquals(9, AlpEncoderDecoder.bitWidthForInt(256)); + assertEquals(16, AlpEncoderDecoder.bitWidthForInt(65535)); + assertEquals(31, AlpEncoderDecoder.bitWidthForInt(Integer.MAX_VALUE)); + } + + @Test + public void testBitWidthForLong() { + assertEquals(0, AlpEncoderDecoder.bitWidthForLong(0L)); + assertEquals(1, AlpEncoderDecoder.bitWidthForLong(1L)); + assertEquals(2, AlpEncoderDecoder.bitWidthForLong(2L)); + assertEquals(2, AlpEncoderDecoder.bitWidthForLong(3L)); + assertEquals(3, AlpEncoderDecoder.bitWidthForLong(4L)); + assertEquals(8, AlpEncoderDecoder.bitWidthForLong(255L)); + assertEquals(9, AlpEncoderDecoder.bitWidthForLong(256L)); + assertEquals(16, AlpEncoderDecoder.bitWidthForLong(65535L)); + assertEquals(31, AlpEncoderDecoder.bitWidthForLong((long) Integer.MAX_VALUE)); + assertEquals(63, AlpEncoderDecoder.bitWidthForLong(Long.MAX_VALUE)); + } + + // ========== Best Parameters Tests ========== + + @Test + public void testFindBestFloatParams() { + float[] values = {1.23f, 4.56f, 7.89f, 10.11f, 12.13f}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + + assertNotNull(params); + assertTrue(params.exponent >= 0 && params.exponent <= AlpConstants.FLOAT_MAX_EXPONENT); + assertTrue(params.factor >= 0 && params.factor <= params.exponent); + + for (float v : values) { + if (!AlpEncoderDecoder.isFloatException(v, params.exponent, params.factor)) { + int encoded = AlpEncoderDecoder.encodeFloat(v, params.exponent, params.factor); + float decoded = AlpEncoderDecoder.decodeFloat(encoded, params.exponent, params.factor); + assertEquals(Float.floatToRawIntBits(v), Float.floatToRawIntBits(decoded)); + } + } + } + + @Test + public void testFindBestFloatParamsWithAllExceptions() { + float[] values = {Float.NaN, Float.NaN, Float.NaN}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + + assertNotNull(params); + assertEquals(values.length, params.numExceptions); + } + + @Test + public void testFindBestDoubleParams() { + double[] values = {1.23, 4.56, 7.89, 10.11, 12.13}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, values.length); + + assertNotNull(params); + assertTrue(params.exponent >= 0 && params.exponent <= AlpConstants.DOUBLE_MAX_EXPONENT); + assertTrue(params.factor >= 0 && params.factor <= params.exponent); + + for (double v : values) { + if (!AlpEncoderDecoder.isDoubleException(v, params.exponent, params.factor)) { + long encoded = AlpEncoderDecoder.encodeDouble(v, params.exponent, params.factor); + double decoded = AlpEncoderDecoder.decodeDouble(encoded, params.exponent, params.factor); + assertEquals(Double.doubleToRawLongBits(v), Double.doubleToRawLongBits(decoded)); + } + } + } + + @Test + public void testFindBestDoubleParamsWithAllExceptions() { + double[] values = {Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestDoubleParams(values, 0, values.length); + + assertNotNull(params); + assertEquals(values.length, params.numExceptions); + } + + @Test + public void testFindBestParamsWithOffset() { + float[] values = {Float.NaN, Float.NaN, 1.23f, 4.56f, 7.89f, Float.NaN}; + AlpEncoderDecoder.EncodingParams params = AlpEncoderDecoder.findBestFloatParams(values, 2, 3); + + assertNotNull(params); + assertEquals(0, params.numExceptions); + } + + // ========== Preset-Based Parameter Search Tests ========== + + @Test + public void testFindBestFloatParamsWithPresets() { + float[] values = {1.23f, 4.56f, 7.89f, 10.11f, 12.13f}; + int[][] presets = {{2, 0}, {3, 0}, {4, 1}}; + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestFloatParamsWithPresets(values, 0, values.length, presets); + + assertNotNull(params); + // Should select one of the preset combinations + boolean foundMatch = false; + for (int[] preset : presets) { + if (params.exponent == preset[0] && params.factor == preset[1]) { + foundMatch = true; + break; + } + } + assertTrue("Result should be one of the preset combinations", foundMatch); + } + + @Test + public void testFindBestDoubleParamsWithPresets() { + double[] values = {1.23, 4.56, 7.89, 10.11, 12.13}; + int[][] presets = {{2, 0}, {3, 0}, {4, 1}}; + AlpEncoderDecoder.EncodingParams params = + AlpEncoderDecoder.findBestDoubleParamsWithPresets(values, 0, values.length, presets); + + assertNotNull(params); + boolean foundMatch = false; + for (int[] preset : presets) { + if (params.exponent == preset[0] && params.factor == preset[1]) { + foundMatch = true; + break; + } + } + assertTrue("Result should be one of the preset combinations", foundMatch); + } + + @Test + public void testPresetsProduceSameResultAsFullSearch() { + float[] values = {1.23f, 4.56f, 7.89f}; + AlpEncoderDecoder.EncodingParams fullResult = AlpEncoderDecoder.findBestFloatParams(values, 0, values.length); + + // Include the best params in presets + int[][] presets = {{fullResult.exponent, fullResult.factor}, {0, 0}, {1, 0}}; + AlpEncoderDecoder.EncodingParams presetResult = + AlpEncoderDecoder.findBestFloatParamsWithPresets(values, 0, values.length, presets); + + assertTrue( + "Preset result should be at least as good as full search", + presetResult.numExceptions <= fullResult.numExceptions); + } +} diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java new file mode 100644 index 0000000000..feaf54bdfb --- /dev/null +++ b/parquet-column/src/test/java/org/apache/parquet/column/values/alp/AlpValuesEndToEndTest.java @@ -0,0 +1,1625 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.column.values.alp; + +import static org.junit.Assert.*; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Random; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.bytes.DirectByteBufferAllocator; +import org.apache.parquet.column.values.bitpacking.BytePacker; +import org.apache.parquet.column.values.bitpacking.Packer; +import org.apache.parquet.io.ParquetDecodingException; +import org.junit.Test; + +/** + * End-to-end tests for ALP encoding and decoding pipeline. + */ +public class AlpValuesEndToEndTest { + + private static final int DEFAULT_VECTOR_SIZE = AlpConstants.DEFAULT_VECTOR_SIZE; + + // ========== Helper Methods ========== + + private void roundTripFloat(float[] values) throws Exception { + roundTripFloat(values, DEFAULT_VECTOR_SIZE); + } + + private void roundTripFloat(float[] values, int vectorSize) throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + int capacity = Math.max(256, values.length * 8); + writer = new AlpValuesWriter.FloatAlpValuesWriter( + capacity, capacity, new DirectByteBufferAllocator(), vectorSize); + + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + float expected = values[i]; + float actual = reader.readFloat(); + + if (Float.isNaN(expected)) { + assertTrue("Expected NaN at index " + i, Float.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + private void roundTripDouble(double[] values) throws Exception { + roundTripDouble(values, DEFAULT_VECTOR_SIZE); + } + + private void roundTripDouble(double[] values, int vectorSize) throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + int capacity = Math.max(512, values.length * 16); + writer = new AlpValuesWriter.DoubleAlpValuesWriter( + capacity, capacity, new DirectByteBufferAllocator(), vectorSize); + + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + double expected = values[i]; + double actual = reader.readDouble(); + + if (Double.isNaN(expected)) { + assertTrue("Expected NaN at index " + i, Double.isNaN(actual)); + } else { + assertEquals( + "Value mismatch at index " + i + " for " + expected, + Double.doubleToRawLongBits(expected), + Double.doubleToRawLongBits(actual)); + } + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Float Pipeline Tests ========== + + @Test + public void testFloatSingleValue() throws Exception { + roundTripFloat(new float[] {1.23f}); + } + + @Test + public void testFloatSmallBatch() throws Exception { + roundTripFloat(new float[] {0.0f, 1.0f, -1.0f, 3.14f, 100.5f, 0.001f, 1234567.0f}); + } + + @Test + public void testFloatRandomData() throws Exception { + Random rand = new Random(42); + float[] values = new float[1024]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextFloat() * 10000.0f - 5000.0f; + } + roundTripFloat(values); + } + + @Test + public void testFloatWithExceptions() throws Exception { + roundTripFloat(new float[] { + 1.0f, Float.NaN, 2.0f, Float.POSITIVE_INFINITY, 3.0f, Float.NEGATIVE_INFINITY, 4.0f, -0.0f, 5.0f + }); + } + + @Test + public void testFloatAllExceptions() throws Exception { + roundTripFloat(new float[] {Float.NaN, Float.POSITIVE_INFINITY, Float.NEGATIVE_INFINITY, -0.0f}); + } + + @Test + public void testFloatMultipleVectors() throws Exception { + Random rand = new Random(42); + float[] values = new float[3000]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextFloat() * 1000.0f; + } + roundTripFloat(values); + } + + @Test + public void testFloatZeroValues() throws Exception { + roundTripFloat(new float[] {0.0f, 0.0f, 0.0f, 0.0f}); + } + + @Test + public void testFloatIdenticalValues() throws Exception { + float[] values = new float[100]; + for (int i = 0; i < values.length; i++) { + values[i] = 3.14159f; + } + roundTripFloat(values); + } + + // ========== Float: Vector Size / Boundary Tests ========== + + @Test + public void testFloatExactOneVector() throws Exception { + float[] values = new float[DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01f; + } + roundTripFloat(values); + } + + @Test + public void testFloatExactTwoVectors() throws Exception { + float[] values = new float[DEFAULT_VECTOR_SIZE * 2]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01f; + } + roundTripFloat(values); + } + + @Test + public void testFloatExactThreeVectors() throws Exception { + float[] values = new float[DEFAULT_VECTOR_SIZE * 3]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01f; + } + roundTripFloat(values); + } + + @Test + public void testFloatPartialLastVectorPlusOne() throws Exception { + float[] values = new float[DEFAULT_VECTOR_SIZE + 1]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01f; + } + roundTripFloat(values); + } + + @Test + public void testFloatPartialLastVectorMinusOne() throws Exception { + float[] values = new float[DEFAULT_VECTOR_SIZE * 2 - 1]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01f; + } + roundTripFloat(values); + } + + @Test + public void testFloatCustomVectorSize512() throws Exception { + float[] values = new float[512 + 100]; // partial second vector + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.1f; + } + roundTripFloat(values, 512); + } + + @Test + public void testFloatCustomVectorSize2048() throws Exception { + float[] values = new float[2048 * 2 + 500]; // partial third vector + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.1f; + } + roundTripFloat(values, 2048); + } + + @Test + public void testFloatCustomVectorSizeSmall() throws Exception { + // Minimum vector size = 8 (2^3) + float[] values = new float[25]; // 3 full vectors + 1 partial + for (int i = 0; i < values.length; i++) { + values[i] = i * 1.5f; + } + roundTripFloat(values, 8); + } + + // ========== Float: Large Page ========== + + @Test + public void testFloatLargePage() throws Exception { + Random rand = new Random(42); + float[] values = new float[15000]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextFloat() * 100000.0f - 50000.0f; + } + roundTripFloat(values); + } + + // ========== Float: Monetary Data ========== + + @Test + public void testFloatMonetaryData() throws Exception { + float[] values = { + 19.99f, 29.95f, 9.99f, 49.99f, 99.95f, 14.50f, 7.99f, 24.99f, 39.95f, 59.99f, 119.99f, 4.99f, 0.99f, 1.50f, + 2.99f, 3.49f + }; + roundTripFloat(values); + } + + // ========== Float: All Identical (bit_width = 0) ========== + + @Test + public void testFloatAllIdenticalBitWidthZero() throws Exception { + float[] values = new float[2000]; + for (int i = 0; i < values.length; i++) { + values[i] = 42.0f; + } + roundTripFloat(values); + } + + // ========== Float: Mixed Exceptions at Various Positions ========== + + @Test + public void testFloatExceptionsAtBoundaries() throws Exception { + // Exceptions at vector start and end + float[] values = new float[20]; + values[0] = Float.NaN; + for (int i = 1; i < values.length - 1; i++) { + values[i] = i * 1.1f; + } + values[values.length - 1] = Float.POSITIVE_INFINITY; + roundTripFloat(values); + } + + // ========== Float: Skip Tests ========== + + @Test + public void testFloatSkip() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + float[] values = {1.0f, 2.0f, 3.0f, 4.0f, 5.0f, 6.0f, 7.0f, 8.0f, 9.0f, 10.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + reader.skip(3); + assertEquals(Float.floatToRawIntBits(5.0f), Float.floatToRawIntBits(reader.readFloat())); + reader.skip(2); + assertEquals(Float.floatToRawIntBits(8.0f), Float.floatToRawIntBits(reader.readFloat())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testFloatSkipAcrossVectorBoundaries() throws Exception { + int vectorSize = 8; // small vector for boundary testing + int totalValues = vectorSize * 3 + 4; // 3 full vectors + partial + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter( + totalValues * 8, totalValues * 8, new DirectByteBufferAllocator(), vectorSize); + + float[] values = new float[totalValues]; + for (int i = 0; i < totalValues; i++) { + values[i] = (i + 1) * 1.0f; + writer.writeFloat(values[i]); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(totalValues, ByteBufferInputStream.wrap(input.toByteBuffer())); + + // Read first value from vector 0 + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + + // Skip past vector 0 and vector 1, into vector 2 + reader.skip(vectorSize * 2 - 1); // skip to index vectorSize*2 + + // Read first value of vector 2 + float expected = (vectorSize * 2 + 1) * 1.0f; + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(reader.readFloat())); + + // Skip into the partial last vector + reader.skip(vectorSize - 1); // skip to index vectorSize*3 + + expected = (vectorSize * 3 + 1) * 1.0f; + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(reader.readFloat())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Float: Writer Reset ========== + + @Test + public void testFloatWriterReset() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + writer.reset(); + assertEquals(0, writer.getBufferedSize()); + + float[] values = {3.0f, 4.0f, 5.0f}; + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (float expected : values) { + assertEquals(Float.floatToRawIntBits(expected), Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Double Pipeline Tests ========== + + @Test + public void testDoubleSingleValue() throws Exception { + roundTripDouble(new double[] {1.23456789}); + } + + @Test + public void testDoubleSmallBatch() throws Exception { + roundTripDouble(new double[] {0.0, 1.0, -1.0, 3.14159265358979, 100.5, 0.001, 12345678901234.0}); + } + + @Test + public void testDoubleRandomData() throws Exception { + Random rand = new Random(42); + double[] values = new double[1024]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextDouble() * 1000000.0 - 500000.0; + } + roundTripDouble(values); + } + + @Test + public void testDoubleWithExceptions() throws Exception { + roundTripDouble(new double[] { + 1.0, Double.NaN, 2.0, Double.POSITIVE_INFINITY, 3.0, Double.NEGATIVE_INFINITY, 4.0, -0.0, 5.0 + }); + } + + @Test + public void testDoubleAllExceptions() throws Exception { + roundTripDouble(new double[] {Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, -0.0}); + } + + @Test + public void testDoubleMultipleVectors() throws Exception { + Random rand = new Random(42); + double[] values = new double[3000]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextDouble() * 100000.0; + } + roundTripDouble(values); + } + + @Test + public void testDoubleZeroValues() throws Exception { + roundTripDouble(new double[] {0.0, 0.0, 0.0, 0.0}); + } + + @Test + public void testDoubleIdenticalValues() throws Exception { + double[] values = new double[100]; + for (int i = 0; i < values.length; i++) { + values[i] = 3.14159265358979; + } + roundTripDouble(values); + } + + // ========== Double: Vector Size / Boundary Tests ========== + + @Test + public void testDoubleExactOneVector() throws Exception { + double[] values = new double[DEFAULT_VECTOR_SIZE]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01; + } + roundTripDouble(values); + } + + @Test + public void testDoubleExactTwoVectors() throws Exception { + double[] values = new double[DEFAULT_VECTOR_SIZE * 2]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01; + } + roundTripDouble(values); + } + + @Test + public void testDoublePartialLastVectorPlusOne() throws Exception { + double[] values = new double[DEFAULT_VECTOR_SIZE + 1]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01; + } + roundTripDouble(values); + } + + @Test + public void testDoublePartialLastVectorMinusOne() throws Exception { + double[] values = new double[DEFAULT_VECTOR_SIZE * 2 - 1]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.01; + } + roundTripDouble(values); + } + + @Test + public void testDoubleCustomVectorSize512() throws Exception { + double[] values = new double[512 + 100]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.1; + } + roundTripDouble(values, 512); + } + + @Test + public void testDoubleCustomVectorSize2048() throws Exception { + double[] values = new double[2048 * 2 + 500]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 0.1; + } + roundTripDouble(values, 2048); + } + + @Test + public void testDoubleCustomVectorSizeSmall() throws Exception { + double[] values = new double[25]; + for (int i = 0; i < values.length; i++) { + values[i] = i * 1.5; + } + roundTripDouble(values, 8); + } + + // ========== Double: Large Page ========== + + @Test + public void testDoubleLargePage() throws Exception { + Random rand = new Random(42); + double[] values = new double[15000]; + for (int i = 0; i < values.length; i++) { + values[i] = rand.nextDouble() * 100000.0 - 50000.0; + } + roundTripDouble(values); + } + + // ========== Double: Monetary Data ========== + + @Test + public void testDoubleMonetaryData() throws Exception { + double[] values = { + 19.99, 29.95, 9.99, 49.99, 99.95, 14.50, 7.99, 24.99, 39.95, 59.99, 119.99, 4.99, 0.99, 1.50, 2.99, 3.49 + }; + roundTripDouble(values); + } + + // ========== Double: All Identical (bit_width = 0) ========== + + @Test + public void testDoubleAllIdenticalBitWidthZero() throws Exception { + double[] values = new double[2000]; + for (int i = 0; i < values.length; i++) { + values[i] = 42.0; + } + roundTripDouble(values); + } + + // ========== Double: Mixed Exceptions at Boundaries ========== + + @Test + public void testDoubleExceptionsAtBoundaries() throws Exception { + double[] values = new double[20]; + values[0] = Double.NaN; + for (int i = 1; i < values.length - 1; i++) { + values[i] = i * 1.1; + } + values[values.length - 1] = Double.POSITIVE_INFINITY; + roundTripDouble(values); + } + + // ========== Double: Skip Tests ========== + + @Test + public void testDoubleSkip() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + double[] values = {1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + assertEquals(Double.doubleToRawLongBits(1.0), Double.doubleToRawLongBits(reader.readDouble())); + reader.skip(3); + assertEquals(Double.doubleToRawLongBits(5.0), Double.doubleToRawLongBits(reader.readDouble())); + reader.skip(2); + assertEquals(Double.doubleToRawLongBits(8.0), Double.doubleToRawLongBits(reader.readDouble())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test + public void testDoubleSkipAcrossVectorBoundaries() throws Exception { + int vectorSize = 8; + int totalValues = vectorSize * 3 + 4; + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter( + totalValues * 16, totalValues * 16, new DirectByteBufferAllocator(), vectorSize); + + double[] values = new double[totalValues]; + for (int i = 0; i < totalValues; i++) { + values[i] = (i + 1) * 1.0; + writer.writeDouble(values[i]); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(totalValues, ByteBufferInputStream.wrap(input.toByteBuffer())); + + assertEquals(Double.doubleToRawLongBits(1.0), Double.doubleToRawLongBits(reader.readDouble())); + reader.skip(vectorSize * 2 - 1); + + double expected = (vectorSize * 2 + 1) * 1.0; + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(reader.readDouble())); + + reader.skip(vectorSize - 1); + expected = (vectorSize * 3 + 1) * 1.0; + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(reader.readDouble())); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Double: Writer Reset ========== + + @Test + public void testDoubleWriterReset() throws Exception { + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + + writer.writeDouble(1.0); + writer.writeDouble(2.0); + writer.reset(); + assertEquals(0, writer.getBufferedSize()); + + double[] values = {3.0, 4.0, 5.0}; + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (double expected : values) { + assertEquals(Double.doubleToRawLongBits(expected), Double.doubleToRawLongBits(reader.readDouble())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Packed Size Verification ========== + + @Test + public void testPackedSizeSpecFormula() throws Exception { + // Verify that the packed data size exactly matches ceil(n * bitWidth / 8) + // by checking total encoded bytes for partial vectors with various remainders. + // Vector size 8 means: 1 full group of 8 = no partial group, + // but with count not divisible by 8 we exercise partial groups. + for (int count : new int[] {1, 2, 3, 5, 7, 9, 11, 13, 15}) { + float[] values = new float[count]; + for (int i = 0; i < count; i++) { + values[i] = (i + 1) * 1.0f; + } + // Use smallest vector size so the partial vector IS the only vector + roundTripFloat(values, 1024); + } + } + + @Test + public void testPartialVectorAllRemaindersMod8() throws Exception { + // Test every possible remainder (1..7) to exercise the partial group path. + // With vectorSize=16: 16 is 2 full groups of 8, no partial group. + // So use vectorSize=16 and write 16+R values where R in {1..7} + // to get a partial last vector of size R. + for (int remainder = 1; remainder <= 7; remainder++) { + int count = 16 + remainder; + float[] values = new float[count]; + for (int i = 0; i < count; i++) { + values[i] = (i + 1) * 0.5f; + } + roundTripFloat(values, 16); + } + // Also test for doubles + for (int remainder = 1; remainder <= 7; remainder++) { + int count = 16 + remainder; + double[] values = new double[count]; + for (int i = 0; i < count; i++) { + values[i] = (i + 1) * 0.5; + } + roundTripDouble(values, 16); + } + } + + // ========== Negative Encoded Values / Wide FOR Range ========== + + @Test + public void testFloatNegativeValues() throws Exception { + // Values that produce negative encoded integers, testing FOR with negative min + float[] values = new float[100]; + for (int i = 0; i < values.length; i++) { + values[i] = -50.0f + i * 1.0f; // range [-50, 49] + } + roundTripFloat(values); + } + + @Test + public void testDoubleNegativeValues() throws Exception { + double[] values = new double[100]; + for (int i = 0; i < values.length; i++) { + values[i] = -50.0 + i * 1.0; + } + roundTripDouble(values); + } + + @Test + public void testFloatWideRange() throws Exception { + // Mix of small and large positive values - exercises higher bit widths + float[] values = {0.01f, 0.02f, 1000.0f, 2000.0f, 0.05f, 50000.0f, 0.99f, 99999.0f}; + roundTripFloat(values); + } + + @Test + public void testDoubleWideRange() throws Exception { + double[] values = {0.01, 0.02, 100000.0, 200000.0, 0.05, 5000000.0, 0.99, 99999999.0}; + roundTripDouble(values); + } + + // ========== Binary Format Verification ========== + + @Test + public void testBinaryFormatExactBytes() throws Exception { + // Write a known small dataset with vectorSize=8, then verify the + // exact binary layout byte-by-byte + int vectorSize = 8; + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator(), vectorSize); + + // Write 10 identical values: 1.0f + // With e=0, f=0: encoded=1, FOR min=1, all deltas=0, bitWidth=0 + for (int i = 0; i < 10; i++) { + writer.writeFloat(1.0f); + } + + BytesInput input = writer.getBytes(); + byte[] bytes = input.toByteArray(); + java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(bytes).order(java.nio.ByteOrder.LITTLE_ENDIAN); + + // Header (8 bytes) + assertEquals("version", 1, buf.get() & 0xFF); + assertEquals("compression_mode", 0, buf.get() & 0xFF); + assertEquals("integer_encoding", 0, buf.get() & 0xFF); + assertEquals("log_vector_size", 3, buf.get() & 0xFF); // log2(8)=3 + assertEquals("num_elements", 10, buf.getInt()); + + // Offset array (2 vectors * 4 bytes = 8 bytes) + int offset0 = buf.getInt(); // first vector offset + int offset1 = buf.getInt(); // second vector offset + assertEquals("offset0 = past offset array", 8, offset0); + + // Vector 0: 8 identical values, bitWidth=0, no exceptions + // AlpInfo (4 bytes) + int v0Exp = buf.get() & 0xFF; + int v0Fac = buf.get() & 0xFF; + int v0Exc = buf.getShort() & 0xFFFF; + assertEquals("v0 exceptions", 0, v0Exc); + + // ForInfo (5 bytes) + int v0For = buf.getInt(); + int v0Bw = buf.get() & 0xFF; + assertEquals("v0 bitWidth", 0, v0Bw); + // No packed bytes (bitWidth=0) + // No exceptions + + // Vector 0 should be exactly 4 + 5 = 9 bytes + assertEquals("offset1 should be offset0 + 9", offset0 + 9, offset1); + + // Vector 1: 2 identical values, bitWidth=0, no exceptions + int v1Exp = buf.get() & 0xFF; + int v1Fac = buf.get() & 0xFF; + assertEquals("same exponent both vectors", v0Exp, v1Exp); + assertEquals("same factor both vectors", v0Fac, v1Fac); + int v1Exc = buf.getShort() & 0xFFFF; + assertEquals("v1 exceptions", 0, v1Exc); + int v1For = buf.getInt(); + int v1Bw = buf.get() & 0xFF; + assertEquals("v1 bitWidth", 0, v1Bw); + + // Should have consumed all bytes + assertEquals("all bytes consumed", bytes.length, buf.position()); + + // Verify round-trip + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(10, ByteBufferInputStream.wrap(input.toByteBuffer())); + for (int i = 0; i < 10; i++) { + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Exception-Heavy Vectors ========== + + @Test + public void testFloatManyExceptionsInVector() throws Exception { + // Vector with more exceptions than normal values + float[] values = new float[10]; + values[0] = 1.0f; // normal + for (int i = 1; i < 10; i++) { + values[i] = Float.NaN; // all exceptions except first + } + roundTripFloat(values, 16); + } + + @Test + public void testDoubleManyExceptionsInVector() throws Exception { + double[] values = new double[10]; + values[0] = 1.0; + for (int i = 1; i < 10; i++) { + values[i] = Double.NaN; + } + roundTripDouble(values, 16); + } + + @Test + public void testFloatExceptionsAcrossMultipleVectors() throws Exception { + // Exceptions in different vectors + int vectorSize = 8; + float[] values = new float[vectorSize * 3]; + for (int i = 0; i < values.length; i++) { + values[i] = (i + 1) * 0.5f; + } + // Place exceptions at specific positions in different vectors + values[0] = Float.NaN; // vector 0, position 0 + values[vectorSize - 1] = Float.POSITIVE_INFINITY; // vector 0, last position + values[vectorSize] = -0.0f; // vector 1, position 0 + values[vectorSize * 2 + 3] = Float.NaN; // vector 2, position 3 + roundTripFloat(values, vectorSize); + } + + // ========== Read After Skip to End ========== + + @Test(expected = ParquetDecodingException.class) + public void testFloatReadPastEnd() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + writer.writeFloat(1.0f); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(1, ByteBufferInputStream.wrap(input.toByteBuffer())); + + reader.readFloat(); // read the one value + reader.readFloat(); // should throw + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + @Test(expected = ParquetDecodingException.class) + public void testFloatSkipPastEnd() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(2, ByteBufferInputStream.wrap(input.toByteBuffer())); + + reader.skip(3); // should throw - only 2 values + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Hand-Crafted Binary → Reader (independent of writer) ========== + + /** + * Construct ALP-encoded bytes manually per the spec and verify the reader + * decodes them correctly. This catches symmetric bugs where writer and reader + * both implement the same wrong thing. + * + *

Test vector: 3 float values {100.0f, 200.0f, 300.0f} with vectorSize=8. + * With exponent=0, factor=0: encoded = {100, 200, 300}. + * FOR min = 100, deltas = {0, 100, 200}, maxDelta = 200, bitWidth = 8. + * Packed: 3 values in 1 partial group of 8, padded with zeros. + * Packed size = ceil(3 * 8 / 8) = 3 bytes. + */ + @Test + public void testReaderWithHandCraftedBytes() throws Exception { + // Manually build ALP page for {100.0f, 200.0f, 300.0f}, vectorSize=8 + int vectorSize = 8; + int logVectorSize = 3; + int numElements = 3; + int numVectors = 1; + + // Encoding: e=0, f=0 → multiplier=1.0 + // encoded = {100, 200, 300} + // FOR min = 100 + // deltas = {0, 100, 200} + // maxDelta = 200, bitWidth = 8 + int exponent = 0; + int factor = 0; + int numExceptions = 0; + int frameOfReference = 100; + int bitWidth = 8; + + // Bit-pack deltas using the same BytePacker the reader will use + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(8); + int[] deltas = {0, 100, 200, 0, 0, 0, 0, 0}; // padded to 8 + byte[] packed = new byte[8]; // bitWidth bytes for one group of 8 + packer.pack8Values(deltas, 0, packed, 0); + int packedSize = (3 * 8 + 7) / 8; // = 3 bytes for 3 values at 8 bits + + // Build the page + int vectorDataSize = 4 + 5 + packedSize; // AlpInfo + ForInfo + packed + int offsetArraySize = numVectors * 4; + + ByteBuffer page = + ByteBuffer.allocate(8 + offsetArraySize + vectorDataSize).order(ByteOrder.LITTLE_ENDIAN); + + // Header (8 bytes) + page.put((byte) 1); // version + page.put((byte) 0); // compression_mode + page.put((byte) 0); // integer_encoding + page.put((byte) logVectorSize); + page.putInt(numElements); + + // Offset array (1 vector) + page.putInt(offsetArraySize); // offset₀ = past offset array + + // Vector 0: AlpInfo (4 bytes) + page.put((byte) exponent); + page.put((byte) factor); + page.putShort((short) numExceptions); + + // Vector 0: ForInfo (5 bytes) + page.putInt(frameOfReference); + page.put((byte) bitWidth); + + // Vector 0: Packed data (3 bytes) + page.put(packed, 0, packedSize); + + page.flip(); + + // Now read it back + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(page)); + + assertEquals(Float.floatToRawIntBits(100.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(200.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(300.0f), Float.floatToRawIntBits(reader.readFloat())); + } + + /** + * Hand-craft bytes with exceptions to verify reader handles them independently. + * Values: {1.0f, NaN, 3.0f} with vectorSize=8. + * e=0, f=0: encoded={1, placeholder=1, 3}, 1 exception at position 1. + * FOR min=1, deltas={0, 0, 2}, maxDelta=2, bitWidth=2. + */ + @Test + public void testReaderWithHandCraftedExceptions() throws Exception { + int vectorSize = 8; + int logVectorSize = 3; + int numElements = 3; + + int exponent = 0; + int factor = 0; + int numExceptions = 1; + int placeholder = 1; // first non-exception encoded value + int frameOfReference = 1; // min of {1, 1, 3} + int bitWidth = 2; // ceil(log2(2+1)) = 2 + + // deltas = {1-1, 1-1, 3-1} = {0, 0, 2}, padded to 8: {0,0,2,0,0,0,0,0} + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(2); + int[] deltas = {0, 0, 2, 0, 0, 0, 0, 0}; + byte[] packed = new byte[2]; // bitWidth=2 bytes for a group of 8 + packer.pack8Values(deltas, 0, packed, 0); + int packedSize = (3 * 2 + 7) / 8; // = 1 byte + + // Vector data: AlpInfo(4) + ForInfo(5) + packed(1) + excPos(2) + excVal(4) = 16 + int vectorDataSize = 4 + 5 + packedSize + 2 + 4; + int offsetArraySize = 1 * 4; + + ByteBuffer page = + ByteBuffer.allocate(8 + offsetArraySize + vectorDataSize).order(ByteOrder.LITTLE_ENDIAN); + + // Header + page.put((byte) 1); + page.put((byte) 0); + page.put((byte) 0); + page.put((byte) logVectorSize); + page.putInt(numElements); + + // Offset array + page.putInt(offsetArraySize); + + // AlpInfo + page.put((byte) exponent); + page.put((byte) factor); + page.putShort((short) numExceptions); + + // ForInfo + page.putInt(frameOfReference); + page.put((byte) bitWidth); + + // Packed data + page.put(packed, 0, packedSize); + + // Exception positions (uint16 LE) + page.putShort((short) 1); // position 1 + + // Exception values (float32 LE) + page.putFloat(Float.NaN); + + page.flip(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(page)); + + assertEquals(Float.floatToRawIntBits(1.0f), Float.floatToRawIntBits(reader.readFloat())); + float val1 = reader.readFloat(); + assertTrue("Expected NaN at position 1", Float.isNaN(val1)); + assertEquals(Float.floatToRawIntBits(3.0f), Float.floatToRawIntBits(reader.readFloat())); + } + + /** + * Hand-craft double ALP bytes and verify reader independence. + * Values: {10.0, 20.0, 30.0}, vectorSize=8, e=0, f=0. + * encoded={10,20,30}, FOR min=10, deltas={0,10,20}, maxDelta=20, bitWidth=5. + */ + @Test + public void testDoubleReaderWithHandCraftedBytes() throws Exception { + int numElements = 3; + int exponent = 0; + int factor = 0; + long frameOfReference = 10; + int bitWidth = 5; // ceil(log2(20+1)) = 5 + + org.apache.parquet.column.values.bitpacking.BytePackerForLong packer = + Packer.LITTLE_ENDIAN.newBytePackerForLong(5); + long[] deltas = {0, 10, 20, 0, 0, 0, 0, 0}; + byte[] packed = new byte[5]; // bitWidth bytes for group of 8 + packer.pack8Values(deltas, 0, packed, 0); + int packedSize = (3 * 5 + 7) / 8; // = 2 bytes + + int vectorDataSize = 4 + 9 + packedSize; // AlpInfo + DoubleForInfo + packed + int offsetArraySize = 4; + + ByteBuffer page = + ByteBuffer.allocate(8 + offsetArraySize + vectorDataSize).order(ByteOrder.LITTLE_ENDIAN); + + // Header + page.put((byte) 1); + page.put((byte) 0); + page.put((byte) 0); + page.put((byte) 3); // log2(8) + page.putInt(numElements); + + // Offset array + page.putInt(offsetArraySize); + + // AlpInfo + page.put((byte) exponent); + page.put((byte) factor); + page.putShort((short) 0); // no exceptions + + // ForInfo (9 bytes for double) + page.putLong(frameOfReference); + page.put((byte) bitWidth); + + // Packed data + page.put(packed, 0, packedSize); + + page.flip(); + + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(page)); + + assertEquals(Double.doubleToRawLongBits(10.0), Double.doubleToRawLongBits(reader.readDouble())); + assertEquals(Double.doubleToRawLongBits(20.0), Double.doubleToRawLongBits(reader.readDouble())); + assertEquals(Double.doubleToRawLongBits(30.0), Double.doubleToRawLongBits(reader.readDouble())); + } + + // ========== Writer Output → Hand-Verified Bytes ========== + + /** + * Write known values, then verify the exact binary output against hand computation. + * This verifies the writer independently of the reader. + * + *

Input: {1.0f, 2.0f, 3.0f} with vectorSize=8. + * Expected: e=0, f=0, encoded={1,2,3}, FOR min=1, deltas={0,1,2}, + * maxDelta=2, bitWidth=2. Packed size = ceil(3*2/8) = 1 byte. + */ + @Test + public void testWriterOutputExactBytes() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator(), 8); + writer.writeFloat(1.0f); + writer.writeFloat(2.0f); + writer.writeFloat(3.0f); + + byte[] bytes = writer.getBytes().toByteArray(); + ByteBuffer buf = ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN); + + // Header (8 bytes) + assertEquals(1, buf.get() & 0xFF); // version + assertEquals(0, buf.get() & 0xFF); // compression_mode + assertEquals(0, buf.get() & 0xFF); // integer_encoding + assertEquals(3, buf.get() & 0xFF); // log2(8) = 3 + assertEquals(3, buf.getInt()); // num_elements + + // Offset array (1 vector) + int offset0 = buf.getInt(); + assertEquals(4, offset0); // 1 * 4 = past offset array + + // AlpInfo + int exp = buf.get() & 0xFF; + int fac = buf.get() & 0xFF; + int numExc = buf.getShort() & 0xFFFF; + assertEquals(0, numExc); + + // ForInfo + int forRef = buf.getInt(); + int bw = buf.get() & 0xFF; + + // Verify: for e=0, f=0: multiplier=1.0 + // encoded = {1, 2, 3}, min = 1, deltas = {0, 1, 2}, bitWidth = 2 + assertEquals(1, forRef); // frame of reference = 1 + assertEquals(2, bw); // bitWidth for max delta 2 + + // Packed data: ceil(3 * 2 / 8) = 1 byte + // deltas {0, 1, 2} at bitWidth=2: + // Use BytePacker to compute expected packed byte + BytePacker packer = Packer.LITTLE_ENDIAN.newBytePacker(2); + int[] expectedDeltas = {0, 1, 2, 0, 0, 0, 0, 0}; + byte[] expectedPacked = new byte[2]; // bitWidth=2 + packer.pack8Values(expectedDeltas, 0, expectedPacked, 0); + + byte actualPackedByte = buf.get(); + assertEquals("packed byte", expectedPacked[0], actualPackedByte); + + // Should have consumed all bytes + assertEquals("all bytes consumed", bytes.length, buf.position()); + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== NaN Bit Pattern Preservation ========== + + /** + * Verify that different NaN bit patterns survive the round-trip exactly. + * Java allows multiple NaN representations. Our exception handling must + * preserve the exact bit pattern, not normalize to Float.NaN. + */ + @Test + public void testNaNBitPatternPreservation() throws Exception { + // Standard NaN + float standardNaN = Float.NaN; // 0x7FC00000 + // Signaling NaN (different bit pattern) + float signalingNaN = Float.intBitsToFloat(0x7F800001); + // Negative NaN + float negativeNaN = Float.intBitsToFloat(0xFFC00000); + // Another NaN variant + float customNaN = Float.intBitsToFloat(0x7FFFFFFF); + + assertTrue(Float.isNaN(standardNaN)); + assertTrue(Float.isNaN(signalingNaN)); + assertTrue(Float.isNaN(negativeNaN)); + assertTrue(Float.isNaN(customNaN)); + + float[] values = {1.0f, standardNaN, 2.0f, signalingNaN, 3.0f, negativeNaN, 4.0f, customNaN}; + + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + float actual = reader.readFloat(); + assertEquals( + "Bit pattern mismatch at index " + i + " (0x" + + Integer.toHexString(Float.floatToRawIntBits(values[i])) + " vs 0x" + + Integer.toHexString(Float.floatToRawIntBits(actual)) + ")", + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + /** + * Same test for double NaN bit patterns. + */ + @Test + public void testDoubleNaNBitPatternPreservation() throws Exception { + double standardNaN = Double.NaN; + double signalingNaN = Double.longBitsToDouble(0x7FF0000000000001L); + double negativeNaN = Double.longBitsToDouble(0xFFF8000000000000L); + double customNaN = Double.longBitsToDouble(0x7FFFFFFFFFFFFFFFL); + + double[] values = {1.0, standardNaN, 2.0, signalingNaN, 3.0, negativeNaN, 4.0, customNaN}; + + AlpValuesWriter.DoubleAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.DoubleAlpValuesWriter(512, 512, new DirectByteBufferAllocator()); + for (double v : values) { + writer.writeDouble(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForDouble reader = new AlpValuesReaderForDouble(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + double actual = reader.readDouble(); + assertEquals( + "Bit pattern mismatch at index " + i + " (0x" + + Long.toHexString(Double.doubleToRawLongBits(values[i])) + " vs 0x" + + Long.toHexString(Double.doubleToRawLongBits(actual)) + ")", + Double.doubleToRawLongBits(values[i]), + Double.doubleToRawLongBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Negative Zero Bit-Exact Roundtrip ========== + + /** + * Verify that -0.0f roundtrips as -0.0f (bit pattern 0x80000000), + * not as +0.0f (bit pattern 0x00000000). + */ + @Test + public void testNegativeZeroBitExact() throws Exception { + float negZero = -0.0f; + float posZero = 0.0f; + + // Sanity: they are == but have different bit patterns + assertTrue(negZero == posZero); + assertNotEquals(Float.floatToRawIntBits(negZero), Float.floatToRawIntBits(posZero)); + + float[] values = {posZero, negZero, 1.0f, negZero}; + + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator()); + for (float v : values) { + writer.writeFloat(v); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(values.length, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < values.length; i++) { + float actual = reader.readFloat(); + assertEquals( + "Bit pattern at index " + i, + Float.floatToRawIntBits(values[i]), + Float.floatToRawIntBits(actual)); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Extreme Float Values ========== + + /** + * Test subnormal floats, Float.MIN_VALUE, Float.MAX_VALUE, Float.MIN_NORMAL. + * These should all either encode losslessly or be stored as exceptions. + */ + @Test + public void testExtremeFloatValues() throws Exception { + float[] values = { + Float.MIN_VALUE, // smallest positive subnormal: 1.4e-45 + Float.MIN_NORMAL, // smallest positive normal: 1.17549435e-38 + Float.MAX_VALUE, // 3.4028235e38 + -Float.MAX_VALUE, // most negative + 1.17549435e-38f, // near MIN_NORMAL + 3.4028234e38f, // near MAX_VALUE + 1.0e-10f, // very small positive + 1.0e10f, // large positive + }; + roundTripFloat(values); + } + + /** + * Test extreme double values. + */ + @Test + public void testExtremeDoubleValues() throws Exception { + double[] values = { + Double.MIN_VALUE, // smallest positive subnormal + Double.MIN_NORMAL, // smallest positive normal + Double.MAX_VALUE, // largest positive + -Double.MAX_VALUE, // most negative + 1.0e-100, // very small + 1.0e100, // very large + }; + roundTripDouble(values); + } + + // ========== Preset Caching Under Distribution Change ========== + + /** + * Write >8 vectors where the optimal (e,f) changes after the sampling phase. + * First 8 vectors: monetary data (e=2, f=0 optimal). + * Remaining vectors: large integers (e=0, f=0 optimal). + * Verify all values survive round-trip despite the distribution change. + */ + @Test + public void testPresetCachingWithDistributionChange() throws Exception { + int vectorSize = 8; // small vectors to get to 8 quickly + int samplingVectors = 8; + int postSamplingVectors = 4; + int totalValues = vectorSize * (samplingVectors + postSamplingVectors); + + float[] values = new float[totalValues]; + + // First 8 vectors: decimal monetary values (best with e=2, f=0) + for (int i = 0; i < vectorSize * samplingVectors; i++) { + values[i] = 10.00f + (i % 100) * 0.01f; // 10.00, 10.01, 10.02, ... + } + + // Last 4 vectors: whole numbers (best with e=0, f=0) + for (int i = vectorSize * samplingVectors; i < totalValues; i++) { + values[i] = (float) (i * 1000); + } + + roundTripFloat(values, vectorSize); + } + + /** + * Same test for doubles. + */ + @Test + public void testDoublePresetCachingWithDistributionChange() throws Exception { + int vectorSize = 8; + int totalValues = vectorSize * 12; // 8 sampling + 4 post + + double[] values = new double[totalValues]; + + for (int i = 0; i < vectorSize * 8; i++) { + values[i] = 10.00 + (i % 100) * 0.01; + } + for (int i = vectorSize * 8; i < totalValues; i++) { + values[i] = (double) (i * 1000); + } + + roundTripDouble(values, vectorSize); + } + + // ========== Multi-Vector Hand-Crafted Binary ========== + + /** + * Hand-craft a 2-vector page to verify offset array navigation works. + * 10 values with vectorSize=8: vector 0 has values 1-8, vector 1 has 9-10. + */ + @Test + public void testMultiVectorHandCrafted() throws Exception { + int vectorSize = 8; + int numElements = 10; + int numVectors = 2; + + // Vector 0: {1,2,3,4,5,6,7,8} → e=0,f=0, encoded={1..8}, FOR min=1, deltas={0..7} + // maxDelta=7, bitWidth=3. Packed: 8 values at 3 bits = 3 bytes exactly. + int v0BitWidth = 3; + BytePacker packer3 = Packer.LITTLE_ENDIAN.newBytePacker(3); + int[] v0Deltas = {0, 1, 2, 3, 4, 5, 6, 7}; + byte[] v0Packed = new byte[3]; + packer3.pack8Values(v0Deltas, 0, v0Packed, 0); + int v0PackedSize = 3; // (8*3+7)/8 = 3 + + // Vector 1: {9, 10} → e=0,f=0, encoded={9,10}, FOR min=9, deltas={0,1} + // maxDelta=1, bitWidth=1. Packed: ceil(2*1/8) = 1 byte. + int v1BitWidth = 1; + BytePacker packer1 = Packer.LITTLE_ENDIAN.newBytePacker(1); + int[] v1Deltas = {0, 1, 0, 0, 0, 0, 0, 0}; + byte[] v1Packed = new byte[1]; + packer1.pack8Values(v1Deltas, 0, v1Packed, 0); + int v1PackedSize = 1; // (2*1+7)/8 = 1 + + int v0DataSize = 4 + 5 + v0PackedSize; // 12 + int v1DataSize = 4 + 5 + v1PackedSize; // 10 + int offsetArraySize = numVectors * 4; // 8 + + ByteBuffer page = + ByteBuffer.allocate(8 + offsetArraySize + v0DataSize + v1DataSize) + .order(ByteOrder.LITTLE_ENDIAN); + + // Header + page.put((byte) 1); + page.put((byte) 0); + page.put((byte) 0); + page.put((byte) 3); // log2(8) + page.putInt(numElements); + + // Offset array + page.putInt(offsetArraySize); // vector 0 starts at 8 + page.putInt(offsetArraySize + v0DataSize); // vector 1 starts at 8+12=20 + + // Vector 0 + page.put((byte) 0); // exponent + page.put((byte) 0); // factor + page.putShort((short) 0); // exceptions + page.putInt(1); // FOR = 1 + page.put((byte) v0BitWidth); + page.put(v0Packed, 0, v0PackedSize); + + // Vector 1 + page.put((byte) 0); // exponent + page.put((byte) 0); // factor + page.putShort((short) 0); // exceptions + page.putInt(9); // FOR = 9 + page.put((byte) v1BitWidth); + page.put(v1Packed, 0, v1PackedSize); + + page.flip(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(page)); + + for (int i = 0; i < numElements; i++) { + float expected = (float) (i + 1); + float actual = reader.readFloat(); + assertEquals( + "Value mismatch at index " + i, + Float.floatToRawIntBits(expected), + Float.floatToRawIntBits(actual)); + } + } + + // ========== Skip Over Entire Vector Without Decoding ========== + + /** + * Verify that skipping an entire vector and reading the next one works. + * Uses hand-crafted bytes to ensure independence from writer. + */ + @Test + public void testSkipEntireVectorHandCrafted() throws Exception { + // Reuse the multi-vector setup: {1..8} in vector 0, {9, 10} in vector 1 + int vectorSize = 8; + int numElements = 10; + int numVectors = 2; + + BytePacker packer3 = Packer.LITTLE_ENDIAN.newBytePacker(3); + int[] v0Deltas = {0, 1, 2, 3, 4, 5, 6, 7}; + byte[] v0Packed = new byte[3]; + packer3.pack8Values(v0Deltas, 0, v0Packed, 0); + + BytePacker packer1 = Packer.LITTLE_ENDIAN.newBytePacker(1); + int[] v1Deltas = {0, 1, 0, 0, 0, 0, 0, 0}; + byte[] v1Packed = new byte[1]; + packer1.pack8Values(v1Deltas, 0, v1Packed, 0); + + int v0DataSize = 4 + 5 + 3; + int v1DataSize = 4 + 5 + 1; + int offsetArraySize = numVectors * 4; + + ByteBuffer page = + ByteBuffer.allocate(8 + offsetArraySize + v0DataSize + v1DataSize) + .order(ByteOrder.LITTLE_ENDIAN); + + page.put((byte) 1).put((byte) 0).put((byte) 0).put((byte) 3); + page.putInt(numElements); + page.putInt(offsetArraySize); + page.putInt(offsetArraySize + v0DataSize); + + // Vector 0 + page.put((byte) 0).put((byte) 0).putShort((short) 0); + page.putInt(1).put((byte) 3); + page.put(v0Packed, 0, 3); + + // Vector 1 + page.put((byte) 0).put((byte) 0).putShort((short) 0); + page.putInt(9).put((byte) 1); + page.put(v1Packed, 0, 1); + + page.flip(); + + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(numElements, ByteBufferInputStream.wrap(page)); + + // Skip entire vector 0 (8 values) + reader.skip(8); + + // Read from vector 1 + assertEquals(Float.floatToRawIntBits(9.0f), Float.floatToRawIntBits(reader.readFloat())); + assertEquals(Float.floatToRawIntBits(10.0f), Float.floatToRawIntBits(reader.readFloat())); + } + + // ========== Vector Size Validation ========== + + /** + * Verify that vectorSize=65536 is rejected because num_exceptions (uint16) + * cannot represent 65536, which would cause silent data corruption if all + * values in a vector are exceptions. + */ + @Test(expected = IllegalArgumentException.class) + public void testVectorSize65536Rejected() throws Exception { + new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator(), 65536); + } + + /** + * Verify that vectorSize=32768 (max allowed) works correctly. + */ + @Test + public void testVectorSize32768Works() throws Exception { + AlpValuesWriter.FloatAlpValuesWriter writer = null; + try { + writer = new AlpValuesWriter.FloatAlpValuesWriter(256, 256, new DirectByteBufferAllocator(), 32768); + // Write a small number of values (partial vector) + for (int i = 0; i < 10; i++) { + writer.writeFloat(i * 1.0f); + } + + BytesInput input = writer.getBytes(); + AlpValuesReaderForFloat reader = new AlpValuesReaderForFloat(); + reader.initFromPage(10, ByteBufferInputStream.wrap(input.toByteBuffer())); + + for (int i = 0; i < 10; i++) { + assertEquals( + Float.floatToRawIntBits(i * 1.0f), + Float.floatToRawIntBits(reader.readFloat())); + } + } finally { + if (writer != null) { + writer.reset(); + writer.close(); + } + } + } + + // ========== Verify Encoder Produces Expected Values ========== + + /** + * Manually verify that the encoder produces the exact integer values + * we expect for known inputs. This is independent of the writer/reader. + */ + @Test + public void testEncoderProducesExpectedValues() { + // 1.23f * 100 = 123 + assertEquals(123, AlpEncoderDecoder.encodeFloat(1.23f, 2, 0)); + // 19.99f * 100 = 1999 + assertEquals(1999, AlpEncoderDecoder.encodeFloat(19.99f, 2, 0)); + // -5.0f * 10 = -50 + assertEquals(-50, AlpEncoderDecoder.encodeFloat(-5.0f, 1, 0)); + // 0.0f * anything = 0 + assertEquals(0, AlpEncoderDecoder.encodeFloat(0.0f, 5, 0)); + // 42.0f * 1 = 42 + assertEquals(42, AlpEncoderDecoder.encodeFloat(42.0f, 0, 0)); + // 1.5f * 10 = 15 + assertEquals(15, AlpEncoderDecoder.encodeFloat(1.5f, 1, 0)); + + // Double path + assertEquals(123L, AlpEncoderDecoder.encodeDouble(1.23, 2, 0)); + assertEquals(1999L, AlpEncoderDecoder.encodeDouble(19.99, 2, 0)); + assertEquals(-50L, AlpEncoderDecoder.encodeDouble(-5.0, 1, 0)); + assertEquals(0L, AlpEncoderDecoder.encodeDouble(0.0, 5, 0)); + assertEquals(42L, AlpEncoderDecoder.encodeDouble(42.0, 0, 0)); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 60150439a6..3816526f18 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -748,6 +748,10 @@ public org.apache.parquet.column.Encoding getEncoding(Encoding encoding) { } public Encoding getEncoding(org.apache.parquet.column.Encoding encoding) { + // ALP encoding is not yet part of the parquet-format specification + if (encoding == org.apache.parquet.column.Encoding.ALP) { + throw new IllegalArgumentException("ALP encoding is not yet supported in the parquet-format specification"); + } return Encoding.valueOf(encoding.name()); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java index 264017a1f0..70692e6735 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java @@ -471,6 +471,10 @@ public void testLogicalToConvertedTypeConversion() { public void testEnumEquivalence() { ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); for (org.apache.parquet.column.Encoding encoding : org.apache.parquet.column.Encoding.values()) { + // Skip ALP encoding as it's not yet in the parquet-format specification + if (encoding == org.apache.parquet.column.Encoding.ALP) { + continue; + } assertEquals( encoding, parquetMetadataConverter.getEncoding(parquetMetadataConverter.getEncoding(encoding))); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java new file mode 100644 index 0000000000..4787769de0 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestInterOpReadAlp.java @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.hadoop; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cross-compatibility test for ALP (Adaptive Lossless floating-Point) encoding. + * + *

This test reads ALP-encoded parquet files generated by Arrow C++ to verify + * that the Java implementation can correctly decode them. + * + *

To run this test with local files generated by Arrow C++, set the environment variable + * or system property: + *

    + *
  • ALP_TEST_FILE - Path to an ALP-encoded parquet file
  • + *
+ * + *

Once parquet-testing PR #100 is merged, this test will also support downloading + * test files from the parquet-testing repository. + * + * @see Arrow C++ ALP PR + * @see parquet-testing ALP PR + */ +public class TestInterOpReadAlp { + private static final Logger LOG = LoggerFactory.getLogger(TestInterOpReadAlp.class); + + // TODO: Update these once parquet-testing PR #100 is merged + // private static final String ALP_FLOAT_FILE = "alp_float.parquet"; + // private static final String ALP_DOUBLE_FILE = "alp_double.parquet"; + // private static final String CHANGESET = "TBD"; + // private final InterOpTester interop = new InterOpTester(); + + /** + * Get the path to a local ALP test file. + * + * @return Path to the test file, or null if not configured + */ + private Path getLocalAlpTestFile() { + String filePath = System.getProperty("ALP_TEST_FILE"); + if (filePath == null) { + filePath = System.getenv("ALP_TEST_FILE"); + } + if (filePath != null && new File(filePath).exists()) { + return new Path(filePath); + } + return null; + } + + /** + * Test reading an ALP-encoded parquet file from a local path. + * + *

This test is skipped if no local file is configured. + * Set ALP_TEST_FILE environment variable or system property to run. + */ + @Test + public void testReadLocalAlpFile() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Reading ALP test file: {}", alpFile); + + int rowCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + // Log first few rows for debugging + if (rowCount <= 5) { + LOG.info("Row {}: {}", rowCount, group); + } + } + } + + LOG.info("Successfully read {} rows from ALP-encoded file", rowCount); + assertTrue("Expected at least one row", rowCount > 0); + } + + /** + * Test reading ALP-encoded floats and comparing with PLAIN-encoded values. + * + *

This test expects a parquet file with columns: + *

    + *
  • float_plain - PLAIN encoded float column
  • + *
  • float_alp - ALP encoded float column
  • + *
+ * + *

Set ALP_TEST_FILE to a file with these columns to run this test. + */ + @Test + public void testReadAlpFloatCompareWithPlain() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Comparing ALP vs PLAIN float encoding from: {}", alpFile); + + int rowCount = 0; + int mismatchCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + try { + // Try to read both columns - may not exist depending on file format + float plainVal = group.getFloat("float_plain", 0); + float alpVal = group.getFloat("float_alp", 0); + + // Compare bit-exact values + if (Float.floatToRawIntBits(plainVal) != Float.floatToRawIntBits(alpVal)) { + mismatchCount++; + if (mismatchCount <= 5) { + LOG.warn( + "Float mismatch at row {}: plain={} (bits={}), alp={} (bits={})", + rowCount, + plainVal, + Float.floatToRawIntBits(plainVal), + alpVal, + Float.floatToRawIntBits(alpVal)); + } + } + } catch (RuntimeException e) { + // Columns may not exist in this file format + if (rowCount == 1) { + LOG.info("Could not compare float columns: {}", e.getMessage()); + } + break; + } + } + } + + if (mismatchCount > 0) { + LOG.error("Found {} float mismatches out of {} rows", mismatchCount, rowCount); + } + assertEquals("Float values should match between PLAIN and ALP encoding", 0, mismatchCount); + } + + /** + * Test reading ALP-encoded doubles and comparing with PLAIN-encoded values. + * + *

This test expects a parquet file with columns: + *

    + *
  • double_plain - PLAIN encoded double column
  • + *
  • double_alp - ALP encoded double column
  • + *
+ * + *

Set ALP_TEST_FILE to a file with these columns to run this test. + */ + @Test + public void testReadAlpDoubleCompareWithPlain() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Comparing ALP vs PLAIN double encoding from: {}", alpFile); + + int rowCount = 0; + int mismatchCount = 0; + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + try { + // Try to read both columns - may not exist depending on file format + double plainVal = group.getDouble("double_plain", 0); + double alpVal = group.getDouble("double_alp", 0); + + // Compare bit-exact values + if (Double.doubleToRawLongBits(plainVal) != Double.doubleToRawLongBits(alpVal)) { + mismatchCount++; + if (mismatchCount <= 5) { + LOG.warn( + "Double mismatch at row {}: plain={} (bits={}), alp={} (bits={})", + rowCount, + plainVal, + Double.doubleToRawLongBits(plainVal), + alpVal, + Double.doubleToRawLongBits(alpVal)); + } + } + } catch (RuntimeException e) { + // Columns may not exist in this file format + if (rowCount == 1) { + LOG.info("Could not compare double columns: {}", e.getMessage()); + } + break; + } + } + } + + if (mismatchCount > 0) { + LOG.error("Found {} double mismatches out of {} rows", mismatchCount, rowCount); + } + assertEquals("Double values should match between PLAIN and ALP encoding", 0, mismatchCount); + } + + /** + * Test reading any ALP-encoded file and verify basic functionality. + * + *

This test reads all float and double columns from the file + * and verifies the values are valid (not corrupted). + */ + @Test + public void testReadAlpVerifyValues() throws IOException { + Path alpFile = getLocalAlpTestFile(); + assumeTrue("ALP_TEST_FILE not set or file does not exist, skipping test", alpFile != null); + + LOG.info("Verifying ALP values from: {}", alpFile); + + int rowCount = 0; + int floatCount = 0; + int doubleCount = 0; + int nanCount = 0; + int infCount = 0; + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), alpFile).build()) { + Group group; + while ((group = reader.read()) != null) { + rowCount++; + + // Try to read float columns + for (int i = 0; i < group.getType().getFieldCount(); i++) { + String fieldName = group.getType().getFieldName(i); + try { + if (group.getType().getType(i).asPrimitiveType().getPrimitiveTypeName() + == org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT) { + float val = group.getFloat(fieldName, 0); + floatCount++; + if (Float.isNaN(val)) nanCount++; + if (Float.isInfinite(val)) infCount++; + } else if (group.getType().getType(i).asPrimitiveType().getPrimitiveTypeName() + == org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE) { + double val = group.getDouble(fieldName, 0); + doubleCount++; + if (Double.isNaN(val)) nanCount++; + if (Double.isInfinite(val)) infCount++; + } + } catch (Exception e) { + // Skip fields that can't be read + } + } + } + } + + LOG.info( + "Read {} rows, {} floats, {} doubles, {} NaNs, {} Infs", + rowCount, + floatCount, + doubleCount, + nanCount, + infCount); + assertTrue("Expected at least one row", rowCount > 0); + } + + // TODO: Uncomment and update once parquet-testing PR #100 is merged + /* + @Test + public void testReadAlpFromParquetTesting() throws IOException { + Path alpFloatFile = interop.GetInterOpFile(ALP_FLOAT_FILE, CHANGESET); + // Test implementation here + } + */ +}