Skip to content

Commit fe54208

Browse files
authored
feat(dataframe): add limit, distinct, dropColumns, withColumnRenamed (#30)
1 parent 9dacdc8 commit fe54208

3 files changed

Lines changed: 304 additions & 0 deletions

File tree

native/src/lib.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,87 @@ pub extern "system" fn Java_org_apache_datafusion_DataFrame_filterRows<'local>(
247247
})
248248
}
249249

250+
#[no_mangle]
251+
pub extern "system" fn Java_org_apache_datafusion_DataFrame_limitRows<'local>(
252+
mut env: JNIEnv<'local>,
253+
_class: JClass<'local>,
254+
handle: jlong,
255+
skip: jint,
256+
fetch: jint,
257+
) -> jlong {
258+
try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult<jlong> {
259+
if handle == 0 {
260+
return Err("DataFrame handle is null".into());
261+
}
262+
let df = unsafe { &*(handle as *const DataFrame) }.clone();
263+
let new_df = df.limit(skip as usize, Some(fetch as usize))?;
264+
Ok(Box::into_raw(Box::new(new_df)) as jlong)
265+
})
266+
}
267+
268+
#[no_mangle]
269+
pub extern "system" fn Java_org_apache_datafusion_DataFrame_distinctRows<'local>(
270+
mut env: JNIEnv<'local>,
271+
_class: JClass<'local>,
272+
handle: jlong,
273+
) -> jlong {
274+
try_unwrap_or_throw(&mut env, 0, |_env| -> JniResult<jlong> {
275+
if handle == 0 {
276+
return Err("DataFrame handle is null".into());
277+
}
278+
let df = unsafe { &*(handle as *const DataFrame) }.clone();
279+
let new_df = df.distinct()?;
280+
Ok(Box::into_raw(Box::new(new_df)) as jlong)
281+
})
282+
}
283+
284+
#[no_mangle]
285+
pub extern "system" fn Java_org_apache_datafusion_DataFrame_dropColumns<'local>(
286+
mut env: JNIEnv<'local>,
287+
_class: JClass<'local>,
288+
handle: jlong,
289+
column_names: JObjectArray<'local>,
290+
) -> jlong {
291+
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
292+
if handle == 0 {
293+
return Err("DataFrame handle is null".into());
294+
}
295+
let df = unsafe { &*(handle as *const DataFrame) }.clone();
296+
297+
let len = env.get_array_length(&column_names)?;
298+
let mut owned: Vec<String> = Vec::with_capacity(len as usize);
299+
for i in 0..len {
300+
let elem = env.get_object_array_element(&column_names, i)?;
301+
let jstr: JString = elem.into();
302+
owned.push(env.get_string(&jstr)?.into());
303+
}
304+
let refs: Vec<&str> = owned.iter().map(String::as_str).collect();
305+
306+
let new_df = df.drop_columns(&refs)?;
307+
Ok(Box::into_raw(Box::new(new_df)) as jlong)
308+
})
309+
}
310+
311+
#[no_mangle]
312+
pub extern "system" fn Java_org_apache_datafusion_DataFrame_renameColumn<'local>(
313+
mut env: JNIEnv<'local>,
314+
_class: JClass<'local>,
315+
handle: jlong,
316+
old_name: JString<'local>,
317+
new_name: JString<'local>,
318+
) -> jlong {
319+
try_unwrap_or_throw(&mut env, 0, |env| -> JniResult<jlong> {
320+
if handle == 0 {
321+
return Err("DataFrame handle is null".into());
322+
}
323+
let df = unsafe { &*(handle as *const DataFrame) }.clone();
324+
let old: String = env.get_string(&old_name)?.into();
325+
let new: String = env.get_string(&new_name)?.into();
326+
let new_df = df.with_column_renamed(&old, &new)?;
327+
Ok(Box::into_raw(Box::new(new_df)) as jlong)
328+
})
329+
}
330+
250331
#[no_mangle]
251332
pub extern "system" fn Java_org_apache_datafusion_DataFrame_writeParquetWithOptions<'local>(
252333
mut env: JNIEnv<'local>,

src/main/java/org/apache/datafusion/DataFrame.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,61 @@ public DataFrame filter(String predicate) {
116116
return new DataFrame(filterRows(nativeHandle, predicate));
117117
}
118118

119+
/**
120+
* Take the first {@code fetch} rows. Equivalent to {@link #limit(int, int)} with {@code skip =
121+
* 0}. The receiver remains usable and must still be closed independently.
122+
*/
123+
public DataFrame limit(int fetch) {
124+
return limit(0, fetch);
125+
}
126+
127+
/**
128+
* Skip {@code skip} rows, then take the next {@code fetch} rows. Both arguments must be
129+
* non-negative. The receiver remains usable and must still be closed independently.
130+
*/
131+
public DataFrame limit(int skip, int fetch) {
132+
if (skip < 0) {
133+
throw new IllegalArgumentException("skip must be non-negative, was " + skip);
134+
}
135+
if (fetch < 0) {
136+
throw new IllegalArgumentException("fetch must be non-negative, was " + fetch);
137+
}
138+
if (nativeHandle == 0) {
139+
throw new IllegalStateException("DataFrame is closed or already collected");
140+
}
141+
return new DataFrame(limitRows(nativeHandle, skip, fetch));
142+
}
143+
144+
/**
145+
* Deduplicate rows across all columns. The receiver remains usable and must still be closed
146+
* independently.
147+
*/
148+
public DataFrame distinct() {
149+
if (nativeHandle == 0) {
150+
throw new IllegalStateException("DataFrame is closed or already collected");
151+
}
152+
return new DataFrame(distinctRows(nativeHandle));
153+
}
154+
155+
/**
156+
* Drop the named columns. The inverse of {@link #select(String...)}. The receiver remains usable
157+
* and must still be closed independently.
158+
*/
159+
public DataFrame dropColumns(String... columnNames) {
160+
if (nativeHandle == 0) {
161+
throw new IllegalStateException("DataFrame is closed or already collected");
162+
}
163+
return new DataFrame(dropColumns(nativeHandle, columnNames));
164+
}
165+
166+
/** Rename a column. The receiver remains usable and must still be closed independently. */
167+
public DataFrame withColumnRenamed(String oldName, String newName) {
168+
if (nativeHandle == 0) {
169+
throw new IllegalStateException("DataFrame is closed or already collected");
170+
}
171+
return new DataFrame(renameColumn(nativeHandle, oldName, newName));
172+
}
173+
119174
/**
120175
* Materialize this DataFrame as Parquet at {@code path}. The path is treated as a directory
121176
* unless overridden via {@link ParquetWriteOptions#singleFileOutput(boolean)}. The receiver
@@ -168,6 +223,14 @@ public void close() {
168223

169224
private static native long filterRows(long handle, String predicate);
170225

226+
private static native long limitRows(long handle, int skip, int fetch);
227+
228+
private static native long distinctRows(long handle);
229+
230+
private static native long dropColumns(long handle, String[] columnNames);
231+
232+
private static native long renameColumn(long handle, String oldName, String newName);
233+
171234
private static native void writeParquetWithOptions(
172235
long handle,
173236
String path,

src/test/java/org/apache/datafusion/DataFrameTransformationsTest.java

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ void methodsThrowAfterClose() {
128128
df.close();
129129
assertThrows(IllegalStateException.class, () -> df.select("x"));
130130
assertThrows(IllegalStateException.class, () -> df.filter("x > 0"));
131+
assertThrows(IllegalStateException.class, () -> df.limit(1));
132+
assertThrows(IllegalStateException.class, () -> df.limit(0, 1));
133+
assertThrows(IllegalStateException.class, df::distinct);
134+
assertThrows(IllegalStateException.class, () -> df.dropColumns("x"));
135+
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
131136
assertThrows(IllegalStateException.class, df::count);
132137
assertThrows(IllegalStateException.class, df::show);
133138
assertThrows(IllegalStateException.class, () -> df.show(5));
@@ -144,6 +149,11 @@ void methodsThrowAfterCollect() throws Exception {
144149
}
145150
assertThrows(IllegalStateException.class, () -> df.select("x"));
146151
assertThrows(IllegalStateException.class, () -> df.filter("x > 0"));
152+
assertThrows(IllegalStateException.class, () -> df.limit(1));
153+
assertThrows(IllegalStateException.class, () -> df.limit(0, 1));
154+
assertThrows(IllegalStateException.class, df::distinct);
155+
assertThrows(IllegalStateException.class, () -> df.dropColumns("x"));
156+
assertThrows(IllegalStateException.class, () -> df.withColumnRenamed("x", "y"));
147157
assertThrows(IllegalStateException.class, df::count);
148158
assertThrows(IllegalStateException.class, df::show);
149159
assertThrows(IllegalStateException.class, () -> df.show(5));
@@ -193,4 +203,154 @@ void lineitemFilterCountAgainstSqlBaseline() throws Exception {
193203
assertEquals(viaSql, viaDataFrame);
194204
}
195205
}
206+
207+
@Test
208+
void limitTakesFirstNRows() {
209+
try (SessionContext ctx = new SessionContext();
210+
DataFrame source = ctx.sql("SELECT * FROM (VALUES (1), (2), (3), (4), (5)) AS t(x)");
211+
DataFrame limited = source.limit(2)) {
212+
assertEquals(2L, limited.count());
213+
}
214+
}
215+
216+
@Test
217+
void limitWithSkipDropsLeadingRows() {
218+
try (SessionContext ctx = new SessionContext();
219+
DataFrame source = ctx.sql("SELECT * FROM (VALUES (1), (2), (3), (4), (5)) AS t(x)");
220+
DataFrame limited = source.limit(2, 2)) {
221+
assertEquals(2L, limited.count());
222+
}
223+
}
224+
225+
@Test
226+
void limitIsNonDestructive() {
227+
try (SessionContext ctx = new SessionContext();
228+
DataFrame source = ctx.sql("SELECT * FROM (VALUES (1), (2), (3)) AS t(x)")) {
229+
try (DataFrame limited = source.limit(1)) {
230+
assertEquals(1L, limited.count());
231+
}
232+
assertEquals(3L, source.count());
233+
}
234+
}
235+
236+
@Test
237+
void limitRejectsNegativeArgs() {
238+
try (SessionContext ctx = new SessionContext();
239+
DataFrame df = ctx.sql("SELECT 1 AS x")) {
240+
assertThrows(IllegalArgumentException.class, () -> df.limit(-1));
241+
assertThrows(IllegalArgumentException.class, () -> df.limit(-1, 0));
242+
assertThrows(IllegalArgumentException.class, () -> df.limit(0, -1));
243+
}
244+
}
245+
246+
@Test
247+
void distinctRemovesDuplicates() {
248+
try (SessionContext ctx = new SessionContext();
249+
DataFrame source =
250+
ctx.sql("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)");
251+
DataFrame deduped = source.distinct()) {
252+
assertEquals(3L, deduped.count());
253+
}
254+
}
255+
256+
@Test
257+
void distinctIsNonDestructive() {
258+
try (SessionContext ctx = new SessionContext();
259+
DataFrame source = ctx.sql("SELECT * FROM (VALUES (1), (1), (2)) AS t(x)")) {
260+
try (DataFrame deduped = source.distinct()) {
261+
assertEquals(2L, deduped.count());
262+
}
263+
assertEquals(3L, source.count());
264+
}
265+
}
266+
267+
@Test
268+
void dropColumnsRemovesNamedColumns() throws Exception {
269+
try (BufferAllocator allocator = new RootAllocator();
270+
SessionContext ctx = new SessionContext();
271+
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b, 3 AS c");
272+
DataFrame dropped = source.dropColumns("b");
273+
ArrowReader reader = dropped.collect(allocator)) {
274+
assertTrue(reader.loadNextBatch());
275+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
276+
assertArrayEquals(
277+
new String[] {"a", "c"},
278+
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
279+
}
280+
}
281+
282+
@Test
283+
void dropColumnsIsNonDestructive() {
284+
try (SessionContext ctx = new SessionContext();
285+
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b")) {
286+
try (DataFrame dropped = source.dropColumns("a")) {
287+
assertEquals(1L, dropped.count());
288+
}
289+
assertEquals(1L, source.count());
290+
}
291+
}
292+
293+
@Test
294+
void dropColumnsSilentlyIgnoresUnknownNames() throws Exception {
295+
try (BufferAllocator allocator = new RootAllocator();
296+
SessionContext ctx = new SessionContext();
297+
DataFrame df = ctx.sql("SELECT 1 AS x");
298+
DataFrame dropped = df.dropColumns("not_a_column");
299+
ArrowReader reader = dropped.collect(allocator)) {
300+
assertTrue(reader.loadNextBatch());
301+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
302+
assertArrayEquals(
303+
new String[] {"x"},
304+
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
305+
}
306+
}
307+
308+
@Test
309+
void withColumnRenamedChangesColumnName() throws Exception {
310+
try (BufferAllocator allocator = new RootAllocator();
311+
SessionContext ctx = new SessionContext();
312+
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b");
313+
DataFrame renamed = source.withColumnRenamed("a", "alpha");
314+
ArrowReader reader = renamed.collect(allocator)) {
315+
assertTrue(reader.loadNextBatch());
316+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
317+
assertArrayEquals(
318+
new String[] {"alpha", "b"},
319+
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
320+
}
321+
}
322+
323+
@Test
324+
void withColumnRenamedIsNonDestructive() throws Exception {
325+
try (BufferAllocator allocator = new RootAllocator();
326+
SessionContext ctx = new SessionContext();
327+
DataFrame source = ctx.sql("SELECT 1 AS a, 2 AS b")) {
328+
try (DataFrame renamed = source.withColumnRenamed("a", "alpha")) {
329+
assertEquals(1L, renamed.count());
330+
}
331+
try (DataFrame again = source.select("a");
332+
ArrowReader reader = again.collect(allocator)) {
333+
assertTrue(reader.loadNextBatch());
334+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
335+
assertArrayEquals(
336+
new String[] {"a"},
337+
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
338+
}
339+
}
340+
}
341+
342+
@Test
343+
void withColumnRenamedUnknownColumnIsNoOp() throws Exception {
344+
try (BufferAllocator allocator = new RootAllocator();
345+
SessionContext ctx = new SessionContext();
346+
DataFrame df = ctx.sql("SELECT 1 AS x");
347+
DataFrame renamed = df.withColumnRenamed("not_a_column", "y");
348+
ArrowReader reader = renamed.collect(allocator)) {
349+
assertTrue(reader.loadNextBatch());
350+
VectorSchemaRoot root = reader.getVectorSchemaRoot();
351+
assertArrayEquals(
352+
new String[] {"x"},
353+
root.getSchema().getFields().stream().map(f -> f.getName()).toArray(String[]::new));
354+
}
355+
}
196356
}

0 commit comments

Comments
 (0)