diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml
index 320c7a7d9c..5078a8a2c4 100644
--- a/parquet-arrow/pom.xml
+++ b/parquet-arrow/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 27cabb757f..74ce50c0ac 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml
index 77df2c101d..2364ef3caf 100644
--- a/parquet-benchmarks/pom.xml
+++ b/parquet-benchmarks/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml
index 1eab1c162e..64b09fd723 100644
--- a/parquet-cli/pom.xml
+++ b/parquet-cli/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index 01b5b8e8c6..f43ae3ba9c 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 6beff4da93..944cfb58eb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -119,6 +119,12 @@ public Optional visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
}
+
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) {
+ return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR);
+ }
})
.orElseThrow(() -> new ShouldNeverHappenException(
"No comparator logic implemented for INT64 logical type: " + logicalType));
@@ -183,6 +189,12 @@ public Optional visit(
}
return empty();
}
+
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) {
+ return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR);
+ }
})
.orElseThrow(() -> new ShouldNeverHappenException(
"No comparator logic implemented for INT32 logical type: " + logicalType));
@@ -283,6 +295,12 @@ public Optional visit(
LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) {
return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
}
+
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) {
+ return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+ }
})
.orElseThrow(() -> new ShouldNeverHappenException(
"No comparator logic implemented for BINARY logical type: " + logicalType));
@@ -417,6 +435,12 @@ public Optional visit(
LogicalTypeAnnotation.Float16LogicalTypeAnnotation float16LogicalType) {
return of(PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR);
}
+
+ @Override
+ public Optional visit(
+ LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) {
+ return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR);
+ }
})
.orElseThrow(() -> new ShouldNeverHappenException(
"No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: "
diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
index d3d1b15bc6..8fb53aca0f 100644
--- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
+++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java
@@ -100,6 +100,29 @@ private void testInt32Comparator(PrimitiveComparator comparator, Intege
checkThrowingUnsupportedException(comparator, Integer.TYPE);
}
+ @Test
+ public void testUnknownLogicalTypeComparator() {
+ PrimitiveType.PrimitiveTypeName[] types = new PrimitiveType.PrimitiveTypeName[] {
+ PrimitiveType.PrimitiveTypeName.BOOLEAN,
+ PrimitiveType.PrimitiveTypeName.BINARY,
+ PrimitiveType.PrimitiveTypeName.INT32,
+ PrimitiveType.PrimitiveTypeName.INT64,
+ PrimitiveType.PrimitiveTypeName.FLOAT,
+ PrimitiveType.PrimitiveTypeName.DOUBLE,
+ PrimitiveType.PrimitiveTypeName.INT96,
+ PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
+ };
+
+ for (PrimitiveType.PrimitiveTypeName type : types) {
+ assertEquals(
+ new PrimitiveType(Type.Repetition.REQUIRED, type, "vo")
+ .withLogicalTypeAnnotation(LogicalTypeAnnotation.unknownType())
+ .comparator()
+ .compare(null, null),
+ 0);
+ }
+ }
+
@Test
public void testSignedInt64Comparator() {
testInt64Comparator(
diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml
index 07d322a799..ab9e2fe682 100644
--- a/parquet-common/pom.xml
+++ b/parquet-common/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml
index eebdde1ef0..f06e50e3f3 100644
--- a/parquet-encoding/pom.xml
+++ b/parquet-encoding/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml
index a63e1c696e..1e998a5d68 100644
--- a/parquet-format-structures/pom.xml
+++ b/parquet-format-structures/pom.xml
@@ -24,7 +24,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
parquet-format-structures
diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d7a4c330c9..776fb45576 100644
--- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -45,6 +45,7 @@
import org.apache.parquet.format.event.TypedConsumer.I64Consumer;
import org.apache.parquet.format.event.TypedConsumer.StringConsumer;
import org.apache.thrift.TBase;
+import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
@@ -59,6 +60,7 @@
public class Util {
private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException {
writeColumnIndex(columnIndex, to, null, null);
@@ -156,6 +158,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp
return read(from, new FileMetaData(), decryptor, AAD);
}
+ public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException {
+ return readFileMetaData(from, null, null, maxMessageSize);
+ }
+
+ public static FileMetaData readFileMetaData(
+ InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException {
+ return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize);
+ }
+
public static void writeColumnMetaData(
ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD)
throws IOException {
@@ -190,6 +201,18 @@ public static FileMetaData readFileMetaData(
return md;
}
+ public static FileMetaData readFileMetaData(
+ InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+ throws IOException {
+ FileMetaData md = new FileMetaData();
+ if (skipRowGroups) {
+ readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize);
+ } else {
+ read(from, md, decryptor, AAD, maxMessageSize);
+ }
+ return md;
+ }
+
public static void writeFileCryptoMetaData(
org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException {
write(cryptoMetadata, to, null, null);
@@ -293,6 +316,17 @@ public static void readFileMetaData(
BlockCipher.Decryptor decryptor,
byte[] AAD)
throws IOException {
+ readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ public static void readFileMetaData(
+ final InputStream input,
+ final FileMetaDataConsumer consumer,
+ boolean skipRowGroups,
+ BlockCipher.Decryptor decryptor,
+ byte[] AAD,
+ int maxMessageSize)
+ throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
@@ -358,26 +392,54 @@ public void consume(RowGroup rowGroup) {
byte[] plainText = decryptor.decrypt(input, AAD);
from = new ByteArrayInputStream(plainText);
}
- new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+ new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer);
} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}
private static TProtocol protocol(OutputStream to) throws TTransportException {
- return protocol(new TIOStreamTransport(to));
+ return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE);
}
private static TProtocol protocol(InputStream from) throws TTransportException {
- return protocol(new TIOStreamTransport(from));
+ return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException {
+ return protocol(new TIOStreamTransport(from), maxMessageSize);
}
- private static InterningProtocol protocol(TIOStreamTransport t) {
+ private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize)
+ throws TTransportException, NumberFormatException {
+ int maxMessageSize = configuredMaxMessageSize;
+ if (configuredMaxMessageSize == -1) {
+ // Set to default 100 MB
+ maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+ }
+ if (configuredMaxMessageSize <= 0) {
+ throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize);
+ }
+
+ TConfiguration config = t.getConfiguration();
+ config.setMaxMessageSize(maxMessageSize);
+ /*
+ Reset known message size to 0 to force checking against the max message size.
+ This is necessary when reusing the same transport for multiple reads/writes,
+ as the known message size may be larger than the max message size.
+ */
+ t.updateKnownMessageSize(0);
return new InterningProtocol(new TCompactProtocol(t));
}
private static > T read(
final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+
+ private static > T read(
+ final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize)
+ throws IOException {
final InputStream from;
if (null == decryptor) {
from = input;
@@ -387,7 +449,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) {
}
try {
- tbase.read(protocol(from));
+ tbase.read(protocol(from, maxMessageSize));
return tbase;
} catch (TException e) {
throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e);
diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml
index df056ca609..4ed1d6032e 100644
--- a/parquet-generator/pom.xml
+++ b/parquet-generator/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml
index 4e5f727b73..77411e3b88 100644
--- a/parquet-hadoop-bundle/pom.xml
+++ b/parquet-hadoop-bundle/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index 687310d9e2..5bdffb50d3 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index d20ac7faeb..a1a256329b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -147,6 +147,15 @@ public class ParquetMetadataConverter {
public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter();
public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k
+ /**
+ * Configuration property to control the Thrift max message size when reading Parquet metadata.
+ * This is useful for files with very large metadata
+ * Default value is 100 MB.
+ */
+ public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit";
+
+ private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB
+
private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class);
private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR =
new LogicalTypeConverterVisitor();
@@ -154,6 +163,7 @@ public class ParquetMetadataConverter {
new ConvertedTypeConverterVisitor();
private final int statisticsTruncateLength;
private final boolean useSignedStringMinMax;
+ private final ParquetReadOptions options;
public ParquetMetadataConverter() {
this(false);
@@ -173,7 +183,7 @@ public ParquetMetadataConverter(Configuration conf) {
}
public ParquetMetadataConverter(ParquetReadOptions options) {
- this(options.useSignedStringMinMax());
+ this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options);
}
private ParquetMetadataConverter(boolean useSignedStringMinMax) {
@@ -181,11 +191,30 @@ private ParquetMetadataConverter(boolean useSignedStringMinMax) {
}
private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) {
+ this(useSignedStringMinMax, statisticsTruncateLength, null);
+ }
+
+ private ParquetMetadataConverter(
+ boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) {
if (statisticsTruncateLength <= 0) {
throw new IllegalArgumentException("Truncate length should be greater than 0");
}
this.useSignedStringMinMax = useSignedStringMinMax;
this.statisticsTruncateLength = statisticsTruncateLength;
+ this.options = options;
+ }
+
+ /**
+ * Gets the configured max message size for Thrift deserialization.
+ * Reads from ParquetReadOptions configuration, or returns -1 if not available.
+ *
+ * @return the max message size in bytes, or -1 to use the default
+ */
+ private int getMaxMessageSize() {
+ if (options != null && options.getConfiguration() != null) {
+ return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE);
+ }
+ return -1;
}
// NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate
@@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata(
filter.accept(new MetadataFilterVisitor() {
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize);
return new FileMetaDataAndRowGroupOffsetInfo(
fileMetadata, generateRowGroupOffsets(fileMetadata));
}
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
@@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro
@Override
public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
- FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
+ int maxMessageSize = getMaxMessageSize();
+ FileMetaData fileMetadata =
+ readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize);
// We must generate the map *before* filtering because it modifies `fileMetadata`.
Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index f296286800..41b068d01a 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -129,6 +129,7 @@ public void close() throws IOException, InterruptedException {
if (!closed) {
try {
if (aborted) {
+ parquetFileWriter.abort();
return;
}
flushRowGroupToStore();
@@ -140,6 +141,9 @@ public void close() throws IOException, InterruptedException {
}
finalMetadata.putAll(finalWriteContext.getExtraMetaData());
parquetFileWriter.end(finalMetadata);
+ } catch (Exception e) {
+ parquetFileWriter.abort();
+ throw e;
} finally {
AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter);
closed = true;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 4d17a1d6e4..82f4577b83 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -173,6 +173,7 @@ public static enum Mode {
// set when end is called
private ParquetMetadata footer = null;
+ private boolean aborted;
private boolean closed;
private final CRC32 crc;
@@ -335,6 +336,34 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long ro
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
+ @FunctionalInterface
+ interface IOCallable {
+ T call() throws IOException;
+ }
+
+ private T withAbortOnFailure(IOCallable action) throws IOException {
+ try {
+ return action.call();
+ } catch (IOException e) {
+ aborted = true;
+ throw e;
+ }
+ }
+
+ @FunctionalInterface
+ interface IORunnable {
+ void run() throws IOException;
+ }
+
+ private void withAbortOnFailure(IORunnable action) throws IOException {
+ try {
+ action.run();
+ } catch (IOException e) {
+ aborted = true;
+ throw e;
+ }
+ }
+
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
@@ -565,13 +594,15 @@ private ParquetFileWriter(
* @throws IOException if there is an error while writing
*/
public void start() throws IOException {
- state = state.start();
- LOG.debug("{}: start", out.getPos());
- byte[] magic = MAGIC;
- if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
- magic = EFMAGIC;
- }
- out.write(magic);
+ withAbortOnFailure(() -> {
+ state = state.start();
+ LOG.debug("{}: start", out.getPos());
+ byte[] magic = MAGIC;
+ if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+ magic = EFMAGIC;
+ }
+ out.write(magic);
+ });
}
public InternalFileEncryptor getEncryptor() {
@@ -585,19 +616,21 @@ public InternalFileEncryptor getEncryptor() {
* @throws IOException if there is an error while writing
*/
public void startBlock(long recordCount) throws IOException {
- state = state.startBlock();
- LOG.debug("{}: start block", out.getPos());
- // out.write(MAGIC); // TODO: add a magic delimiter
+ withAbortOnFailure(() -> {
+ state = state.startBlock();
+ LOG.debug("{}: start block", out.getPos());
+ // out.write(MAGIC); // TODO: add a magic delimiter
- alignment.alignForRowGroup(out);
+ alignment.alignForRowGroup(out);
- currentBlock = new BlockMetaData();
- currentRecordCount = recordCount;
+ currentBlock = new BlockMetaData();
+ currentRecordCount = recordCount;
- currentColumnIndexes = new ArrayList<>();
- currentOffsetIndexes = new ArrayList<>();
+ currentColumnIndexes = new ArrayList<>();
+ currentOffsetIndexes = new ArrayList<>();
- currentBloomFilters = new HashMap<>();
+ currentBloomFilters = new HashMap<>();
+ });
}
/**
@@ -610,28 +643,31 @@ public void startBlock(long recordCount) throws IOException {
*/
public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName)
throws IOException {
- state = state.startColumn();
- encodingStatsBuilder.clear();
- currentEncodings = new HashSet();
- currentChunkPath = ColumnPath.get(descriptor.getPath());
- currentChunkType = descriptor.getPrimitiveType();
- currentChunkCodec = compressionCodecName;
- currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = -1;
- compressedLength = 0;
- uncompressedLength = 0;
- // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
- currentStatistics = null;
- currentSizeStatistics = SizeStatistics.newBuilder(
- descriptor.getPrimitiveType(),
- descriptor.getMaxRepetitionLevel(),
- descriptor.getMaxDefinitionLevel())
- .build();
- currentGeospatialStatistics =
- GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build();
-
- columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
- offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ withAbortOnFailure(() -> {
+ state = state.startColumn();
+ encodingStatsBuilder.clear();
+ currentEncodings = new HashSet();
+ currentChunkPath = ColumnPath.get(descriptor.getPath());
+ currentChunkType = descriptor.getPrimitiveType();
+ currentChunkCodec = compressionCodecName;
+ currentChunkValueCount = valueCount;
+ currentChunkFirstDataPage = -1;
+ compressedLength = 0;
+ uncompressedLength = 0;
+ // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed
+ // one
+ currentStatistics = null;
+ currentSizeStatistics = SizeStatistics.newBuilder(
+ descriptor.getPrimitiveType(),
+ descriptor.getMaxRepetitionLevel(),
+ descriptor.getMaxDefinitionLevel())
+ .build();
+ currentGeospatialStatistics = GeospatialStatistics.newBuilder(descriptor.getPrimitiveType())
+ .build();
+
+ columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
+ offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ });
}
/**
@@ -641,45 +677,51 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio
* @throws IOException if there is an error while writing
*/
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
- writeDictionaryPage(dictionaryPage, null, null);
+ withAbortOnFailure(() -> {
+ writeDictionaryPage(dictionaryPage, null, null);
+ });
}
public void writeDictionaryPage(
DictionaryPage dictionaryPage, BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
- state = state.write();
- LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
- currentChunkDictionaryPageOffset = out.getPos();
- int uncompressedSize = dictionaryPage.getUncompressedSize();
- int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size());
- if (pageWriteChecksumEnabled) {
- crc.reset();
- crcUpdate(dictionaryPage.getBytes());
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- (int) crc.getValue(),
- out,
- headerBlockEncryptor,
- AAD);
- } else {
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- out,
- headerBlockEncryptor,
- AAD);
- }
- long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
- this.uncompressedLength += uncompressedSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
- encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
- currentEncodings.add(dictionaryPage.getEncoding());
+ withAbortOnFailure(() -> {
+ state = state.write();
+ LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
+ currentChunkDictionaryPageOffset = out.getPos();
+ int uncompressedSize = dictionaryPage.getUncompressedSize();
+ int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size());
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crcUpdate(dictionaryPage.getBytes());
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ (int) crc.getValue(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ } else {
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ }
+ long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+ this.uncompressedLength += uncompressedSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
+ dictionaryPage
+ .getBytes()
+ .writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
+ encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
+ currentEncodings.add(dictionaryPage.getEncoding());
+ });
}
/**
@@ -871,22 +913,24 @@ public void writeDataPage(
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- long beforeHeader = out.getPos();
- innerWriteDataPage(
- valueCount,
- uncompressedPageSize,
- bytes,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- metadataBlockEncryptor,
- pageHeaderAAD,
- sizeStatistics);
- offsetIndexBuilder.add(
- toIntWithCheck(out.getPos() - beforeHeader, "page"),
- rowCount,
- sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ withAbortOnFailure(() -> {
+ long beforeHeader = out.getPos();
+ innerWriteDataPage(
+ valueCount,
+ uncompressedPageSize,
+ bytes,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ metadataBlockEncryptor,
+ pageHeaderAAD,
+ sizeStatistics);
+ offsetIndexBuilder.add(
+ toIntWithCheck(out.getPos() - beforeHeader, "page"),
+ rowCount,
+ sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ });
}
private void innerWriteDataPage(
@@ -978,51 +1022,53 @@ public void writeDataPage(
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (currentChunkFirstDataPage < 0) {
- currentChunkFirstDataPage = beforeHeader;
- }
- LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
- int compressedPageSize = toIntWithCheck(bytes.size(), "page");
- if (pageWriteChecksumEnabled) {
- crc.reset();
- crcUpdate(bytes);
- metadataConverter.writeDataPageV1Header(
- uncompressedPageSize,
- compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- (int) crc.getValue(),
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- } else {
- metadataConverter.writeDataPageV1Header(
- uncompressedPageSize,
- compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- }
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
- bytes.writeAllTo(out);
+ withAbortOnFailure(() -> {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
+ LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+ int compressedPageSize = toIntWithCheck(bytes.size(), "page");
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crcUpdate(bytes);
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize,
+ compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ } else {
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize,
+ compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ }
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
+ bytes.writeAllTo(out);
- mergeColumnStatistics(statistics, sizeStatistics);
+ mergeColumnStatistics(statistics, sizeStatistics);
- encodingStatsBuilder.addDataEncoding(valuesEncoding);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
+ encodingStatsBuilder.addDataEncoding(valuesEncoding);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ });
}
/**
@@ -1297,76 +1343,79 @@ public void writeDataPageV2(
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- state = state.write();
- int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels");
- int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels");
+ withAbortOnFailure(() -> {
+ state = state.write();
+ int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels");
+ int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels");
- int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page");
+ int compressedSize =
+ toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page");
- int uncompressedSize =
- toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page");
+ int uncompressedSize =
+ toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page");
- long beforeHeader = out.getPos();
- if (currentChunkFirstDataPage < 0) {
- currentChunkFirstDataPage = beforeHeader;
- }
-
- if (pageWriteChecksumEnabled) {
- crc.reset();
- if (repetitionLevels.size() > 0) {
- crcUpdate(repetitionLevels);
- }
- if (definitionLevels.size() > 0) {
- crcUpdate(definitionLevels);
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
}
- if (bytes.size() > 0) {
- crcUpdate(bytes);
+
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ if (repetitionLevels.size() > 0) {
+ crcUpdate(repetitionLevels);
+ }
+ if (definitionLevels.size() > 0) {
+ crcUpdate(definitionLevels);
+ }
+ if (bytes.size() > 0) {
+ crcUpdate(bytes);
+ }
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed,
+ (int) crc.getValue(),
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ } else {
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed,
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
}
- metadataConverter.writeDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- compressed,
- (int) crc.getValue(),
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- } else {
- metadataConverter.writeDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- compressed,
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- }
- long headersSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedSize + headersSize;
- this.compressedLength += compressedSize + headersSize;
+ long headersSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedSize + headersSize;
+ this.compressedLength += compressedSize + headersSize;
- mergeColumnStatistics(statistics, sizeStatistics);
+ mergeColumnStatistics(statistics, sizeStatistics);
- currentEncodings.add(dataEncoding);
- encodingStatsBuilder.addDataEncoding(dataEncoding);
+ currentEncodings.add(dataEncoding);
+ encodingStatsBuilder.addDataEncoding(dataEncoding);
- BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out);
+ BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out);
- offsetIndexBuilder.add(
- toIntWithCheck(out.getPos() - beforeHeader, "page"),
- rowCount,
- sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ offsetIndexBuilder.add(
+ toIntWithCheck(out.getPos() - beforeHeader, "page"),
+ rowCount,
+ sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ });
}
private void crcUpdate(BytesInput bytes) {
@@ -1457,58 +1506,61 @@ void writeColumnChunk(
int columnOrdinal,
byte[] fileAAD)
throws IOException {
- startColumn(descriptor, valueCount, compressionCodecName);
-
- state = state.write();
- if (dictionaryPage != null) {
- byte[] dictonaryPageHeaderAAD = null;
- if (null != headerBlockEncryptor) {
- dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
- fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
+ withAbortOnFailure(() -> {
+ startColumn(descriptor, valueCount, compressionCodecName);
+
+ state = state.write();
+ if (dictionaryPage != null) {
+ byte[] dictonaryPageHeaderAAD = null;
+ if (null != headerBlockEncryptor) {
+ dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
+ fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1);
+ }
+ writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
}
- writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
- }
- if (bloomFilter != null) {
- // write bloom filter if one of data pages is not dictionary encoded
- boolean isWriteBloomFilter = false;
- for (Encoding encoding : dataEncodings) {
- // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2
- if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) {
- isWriteBloomFilter = true;
- break;
+ if (bloomFilter != null) {
+ // write bloom filter if one of data pages is not dictionary encoded
+ boolean isWriteBloomFilter = false;
+ for (Encoding encoding : dataEncodings) {
+ // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in
+ // parquet v2
+ if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) {
+ isWriteBloomFilter = true;
+ break;
+ }
+ }
+ if (isWriteBloomFilter) {
+ currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+ } else {
+ LOG.info(
+ "No need to write bloom filter because column {} data pages are all encoded as dictionary.",
+ descriptor.getPath());
}
}
- if (isWriteBloomFilter) {
- currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
- } else {
- LOG.info(
- "No need to write bloom filter because column {} data pages are all encoded as dictionary.",
- descriptor.getPath());
+ LOG.debug("{}: write data pages", out.getPos());
+ long headersSize = bytes.size() - compressedTotalPageSize;
+ this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+ this.compressedLength += compressedTotalPageSize + headersSize;
+ LOG.debug("{}: write data pages content", out.getPos());
+ currentChunkFirstDataPage = out.getPos();
+ bytes.writeAllTo(out);
+ encodingStatsBuilder.addDataEncodings(dataEncodings);
+ if (rlEncodings.isEmpty()) {
+ encodingStatsBuilder.withV2Pages();
}
- }
- LOG.debug("{}: write data pages", out.getPos());
- long headersSize = bytes.size() - compressedTotalPageSize;
- this.uncompressedLength += uncompressedTotalPageSize + headersSize;
- this.compressedLength += compressedTotalPageSize + headersSize;
- LOG.debug("{}: write data pages content", out.getPos());
- currentChunkFirstDataPage = out.getPos();
- bytes.writeAllTo(out);
- encodingStatsBuilder.addDataEncodings(dataEncodings);
- if (rlEncodings.isEmpty()) {
- encodingStatsBuilder.withV2Pages();
- }
- currentEncodings.addAll(rlEncodings);
- currentEncodings.addAll(dlEncodings);
- currentEncodings.addAll(dataEncodings);
- currentStatistics = totalStats;
- currentSizeStatistics = totalSizeStats;
- currentGeospatialStatistics = totalGeospatialStats;
+ currentEncodings.addAll(rlEncodings);
+ currentEncodings.addAll(dlEncodings);
+ currentEncodings.addAll(dataEncodings);
+ currentStatistics = totalStats;
+ currentSizeStatistics = totalSizeStats;
+ currentGeospatialStatistics = totalGeospatialStats;
- this.columnIndexBuilder = columnIndexBuilder;
- this.offsetIndexBuilder = offsetIndexBuilder;
+ this.columnIndexBuilder = columnIndexBuilder;
+ this.offsetIndexBuilder = offsetIndexBuilder;
- endColumn();
+ endColumn();
+ });
}
/**
@@ -1530,34 +1582,36 @@ public void invalidateStatistics(Statistics> totalStatistics) {
* @throws IOException if there is an error while writing
*/
public void endColumn() throws IOException {
- state = state.endColumn();
- LOG.debug("{}: end column", out.getPos());
- if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
- currentColumnIndexes.add(null);
- } else {
- currentColumnIndexes.add(columnIndexBuilder.build());
- }
- currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
- currentBlock.addColumn(ColumnChunkMetaData.get(
- currentChunkPath,
- currentChunkType,
- currentChunkCodec,
- encodingStatsBuilder.build(),
- currentEncodings,
- currentStatistics,
- currentChunkFirstDataPage,
- currentChunkDictionaryPageOffset,
- currentChunkValueCount,
- compressedLength,
- uncompressedLength,
- currentSizeStatistics,
- currentGeospatialStatistics));
- this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
- this.uncompressedLength = 0;
- this.compressedLength = 0;
- this.currentChunkDictionaryPageOffset = 0;
- columnIndexBuilder = null;
- offsetIndexBuilder = null;
+ withAbortOnFailure(() -> {
+ state = state.endColumn();
+ LOG.debug("{}: end column", out.getPos());
+ if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
+ currentColumnIndexes.add(null);
+ } else {
+ currentColumnIndexes.add(columnIndexBuilder.build());
+ }
+ currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ currentChunkPath,
+ currentChunkType,
+ currentChunkCodec,
+ encodingStatsBuilder.build(),
+ currentEncodings,
+ currentStatistics,
+ currentChunkFirstDataPage,
+ currentChunkDictionaryPageOffset,
+ currentChunkValueCount,
+ compressedLength,
+ uncompressedLength,
+ currentSizeStatistics,
+ currentGeospatialStatistics));
+ this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
+ this.uncompressedLength = 0;
+ this.compressedLength = 0;
+ this.currentChunkDictionaryPageOffset = 0;
+ columnIndexBuilder = null;
+ offsetIndexBuilder = null;
+ });
}
/**
@@ -1566,22 +1620,24 @@ public void endColumn() throws IOException {
* @throws IOException if there is an error while writing
*/
public void endBlock() throws IOException {
- if (currentRecordCount == 0) {
- throw new ParquetEncodingException("End block with zero record");
- }
+ withAbortOnFailure(() -> {
+ if (currentRecordCount == 0) {
+ throw new ParquetEncodingException("End block with zero record");
+ }
- state = state.endBlock();
- LOG.debug("{}: end block", out.getPos());
- currentBlock.setRowCount(currentRecordCount);
- currentBlock.setOrdinal(blocks.size());
- blocks.add(currentBlock);
- columnIndexes.add(currentColumnIndexes);
- offsetIndexes.add(currentOffsetIndexes);
- bloomFilters.add(currentBloomFilters);
- currentColumnIndexes = null;
- currentOffsetIndexes = null;
- currentBloomFilters = null;
- currentBlock = null;
+ state = state.endBlock();
+ LOG.debug("{}: end block", out.getPos());
+ currentBlock.setRowCount(currentRecordCount);
+ currentBlock.setOrdinal(blocks.size());
+ blocks.add(currentBlock);
+ columnIndexes.add(currentColumnIndexes);
+ offsetIndexes.add(currentOffsetIndexes);
+ bloomFilters.add(currentBloomFilters);
+ currentColumnIndexes = null;
+ currentOffsetIndexes = null;
+ currentBloomFilters = null;
+ currentBlock = null;
+ });
}
/**
@@ -1598,9 +1654,11 @@ public void appendFile(Configuration conf, Path file) throws IOException {
}
public void appendFile(InputFile file) throws IOException {
- try (ParquetFileReader reader = ParquetFileReader.open(file)) {
- reader.appendTo(this);
- }
+ withAbortOnFailure(() -> {
+ try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+ reader.appendTo(this);
+ }
+ });
}
/**
@@ -1619,9 +1677,11 @@ public void appendRowGroups(FSDataInputStream file, List rowGroup
public void appendRowGroups(SeekableInputStream file, List rowGroups, boolean dropColumns)
throws IOException {
- for (BlockMetaData block : rowGroups) {
- appendRowGroup(file, block, dropColumns);
- }
+ withAbortOnFailure(() -> {
+ for (BlockMetaData block : rowGroups) {
+ appendRowGroup(file, block, dropColumns);
+ }
+ });
}
/**
@@ -1639,83 +1699,86 @@ public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boole
public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns)
throws IOException {
- startBlock(rowGroup.getRowCount());
-
- Map columnsToCopy = new HashMap();
- for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
- columnsToCopy.put(chunk.getPath().toDotString(), chunk);
- }
+ withAbortOnFailure(() -> {
+ startBlock(rowGroup.getRowCount());
- List columnsInOrder = new ArrayList();
-
- for (ColumnDescriptor descriptor : schema.getColumns()) {
- String path = ColumnPath.get(descriptor.getPath()).toDotString();
- ColumnChunkMetaData chunk = columnsToCopy.remove(path);
- if (chunk != null) {
- columnsInOrder.add(chunk);
- } else {
- throw new IllegalArgumentException(
- String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup));
+ Map columnsToCopy = new HashMap();
+ for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+ columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}
- }
- // complain if some columns would be dropped and that's not okay
- if (!dropColumns && !columnsToCopy.isEmpty()) {
- throw new IllegalArgumentException(String.format(
- "Columns cannot be copied (missing from target schema): %s",
- String.join(", ", columnsToCopy.keySet())));
- }
-
- // copy the data for all chunks
- long start = -1;
- long length = 0;
- long blockUncompressedSize = 0L;
- for (int i = 0; i < columnsInOrder.size(); i += 1) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
-
- // get this chunk's start position in the new file
- long newChunkStart = out.getPos() + length;
+ List columnsInOrder = new ArrayList();
- // add this chunk to be copied with any previous chunks
- if (start < 0) {
- // no previous chunk included, start at this chunk's starting pos
- start = chunk.getStartingPos();
+ for (ColumnDescriptor descriptor : schema.getColumns()) {
+ String path = ColumnPath.get(descriptor.getPath()).toDotString();
+ ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+ if (chunk != null) {
+ columnsInOrder.add(chunk);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup));
+ }
}
- length += chunk.getTotalSize();
-
- if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
- // not contiguous. do the copy now.
- copy(from, out, start, length);
- // reset to start at the next column chunk
- start = -1;
- length = 0;
+
+ // complain if some columns would be dropped and that's not okay
+ if (!dropColumns && !columnsToCopy.isEmpty()) {
+ throw new IllegalArgumentException(String.format(
+ "Columns cannot be copied (missing from target schema): %s",
+ String.join(", ", columnsToCopy.keySet())));
}
- // TODO: column/offset indexes are not copied
- // (it would require seeking to the end of the file for each row groups)
- currentColumnIndexes.add(null);
- currentOffsetIndexes.add(null);
+ // copy the data for all chunks
+ long start = -1;
+ long length = 0;
+ long blockUncompressedSize = 0L;
+ for (int i = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
- Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
- currentBlock.addColumn(ColumnChunkMetaData.get(
- chunk.getPath(),
- chunk.getPrimitiveType(),
- chunk.getCodec(),
- chunk.getEncodingStats(),
- chunk.getEncodings(),
- chunk.getStatistics(),
- offsets.firstDataPageOffset,
- offsets.dictionaryPageOffset,
- chunk.getValueCount(),
- chunk.getTotalSize(),
- chunk.getTotalUncompressedSize()));
+ // get this chunk's start position in the new file
+ long newChunkStart = out.getPos() + length;
- blockUncompressedSize += chunk.getTotalUncompressedSize();
- }
+ // add this chunk to be copied with any previous chunks
+ if (start < 0) {
+ // no previous chunk included, start at this chunk's starting pos
+ start = chunk.getStartingPos();
+ }
+ length += chunk.getTotalSize();
+
+ if ((i + 1) == columnsInOrder.size()
+ || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
+ // not contiguous. do the copy now.
+ copy(from, out, start, length);
+ // reset to start at the next column chunk
+ start = -1;
+ length = 0;
+ }
- currentBlock.setTotalByteSize(blockUncompressedSize);
+ // TODO: column/offset indexes are not copied
+ // (it would require seeking to the end of the file for each row groups)
+ currentColumnIndexes.add(null);
+ currentOffsetIndexes.add(null);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
+
+ blockUncompressedSize += chunk.getTotalUncompressedSize();
+ }
- endBlock();
+ currentBlock.setTotalByteSize(blockUncompressedSize);
+
+ endBlock();
+ });
}
/**
@@ -1735,36 +1798,41 @@ public void appendColumnChunk(
ColumnIndex columnIndex,
OffsetIndex offsetIndex)
throws IOException {
- long start = chunk.getStartingPos();
- long length = chunk.getTotalSize();
- long newChunkStart = out.getPos();
+ withAbortOnFailure(() -> {
+ long start = chunk.getStartingPos();
+ long length = chunk.getTotalSize();
+ long newChunkStart = out.getPos();
- if (offsetIndex != null && newChunkStart != start) {
- offsetIndex =
- OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart - start);
- }
+ OffsetIndex effectiveOffsetIndex = offsetIndex;
- copy(from, out, start, length);
+ if (effectiveOffsetIndex != null && newChunkStart != start) {
+ effectiveOffsetIndex = OffsetIndexBuilder.getBuilder()
+ .fromOffsetIndex(effectiveOffsetIndex)
+ .build(newChunkStart - start);
+ }
- currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
- currentColumnIndexes.add(columnIndex);
- currentOffsetIndexes.add(offsetIndex);
+ copy(from, out, start, length);
- Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
- currentBlock.addColumn(ColumnChunkMetaData.get(
- chunk.getPath(),
- chunk.getPrimitiveType(),
- chunk.getCodec(),
- chunk.getEncodingStats(),
- chunk.getEncodings(),
- chunk.getStatistics(),
- offsets.firstDataPageOffset,
- offsets.dictionaryPageOffset,
- chunk.getValueCount(),
- chunk.getTotalSize(),
- chunk.getTotalUncompressedSize()));
+ currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+ currentColumnIndexes.add(columnIndex);
+ currentOffsetIndexes.add(effectiveOffsetIndex);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
- currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+ currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
+ });
}
// Buffers for the copy function.
@@ -1804,17 +1872,25 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long
* @throws IOException if there is an error while writing
*/
public void end(Map extraMetaData) throws IOException {
- try {
- state = state.end();
- serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
- serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
- serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
- LOG.debug("{}: end", out.getPos());
- this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out, fileEncryptor, metadataConverter);
- } finally {
- close();
- }
+ withAbortOnFailure(() -> {
+ try {
+ state = state.end();
+ serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+ serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+ serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
+ LOG.debug("{}: end", out.getPos());
+ this.footer =
+ new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
+ serializeFooter(footer, out, fileEncryptor, metadataConverter);
+ } finally {
+ close();
+ }
+ });
+ }
+
+ /* Mark the writer as aborted to avoid flushing incomplete data. */
+ public void abort() {
+ aborted = true;
}
@Override
@@ -1822,8 +1898,13 @@ public void close() throws IOException {
if (closed) {
return;
}
- try (PositionOutputStream temp = out) {
- temp.flush();
+
+ try {
+ if (!aborted) {
+ try (PositionOutputStream temp = out) {
+ temp.flush();
+ }
+ }
if (crcAllocator != null) {
crcAllocator.close();
}
@@ -2274,11 +2355,11 @@ static ParquetMetadata mergeFooters(
* @throws IOException if there is an error while getting the current stream's position
*/
public long getPos() throws IOException {
- return out.getPos();
+ return withAbortOnFailure(() -> out.getPos());
}
public long getNextRowGroupSize() throws IOException {
- return alignment.nextRowGroupSize(out);
+ return withAbortOnFailure(() -> alignment.nextRowGroupSize(out));
}
/**
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
new file mode 100644
index 0000000000..f9f121b998
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestParquetFileReaderMaxMessageSize {
+
+ public static Path TEST_FILE;
+ public MessageType schema;
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ @Before
+ public void testSetup() throws IOException {
+
+ File testParquetFile = temp.newFile();
+ testParquetFile.delete();
+
+ TEST_FILE = new Path(testParquetFile.toURI());
+ // Create a file with many columns
+ StringBuilder schemaBuilder = new StringBuilder("message test_schema {");
+ for (int i = 0; i < 2000; i++) {
+ schemaBuilder.append("required int64 col_").append(i).append(";");
+ }
+ schemaBuilder.append("}");
+
+ schema = MessageTypeParser.parseMessageType(schemaBuilder.toString());
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ try (ParquetWriter writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf))
+ .withConf(conf)
+ .withType(schema)
+ .build()) {
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ Group group = factory.newGroup();
+ for (int col = 0; col < 2000; col++) {
+ group.append("col_" + col, 1L);
+ }
+ writer.write(group);
+ }
+ }
+
+ /**
+ * Test reading a file with many columns using custom max message size
+ */
+ @Test
+ public void testReadFileWithManyColumns() throws IOException {
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024);
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(schema, metadata.getFileMetaData().getSchema());
+ assertTrue(metadata.getBlocks().size() > 0);
+ }
+ }
+
+ /**
+ * Test that default configuration works for normal files
+ */
+ @Test
+ public void testReadNormalFileWithDefaultConfig() throws IOException {
+ // Read with default configuration (no custom max message size)
+ Configuration readConf = new Configuration();
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+
+ ParquetMetadata metadata = reader.getFooter();
+ assertNotNull(metadata);
+ assertEquals(1, metadata.getBlocks().get(0).getRowCount());
+ }
+ }
+
+ /**
+ * Test that insufficient max message size produces error
+ */
+ @Test
+ public void testInsufficientMaxMessageSizeError() throws IOException {
+ // Try to read with very small max message size
+ Configuration readConf = new Configuration();
+ readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte
+
+ ParquetReadOptions options = HadoopReadOptions.builder(readConf).build();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) {
+ fail("Should have thrown Message size exceeds limit due to MaxMessageSize");
+ } catch (IOException e) {
+ e.printStackTrace();
+ assertTrue(
+ "Error should mention TTransportException",
+ e.getMessage().contains("Message size exceeds limit")
+ || e.getCause().getMessage().contains("Message size exceeds limit")
+ || e.getMessage().contains("MaxMessageSize reached")
+ || e.getCause().getMessage().contains("MaxMessageSize reached"));
+ }
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 2cd83624f6..38b66d7708 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -44,6 +44,7 @@
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -52,6 +53,7 @@
import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -722,4 +724,42 @@ private void testV2WriteAllNullValues(
}
}
}
+
+ @Test
+ public void testNoFlushAfterException() throws Exception {
+ final File testDir = temp.newFile();
+ testDir.delete();
+
+ final Path file = new Path(testDir.getAbsolutePath(), "test.parquet");
+
+ MessageType schema = Types.buildMessage()
+ .required(BINARY)
+ .named("binary_field")
+ .required(INT32)
+ .named("int32_field")
+ .named("test_schema_abort");
+ Configuration conf = new Configuration();
+
+ try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString()))
+ .withAllocator(allocator)
+ .withType(schema)
+ .build()) {
+
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ writer.write(f.newGroup().append("binary_field", "hello").append("int32_field", 123));
+
+ Field internalWriterField = ParquetWriter.class.getDeclaredField("writer");
+ internalWriterField.setAccessible(true);
+ Object internalWriter = internalWriterField.get(writer);
+
+ Field abortedField = internalWriter.getClass().getDeclaredField("aborted");
+ abortedField.setAccessible(true);
+ abortedField.setBoolean(internalWriter, true);
+ writer.close();
+ }
+
+ // After closing, check whether file exists or is empty
+ FileSystem fs = file.getFileSystem(conf);
+ assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0);
+ }
}
diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml
index e61f250f42..b35807212c 100644
--- a/parquet-jackson/pom.xml
+++ b/parquet-jackson/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-plugins/parquet-encoding-vector/pom.xml b/parquet-plugins/parquet-encoding-vector/pom.xml
index 97a8557ea7..eed9729cf0 100644
--- a/parquet-plugins/parquet-encoding-vector/pom.xml
+++ b/parquet-plugins/parquet-encoding-vector/pom.xml
@@ -22,7 +22,7 @@
org.apache.parquet
parquet
- 1.16.0
+ 1.16.1-SNAPSHOT
../../pom.xml
diff --git a/parquet-plugins/parquet-plugins-benchmarks/pom.xml b/parquet-plugins/parquet-plugins-benchmarks/pom.xml
index d6c5186eb2..74d4c59021 100644
--- a/parquet-plugins/parquet-plugins-benchmarks/pom.xml
+++ b/parquet-plugins/parquet-plugins-benchmarks/pom.xml
@@ -22,7 +22,7 @@
org.apache.parquet
parquet
- 1.16.0
+ 1.16.1-SNAPSHOT
../../pom.xml
diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index bb985f523c..0de3527343 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 62552f44f1..adf4eb2de7 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/parquet-variant/pom.xml b/parquet-variant/pom.xml
index 684658b364..156df2bcd7 100644
--- a/parquet-variant/pom.xml
+++ b/parquet-variant/pom.xml
@@ -21,7 +21,7 @@
org.apache.parquet
parquet
../pom.xml
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
4.0.0
diff --git a/pom.xml b/pom.xml
index e98d7684ad..976f3779b1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
org.apache.parquet
parquet
- 1.16.0-SNAPSHOT
+ 1.16.1-SNAPSHOT
pom
Apache Parquet Java
@@ -85,7 +85,7 @@
3.3.0
2.12.0
- 1.15.1
+ 1.16.0
thrift
${thrift.executable}
0.16.0