Skip to content

Commit e05f217

Browse files
authored
Improve README and add examples (#137)
1 parent 575367c commit e05f217

10 files changed

Lines changed: 355 additions & 102 deletions

README.md

Lines changed: 40 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -24,139 +24,79 @@
2424

2525
This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).
2626

27-
Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.
27+
Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
28+
files, run it in a multi-threaded environment, and obtain the result back in Python.
2829

2930
It also allows you to use UDFs and UDAFs for complex operations.
3031

31-
The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.
32+
The major advantage of this library over other execution engines is that this library achieves zero-copy between
33+
Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
34+
from having to lock the GIL when running those operations.
3235

33-
Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks.
36+
Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
37+
about thread safety and lack of memory leaks.
3438

3539
Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
3640

37-
## How to use it
41+
## Example Usage
3842

39-
Simple usage:
43+
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results
44+
in a Pandas DataFrame, and then plotting a chart.
4045

41-
```python
42-
import datafusion
43-
from datafusion import col
44-
import pyarrow
45-
46-
# create a context
47-
ctx = datafusion.SessionContext()
48-
49-
# create a RecordBatch and a new DataFrame from it
50-
batch = pyarrow.RecordBatch.from_arrays(
51-
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
52-
names=["a", "b"],
53-
)
54-
df = ctx.create_dataframe([[batch]])
55-
56-
# create a new statement
57-
df = df.select(
58-
col("a") + col("b"),
59-
col("a") - col("b"),
60-
)
61-
62-
# execute and collect the first (and only) batch
63-
result = df.collect()[0]
64-
65-
assert result.column(0) == pyarrow.array([5, 7, 9])
66-
assert result.column(1) == pyarrow.array([-3, -3, -3])
67-
```
68-
69-
### UDFs
70-
71-
```python
72-
import pyarrow
73-
from datafusion import udf
74-
75-
def is_null(array: pyarrow.Array) -> pyarrow.Array:
76-
return array.is_null()
77-
78-
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
79-
80-
# create a context
81-
ctx = datafusion.SessionContext()
82-
83-
# create a RecordBatch and a new DataFrame from it
84-
batch = pyarrow.RecordBatch.from_arrays(
85-
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
86-
names=["a", "b"],
87-
)
88-
df = ctx.create_dataframe([[batch]])
89-
90-
df = df.select(is_null_arr(col("a")))
46+
The Parquet file used in this example can be downloaded from the following page:
9147

92-
result = df.collect()[0]
48+
- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
9349

94-
assert result.column(0) == pyarrow.array([False] * 3)
95-
```
96-
97-
### UDAF
50+
See the [examples](examples) directory for more examples.
9851

9952
```python
100-
import pyarrow
101-
import pyarrow.compute
102-
import datafusion
103-
from datafusion import udaf, Accumulator
104-
from datafusion import col
105-
106-
107-
class MyAccumulator(Accumulator):
108-
"""
109-
Interface of a user-defined accumulation.
110-
"""
111-
def __init__(self):
112-
self._sum = pyarrow.scalar(0.0)
113-
114-
def update(self, values: pyarrow.Array) -> None:
115-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
116-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
53+
from datafusion import SessionContext
54+
import pandas as pd
55+
import pyarrow as pa
11756

118-
def merge(self, states: pyarrow.Array) -> None:
119-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
120-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())
57+
# Create a DataFusion context
58+
ctx = SessionContext()
12159

122-
def state(self) -> pyarrow.Array:
123-
return pyarrow.array([self._sum.as_py()])
60+
# Register table with context
61+
ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet')
12462

125-
def evaluate(self) -> pyarrow.Scalar:
126-
return self._sum
63+
# Execute SQL
64+
df = ctx.sql("select passenger_count, count(*) "
65+
"from taxi "
66+
"where passenger_count is not null "
67+
"group by passenger_count "
68+
"order by passenger_count")
12769

128-
# create a context
129-
ctx = datafusion.SessionContext()
70+
# collect as list of pyarrow.RecordBatch
71+
results = df.collect()
13072

131-
# create a RecordBatch and a new DataFrame from it
132-
batch = pyarrow.RecordBatch.from_arrays(
133-
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
134-
names=["a", "b"],
135-
)
136-
df = ctx.create_dataframe([[batch]])
73+
# get first batch
74+
batch = results[0]
13775

138-
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
76+
# convert to Pandas
77+
df = batch.to_pandas()
13978

140-
df = df.aggregate(
141-
[],
142-
[my_udaf(col("a"))]
143-
)
79+
# create a chart
80+
fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
81+
fig.savefig('chart.png')
82+
```
14483

145-
result = df.collect()[0]
84+
This produces the following chart:
14685

147-
assert result.column(0) == pyarrow.array([6.0])
148-
```
86+
![Chart](examples/chart.png)
14987

15088
## How to install (from pip)
15189

15290
### Pip
91+
15392
```bash
15493
pip install datafusion
15594
# or
15695
python -m pip install datafusion
15796
```
15897

15998
### Conda
99+
160100
```bash
161101
conda install -c conda-forge datafusion
162102
```
@@ -169,7 +109,6 @@ You can verify the installation by running:
169109
'0.6.0'
170110
```
171111

172-
173112
## How to develop
174113

175114
This assumes that you have rust and cargo installed. We use the workflow recommended by [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin).

examples/README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<!---
2+
Licensed to the Apache Software Foundation (ASF) under one
3+
or more contributor license agreements. See the NOTICE file
4+
distributed with this work for additional information
5+
regarding copyright ownership. The ASF licenses this file
6+
to you under the Apache License, Version 2.0 (the
7+
"License"); you may not use this file except in compliance
8+
with the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing,
13+
software distributed under the License is distributed on an
14+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
KIND, either express or implied. See the License for the
16+
specific language governing permissions and limitations
17+
under the License.
18+
-->
19+
20+
# DataFusion Python Examples
21+
22+
- [Query a Parquet file using SQL](./sql-parquet.py)
23+
- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
24+
- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
25+
- [Query PyArrow Data](./query-pyarrow-data.py)
26+
- [Register a Python UDF with DataFusion](./python-udf.py)
27+
- [Register a Python UDAF with DataFusion](./python-udaf.py)

examples/chart.png

18.6 KB
Loading

examples/dataframe-parquet.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from datafusion import SessionContext
19+
from datafusion import functions as f
20+
21+
ctx = SessionContext()
22+
df = ctx.read_parquet(
23+
"/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
24+
).aggregate([f.col("passenger_count")], [f.count_star()])
25+
df.show()

examples/python-udaf.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import pyarrow
19+
import pyarrow.compute
20+
import datafusion
21+
from datafusion import udaf, Accumulator
22+
from datafusion import col
23+
24+
25+
class MyAccumulator(Accumulator):
26+
"""
27+
Interface of a user-defined accumulation.
28+
"""
29+
30+
def __init__(self):
31+
self._sum = pyarrow.scalar(0.0)
32+
33+
def update(self, values: pyarrow.Array) -> None:
34+
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
35+
self._sum = pyarrow.scalar(
36+
self._sum.as_py() + pyarrow.compute.sum(values).as_py()
37+
)
38+
39+
def merge(self, states: pyarrow.Array) -> None:
40+
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
41+
self._sum = pyarrow.scalar(
42+
self._sum.as_py() + pyarrow.compute.sum(states).as_py()
43+
)
44+
45+
def state(self) -> pyarrow.Array:
46+
return pyarrow.array([self._sum.as_py()])
47+
48+
def evaluate(self) -> pyarrow.Scalar:
49+
return self._sum
50+
51+
52+
# create a context
53+
ctx = datafusion.SessionContext()
54+
55+
# create a RecordBatch and a new DataFrame from it
56+
batch = pyarrow.RecordBatch.from_arrays(
57+
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
58+
names=["a", "b"],
59+
)
60+
df = ctx.create_dataframe([[batch]])
61+
62+
my_udaf = udaf(
63+
MyAccumulator,
64+
pyarrow.float64(),
65+
pyarrow.float64(),
66+
[pyarrow.float64()],
67+
"stable",
68+
)
69+
70+
df = df.aggregate([], [my_udaf(col("a"))])
71+
72+
result = df.collect()[0]
73+
74+
assert result.column(0) == pyarrow.array([6.0])

examples/python-udf.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import pyarrow
19+
from datafusion import udf, SessionContext, functions as f
20+
21+
22+
def is_null(array: pyarrow.Array) -> pyarrow.Array:
23+
return array.is_null()
24+
25+
26+
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), "stable")
27+
28+
# create a context
29+
ctx = SessionContext()
30+
31+
# create a RecordBatch and a new DataFrame from it
32+
batch = pyarrow.RecordBatch.from_arrays(
33+
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
34+
names=["a", "b"],
35+
)
36+
df = ctx.create_dataframe([[batch]])
37+
38+
df = df.select(is_null_arr(f.col("a")))
39+
40+
result = df.collect()[0]
41+
42+
assert result.column(0) == pyarrow.array([False] * 3)

0 commit comments

Comments
 (0)