Skip to content

Commit 8060ae0

Browse files
authored
Support partitionBy in VortexSparkDataSource (#7218)
Support partitionBy in spark reader/writer --------- Signed-off-by: Robert Kruszewski <github@robertk.io>
1 parent 3ea259e commit 8060ae0

17 files changed

Lines changed: 978 additions & 143 deletions

java/testfiles/Cargo.lock

Lines changed: 18 additions & 18 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

java/vortex-jni/src/test/java/dev/vortex/api/DTypeTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ public void testNestedFixedSizeList() {
6060
public void testFixedSizeListInStruct() {
6161
var elementType = DType.newFloat(false);
6262
var fslType = DType.newFixedSizeList(elementType, 3, false);
63-
var structType = DType.newStruct(
64-
new String[] {"id", "embedding"},
65-
new DType[] {DType.newInt(false), fslType},
66-
false);
63+
var structType =
64+
DType.newStruct(new String[] {"id", "embedding"}, new DType[] {DType.newInt(false), fslType}, false);
6765
assertEquals(DType.Variant.STRUCT, structType.getVariant());
6866

6967
var fieldTypes = structType.getFieldTypes();

java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package dev.vortex.jni;
55

6+
import static java.nio.charset.StandardCharsets.UTF_8;
67
import static org.junit.jupiter.api.Assertions.assertEquals;
78
import static org.junit.jupiter.api.Assertions.assertNotNull;
89
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -11,8 +12,6 @@
1112
import dev.vortex.api.ScanOptions;
1213
import dev.vortex.api.VortexWriter;
1314
import dev.vortex.arrow.ArrowAllocation;
14-
import static java.nio.charset.StandardCharsets.UTF_8;
15-
1615
import java.io.IOException;
1716
import java.nio.file.Files;
1817
import java.nio.file.Path;
@@ -81,9 +80,7 @@ public void testWriteBatchFfi() throws IOException {
8180
String writePath = outputPath.toAbsolutePath().toUri().toString();
8281

8382
var writeSchema = DType.newStruct(
84-
new String[] {"name", "age"},
85-
new DType[] {DType.newUtf8(false), DType.newInt(false)},
86-
false);
83+
new String[] {"name", "age"}, new DType[] {DType.newUtf8(false), DType.newInt(false)}, false);
8784

8885
BufferAllocator allocator = ArrowAllocation.rootAllocator();
8986

java/vortex-spark/src/main/java/dev/vortex/spark/VortexDataSourceV2.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,20 @@
1111
import dev.vortex.api.Files;
1212
import dev.vortex.jni.NativeFileMethods;
1313
import dev.vortex.spark.config.HadoopUtils;
14+
import dev.vortex.spark.read.PartitionPathUtils;
1415
import java.util.Map;
1516
import java.util.Objects;
1617
import java.util.Optional;
18+
import java.util.Set;
19+
import java.util.stream.Collectors;
20+
import java.util.stream.Stream;
1721
import org.apache.spark.sql.SparkSession;
1822
import org.apache.spark.sql.connector.catalog.CatalogV2Util;
1923
import org.apache.spark.sql.connector.catalog.Table;
2024
import org.apache.spark.sql.connector.catalog.TableProvider;
2125
import org.apache.spark.sql.connector.expressions.Transform;
2226
import org.apache.spark.sql.sources.DataSourceRegister;
27+
import org.apache.spark.sql.types.DataType;
2328
import org.apache.spark.sql.types.StructType;
2429
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
2530
import scala.Option;
@@ -81,18 +86,31 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
8186
.findFirst();
8287

8388
if (firstFile.isEmpty()) {
84-
// Return empty struct if no files found
85-
// TODO(aduffy): how does Parquet handle this?
8689
return new StructType();
8790
} else {
8891
pathToInfer = firstFile.get();
8992
}
9093
}
9194

95+
StructType dataSchema;
9296
try (File file = Files.open(pathToInfer, formatOptions)) {
9397
var columns = SparkTypes.toColumns(file.getDType());
94-
return CatalogV2Util.v2ColumnsToStructType(columns);
98+
dataSchema = CatalogV2Util.v2ColumnsToStructType(columns);
9599
}
100+
101+
// Discover partition columns from Hive-style directory paths and append them.
102+
Map<String, String> partitionValues = PartitionPathUtils.parsePartitionValues(pathToInfer);
103+
if (!partitionValues.isEmpty()) {
104+
Set<String> dataColumnNames = Stream.of(dataSchema.fieldNames()).collect(Collectors.toSet());
105+
for (Map.Entry<String, String> entry : partitionValues.entrySet()) {
106+
if (!dataColumnNames.contains(entry.getKey())) {
107+
DataType type = PartitionPathUtils.inferPartitionColumnType(entry.getValue());
108+
dataSchema = dataSchema.add(entry.getKey(), type, true);
109+
}
110+
}
111+
}
112+
113+
return dataSchema;
96114
}
97115

98116
/**
@@ -102,16 +120,16 @@ public StructType inferSchema(CaseInsensitiveStringMap options) {
102120
* Vortex files. The partitioning parameter is currently ignored.
103121
*
104122
* @param schema the table schema
105-
* @param _partitioning table partitioning transforms (currently ignored)
123+
* @param partitioning table partitioning transforms
106124
* @param properties the table properties containing file paths and other options
107125
* @return a VortexTable instance for reading and writing data
108126
* @throws RuntimeException if required path properties are missing
109127
*/
110128
@Override
111-
public Table getTable(StructType schema, Transform[] _partitioning, Map<String, String> properties) {
129+
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> properties) {
112130
var uncased = new CaseInsensitiveStringMap(properties);
113131
ImmutableList<String> paths = getPaths(uncased);
114-
return new VortexTable(paths, schema, buildDataSourceOptions(properties));
132+
return new VortexTable(paths, schema, buildDataSourceOptions(properties), partitioning);
115133
}
116134

117135
/**

java/vortex-spark/src/main/java/dev/vortex/spark/VortexFilePartition.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,25 @@ public final class VortexFilePartition implements InputPartition, Serializable {
2121
private final String path;
2222
private final ImmutableList<Column> columns;
2323
private final ImmutableMap<String, String> formatOptions;
24+
private final ImmutableMap<String, String> partitionValues;
2425

2526
/**
2627
* Creates a new Vortex file partition.
2728
*
2829
* @param path the file system path to the Vortex file
2930
* @param columns the list of columns to read from the file
31+
* @param formatOptions options for accessing the file (S3/Azure credentials, etc.)
32+
* @param partitionValues Hive-style partition column values extracted from the file path
3033
*/
31-
public VortexFilePartition(String path, ImmutableList<Column> columns, ImmutableMap<String, String> formatOptions) {
34+
public VortexFilePartition(
35+
String path,
36+
ImmutableList<Column> columns,
37+
ImmutableMap<String, String> formatOptions,
38+
ImmutableMap<String, String> partitionValues) {
3239
this.path = path;
3340
this.columns = columns;
3441
this.formatOptions = formatOptions;
42+
this.partitionValues = partitionValues;
3543
}
3644

3745
/**
@@ -55,4 +63,14 @@ public ImmutableList<Column> getColumns() {
5563
public Map<String, String> getFormatOptions() {
5664
return formatOptions;
5765
}
66+
67+
/**
68+
* Returns the partition column values parsed from this file's Hive-style directory path.
69+
* Keys are column names, values are the string-encoded partition values.
70+
*
71+
* @return the partition values, empty if the file is not in a partitioned directory
72+
*/
73+
public ImmutableMap<String, String> getPartitionValues() {
74+
return partitionValues;
75+
}
5876
}

java/vortex-spark/src/main/java/dev/vortex/spark/VortexTable.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.Map;
1212
import java.util.Set;
1313
import org.apache.spark.sql.connector.catalog.*;
14+
import org.apache.spark.sql.connector.expressions.Transform;
1415
import org.apache.spark.sql.connector.read.ScanBuilder;
1516
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
1617
import org.apache.spark.sql.connector.write.WriteBuilder;
@@ -26,14 +27,20 @@ public final class VortexTable implements Table, SupportsRead, SupportsWrite {
2627
private final ImmutableList<String> paths;
2728
private final StructType schema;
2829
private final Map<String, String> formatOptions;
30+
private final Transform[] partitionTransforms;
2931

3032
/**
3133
* Creates a new VortexTable with read/write support.
3234
*/
33-
public VortexTable(ImmutableList<String> paths, StructType schema, Map<String, String> formatOptions) {
35+
public VortexTable(
36+
ImmutableList<String> paths,
37+
StructType schema,
38+
Map<String, String> formatOptions,
39+
Transform[] partitionTransforms) {
3440
this.paths = paths;
3541
this.schema = schema;
3642
this.formatOptions = formatOptions;
43+
this.partitionTransforms = partitionTransforms;
3744
}
3845

3946
/**
@@ -93,7 +100,17 @@ public StructType schema() {
93100
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
94101
// Make sure only one write path was provided.
95102
String writePath = Iterables.getOnlyElement(paths);
96-
return new VortexWriteBuilder(writePath, info, formatOptions);
103+
return new VortexWriteBuilder(writePath, info, formatOptions, partitionTransforms);
104+
}
105+
106+
/**
107+
* Returns the partitioning transforms for this table.
108+
*
109+
* @return an array of partition transforms
110+
*/
111+
@Override
112+
public Transform[] partitioning() {
113+
return partitionTransforms;
97114
}
98115

99116
/**

0 commit comments

Comments
 (0)