diff --git a/.github/workflows/ci-hadoop2.yml b/.github/workflows/ci-hadoop2.yml new file mode 100644 index 0000000000..b824e18456 --- /dev/null +++ b/.github/workflows/ci-hadoop2.yml @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: CI Hadoop 2 + +on: [push, pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + codes: [ 'uncompressed,brotli', 'gzip,snappy' ] + name: Build Parquet with JDK ${{ matrix.java }} and ${{ matrix.codes }} + + steps: + - uses: actions/checkout@master + - name: Set up JDK8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: before_install + env: + CI_TARGET_BRANCH: $GITHUB_HEAD_REF + run: | + bash dev/ci-before_install.sh + - name: install + run: | + EXTRA_JAVA_TEST_ARGS=$(mvn help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) + export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" + mvn install --batch-mode -P hadoop2 -DskipTests=true -Dmaven.javadoc.skip=true -Dsource.skip=true -Djava.version=1.8 + - name: verify + env: + TEST_CODECS: ${{ matrix.codes }} + JAVA_VERSION: ${{ matrix.java }} + run: | + EXTRA_JAVA_TEST_ARGS=$(mvn help:evaluate -Dexpression=extraJavaTestArgs -q -DforceStdout) + export MAVEN_OPTS="$MAVEN_OPTS $EXTRA_JAVA_TEST_ARGS" + mvn verify --batch-mode -P hadoop2 javadoc:javadoc -Pci-test diff --git a/.github/workflows/test.yml b/.github/workflows/ci-hadoop3.yml similarity index 99% rename from .github/workflows/test.yml rename to .github/workflows/ci-hadoop3.yml index f96585244c..76c758bd53 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/ci-hadoop3.yml @@ -15,7 +15,7 @@ # specific language governing permissions and limitations # under the License. -name: Test +name: CI Hadoop 3 on: [push, pull_request] diff --git a/CHANGES.md b/CHANGES.md index 7785db5486..6138485868 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,129 @@ # Parquet # +### Version 1.13.0 ### + +Release Notes - Parquet - Version 1.13.0 + +#### New Feature + +* [PARQUET-1020](https://issues.apache.org/jira/browse/PARQUET-1020) - Add support for Dynamic Messages in parquet-protobuf + +#### Task + +* [PARQUET-2230](https://issues.apache.org/jira/browse/PARQUET-2230) - Add a new rewrite command powered by ParquetRewriter +* [PARQUET-2228](https://issues.apache.org/jira/browse/PARQUET-2228) - ParquetRewriter supports more than one input file +* [PARQUET-2229](https://issues.apache.org/jira/browse/PARQUET-2229) - ParquetRewriter supports masking and encrypting the same column +* [PARQUET-2227](https://issues.apache.org/jira/browse/PARQUET-2227) - Refactor different file rewriters to use single implementation + +#### Improvement + +* [PARQUET-2258](https://issues.apache.org/jira/browse/PARQUET-2258) - Storing toString fields in FilterPredicate instances can lead to memory pressure +* [PARQUET-2252](https://issues.apache.org/jira/browse/PARQUET-2252) - Make some methods public to allow external projects to implement page skipping +* [PARQUET-2159](https://issues.apache.org/jira/browse/PARQUET-2159) - Vectorized BytePacker decoder using Java VectorAPI +* [PARQUET-2246](https://issues.apache.org/jira/browse/PARQUET-2246) - Add short circuit logic to column index filter +* [PARQUET-2226](https://issues.apache.org/jira/browse/PARQUET-2226) - Support merge Bloom Filters +* [PARQUET-2224](https://issues.apache.org/jira/browse/PARQUET-2224) - Publish SBOM artifacts +* [PARQUET-2208](https://issues.apache.org/jira/browse/PARQUET-2208) - Add details to nested column encryption config doc and exception text +* [PARQUET-2195](https://issues.apache.org/jira/browse/PARQUET-2195) - Add scan command to parquet-cli +* [PARQUET-2196](https://issues.apache.org/jira/browse/PARQUET-2196) - Support LZ4_RAW codec +* [PARQUET-2176](https://issues.apache.org/jira/browse/PARQUET-2176) - Column index/statistics truncation in ParquetWriter +* [PARQUET-2197](https://issues.apache.org/jira/browse/PARQUET-2197) - Document uniform encryption +* [PARQUET-2191](https://issues.apache.org/jira/browse/PARQUET-2191) - Upgrade Scala to 2.12.17 +* [PARQUET-2169](https://issues.apache.org/jira/browse/PARQUET-2169) - Upgrade Avro to version 1.11.1 +* [PARQUET-2155](https://issues.apache.org/jira/browse/PARQUET-2155) - Upgrade protobuf version to 3.17.3 +* [PARQUET-2158](https://issues.apache.org/jira/browse/PARQUET-2158) - Upgrade Hadoop dependency to version 3.2.0 +* [PARQUET-2138](https://issues.apache.org/jira/browse/PARQUET-2138) - Add ShowBloomFilterCommand to parquet-cli +* [PARQUET-2157](https://issues.apache.org/jira/browse/PARQUET-2157) - Add BloomFilter fpp config + +#### Bug + +* [PARQUET-2202](https://issues.apache.org/jira/browse/PARQUET-2202) - Redundant String allocation on the hot path in CapacityByteArrayOutputStream.setByte +* [PARQUET-2164](https://issues.apache.org/jira/browse/PARQUET-2164) - CapacityByteArrayOutputStream overflow while writing causes negative row group sizes to be written +* [PARQUET-2103](https://issues.apache.org/jira/browse/PARQUET-2103) - Fix crypto exception in print toPrettyJSON +* [PARQUET-2251](https://issues.apache.org/jira/browse/PARQUET-2251) - Avoid generating Bloomfilter when all pages of a column are encoded by dictionary +* [PARQUET-2243](https://issues.apache.org/jira/browse/PARQUET-2243) - Support zstd-jni in DirectCodecFactory +* [PARQUET-2247](https://issues.apache.org/jira/browse/PARQUET-2247) - Fail-fast if CapacityByteArrayOutputStream write overflow +* [PARQUET-2241](https://issues.apache.org/jira/browse/PARQUET-2241) - Fix ByteStreamSplitValuesReader with nulls +* [PARQUET-2244](https://issues.apache.org/jira/browse/PARQUET-2244) - Fix notIn for columns with null values +* [PARQUET-2173](https://issues.apache.org/jira/browse/PARQUET-2173) - Fix parquet build against hadoop 3.3.3+ +* [PARQUET-2219](https://issues.apache.org/jira/browse/PARQUET-2219) - ParquetFileReader skips empty row group +* [PARQUET-2198](https://issues.apache.org/jira/browse/PARQUET-2198) - Updating jackson data bind version to fix CVEs +* [PARQUET-2177](https://issues.apache.org/jira/browse/PARQUET-2177) - Fix parquet-cli not to fail showing descriptions +* [PARQUET-1711](https://issues.apache.org/jira/browse/PARQUET-1711) - Support recursive proto schemas by limiting recursion depth +* [PARQUET-2142](https://issues.apache.org/jira/browse/PARQUET-2142) - parquet-cli without hadoop throws java.lang.NoSuchMethodError on any parquet file access command +* [PARQUET-2160](https://issues.apache.org/jira/browse/PARQUET-2160) - Close decompression stream to free off-heap memory in time +* [PARQUET-2185](https://issues.apache.org/jira/browse/PARQUET-2185) - ParquetReader constructed using builder fails to read encrypted files +* [PARQUET-2167](https://issues.apache.org/jira/browse/PARQUET-2167) - CLI show footer command fails if Parquet file contains date fields +* [PARQUET-2134](https://issues.apache.org/jira/browse/PARQUET-2134) - Incorrect type checking in HadoopStreams.wrap +* [PARQUET-2161](https://issues.apache.org/jira/browse/PARQUET-2161) - Fix row index generation in combination with range filtering +* [PARQUET-2154](https://issues.apache.org/jira/browse/PARQUET-2154) - ParquetFileReader should close its input stream when filterRowGroups throw Exception in constructor + +#### Test + +* [PARQUET-2192](https://issues.apache.org/jira/browse/PARQUET-2192) - Add Java 17 build test to GitHub action + +### Version 1.12.3 ### + +Release Notes - Parquet - Version 1.12.3 + +#### New Feature + +* [PARQUET-2117](https://issues.apache.org/jira/browse/PARQUET-2117) - Add rowPosition API in parquet record readers + +#### Task + +* [PARQUET-2081](https://issues.apache.org/jira/browse/PARQUET-2081) - Encryption translation tool - Parquet-hadoop + +#### Improvement + +* [PARQUET-2040](https://issues.apache.org/jira/browse/PARQUET-2040) - Uniform encryption +* [PARQUET-2076](https://issues.apache.org/jira/browse/PARQUET-2076) - Improve Travis CI build Performance +* [PARQUET-2105](https://issues.apache.org/jira/browse/PARQUET-2105) - Refactor the test code of creating the test file +* [PARQUET-2106](https://issues.apache.org/jira/browse/PARQUET-2106) - BinaryComparator should avoid doing ByteBuffer.wrap in the hot-path +* [PARQUET-2112](https://issues.apache.org/jira/browse/PARQUET-2112) - Fix typo in MessageColumnIO +* [PARQUET-2121](https://issues.apache.org/jira/browse/PARQUET-2121) - Remove descriptions for the removed modules +* [PARQUET-2127](https://issues.apache.org/jira/browse/PARQUET-2127) - Security risk in latest parquet-jackson-1.12.2.jar +* [PARQUET-2128](https://issues.apache.org/jira/browse/PARQUET-2128) - Bump Thrift to 0.16.0 +* [PARQUET-2129](https://issues.apache.org/jira/browse/PARQUET-2129) - Add uncompressedSize to "meta" output +* [PARQUET-2136](https://issues.apache.org/jira/browse/PARQUET-2136) - File writer construction with encryptor + +#### Bug + +* [PARQUET-2101](https://issues.apache.org/jira/browse/PARQUET-2101) - Fix wrong descriptions about the default block size +* [PARQUET-2102](https://issues.apache.org/jira/browse/PARQUET-2102) - Typo in ColumnIndexBase toString +* [PARQUET-2107](https://issues.apache.org/jira/browse/PARQUET-2107) - Travis failures +* [PARQUET-2120](https://issues.apache.org/jira/browse/PARQUET-2120) - parquet-cli dictionary command fails on pages without dictionary encoding +* [PARQUET-2144](https://issues.apache.org/jira/browse/PARQUET-2144) - Fix ColumnIndexBuilder for notIn predicate +* [PARQUET-2148](https://issues.apache.org/jira/browse/PARQUET-2148) - Enable uniform decryption with plaintext footer + +### Version 1.12.2 ### + +Release Notes - Parquet - Version 1.12.2 + +#### Bug + +* [PARQUET-2094](https://issues.apache.org/jira/browse/PARQUET-2094) - Handle negative values in page headers + +### Version 1.12.1 ### + +Release Notes - Parquet - Version 1.12.1 + +#### Bug + +* [PARQUET-1633](https://issues.apache.org/jira/browse/PARQUET-1633) - Fix integer overflow +* [PARQUET-2022](https://issues.apache.org/jira/browse/PARQUET-2022) - ZstdDecompressorStream should close zstdInputStream +* [PARQUET-2027](https://issues.apache.org/jira/browse/PARQUET-2027) - Fix calculating directory offset for merge +* [PARQUET-2052](https://issues.apache.org/jira/browse/PARQUET-2052) - Integer overflow when writing huge binary using dictionary encoding +* [PARQUET-2054](https://issues.apache.org/jira/browse/PARQUET-2054) - fix TCP leaking when calling ParquetFileWriter.appendFile +* [PARQUET-2072](https://issues.apache.org/jira/browse/PARQUET-2072) - Do Not Determine Both Min/Max for Binary Stats +* [PARQUET-2073](https://issues.apache.org/jira/browse/PARQUET-2073) - Fix estimate remaining row count in ColumnWriteStoreBase. +* [PARQUET-2078](https://issues.apache.org/jira/browse/PARQUET-2078) - Failed to read parquet file after writing with the same parquet version + +#### Improvement + +* [PARQUET-2064](https://issues.apache.org/jira/browse/PARQUET-2064) - Make Range public accessible in RowRanges + ### Version 1.12.0 ### Release Notes - Parquet - Version 1.12.0 diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml index 81d6e7ab49..2a912a5f2c 100644 --- a/parquet-arrow/pom.xml +++ b/parquet-arrow/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index c4d12e484f..b95efd53df 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 @@ -104,6 +104,30 @@ test-jar test + + org.mockito + mockito-core + 2.23.0 + test + + + org.powermock + powermock-module-junit4 + ${powermock.version} + test + + + org.powermock + powermock-core + ${powermock.version} + test + + + org.powermock + powermock-api-mockito2 + ${powermock.version} + test + diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java index 9d514673eb..94d8167b0a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroParquetWriter.java @@ -160,7 +160,7 @@ private static WriteSupport writeSupport(Configuration conf, public static class Builder extends ParquetWriter.Builder> { private Schema schema = null; - private GenericData model = SpecificData.get(); + private GenericData model = null; private Builder(Path file) { super(file); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java index eca14413a6..8f268a145a 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroReadSupport.java @@ -27,6 +27,8 @@ import org.apache.parquet.hadoop.api.ReadSupport; import org.apache.parquet.io.api.RecordMaterializer; import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Avro implementation of {@link ReadSupport} for avro generic, specific, and @@ -37,6 +39,8 @@ */ public class AvroReadSupport extends ReadSupport { + private static final Logger LOG = LoggerFactory.getLogger(AvroReadSupport.class); + public static String AVRO_REQUESTED_PROJECTION = "parquet.avro.projection"; private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema"; @@ -134,7 +138,7 @@ public RecordMaterializer prepareForRead( avroSchema = new AvroSchemaConverter(configuration).convert(parquetSchema); } - GenericData model = getDataModel(configuration); + GenericData model = getDataModel(configuration, avroSchema); String compatEnabled = metadata.get(AvroReadSupport.AVRO_COMPATIBILITY); if (compatEnabled != null && Boolean.valueOf(compatEnabled)) { return newCompatMaterializer(parquetSchema, avroSchema, model); @@ -149,10 +153,26 @@ private static RecordMaterializer newCompatMaterializer( parquetSchema, avroSchema, model); } - private GenericData getDataModel(Configuration conf) { + private GenericData getDataModel(Configuration conf, Schema schema) { if (model != null) { return model; } + + if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) { + GenericData modelForSchema; + try { + modelForSchema = AvroRecordConverter.getModelForSchema(schema); + } catch (Exception e) { + LOG.warn(String.format("Failed to derive data model for Avro schema %s. Parquet will use default " + + "SpecificData model for reading from source.", schema), e); + modelForSchema = null; + } + + if (modelForSchema != null) { + return modelForSchema; + } + } + Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); return ReflectionUtils.newInstance(suppClass, conf).get(); diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index fee7df7277..cc17df5827 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -30,12 +30,15 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; +import java.util.Arrays; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.LinkedHashMap; +import java.util.Objects; + import org.apache.avro.AvroTypeException; import org.apache.avro.Conversion; import org.apache.avro.LogicalType; @@ -57,6 +60,8 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; import static org.apache.avro.SchemaCompatibility.checkReaderWriterCompatibility; @@ -73,6 +78,8 @@ */ class AvroRecordConverter extends AvroConverters.AvroGroupConverter { + private static final Logger LOG = LoggerFactory.getLogger(AvroRecordConverter.class); + private static final String STRINGABLE_PROP = "avro.java.string"; private static final String JAVA_CLASS_PROP = "java-class"; private static final String JAVA_KEY_CLASS_PROP = "java-key-class"; @@ -169,6 +176,77 @@ public void add(Object value) { } } + /** + * Returns the specific data model for a given SpecificRecord schema by reflecting the underlying + * Avro class's `MODEL$` field, or Null if the class is not on the classpath or reflection fails. + */ + static SpecificData getModelForSchema(Schema schema) { + final Class clazz; + + if (schema != null && (schema.getType() == Schema.Type.RECORD || schema.getType() == Schema.Type.UNION)) { + clazz = SpecificData.get().getClass(schema); + } else { + return null; + } + + // If clazz == null, the underlying Avro class for the schema is not on the classpath + if (clazz == null) { + return null; + } + + final SpecificData model; + try { + final Field modelField = clazz.getDeclaredField("MODEL$"); + modelField.setAccessible(true); + + model = (SpecificData) modelField.get(null); + } catch (NoSuchFieldException e) { + LOG.info(String.format( + "Generated Avro class %s did not contain a MODEL$ field. Parquet will use default SpecificData model for " + + "reading and writing.", clazz)); + return null; + } catch (IllegalAccessException e) { + LOG.warn(String.format( + "Field `MODEL$` in class %s was inaccessible. Parquet will use default SpecificData model for " + + "reading and writing.", clazz), e); + return null; + } + + final String avroVersion = getRuntimeAvroVersion(); + // Avro 1.7 and 1.8 don't include conversions in the MODEL$ field by default + if (avroVersion != null && (avroVersion.startsWith("1.8.") || avroVersion.startsWith("1.7."))) { + final Field conversionsField; + try { + conversionsField = clazz.getDeclaredField("conversions"); + } catch (NoSuchFieldException e) { + // Avro classes without logical types (denoted by the "conversions" field) can be returned as-is + return model; + } + + final Conversion[] conversions; + try { + conversionsField.setAccessible(true); + conversions = (Conversion[]) conversionsField.get(null); + } catch (IllegalAccessException e) { + LOG.warn(String.format("Field `conversions` in class %s was inaccessible. Parquet will use default " + + "SpecificData model for reading and writing.", clazz)); + return null; + } + + for (int i = 0; i < conversions.length; i++) { + if (conversions[i] != null) { + model.addLogicalTypeConversion(conversions[i]); + } + } + } + + return model; + } + + static String getRuntimeAvroVersion() { + return Schema.Parser.class.getPackage().getImplementationVersion(); + } + // this was taken from Avro's ReflectData private static Map> getFieldsByName(Class recordClass, boolean excludeJava) { diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java index 9a7ef6c905..564e745392 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java @@ -43,6 +43,8 @@ import org.apache.parquet.schema.Type; import org.apache.hadoop.util.ReflectionUtils; import org.apache.parquet.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Avro implementation of {@link WriteSupport} for generic, specific, and @@ -51,6 +53,8 @@ */ public class AvroWriteSupport extends WriteSupport { + private static final Logger LOG = LoggerFactory.getLogger(AvroWriteSupport.class); + public static final String AVRO_DATA_SUPPLIER = "parquet.avro.write.data.supplier"; public static void setAvroDataSupplier( @@ -131,7 +135,7 @@ public WriteContext init(Configuration configuration) { } if (model == null) { - this.model = getDataModel(configuration); + this.model = getDataModel(configuration, rootAvroSchema); } boolean writeOldListStructure = configuration.getBoolean( @@ -400,7 +404,23 @@ private Binary fromAvroString(Object value) { return Binary.fromCharSequence(value.toString()); } - private static GenericData getDataModel(Configuration conf) { + private static GenericData getDataModel(Configuration conf, Schema schema) { + if (conf.get(AVRO_DATA_SUPPLIER) == null && schema != null) { + GenericData modelForSchema; + try { + modelForSchema = AvroRecordConverter.getModelForSchema(schema); + } catch (Exception e) { + LOG.warn(String.format("Failed to derive data model for Avro schema %s. Parquet will use default " + + "SpecificData model for writing to sink.", schema), e); + modelForSchema = null; + } + + + if (modelForSchema != null) { + return modelForSchema; + } + } + Class suppClass = conf.getClass( AVRO_DATA_SUPPLIER, SpecificDataSupplier.class, AvroDataSupplier.class); return ReflectionUtils.newInstance(suppClass, conf).get(); diff --git a/parquet-avro/src/test/avro/logicalType.avsc b/parquet-avro/src/test/avro/logicalType.avsc new file mode 100644 index 0000000000..fbec10a8d5 --- /dev/null +++ b/parquet-avro/src/test/avro/logicalType.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "LogicalTypesTest", + "namespace": "org.apache.parquet.avro", + "doc": "Record for testing logical types", + "fields": [ + { + "name": "timestamp", + "type": { + "type": "long", "logicalType": "timestamp-millis" + } + } + ] +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java new file mode 100644 index 0000000000..8339285ba1 --- /dev/null +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroRecordConverter.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.avro; + +import com.google.common.collect.Lists; +import org.apache.avro.Conversion; +import org.apache.avro.Conversions; +import org.apache.avro.Schema; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.specific.SpecificData; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.math.BigDecimal; +import java.time.Instant; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.CALLS_REAL_METHODS; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(AvroRecordConverter.class) +public class TestAvroRecordConverter { + + @Before + public void setup() { + // Default to calling real methods unless overridden in specific test + PowerMockito.mockStatic(AvroRecordConverter.class, CALLS_REAL_METHODS); + } + + @Test + public void testModelForSpecificRecordWithLogicalTypes() { + SpecificData model = AvroRecordConverter.getModelForSchema(LogicalTypesTest.SCHEMA$); + + // Test that model is generated correctly + Conversion conversion = model.getConversionByClass(Instant.class); + assertEquals(TimeConversions.TimestampMillisConversion.class, conversion.getClass()); + } + + @Test + public void testModelForSpecificRecordWithoutLogicalTypes() { + SpecificData model = AvroRecordConverter.getModelForSchema(Car.SCHEMA$); + + assertTrue(model.getConversions().isEmpty()); + } + + @Test + public void testModelForGenericRecord() { + SpecificData model = AvroRecordConverter.getModelForSchema( + Schema.createRecord( + "someSchema", + "doc", + "some.namespace", + false, + Lists.newArrayList(new Schema.Field("strField", Schema.create(Schema.Type.STRING))))); + + // There is no class "someSchema" on the classpath, so should return null + assertNull(model); + } + + // Test logical type support for older Avro versions + @Test + public void testGetModelAvro1_7() { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.7.7"); + + // Test that model is generated correctly + final SpecificData model = AvroRecordConverter.getModelForSchema(Avro17GeneratedClass.SCHEMA$); + Conversion conversion = model.getConversionByClass(BigDecimal.class); + assertEquals(Conversions.DecimalConversion.class, conversion.getClass()); + } + + @Test + public void testGetModelAvro1_8() { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.8.2"); + + // Test that model is generated correctly + final SpecificData model = AvroRecordConverter.getModelForSchema(Avro18GeneratedClass.SCHEMA$); + Conversion conversion = model.getConversionByClass(BigDecimal.class); + assertEquals(Conversions.DecimalConversion.class, conversion.getClass()); + } + + @Test + public void testGetModelAvro1_9() { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.9.2"); + + // Test that model is generated correctly + final SpecificData model = AvroRecordConverter.getModelForSchema(Avro19GeneratedClass.SCHEMA$); + Conversion conversion = model.getConversionByClass(BigDecimal.class); + assertEquals(Conversions.DecimalConversion.class, conversion.getClass()); + } + + @Test + public void testGetModelAvro1_10() { + Mockito.when(AvroRecordConverter.getRuntimeAvroVersion()).thenReturn("1.10.2"); + + // Test that model is generated correctly + final SpecificData model = AvroRecordConverter.getModelForSchema(Avro110GeneratedClass.SCHEMA$); + Conversion conversion = model.getConversionByClass(BigDecimal.class); + assertEquals(Conversions.DecimalConversion.class, conversion.getClass()); + } + + // Test Avro record class stubs, generated using different versions of the Avro compiler + public abstract static class Avro110GeneratedClass extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5558880508010468207L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro110GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + static { + MODEL$.addLogicalTypeConversion(new org.apache.avro.Conversions.DecimalConversion()); + } + } + + public abstract static class Avro19GeneratedClass extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5558880508010468207L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro19GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + static { + MODEL$.addLogicalTypeConversion(new org.apache.avro.Conversions.DecimalConversion()); + } + } + + public abstract static class Avro18GeneratedClass extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5558880508010468207L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro18GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + protected static final org.apache.avro.Conversions.DecimalConversion DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion(); + + private static final org.apache.avro.Conversion[] conversions = + new org.apache.avro.Conversion[] { + DECIMAL_CONVERSION, + null + }; + + @Override + public org.apache.avro.Conversion getConversion(int field) { + return conversions[field]; + } + } + + public abstract static class Avro17GeneratedClass extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = 5558880508010468207L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"Avro17GeneratedClass\",\"namespace\":\"org.apache.parquet.avro.TestAvroRecordConverter\",\"doc\":\"\",\"fields\":[{\"name\":\"decimal\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":4,\"scale\":2}}]}"); + + public static org.apache.avro.Schema getClassSchema() { + return SCHEMA$; + } + + private static SpecificData MODEL$ = new SpecificData(); + + protected static final org.apache.avro.Conversions.DecimalConversion DECIMAL_CONVERSION = new org.apache.avro.Conversions.DecimalConversion(); + + private static final org.apache.avro.Conversion[] conversions = + new org.apache.avro.Conversion[] { + DECIMAL_CONVERSION, + null + }; + + @Override + public org.apache.avro.Conversion getConversion(int field) { + return conversions[field]; + } + } +} diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java index 66f166cfd2..6484ab4a62 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestReadWrite.java @@ -27,6 +27,8 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -35,7 +37,10 @@ import java.util.List; import java.util.Map; import java.util.Random; + +import org.apache.avro.Conversion; import org.apache.avro.Conversions; +import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -775,6 +780,86 @@ AvroParquetWriter. builder(file).withSchema(schema) } } + public static class CustomDataModel implements AvroDataSupplier { + @Override + public GenericData get() { + GenericData genericData = new GenericData(); + genericData.addLogicalTypeConversion(new Conversion() { + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMdd"); + + @Override + public Class getConvertedType() { + return LocalDate.class; + } + + @Override + public String getLogicalTypeName() { + return "date"; + } + + public LocalDate fromInt(Integer localDate, Schema schema, LogicalType type) { + return LocalDate.parse(String.valueOf(localDate), dateTimeFormatter); + } + + public Integer toInt(LocalDate date, Schema schema, LogicalType type) { + return Integer.parseInt(dateTimeFormatter.format(date)); + } + }); + return genericData; + } + } + @Test + public void testParsesDataModelFromConf() throws Exception { + Schema datetimeSchema = Schema.createRecord("myrecord", null, null, false); + Schema date = LogicalTypes.date().addToSchema( + Schema.create(Schema.Type.INT)); + datetimeSchema.setFields(Collections.singletonList( + new Schema.Field("date", date, null, null))); + + File file = temp.newFile("datetime.parquet"); + file.delete(); + Path path = new Path(file.toString()); + List expected = Lists.newArrayList(); + + Configuration conf = new Configuration(); + AvroWriteSupport.setAvroDataSupplier(conf, CustomDataModel.class); + + // .withDataModel is not set; AvroWriteSupport should parse it from the Configuration + try(ParquetWriter writer = AvroParquetWriter + .builder(path) + .withConf(conf) + .withSchema(datetimeSchema) + .build()) { + + GenericRecordBuilder builder = new GenericRecordBuilder(datetimeSchema); + for (int i = 0; i < 100; i += 1) { + builder.set("date", LocalDate.now().minusDays(i)); + + GenericRecord rec = builder.build(); + expected.add(rec); + writer.write(builder.build()); + } + } + List records = Lists.newArrayList(); + + AvroReadSupport.setAvroDataSupplier(conf, CustomDataModel.class); + + try(ParquetReader reader = AvroParquetReader + .builder(path) + .disableCompatibility() + .withConf(conf) + .build()) { + GenericRecord rec; + while ((rec = reader.read()) != null) { + records.add(rec); + } + } + + Assert.assertTrue("date field should be a LocalDate instance", + records.get(0).get("date") instanceof LocalDate); + Assert.assertEquals("Content should match", expected, records); + } + private File createTempFile() throws IOException { File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); tmp.deleteOnExit(); diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java index 46e2f2c234..49ed27b1b3 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java @@ -30,14 +30,19 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.junit.Test; +import org.apache.parquet.avro.LogicalTypesTest; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -237,6 +242,43 @@ public void testAvroReadSchema() throws IOException { } } + @Test + public void testParsesSpecificDataModel() throws IOException { + // SpecificRecord contains a logical type and will fail to decode unless its SpecificData model is parsed + List records = IntStream + .range(0, 25) + .mapToObj(i -> LogicalTypesTest.newBuilder().setTimestamp(Instant.now()).build()) + .collect(Collectors.toList()); + + // Test that SpecificData model is parsed in AvroParquetWriter + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); + tmp.deleteOnExit(); + tmp.delete(); + Path path = new Path(tmp.getPath()); + + try( + ParquetWriter writer = AvroParquetWriter.builder(path) + .withSchema(LogicalTypesTest.SCHEMA$) + .withConf(new Configuration(false)) + .withCompressionCodec(CompressionCodecName.UNCOMPRESSED) + .build() + ) { + for (LogicalTypesTest record : records) { + writer.write(record); + } + } + + // Test that SpecificData model is parsed in AvroParquetReader + final List output = new ArrayList<>(); + try (ParquetReader reader = new AvroParquetReader<>(testConf, path)) { + for (LogicalTypesTest record = reader.read(); record != null; record = reader.read()) { + output.add(record); + } + } + + assertEquals(records, output); + } + private Path writeCarsToParquetFile( int num, CompressionCodecName compression, boolean enableDictionary) throws IOException { return writeCarsToParquetFile(num, compression, enableDictionary, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); } diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index 673ff86c6a..1eebceb968 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index f819793482..ab4001961c 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index 664a6be141..849b5a8076 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index 1a0f2f9f5e..3d3042c718 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml index 2b27c19eaa..4482af8068 100644 --- a/parquet-encoding/pom.xml +++ b/parquet-encoding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index ce72ed5353..5004eb2b45 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -24,7 +24,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT parquet-format-structures diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml index a39370da19..9f713fc082 100644 --- a/parquet-generator/pom.xml +++ b/parquet-generator/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml index d15792f241..f00f3300af 100644 --- a/parquet-hadoop-bundle/pom.xml +++ b/parquet-hadoop-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index ce476a15f2..e496befb71 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.13.0-SNAPSHOT + 1.13.2-SNAPSHOT 4.0.0 diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index a69ba46be6..f206282755 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -171,7 +171,7 @@ public static class Builder { protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors - protected CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + protected CompressionCodecFactory codecFactory = null; protected ByteBufferAllocator allocator = new HeapByteBufferAllocator(); protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; protected Map properties = new HashMap<>(); @@ -314,6 +314,10 @@ public Builder copy(ParquetReadOptions options) { } public ParquetReadOptions build() { + if (codecFactory == null) { + codecFactory = HadoopCodecs.newFactory(0); + } + return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 7fa71cb618..b50149cdb5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -53,6 +53,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; @@ -99,7 +101,6 @@ import org.apache.parquet.hadoop.metadata.FileMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.hadoop.util.HiddenFileFilter; import org.apache.parquet.hadoop.util.counters.BenchmarkCounter; import org.apache.parquet.internal.column.columnindex.ColumnIndex; import org.apache.parquet.internal.column.columnindex.OffsetIndex; @@ -374,17 +375,25 @@ public static List