@@ -128,6 +128,11 @@ void methodsThrowAfterClose() {
128128 df .close ();
129129 assertThrows (IllegalStateException .class , () -> df .select ("x" ));
130130 assertThrows (IllegalStateException .class , () -> df .filter ("x > 0" ));
131+ assertThrows (IllegalStateException .class , () -> df .limit (1 ));
132+ assertThrows (IllegalStateException .class , () -> df .limit (0 , 1 ));
133+ assertThrows (IllegalStateException .class , df ::distinct );
134+ assertThrows (IllegalStateException .class , () -> df .dropColumns ("x" ));
135+ assertThrows (IllegalStateException .class , () -> df .withColumnRenamed ("x" , "y" ));
131136 assertThrows (IllegalStateException .class , df ::count );
132137 assertThrows (IllegalStateException .class , df ::show );
133138 assertThrows (IllegalStateException .class , () -> df .show (5 ));
@@ -144,6 +149,11 @@ void methodsThrowAfterCollect() throws Exception {
144149 }
145150 assertThrows (IllegalStateException .class , () -> df .select ("x" ));
146151 assertThrows (IllegalStateException .class , () -> df .filter ("x > 0" ));
152+ assertThrows (IllegalStateException .class , () -> df .limit (1 ));
153+ assertThrows (IllegalStateException .class , () -> df .limit (0 , 1 ));
154+ assertThrows (IllegalStateException .class , df ::distinct );
155+ assertThrows (IllegalStateException .class , () -> df .dropColumns ("x" ));
156+ assertThrows (IllegalStateException .class , () -> df .withColumnRenamed ("x" , "y" ));
147157 assertThrows (IllegalStateException .class , df ::count );
148158 assertThrows (IllegalStateException .class , df ::show );
149159 assertThrows (IllegalStateException .class , () -> df .show (5 ));
@@ -193,4 +203,154 @@ void lineitemFilterCountAgainstSqlBaseline() throws Exception {
193203 assertEquals (viaSql , viaDataFrame );
194204 }
195205 }
206+
207+ @ Test
208+ void limitTakesFirstNRows () {
209+ try (SessionContext ctx = new SessionContext ();
210+ DataFrame source = ctx .sql ("SELECT * FROM (VALUES (1), (2), (3), (4), (5)) AS t(x)" );
211+ DataFrame limited = source .limit (2 )) {
212+ assertEquals (2L , limited .count ());
213+ }
214+ }
215+
216+ @ Test
217+ void limitWithSkipDropsLeadingRows () {
218+ try (SessionContext ctx = new SessionContext ();
219+ DataFrame source = ctx .sql ("SELECT * FROM (VALUES (1), (2), (3), (4), (5)) AS t(x)" );
220+ DataFrame limited = source .limit (2 , 2 )) {
221+ assertEquals (2L , limited .count ());
222+ }
223+ }
224+
225+ @ Test
226+ void limitIsNonDestructive () {
227+ try (SessionContext ctx = new SessionContext ();
228+ DataFrame source = ctx .sql ("SELECT * FROM (VALUES (1), (2), (3)) AS t(x)" )) {
229+ try (DataFrame limited = source .limit (1 )) {
230+ assertEquals (1L , limited .count ());
231+ }
232+ assertEquals (3L , source .count ());
233+ }
234+ }
235+
236+ @ Test
237+ void limitRejectsNegativeArgs () {
238+ try (SessionContext ctx = new SessionContext ();
239+ DataFrame df = ctx .sql ("SELECT 1 AS x" )) {
240+ assertThrows (IllegalArgumentException .class , () -> df .limit (-1 ));
241+ assertThrows (IllegalArgumentException .class , () -> df .limit (-1 , 0 ));
242+ assertThrows (IllegalArgumentException .class , () -> df .limit (0 , -1 ));
243+ }
244+ }
245+
246+ @ Test
247+ void distinctRemovesDuplicates () {
248+ try (SessionContext ctx = new SessionContext ();
249+ DataFrame source =
250+ ctx .sql ("SELECT * FROM (VALUES (1), (1), (2), (2), (3)) AS t(x)" );
251+ DataFrame deduped = source .distinct ()) {
252+ assertEquals (3L , deduped .count ());
253+ }
254+ }
255+
256+ @ Test
257+ void distinctIsNonDestructive () {
258+ try (SessionContext ctx = new SessionContext ();
259+ DataFrame source = ctx .sql ("SELECT * FROM (VALUES (1), (1), (2)) AS t(x)" )) {
260+ try (DataFrame deduped = source .distinct ()) {
261+ assertEquals (2L , deduped .count ());
262+ }
263+ assertEquals (3L , source .count ());
264+ }
265+ }
266+
267+ @ Test
268+ void dropColumnsRemovesNamedColumns () throws Exception {
269+ try (BufferAllocator allocator = new RootAllocator ();
270+ SessionContext ctx = new SessionContext ();
271+ DataFrame source = ctx .sql ("SELECT 1 AS a, 2 AS b, 3 AS c" );
272+ DataFrame dropped = source .dropColumns ("b" );
273+ ArrowReader reader = dropped .collect (allocator )) {
274+ assertTrue (reader .loadNextBatch ());
275+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
276+ assertArrayEquals (
277+ new String [] {"a" , "c" },
278+ root .getSchema ().getFields ().stream ().map (f -> f .getName ()).toArray (String []::new ));
279+ }
280+ }
281+
282+ @ Test
283+ void dropColumnsIsNonDestructive () {
284+ try (SessionContext ctx = new SessionContext ();
285+ DataFrame source = ctx .sql ("SELECT 1 AS a, 2 AS b" )) {
286+ try (DataFrame dropped = source .dropColumns ("a" )) {
287+ assertEquals (1L , dropped .count ());
288+ }
289+ assertEquals (1L , source .count ());
290+ }
291+ }
292+
293+ @ Test
294+ void dropColumnsSilentlyIgnoresUnknownNames () throws Exception {
295+ try (BufferAllocator allocator = new RootAllocator ();
296+ SessionContext ctx = new SessionContext ();
297+ DataFrame df = ctx .sql ("SELECT 1 AS x" );
298+ DataFrame dropped = df .dropColumns ("not_a_column" );
299+ ArrowReader reader = dropped .collect (allocator )) {
300+ assertTrue (reader .loadNextBatch ());
301+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
302+ assertArrayEquals (
303+ new String [] {"x" },
304+ root .getSchema ().getFields ().stream ().map (f -> f .getName ()).toArray (String []::new ));
305+ }
306+ }
307+
308+ @ Test
309+ void withColumnRenamedChangesColumnName () throws Exception {
310+ try (BufferAllocator allocator = new RootAllocator ();
311+ SessionContext ctx = new SessionContext ();
312+ DataFrame source = ctx .sql ("SELECT 1 AS a, 2 AS b" );
313+ DataFrame renamed = source .withColumnRenamed ("a" , "alpha" );
314+ ArrowReader reader = renamed .collect (allocator )) {
315+ assertTrue (reader .loadNextBatch ());
316+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
317+ assertArrayEquals (
318+ new String [] {"alpha" , "b" },
319+ root .getSchema ().getFields ().stream ().map (f -> f .getName ()).toArray (String []::new ));
320+ }
321+ }
322+
323+ @ Test
324+ void withColumnRenamedIsNonDestructive () throws Exception {
325+ try (BufferAllocator allocator = new RootAllocator ();
326+ SessionContext ctx = new SessionContext ();
327+ DataFrame source = ctx .sql ("SELECT 1 AS a, 2 AS b" )) {
328+ try (DataFrame renamed = source .withColumnRenamed ("a" , "alpha" )) {
329+ assertEquals (1L , renamed .count ());
330+ }
331+ try (DataFrame again = source .select ("a" );
332+ ArrowReader reader = again .collect (allocator )) {
333+ assertTrue (reader .loadNextBatch ());
334+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
335+ assertArrayEquals (
336+ new String [] {"a" },
337+ root .getSchema ().getFields ().stream ().map (f -> f .getName ()).toArray (String []::new ));
338+ }
339+ }
340+ }
341+
342+ @ Test
343+ void withColumnRenamedUnknownColumnIsNoOp () throws Exception {
344+ try (BufferAllocator allocator = new RootAllocator ();
345+ SessionContext ctx = new SessionContext ();
346+ DataFrame df = ctx .sql ("SELECT 1 AS x" );
347+ DataFrame renamed = df .withColumnRenamed ("not_a_column" , "y" );
348+ ArrowReader reader = renamed .collect (allocator )) {
349+ assertTrue (reader .loadNextBatch ());
350+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
351+ assertArrayEquals (
352+ new String [] {"x" },
353+ root .getSchema ().getFields ().stream ().map (f -> f .getName ()).toArray (String []::new ));
354+ }
355+ }
196356}
0 commit comments