diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml index 320c7a7d9c..5078a8a2c4 100644 --- a/parquet-arrow/pom.xml +++ b/parquet-arrow/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index 27cabb757f..74ce50c0ac 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 77df2c101d..2364ef3caf 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index 1eab1c162e..64b09fd723 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 01b5b8e8c6..f43ae3ba9c 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java index 6beff4da93..944cfb58eb 100644 --- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java +++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java @@ -119,6 +119,12 @@ public Optional visit( LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) { + return of(PrimitiveComparator.SIGNED_INT64_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for INT64 logical type: " + logicalType)); @@ -183,6 +189,12 @@ public Optional visit( } return empty(); } + + @Override + public Optional visit( + LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) { + return of(PrimitiveComparator.SIGNED_INT32_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for INT32 logical type: " + logicalType)); @@ -283,6 +295,12 @@ public Optional visit( LogicalTypeAnnotation.GeographyLogicalTypeAnnotation geographyLogicalType) { return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) { + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for BINARY logical type: " + logicalType)); @@ -417,6 +435,12 @@ public Optional visit( LogicalTypeAnnotation.Float16LogicalTypeAnnotation float16LogicalType) { return of(PrimitiveComparator.BINARY_AS_FLOAT16_COMPARATOR); } + + @Override + public Optional visit( + LogicalTypeAnnotation.UnknownLogicalTypeAnnotation unknownLogicalTypeAnnotation) { + return of(PrimitiveComparator.UNSIGNED_LEXICOGRAPHICAL_BINARY_COMPARATOR); + } }) .orElseThrow(() -> new ShouldNeverHappenException( "No comparator logic implemented for FIXED_LEN_BYTE_ARRAY logical type: " diff --git a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java index d3d1b15bc6..8fb53aca0f 100644 --- a/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java +++ b/parquet-column/src/test/java/org/apache/parquet/schema/TestPrimitiveComparator.java @@ -100,6 +100,29 @@ private void testInt32Comparator(PrimitiveComparator comparator, Intege checkThrowingUnsupportedException(comparator, Integer.TYPE); } + @Test + public void testUnknownLogicalTypeComparator() { + PrimitiveType.PrimitiveTypeName[] types = new PrimitiveType.PrimitiveTypeName[] { + PrimitiveType.PrimitiveTypeName.BOOLEAN, + PrimitiveType.PrimitiveTypeName.BINARY, + PrimitiveType.PrimitiveTypeName.INT32, + PrimitiveType.PrimitiveTypeName.INT64, + PrimitiveType.PrimitiveTypeName.FLOAT, + PrimitiveType.PrimitiveTypeName.DOUBLE, + PrimitiveType.PrimitiveTypeName.INT96, + PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY + }; + + for (PrimitiveType.PrimitiveTypeName type : types) { + assertEquals( + new PrimitiveType(Type.Repetition.REQUIRED, type, "vo") + .withLogicalTypeAnnotation(LogicalTypeAnnotation.unknownType()) + .comparator() + .compare(null, null), + 0); + } + } + @Test public void testSignedInt64Comparator() { testInt64Comparator( diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index 07d322a799..ab9e2fe682 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml index eebdde1ef0..f06e50e3f3 100644 --- a/parquet-encoding/pom.xml +++ b/parquet-encoding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index a63e1c696e..1e998a5d68 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -24,7 +24,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT parquet-format-structures diff --git a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java index d7a4c330c9..776fb45576 100644 --- a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java +++ b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java @@ -45,6 +45,7 @@ import org.apache.parquet.format.event.TypedConsumer.I64Consumer; import org.apache.parquet.format.event.TypedConsumer.StringConsumer; import org.apache.thrift.TBase; +import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; import org.apache.thrift.protocol.TProtocol; @@ -59,6 +60,7 @@ public class Util { private static final int INIT_MEM_ALLOC_ENCR_BUFFER = 100; + private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream to) throws IOException { writeColumnIndex(columnIndex, to, null, null); @@ -156,6 +158,15 @@ public static FileMetaData readFileMetaData(InputStream from, BlockCipher.Decryp return read(from, new FileMetaData(), decryptor, AAD); } + public static FileMetaData readFileMetaData(InputStream from, int maxMessageSize) throws IOException { + return readFileMetaData(from, null, null, maxMessageSize); + } + + public static FileMetaData readFileMetaData( + InputStream from, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) throws IOException { + return read(from, new FileMetaData(), decryptor, AAD, maxMessageSize); + } + public static void writeColumnMetaData( ColumnMetaData columnMetaData, OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException { @@ -190,6 +201,18 @@ public static FileMetaData readFileMetaData( return md; } + public static FileMetaData readFileMetaData( + InputStream from, boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) + throws IOException { + FileMetaData md = new FileMetaData(); + if (skipRowGroups) { + readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups, decryptor, AAD, maxMessageSize); + } else { + read(from, md, decryptor, AAD, maxMessageSize); + } + return md; + } + public static void writeFileCryptoMetaData( org.apache.parquet.format.FileCryptoMetaData cryptoMetadata, OutputStream to) throws IOException { write(cryptoMetadata, to, null, null); @@ -293,6 +316,17 @@ public static void readFileMetaData( BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + readFileMetaData(input, consumer, skipRowGroups, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE); + } + + public static void readFileMetaData( + final InputStream input, + final FileMetaDataConsumer consumer, + boolean skipRowGroups, + BlockCipher.Decryptor decryptor, + byte[] AAD, + int maxMessageSize) + throws IOException { try { DelegatingFieldConsumer eventConsumer = fieldConsumer() .onField(VERSION, new I32Consumer() { @@ -358,26 +392,54 @@ public void consume(RowGroup rowGroup) { byte[] plainText = decryptor.decrypt(input, AAD); from = new ByteArrayInputStream(plainText); } - new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer); + new EventBasedThriftReader(protocol(from, maxMessageSize)).readStruct(eventConsumer); } catch (TException e) { throw new IOException("can not read FileMetaData: " + e.getMessage(), e); } } private static TProtocol protocol(OutputStream to) throws TTransportException { - return protocol(new TIOStreamTransport(to)); + return protocol(new TIOStreamTransport(to), DEFAULT_MAX_MESSAGE_SIZE); } private static TProtocol protocol(InputStream from) throws TTransportException { - return protocol(new TIOStreamTransport(from)); + return protocol(new TIOStreamTransport(from), DEFAULT_MAX_MESSAGE_SIZE); + } + + private static TProtocol protocol(InputStream from, int maxMessageSize) throws TTransportException { + return protocol(new TIOStreamTransport(from), maxMessageSize); } - private static InterningProtocol protocol(TIOStreamTransport t) { + private static InterningProtocol protocol(TIOStreamTransport t, int configuredMaxMessageSize) + throws TTransportException, NumberFormatException { + int maxMessageSize = configuredMaxMessageSize; + if (configuredMaxMessageSize == -1) { + // Set to default 100 MB + maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; + } + if (configuredMaxMessageSize <= 0) { + throw new NumberFormatException("Max message size must be positive: " + configuredMaxMessageSize); + } + + TConfiguration config = t.getConfiguration(); + config.setMaxMessageSize(maxMessageSize); + /* + Reset known message size to 0 to force checking against the max message size. + This is necessary when reusing the same transport for multiple reads/writes, + as the known message size may be larger than the max message size. + */ + t.updateKnownMessageSize(0); return new InterningProtocol(new TCompactProtocol(t)); } private static > T read( final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException { + return read(input, tbase, decryptor, AAD, DEFAULT_MAX_MESSAGE_SIZE); + } + + private static > T read( + final InputStream input, T tbase, BlockCipher.Decryptor decryptor, byte[] AAD, int maxMessageSize) + throws IOException { final InputStream from; if (null == decryptor) { from = input; @@ -387,7 +449,7 @@ private static InterningProtocol protocol(TIOStreamTransport t) { } try { - tbase.read(protocol(from)); + tbase.read(protocol(from, maxMessageSize)); return tbase; } catch (TException e) { throw new IOException("can not read " + tbase.getClass() + ": " + e.getMessage(), e); diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml index df056ca609..4ed1d6032e 100644 --- a/parquet-generator/pom.xml +++ b/parquet-generator/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml index 4e5f727b73..77411e3b88 100644 --- a/parquet-hadoop-bundle/pom.xml +++ b/parquet-hadoop-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index 687310d9e2..5bdffb50d3 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index d20ac7faeb..a1a256329b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -147,6 +147,15 @@ public class ParquetMetadataConverter { public static final MetadataFilter SKIP_ROW_GROUPS = new SkipMetadataFilter(); public static final long MAX_STATS_SIZE = 4096; // limit stats to 4k + /** + * Configuration property to control the Thrift max message size when reading Parquet metadata. + * This is useful for files with very large metadata + * Default value is 100 MB. + */ + public static final String PARQUET_THRIFT_STRING_SIZE_LIMIT = "parquet.thrift.string.size.limit"; + + private static final int DEFAULT_MAX_MESSAGE_SIZE = 104857600; // 100 MB + private static final Logger LOG = LoggerFactory.getLogger(ParquetMetadataConverter.class); private static final LogicalTypeConverterVisitor LOGICAL_TYPE_ANNOTATION_VISITOR = new LogicalTypeConverterVisitor(); @@ -154,6 +163,7 @@ public class ParquetMetadataConverter { new ConvertedTypeConverterVisitor(); private final int statisticsTruncateLength; private final boolean useSignedStringMinMax; + private final ParquetReadOptions options; public ParquetMetadataConverter() { this(false); @@ -173,7 +183,7 @@ public ParquetMetadataConverter(Configuration conf) { } public ParquetMetadataConverter(ParquetReadOptions options) { - this(options.useSignedStringMinMax()); + this(options.useSignedStringMinMax(), ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH, options); } private ParquetMetadataConverter(boolean useSignedStringMinMax) { @@ -181,11 +191,30 @@ private ParquetMetadataConverter(boolean useSignedStringMinMax) { } private ParquetMetadataConverter(boolean useSignedStringMinMax, int statisticsTruncateLength) { + this(useSignedStringMinMax, statisticsTruncateLength, null); + } + + private ParquetMetadataConverter( + boolean useSignedStringMinMax, int statisticsTruncateLength, ParquetReadOptions options) { if (statisticsTruncateLength <= 0) { throw new IllegalArgumentException("Truncate length should be greater than 0"); } this.useSignedStringMinMax = useSignedStringMinMax; this.statisticsTruncateLength = statisticsTruncateLength; + this.options = options; + } + + /** + * Gets the configured max message size for Thrift deserialization. + * Reads from ParquetReadOptions configuration, or returns -1 if not available. + * + * @return the max message size in bytes, or -1 to use the default + */ + private int getMaxMessageSize() { + if (options != null && options.getConfiguration() != null) { + return options.getConfiguration().getInt(PARQUET_THRIFT_STRING_SIZE_LIMIT, DEFAULT_MAX_MESSAGE_SIZE); + } + return -1; } // NOTE: this cache is for memory savings, not cpu savings, and is used to de-duplicate @@ -1694,21 +1723,27 @@ public ParquetMetadata readParquetMetadata( filter.accept(new MetadataFilterVisitor() { @Override public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); return new FileMetaDataAndRowGroupOffsetInfo( fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD, maxMessageSize); return new FileMetaDataAndRowGroupOffsetInfo( fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); // We must generate the map *before* filtering because it modifies `fileMetadata`. Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter); @@ -1717,7 +1752,9 @@ public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) thro @Override public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException { - FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + int maxMessageSize = getMaxMessageSize(); + FileMetaData fileMetadata = + readFileMetaData(from, footerDecryptor, encryptedFooterAAD, maxMessageSize); // We must generate the map *before* filtering because it modifies `fileMetadata`. Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index f296286800..41b068d01a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -129,6 +129,7 @@ public void close() throws IOException, InterruptedException { if (!closed) { try { if (aborted) { + parquetFileWriter.abort(); return; } flushRowGroupToStore(); @@ -140,6 +141,9 @@ public void close() throws IOException, InterruptedException { } finalMetadata.putAll(finalWriteContext.getExtraMetaData()); parquetFileWriter.end(finalMetadata); + } catch (Exception e) { + parquetFileWriter.abort(); + throw e; } finally { AutoCloseables.uncheckedClose(columnStore, pageStore, bloomFilterWriteStore, parquetFileWriter); closed = true; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index 4d17a1d6e4..82f4577b83 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -173,6 +173,7 @@ public static enum Mode { // set when end is called private ParquetMetadata footer = null; + private boolean aborted; private boolean closed; private final CRC32 crc; @@ -335,6 +336,34 @@ public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode, long ro ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED); } + @FunctionalInterface + interface IOCallable { + T call() throws IOException; + } + + private T withAbortOnFailure(IOCallable action) throws IOException { + try { + return action.call(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + + @FunctionalInterface + interface IORunnable { + void run() throws IOException; + } + + private void withAbortOnFailure(IORunnable action) throws IOException { + try { + action.run(); + } catch (IOException e) { + aborted = true; + throw e; + } + } + /** * @param file OutputFile to create or overwrite * @param schema the schema of the data @@ -565,13 +594,15 @@ private ParquetFileWriter( * @throws IOException if there is an error while writing */ public void start() throws IOException { - state = state.start(); - LOG.debug("{}: start", out.getPos()); - byte[] magic = MAGIC; - if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { - magic = EFMAGIC; - } - out.write(magic); + withAbortOnFailure(() -> { + state = state.start(); + LOG.debug("{}: start", out.getPos()); + byte[] magic = MAGIC; + if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) { + magic = EFMAGIC; + } + out.write(magic); + }); } public InternalFileEncryptor getEncryptor() { @@ -585,19 +616,21 @@ public InternalFileEncryptor getEncryptor() { * @throws IOException if there is an error while writing */ public void startBlock(long recordCount) throws IOException { - state = state.startBlock(); - LOG.debug("{}: start block", out.getPos()); - // out.write(MAGIC); // TODO: add a magic delimiter + withAbortOnFailure(() -> { + state = state.startBlock(); + LOG.debug("{}: start block", out.getPos()); + // out.write(MAGIC); // TODO: add a magic delimiter - alignment.alignForRowGroup(out); + alignment.alignForRowGroup(out); - currentBlock = new BlockMetaData(); - currentRecordCount = recordCount; + currentBlock = new BlockMetaData(); + currentRecordCount = recordCount; - currentColumnIndexes = new ArrayList<>(); - currentOffsetIndexes = new ArrayList<>(); + currentColumnIndexes = new ArrayList<>(); + currentOffsetIndexes = new ArrayList<>(); - currentBloomFilters = new HashMap<>(); + currentBloomFilters = new HashMap<>(); + }); } /** @@ -610,28 +643,31 @@ public void startBlock(long recordCount) throws IOException { */ public void startColumn(ColumnDescriptor descriptor, long valueCount, CompressionCodecName compressionCodecName) throws IOException { - state = state.startColumn(); - encodingStatsBuilder.clear(); - currentEncodings = new HashSet(); - currentChunkPath = ColumnPath.get(descriptor.getPath()); - currentChunkType = descriptor.getPrimitiveType(); - currentChunkCodec = compressionCodecName; - currentChunkValueCount = valueCount; - currentChunkFirstDataPage = -1; - compressedLength = 0; - uncompressedLength = 0; - // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one - currentStatistics = null; - currentSizeStatistics = SizeStatistics.newBuilder( - descriptor.getPrimitiveType(), - descriptor.getMaxRepetitionLevel(), - descriptor.getMaxDefinitionLevel()) - .build(); - currentGeospatialStatistics = - GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build(); - - columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); - offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + withAbortOnFailure(() -> { + state = state.startColumn(); + encodingStatsBuilder.clear(); + currentEncodings = new HashSet(); + currentChunkPath = ColumnPath.get(descriptor.getPath()); + currentChunkType = descriptor.getPrimitiveType(); + currentChunkCodec = compressionCodecName; + currentChunkValueCount = valueCount; + currentChunkFirstDataPage = -1; + compressedLength = 0; + uncompressedLength = 0; + // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed + // one + currentStatistics = null; + currentSizeStatistics = SizeStatistics.newBuilder( + descriptor.getPrimitiveType(), + descriptor.getMaxRepetitionLevel(), + descriptor.getMaxDefinitionLevel()) + .build(); + currentGeospatialStatistics = GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()) + .build(); + + columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength); + offsetIndexBuilder = OffsetIndexBuilder.getBuilder(); + }); } /** @@ -641,45 +677,51 @@ public void startColumn(ColumnDescriptor descriptor, long valueCount, Compressio * @throws IOException if there is an error while writing */ public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException { - writeDictionaryPage(dictionaryPage, null, null); + withAbortOnFailure(() -> { + writeDictionaryPage(dictionaryPage, null, null); + }); } public void writeDictionaryPage( DictionaryPage dictionaryPage, BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException { - state = state.write(); - LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); - currentChunkDictionaryPageOffset = out.getPos(); - int uncompressedSize = dictionaryPage.getUncompressedSize(); - int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(dictionaryPage.getBytes()); - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - (int) crc.getValue(), - out, - headerBlockEncryptor, - AAD); - } else { - metadataConverter.writeDictionaryPageHeader( - uncompressedSize, - compressedPageSize, - dictionaryPage.getDictionarySize(), - dictionaryPage.getEncoding(), - out, - headerBlockEncryptor, - AAD); - } - long headerSize = out.getPos() - currentChunkDictionaryPageOffset; - this.uncompressedLength += uncompressedSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); - dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted - encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); - currentEncodings.add(dictionaryPage.getEncoding()); + withAbortOnFailure(() -> { + state = state.write(); + LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize()); + currentChunkDictionaryPageOffset = out.getPos(); + int uncompressedSize = dictionaryPage.getUncompressedSize(); + int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size()); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(dictionaryPage.getBytes()); + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + (int) crc.getValue(), + out, + headerBlockEncryptor, + AAD); + } else { + metadataConverter.writeDictionaryPageHeader( + uncompressedSize, + compressedPageSize, + dictionaryPage.getDictionarySize(), + dictionaryPage.getEncoding(), + out, + headerBlockEncryptor, + AAD); + } + long headerSize = out.getPos() - currentChunkDictionaryPageOffset; + this.uncompressedLength += uncompressedSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize); + dictionaryPage + .getBytes() + .writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted + encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding()); + currentEncodings.add(dictionaryPage.getEncoding()); + }); } /** @@ -871,22 +913,24 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - long beforeHeader = out.getPos(); - innerWriteDataPage( - valueCount, - uncompressedPageSize, - bytes, - statistics, - rlEncoding, - dlEncoding, - valuesEncoding, - metadataBlockEncryptor, - pageHeaderAAD, - sizeStatistics); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + withAbortOnFailure(() -> { + long beforeHeader = out.getPos(); + innerWriteDataPage( + valueCount, + uncompressedPageSize, + bytes, + statistics, + rlEncoding, + dlEncoding, + valuesEncoding, + metadataBlockEncryptor, + pageHeaderAAD, + sizeStatistics); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void innerWriteDataPage( @@ -978,51 +1022,53 @@ public void writeDataPage( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); - int compressedPageSize = toIntWithCheck(bytes.size(), "page"); - if (pageWriteChecksumEnabled) { - crc.reset(); - crcUpdate(bytes); - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV1Header( - uncompressedPageSize, - compressedPageSize, - valueCount, - rlEncoding, - dlEncoding, - valuesEncoding, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headerSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedPageSize + headerSize; - this.compressedLength += compressedPageSize + headerSize; - LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); - bytes.writeAllTo(out); + withAbortOnFailure(() -> { + state = state.write(); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; + } + LOG.debug("{}: write data page: {} values", beforeHeader, valueCount); + int compressedPageSize = toIntWithCheck(bytes.size(), "page"); + if (pageWriteChecksumEnabled) { + crc.reset(); + crcUpdate(bytes); + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV1Header( + uncompressedPageSize, + compressedPageSize, + valueCount, + rlEncoding, + dlEncoding, + valuesEncoding, + out, + metadataBlockEncryptor, + pageHeaderAAD); + } + long headerSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedPageSize + headerSize; + this.compressedLength += compressedPageSize + headerSize; + LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize); + bytes.writeAllTo(out); - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - encodingStatsBuilder.addDataEncoding(valuesEncoding); - currentEncodings.add(rlEncoding); - currentEncodings.add(dlEncoding); - currentEncodings.add(valuesEncoding); + encodingStatsBuilder.addDataEncoding(valuesEncoding); + currentEncodings.add(rlEncoding); + currentEncodings.add(dlEncoding); + currentEncodings.add(valuesEncoding); + }); } /** @@ -1297,76 +1343,79 @@ public void writeDataPageV2( byte[] pageHeaderAAD, SizeStatistics sizeStatistics) throws IOException { - state = state.write(); - int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); - int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); + withAbortOnFailure(() -> { + state = state.write(); + int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); + int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = + toIntWithCheck(bytes.size() + repetitionLevels.size() + definitionLevels.size(), "page"); - int uncompressedSize = - toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); + int uncompressedSize = + toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); - long beforeHeader = out.getPos(); - if (currentChunkFirstDataPage < 0) { - currentChunkFirstDataPage = beforeHeader; - } - - if (pageWriteChecksumEnabled) { - crc.reset(); - if (repetitionLevels.size() > 0) { - crcUpdate(repetitionLevels); - } - if (definitionLevels.size() > 0) { - crcUpdate(definitionLevels); + long beforeHeader = out.getPos(); + if (currentChunkFirstDataPage < 0) { + currentChunkFirstDataPage = beforeHeader; } - if (bytes.size() > 0) { - crcUpdate(bytes); + + if (pageWriteChecksumEnabled) { + crc.reset(); + if (repetitionLevels.size() > 0) { + crcUpdate(repetitionLevels); + } + if (definitionLevels.size() > 0) { + crcUpdate(definitionLevels); + } + if (bytes.size() > 0) { + crcUpdate(bytes); + } + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed, + (int) crc.getValue(), + out, + metadataBlockEncryptor, + pageHeaderAAD); + } else { + metadataConverter.writeDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength, + compressed, + out, + metadataBlockEncryptor, + pageHeaderAAD); } - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - compressed, - (int) crc.getValue(), - out, - metadataBlockEncryptor, - pageHeaderAAD); - } else { - metadataConverter.writeDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - compressed, - out, - metadataBlockEncryptor, - pageHeaderAAD); - } - long headersSize = out.getPos() - beforeHeader; - this.uncompressedLength += uncompressedSize + headersSize; - this.compressedLength += compressedSize + headersSize; + long headersSize = out.getPos() - beforeHeader; + this.uncompressedLength += uncompressedSize + headersSize; + this.compressedLength += compressedSize + headersSize; - mergeColumnStatistics(statistics, sizeStatistics); + mergeColumnStatistics(statistics, sizeStatistics); - currentEncodings.add(dataEncoding); - encodingStatsBuilder.addDataEncoding(dataEncoding); + currentEncodings.add(dataEncoding); + encodingStatsBuilder.addDataEncoding(dataEncoding); - BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out); + BytesInput.concat(repetitionLevels, definitionLevels, bytes).writeAllTo(out); - offsetIndexBuilder.add( - toIntWithCheck(out.getPos() - beforeHeader, "page"), - rowCount, - sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + offsetIndexBuilder.add( + toIntWithCheck(out.getPos() - beforeHeader, "page"), + rowCount, + sizeStatistics != null ? sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty()); + }); } private void crcUpdate(BytesInput bytes) { @@ -1457,58 +1506,61 @@ void writeColumnChunk( int columnOrdinal, byte[] fileAAD) throws IOException { - startColumn(descriptor, valueCount, compressionCodecName); - - state = state.write(); - if (dictionaryPage != null) { - byte[] dictonaryPageHeaderAAD = null; - if (null != headerBlockEncryptor) { - dictonaryPageHeaderAAD = AesCipher.createModuleAAD( - fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + withAbortOnFailure(() -> { + startColumn(descriptor, valueCount, compressionCodecName); + + state = state.write(); + if (dictionaryPage != null) { + byte[] dictonaryPageHeaderAAD = null; + if (null != headerBlockEncryptor) { + dictonaryPageHeaderAAD = AesCipher.createModuleAAD( + fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, columnOrdinal, -1); + } + writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); } - writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD); - } - if (bloomFilter != null) { - // write bloom filter if one of data pages is not dictionary encoded - boolean isWriteBloomFilter = false; - for (Encoding encoding : dataEncodings) { - // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in parquet v2 - if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { - isWriteBloomFilter = true; - break; + if (bloomFilter != null) { + // write bloom filter if one of data pages is not dictionary encoded + boolean isWriteBloomFilter = false; + for (Encoding encoding : dataEncodings) { + // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, `RLE_DICTIONARY` is used in + // parquet v2 + if (encoding != Encoding.PLAIN_DICTIONARY && encoding != Encoding.RLE_DICTIONARY) { + isWriteBloomFilter = true; + break; + } + } + if (isWriteBloomFilter) { + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + } else { + LOG.info( + "No need to write bloom filter because column {} data pages are all encoded as dictionary.", + descriptor.getPath()); } } - if (isWriteBloomFilter) { - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - } else { - LOG.info( - "No need to write bloom filter because column {} data pages are all encoded as dictionary.", - descriptor.getPath()); + LOG.debug("{}: write data pages", out.getPos()); + long headersSize = bytes.size() - compressedTotalPageSize; + this.uncompressedLength += uncompressedTotalPageSize + headersSize; + this.compressedLength += compressedTotalPageSize + headersSize; + LOG.debug("{}: write data pages content", out.getPos()); + currentChunkFirstDataPage = out.getPos(); + bytes.writeAllTo(out); + encodingStatsBuilder.addDataEncodings(dataEncodings); + if (rlEncodings.isEmpty()) { + encodingStatsBuilder.withV2Pages(); } - } - LOG.debug("{}: write data pages", out.getPos()); - long headersSize = bytes.size() - compressedTotalPageSize; - this.uncompressedLength += uncompressedTotalPageSize + headersSize; - this.compressedLength += compressedTotalPageSize + headersSize; - LOG.debug("{}: write data pages content", out.getPos()); - currentChunkFirstDataPage = out.getPos(); - bytes.writeAllTo(out); - encodingStatsBuilder.addDataEncodings(dataEncodings); - if (rlEncodings.isEmpty()) { - encodingStatsBuilder.withV2Pages(); - } - currentEncodings.addAll(rlEncodings); - currentEncodings.addAll(dlEncodings); - currentEncodings.addAll(dataEncodings); - currentStatistics = totalStats; - currentSizeStatistics = totalSizeStats; - currentGeospatialStatistics = totalGeospatialStats; + currentEncodings.addAll(rlEncodings); + currentEncodings.addAll(dlEncodings); + currentEncodings.addAll(dataEncodings); + currentStatistics = totalStats; + currentSizeStatistics = totalSizeStats; + currentGeospatialStatistics = totalGeospatialStats; - this.columnIndexBuilder = columnIndexBuilder; - this.offsetIndexBuilder = offsetIndexBuilder; + this.columnIndexBuilder = columnIndexBuilder; + this.offsetIndexBuilder = offsetIndexBuilder; - endColumn(); + endColumn(); + }); } /** @@ -1530,34 +1582,36 @@ public void invalidateStatistics(Statistics totalStatistics) { * @throws IOException if there is an error while writing */ public void endColumn() throws IOException { - state = state.endColumn(); - LOG.debug("{}: end column", out.getPos()); - if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { - currentColumnIndexes.add(null); - } else { - currentColumnIndexes.add(columnIndexBuilder.build()); - } - currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); - currentBlock.addColumn(ColumnChunkMetaData.get( - currentChunkPath, - currentChunkType, - currentChunkCodec, - encodingStatsBuilder.build(), - currentEncodings, - currentStatistics, - currentChunkFirstDataPage, - currentChunkDictionaryPageOffset, - currentChunkValueCount, - compressedLength, - uncompressedLength, - currentSizeStatistics, - currentGeospatialStatistics)); - this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); - this.uncompressedLength = 0; - this.compressedLength = 0; - this.currentChunkDictionaryPageOffset = 0; - columnIndexBuilder = null; - offsetIndexBuilder = null; + withAbortOnFailure(() -> { + state = state.endColumn(); + LOG.debug("{}: end column", out.getPos()); + if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) { + currentColumnIndexes.add(null); + } else { + currentColumnIndexes.add(columnIndexBuilder.build()); + } + currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage)); + currentBlock.addColumn(ColumnChunkMetaData.get( + currentChunkPath, + currentChunkType, + currentChunkCodec, + encodingStatsBuilder.build(), + currentEncodings, + currentStatistics, + currentChunkFirstDataPage, + currentChunkDictionaryPageOffset, + currentChunkValueCount, + compressedLength, + uncompressedLength, + currentSizeStatistics, + currentGeospatialStatistics)); + this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); + this.uncompressedLength = 0; + this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; + columnIndexBuilder = null; + offsetIndexBuilder = null; + }); } /** @@ -1566,22 +1620,24 @@ public void endColumn() throws IOException { * @throws IOException if there is an error while writing */ public void endBlock() throws IOException { - if (currentRecordCount == 0) { - throw new ParquetEncodingException("End block with zero record"); - } + withAbortOnFailure(() -> { + if (currentRecordCount == 0) { + throw new ParquetEncodingException("End block with zero record"); + } - state = state.endBlock(); - LOG.debug("{}: end block", out.getPos()); - currentBlock.setRowCount(currentRecordCount); - currentBlock.setOrdinal(blocks.size()); - blocks.add(currentBlock); - columnIndexes.add(currentColumnIndexes); - offsetIndexes.add(currentOffsetIndexes); - bloomFilters.add(currentBloomFilters); - currentColumnIndexes = null; - currentOffsetIndexes = null; - currentBloomFilters = null; - currentBlock = null; + state = state.endBlock(); + LOG.debug("{}: end block", out.getPos()); + currentBlock.setRowCount(currentRecordCount); + currentBlock.setOrdinal(blocks.size()); + blocks.add(currentBlock); + columnIndexes.add(currentColumnIndexes); + offsetIndexes.add(currentOffsetIndexes); + bloomFilters.add(currentBloomFilters); + currentColumnIndexes = null; + currentOffsetIndexes = null; + currentBloomFilters = null; + currentBlock = null; + }); } /** @@ -1598,9 +1654,11 @@ public void appendFile(Configuration conf, Path file) throws IOException { } public void appendFile(InputFile file) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(file)) { - reader.appendTo(this); - } + withAbortOnFailure(() -> { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + reader.appendTo(this); + } + }); } /** @@ -1619,9 +1677,11 @@ public void appendRowGroups(FSDataInputStream file, List rowGroup public void appendRowGroups(SeekableInputStream file, List rowGroups, boolean dropColumns) throws IOException { - for (BlockMetaData block : rowGroups) { - appendRowGroup(file, block, dropColumns); - } + withAbortOnFailure(() -> { + for (BlockMetaData block : rowGroups) { + appendRowGroup(file, block, dropColumns); + } + }); } /** @@ -1639,83 +1699,86 @@ public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup, boole public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, boolean dropColumns) throws IOException { - startBlock(rowGroup.getRowCount()); - - Map columnsToCopy = new HashMap(); - for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { - columnsToCopy.put(chunk.getPath().toDotString(), chunk); - } + withAbortOnFailure(() -> { + startBlock(rowGroup.getRowCount()); - List columnsInOrder = new ArrayList(); - - for (ColumnDescriptor descriptor : schema.getColumns()) { - String path = ColumnPath.get(descriptor.getPath()).toDotString(); - ColumnChunkMetaData chunk = columnsToCopy.remove(path); - if (chunk != null) { - columnsInOrder.add(chunk); - } else { - throw new IllegalArgumentException( - String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + Map columnsToCopy = new HashMap(); + for (ColumnChunkMetaData chunk : rowGroup.getColumns()) { + columnsToCopy.put(chunk.getPath().toDotString(), chunk); } - } - // complain if some columns would be dropped and that's not okay - if (!dropColumns && !columnsToCopy.isEmpty()) { - throw new IllegalArgumentException(String.format( - "Columns cannot be copied (missing from target schema): %s", - String.join(", ", columnsToCopy.keySet()))); - } - - // copy the data for all chunks - long start = -1; - long length = 0; - long blockUncompressedSize = 0L; - for (int i = 0; i < columnsInOrder.size(); i += 1) { - ColumnChunkMetaData chunk = columnsInOrder.get(i); - - // get this chunk's start position in the new file - long newChunkStart = out.getPos() + length; + List columnsInOrder = new ArrayList(); - // add this chunk to be copied with any previous chunks - if (start < 0) { - // no previous chunk included, start at this chunk's starting pos - start = chunk.getStartingPos(); + for (ColumnDescriptor descriptor : schema.getColumns()) { + String path = ColumnPath.get(descriptor.getPath()).toDotString(); + ColumnChunkMetaData chunk = columnsToCopy.remove(path); + if (chunk != null) { + columnsInOrder.add(chunk); + } else { + throw new IllegalArgumentException( + String.format("Missing column '%s', cannot copy row group: %s", path, rowGroup)); + } } - length += chunk.getTotalSize(); - - if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { - // not contiguous. do the copy now. - copy(from, out, start, length); - // reset to start at the next column chunk - start = -1; - length = 0; + + // complain if some columns would be dropped and that's not okay + if (!dropColumns && !columnsToCopy.isEmpty()) { + throw new IllegalArgumentException(String.format( + "Columns cannot be copied (missing from target schema): %s", + String.join(", ", columnsToCopy.keySet()))); } - // TODO: column/offset indexes are not copied - // (it would require seeking to the end of the file for each row groups) - currentColumnIndexes.add(null); - currentOffsetIndexes.add(null); + // copy the data for all chunks + long start = -1; + long length = 0; + long blockUncompressedSize = 0L; + for (int i = 0; i < columnsInOrder.size(); i += 1) { + ColumnChunkMetaData chunk = columnsInOrder.get(i); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + // get this chunk's start position in the new file + long newChunkStart = out.getPos() + length; - blockUncompressedSize += chunk.getTotalUncompressedSize(); - } + // add this chunk to be copied with any previous chunks + if (start < 0) { + // no previous chunk included, start at this chunk's starting pos + start = chunk.getStartingPos(); + } + length += chunk.getTotalSize(); + + if ((i + 1) == columnsInOrder.size() + || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) { + // not contiguous. do the copy now. + copy(from, out, start, length); + // reset to start at the next column chunk + start = -1; + length = 0; + } - currentBlock.setTotalByteSize(blockUncompressedSize); + // TODO: column/offset indexes are not copied + // (it would require seeking to the end of the file for each row groups) + currentColumnIndexes.add(null); + currentOffsetIndexes.add(null); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); + + blockUncompressedSize += chunk.getTotalUncompressedSize(); + } - endBlock(); + currentBlock.setTotalByteSize(blockUncompressedSize); + + endBlock(); + }); } /** @@ -1735,36 +1798,41 @@ public void appendColumnChunk( ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException { - long start = chunk.getStartingPos(); - long length = chunk.getTotalSize(); - long newChunkStart = out.getPos(); + withAbortOnFailure(() -> { + long start = chunk.getStartingPos(); + long length = chunk.getTotalSize(); + long newChunkStart = out.getPos(); - if (offsetIndex != null && newChunkStart != start) { - offsetIndex = - OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart - start); - } + OffsetIndex effectiveOffsetIndex = offsetIndex; - copy(from, out, start, length); + if (effectiveOffsetIndex != null && newChunkStart != start) { + effectiveOffsetIndex = OffsetIndexBuilder.getBuilder() + .fromOffsetIndex(effectiveOffsetIndex) + .build(newChunkStart - start); + } - currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); - currentColumnIndexes.add(columnIndex); - currentOffsetIndexes.add(offsetIndex); + copy(from, out, start, length); - Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); - currentBlock.addColumn(ColumnChunkMetaData.get( - chunk.getPath(), - chunk.getPrimitiveType(), - chunk.getCodec(), - chunk.getEncodingStats(), - chunk.getEncodings(), - chunk.getStatistics(), - offsets.firstDataPageOffset, - offsets.dictionaryPageOffset, - chunk.getValueCount(), - chunk.getTotalSize(), - chunk.getTotalUncompressedSize())); + currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter); + currentColumnIndexes.add(columnIndex); + currentOffsetIndexes.add(effectiveOffsetIndex); + + Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart); + currentBlock.addColumn(ColumnChunkMetaData.get( + chunk.getPath(), + chunk.getPrimitiveType(), + chunk.getCodec(), + chunk.getEncodingStats(), + chunk.getEncodings(), + chunk.getStatistics(), + offsets.firstDataPageOffset, + offsets.dictionaryPageOffset, + chunk.getValueCount(), + chunk.getTotalSize(), + chunk.getTotalUncompressedSize())); - currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize()); + }); } // Buffers for the copy function. @@ -1804,17 +1872,25 @@ private static void copy(SeekableInputStream from, PositionOutputStream to, long * @throws IOException if there is an error while writing */ public void end(Map extraMetaData) throws IOException { - try { - state = state.end(); - serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); - serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); - serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); - LOG.debug("{}: end", out.getPos()); - this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); - serializeFooter(footer, out, fileEncryptor, metadataConverter); - } finally { - close(); - } + withAbortOnFailure(() -> { + try { + state = state.end(); + serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor); + serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor); + serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor); + LOG.debug("{}: end", out.getPos()); + this.footer = + new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks); + serializeFooter(footer, out, fileEncryptor, metadataConverter); + } finally { + close(); + } + }); + } + + /* Mark the writer as aborted to avoid flushing incomplete data. */ + public void abort() { + aborted = true; } @Override @@ -1822,8 +1898,13 @@ public void close() throws IOException { if (closed) { return; } - try (PositionOutputStream temp = out) { - temp.flush(); + + try { + if (!aborted) { + try (PositionOutputStream temp = out) { + temp.flush(); + } + } if (crcAllocator != null) { crcAllocator.close(); } @@ -2274,11 +2355,11 @@ static ParquetMetadata mergeFooters( * @throws IOException if there is an error while getting the current stream's position */ public long getPos() throws IOException { - return out.getPos(); + return withAbortOnFailure(() -> out.getPos()); } public long getNextRowGroupSize() throws IOException { - return alignment.nextRowGroupSize(out); + return withAbortOnFailure(() -> alignment.nextRowGroupSize(out)); } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java new file mode 100644 index 0000000000..f9f121b998 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileReaderMaxMessageSize.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.junit.Assert.*; + +import java.io.File; +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class TestParquetFileReaderMaxMessageSize { + + public static Path TEST_FILE; + public MessageType schema; + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Before + public void testSetup() throws IOException { + + File testParquetFile = temp.newFile(); + testParquetFile.delete(); + + TEST_FILE = new Path(testParquetFile.toURI()); + // Create a file with many columns + StringBuilder schemaBuilder = new StringBuilder("message test_schema {"); + for (int i = 0; i < 2000; i++) { + schemaBuilder.append("required int64 col_").append(i).append(";"); + } + schemaBuilder.append("}"); + + schema = MessageTypeParser.parseMessageType(schemaBuilder.toString()); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + try (ParquetWriter writer = ExampleParquetWriter.builder(HadoopOutputFile.fromPath(TEST_FILE, conf)) + .withConf(conf) + .withType(schema) + .build()) { + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Group group = factory.newGroup(); + for (int col = 0; col < 2000; col++) { + group.append("col_" + col, 1L); + } + writer.write(group); + } + } + + /** + * Test reading a file with many columns using custom max message size + */ + @Test + public void testReadFileWithManyColumns() throws IOException { + Configuration readConf = new Configuration(); + readConf.setInt("parquet.thrift.string.size.limit", 200 * 1024 * 1024); + + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + + ParquetMetadata metadata = reader.getFooter(); + assertNotNull(metadata); + assertEquals(schema, metadata.getFileMetaData().getSchema()); + assertTrue(metadata.getBlocks().size() > 0); + } + } + + /** + * Test that default configuration works for normal files + */ + @Test + public void testReadNormalFileWithDefaultConfig() throws IOException { + // Read with default configuration (no custom max message size) + Configuration readConf = new Configuration(); + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + + ParquetMetadata metadata = reader.getFooter(); + assertNotNull(metadata); + assertEquals(1, metadata.getBlocks().get(0).getRowCount()); + } + } + + /** + * Test that insufficient max message size produces error + */ + @Test + public void testInsufficientMaxMessageSizeError() throws IOException { + // Try to read with very small max message size + Configuration readConf = new Configuration(); + readConf.setInt("parquet.thrift.string.size.limit", 1); // Only 1 byte + + ParquetReadOptions options = HadoopReadOptions.builder(readConf).build(); + + try (ParquetFileReader reader = + ParquetFileReader.open(HadoopInputFile.fromPath(TEST_FILE, readConf), options)) { + fail("Should have thrown Message size exceeds limit due to MaxMessageSize"); + } catch (IOException e) { + e.printStackTrace(); + assertTrue( + "Error should mention TTransportException", + e.getMessage().contains("Message size exceeds limit") + || e.getCause().getMessage().contains("Message size exceeds limit") + || e.getMessage().contains("MaxMessageSize reached") + || e.getCause().getMessage().contains("MaxMessageSize reached")); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 2cd83624f6..38b66d7708 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -44,6 +44,7 @@ import com.google.common.collect.ImmutableMap; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -52,6 +53,7 @@ import net.openhft.hashing.LongHashFunction; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.HeapByteBufferAllocator; @@ -722,4 +724,42 @@ private void testV2WriteAllNullValues( } } } + + @Test + public void testNoFlushAfterException() throws Exception { + final File testDir = temp.newFile(); + testDir.delete(); + + final Path file = new Path(testDir.getAbsolutePath(), "test.parquet"); + + MessageType schema = Types.buildMessage() + .required(BINARY) + .named("binary_field") + .required(INT32) + .named("int32_field") + .named("test_schema_abort"); + Configuration conf = new Configuration(); + + try (ParquetWriter writer = ExampleParquetWriter.builder(new Path(file.toString())) + .withAllocator(allocator) + .withType(schema) + .build()) { + + SimpleGroupFactory f = new SimpleGroupFactory(schema); + writer.write(f.newGroup().append("binary_field", "hello").append("int32_field", 123)); + + Field internalWriterField = ParquetWriter.class.getDeclaredField("writer"); + internalWriterField.setAccessible(true); + Object internalWriter = internalWriterField.get(writer); + + Field abortedField = internalWriter.getClass().getDeclaredField("aborted"); + abortedField.setAccessible(true); + abortedField.setBoolean(internalWriter, true); + writer.close(); + } + + // After closing, check whether file exists or is empty + FileSystem fs = file.getFileSystem(conf); + assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0); + } } diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml index e61f250f42..b35807212c 100644 --- a/parquet-jackson/pom.xml +++ b/parquet-jackson/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-plugins/parquet-encoding-vector/pom.xml b/parquet-plugins/parquet-encoding-vector/pom.xml index 97a8557ea7..eed9729cf0 100644 --- a/parquet-plugins/parquet-encoding-vector/pom.xml +++ b/parquet-plugins/parquet-encoding-vector/pom.xml @@ -22,7 +22,7 @@ org.apache.parquet parquet - 1.16.0 + 1.16.1-SNAPSHOT ../../pom.xml diff --git a/parquet-plugins/parquet-plugins-benchmarks/pom.xml b/parquet-plugins/parquet-plugins-benchmarks/pom.xml index d6c5186eb2..74d4c59021 100644 --- a/parquet-plugins/parquet-plugins-benchmarks/pom.xml +++ b/parquet-plugins/parquet-plugins-benchmarks/pom.xml @@ -22,7 +22,7 @@ org.apache.parquet parquet - 1.16.0 + 1.16.1-SNAPSHOT ../../pom.xml diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index bb985f523c..0de3527343 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index 62552f44f1..adf4eb2de7 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/parquet-variant/pom.xml b/parquet-variant/pom.xml index 684658b364..156df2bcd7 100644 --- a/parquet-variant/pom.xml +++ b/parquet-variant/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index e98d7684ad..976f3779b1 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ org.apache.parquet parquet - 1.16.0-SNAPSHOT + 1.16.1-SNAPSHOT pom Apache Parquet Java @@ -85,7 +85,7 @@ 3.3.0 2.12.0 - 1.15.1 + 1.16.0 thrift ${thrift.executable} 0.16.0