Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.alp.AlpValuesReaderForDouble;
import org.apache.parquet.column.values.alp.AlpValuesReaderForFloat;
import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForDouble;
import org.apache.parquet.column.values.bytestreamsplit.ByteStreamSplitValuesReaderForFLBA;
Expand Down Expand Up @@ -147,6 +149,26 @@ public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valu
}
},

/**
* ALP (Adaptive Lossless floating-Point) encoding for FLOAT and DOUBLE types.
* Works by converting floating-point values to integers using decimal scaling,
* then applying Frame of Reference (FOR) encoding and bit-packing.
*/
ALP {
@Override
public ValuesReader getValuesReader(ColumnDescriptor descriptor, ValuesType valuesType) {
switch (descriptor.getType()) {
case FLOAT:
return new AlpValuesReaderForFloat();
case DOUBLE:
return new AlpValuesReaderForDouble();
default:
throw new ParquetDecodingException(
"ALP encoding is only supported for FLOAT and DOUBLE, not " + descriptor.getType());
}
}
},

/**
* @deprecated This is no longer used, and has been replaced by {@link #RLE}
* which is combination of bit packing and rle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ParquetProperties {
public static final int DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
public static final boolean DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED = false;
public static final boolean DEFAULT_IS_ALP_ENABLED = false;
public static final WriterVersion DEFAULT_WRITER_VERSION = WriterVersion.PARQUET_1_0;
public static final boolean DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK = true;
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
Expand Down Expand Up @@ -132,6 +133,7 @@ public static WriterVersion fromString(String name) {
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private final ColumnProperty<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty<Boolean> alpEnabled;
private final Map<String, String> extraMetaData;
private final ColumnProperty<Boolean> statistics;
private final ColumnProperty<Boolean> sizeStatistics;
Expand Down Expand Up @@ -164,6 +166,7 @@ private ParquetProperties(Builder builder) {
this.pageRowCountLimit = builder.pageRowCountLimit;
this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
this.byteStreamSplitEnabled = builder.byteStreamSplitEnabled.build();
this.alpEnabled = builder.alpEnabled.build();
this.extraMetaData = builder.extraMetaData;
this.statistics = builder.statistics.build();
this.sizeStatistics = builder.sizeStatistics.build();
Expand Down Expand Up @@ -259,6 +262,23 @@ public boolean isByteStreamSplitEnabled(ColumnDescriptor column) {
}
}

/**
* Check if ALP encoding is enabled for the given column.
* ALP encoding is only supported for FLOAT and DOUBLE types.
*
* @param column the column descriptor
* @return true if ALP encoding is enabled for this column
*/
public boolean isAlpEnabled(ColumnDescriptor column) {
switch (column.getPrimitiveType().getPrimitiveTypeName()) {
case FLOAT:
case DOUBLE:
return alpEnabled.getValue(column);
default:
return false;
}
}

public ByteBufferAllocator getAllocator() {
return allocator;
}
Expand Down Expand Up @@ -416,6 +436,7 @@ public static class Builder {
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private final ColumnProperty.Builder<ByteStreamSplitMode> byteStreamSplitEnabled;
private final ColumnProperty.Builder<Boolean> alpEnabled;
private Map<String, String> extraMetaData = new HashMap<>();
private final ColumnProperty.Builder<Boolean> statistics;
private final ColumnProperty.Builder<Boolean> sizeStatistics;
Expand All @@ -427,6 +448,7 @@ private Builder() {
DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED
? ByteStreamSplitMode.FLOATING_POINT
: ByteStreamSplitMode.NONE);
alpEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_IS_ALP_ENABLED);
bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
Expand Down Expand Up @@ -457,6 +479,7 @@ private Builder(ParquetProperties toCopy) {
this.numBloomFilterCandidates = ColumnProperty.builder(toCopy.numBloomFilterCandidates);
this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
this.byteStreamSplitEnabled = ColumnProperty.builder(toCopy.byteStreamSplitEnabled);
this.alpEnabled = ColumnProperty.builder(toCopy.alpEnabled);
this.extraMetaData = toCopy.extraMetaData;
this.statistics = ColumnProperty.builder(toCopy.statistics);
this.sizeStatistics = ColumnProperty.builder(toCopy.sizeStatistics);
Expand Down Expand Up @@ -534,6 +557,29 @@ public Builder withExtendedByteStreamSplitEncoding(boolean enable) {
return this;
}

/**
* Enable or disable ALP encoding for FLOAT and DOUBLE columns.
*
* @param enable whether ALP encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withAlpEncoding(boolean enable) {
this.alpEnabled.withDefaultValue(enable);
return this;
}

/**
* Enable or disable ALP encoding for the specified column.
*
* @param columnPath the path of the column (dot-string)
* @param enable whether ALP encoding should be enabled
* @return this builder for method chaining.
*/
public Builder withAlpEncoding(String columnPath, boolean enable) {
this.alpEnabled.withValue(columnPath, enable);
return this;
}

/**
* Set the Parquet format dictionary page size.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.values.alp;

import org.apache.parquet.Preconditions;

/**
* Constants for the ALP (Adaptive Lossless floating-Point) encoding.
*
* <p>ALP encoding converts floating-point values to integers using decimal scaling,
* then applies Frame of Reference encoding and bit-packing.
* Values that cannot be losslessly converted are stored as exceptions.
*
* <p>Based on the paper: "ALP: Adaptive Lossless floating-Point Compression" (SIGMOD 2024)
*
* @see <a href="https://dl.acm.org/doi/10.1145/3626717">ALP Paper</a>
*/
public final class AlpConstants {

private AlpConstants() {
// Utility class
}

// Page header fields
public static final int ALP_VERSION = 1;
public static final int ALP_COMPRESSION_MODE = 0;
public static final int ALP_INTEGER_ENCODING_FOR = 0;
public static final int ALP_HEADER_SIZE = 8;

public static final int DEFAULT_VECTOR_SIZE = 1024;
public static final int DEFAULT_VECTOR_SIZE_LOG = 10;

// Capped at 15 (vectorSize=32768) because num_exceptions is uint16,
// so vectorSize must not exceed 65535 to avoid overflow when all values are exceptions.
static final int MAX_LOG_VECTOR_SIZE = 15;
static final int MIN_LOG_VECTOR_SIZE = 3;

static final int FLOAT_MAX_EXPONENT = 10;
static final int DOUBLE_MAX_EXPONENT = 18;

// Preset caching: full search for the first N vectors, then lock in the top combos
static final int SAMPLER_SAMPLE_VECTORS = 8;
static final int MAX_PRESET_COMBINATIONS = 5;

// Magic numbers for the fast-rounding trick (see ALP paper, Section 3.2)
static final float MAGIC_FLOAT = 12_582_912.0f; // 2^22 + 2^23
static final double MAGIC_DOUBLE = 6_755_399_441_055_744.0; // 2^51 + 2^52

// Per-vector metadata sizes in bytes
public static final int ALP_INFO_SIZE = 4; // exponent(1) + factor(1) + num_exceptions(2)
public static final int FLOAT_FOR_INFO_SIZE = 5; // frame_of_reference(4) + bit_width(1)
public static final int DOUBLE_FOR_INFO_SIZE = 9; // frame_of_reference(8) + bit_width(1)

static final float[] FLOAT_POW10 = {1e0f, 1e1f, 1e2f, 1e3f, 1e4f, 1e5f, 1e6f, 1e7f, 1e8f, 1e9f, 1e10f};

static final double[] DOUBLE_POW10 = {
1e0, 1e1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12, 1e13, 1e14, 1e15, 1e16, 1e17, 1e18
};

static final int FLOAT_NEGATIVE_ZERO_BITS = 0x80000000;
static final long DOUBLE_NEGATIVE_ZERO_BITS = 0x8000000000000000L;

/** Validates vector size: must be a power of 2 in [2^MIN_LOG .. 2^MAX_LOG]. */
static int validateVectorSize(int vectorSize) {
Preconditions.checkArgument(
vectorSize > 0 && (vectorSize & (vectorSize - 1)) == 0,
"Vector size must be a power of 2, got: %s",
vectorSize);
int logSize = Integer.numberOfTrailingZeros(vectorSize);
Preconditions.checkArgument(
logSize >= MIN_LOG_VECTOR_SIZE && logSize <= MAX_LOG_VECTOR_SIZE,
"Vector size log2 must be between %s and %s, got: %s (vectorSize=%s)",
MIN_LOG_VECTOR_SIZE,
MAX_LOG_VECTOR_SIZE,
logSize,
vectorSize);
return vectorSize;
}
}
Loading