|
| 1 | +<!--- |
| 2 | + Licensed to the Apache Software Foundation (ASF) under one |
| 3 | + or more contributor license agreements. See the NOTICE file |
| 4 | + distributed with this work for additional information |
| 5 | + regarding copyright ownership. The ASF licenses this file |
| 6 | + to you under the Apache License, Version 2.0 (the |
| 7 | + "License"); you may not use this file except in compliance |
| 8 | + with the License. You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | + Unless required by applicable law or agreed to in writing, |
| 13 | + software distributed under the License is distributed on an |
| 14 | + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | + KIND, either express or implied. See the License for the |
| 16 | + specific language governing permissions and limitations |
| 17 | + under the License. |
| 18 | +--> |
| 19 | + |
| 20 | +## DataFusion in Python |
| 21 | + |
| 22 | +This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow/tree/master/rust/datafusion). |
| 23 | + |
| 24 | +Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python. |
| 25 | + |
| 26 | +It also allows you to use UDFs and UDAFs for complex operations. |
| 27 | + |
| 28 | +The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations. |
| 29 | + |
| 30 | +Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions about thread safety and lack of memory leaks. |
| 31 | + |
| 32 | +Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html). |
| 33 | + |
| 34 | +## How to use it |
| 35 | + |
| 36 | +Simple usage: |
| 37 | + |
| 38 | +```python |
| 39 | +import datafusion |
| 40 | +import pyarrow |
| 41 | + |
| 42 | +# an alias |
| 43 | +f = datafusion.functions |
| 44 | + |
| 45 | +# create a context |
| 46 | +ctx = datafusion.ExecutionContext() |
| 47 | + |
| 48 | +# create a RecordBatch and a new DataFrame from it |
| 49 | +batch = pyarrow.RecordBatch.from_arrays( |
| 50 | + [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], |
| 51 | + names=["a", "b"], |
| 52 | +) |
| 53 | +df = ctx.create_dataframe([[batch]]) |
| 54 | + |
| 55 | +# create a new statement |
| 56 | +df = df.select( |
| 57 | + f.col("a") + f.col("b"), |
| 58 | + f.col("a") - f.col("b"), |
| 59 | +) |
| 60 | + |
| 61 | +# execute and collect the first (and only) batch |
| 62 | +result = df.collect()[0] |
| 63 | + |
| 64 | +assert result.column(0) == pyarrow.array([5, 7, 9]) |
| 65 | +assert result.column(1) == pyarrow.array([-3, -3, -3]) |
| 66 | +``` |
| 67 | + |
| 68 | +### UDFs |
| 69 | + |
| 70 | +```python |
| 71 | +def is_null(array: pyarrow.Array) -> pyarrow.Array: |
| 72 | + return array.is_null() |
| 73 | + |
| 74 | +udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_()) |
| 75 | + |
| 76 | +df = df.select(udf(f.col("a"))) |
| 77 | +``` |
| 78 | + |
| 79 | +### UDAF |
| 80 | + |
| 81 | +```python |
| 82 | +import pyarrow |
| 83 | +import pyarrow.compute |
| 84 | + |
| 85 | + |
| 86 | +class Accumulator: |
| 87 | + """ |
| 88 | + Interface of a user-defined accumulation. |
| 89 | + """ |
| 90 | + def __init__(self): |
| 91 | + self._sum = pyarrow.scalar(0.0) |
| 92 | + |
| 93 | + def to_scalars(self) -> [pyarrow.Scalar]: |
| 94 | + return [self._sum] |
| 95 | + |
| 96 | + def update(self, values: pyarrow.Array) -> None: |
| 97 | + # not nice since pyarrow scalars can't be summed yet. This breaks on `None` |
| 98 | + self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py()) |
| 99 | + |
| 100 | + def merge(self, states: pyarrow.Array) -> None: |
| 101 | + # not nice since pyarrow scalars can't be summed yet. This breaks on `None` |
| 102 | + self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py()) |
| 103 | + |
| 104 | + def evaluate(self) -> pyarrow.Scalar: |
| 105 | + return self._sum |
| 106 | + |
| 107 | + |
| 108 | +df = ... |
| 109 | + |
| 110 | +udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()]) |
| 111 | + |
| 112 | +df = df.aggregate( |
| 113 | + [], |
| 114 | + [udaf(f.col("a"))] |
| 115 | +) |
| 116 | +``` |
| 117 | + |
| 118 | +## How to install |
| 119 | + |
| 120 | +```bash |
| 121 | +pip install datafusion |
| 122 | +``` |
| 123 | + |
| 124 | +## How to develop |
| 125 | + |
| 126 | +This assumes that you have rust and cargo installed. We use the workflow recommended by [pyo3](https://github.com/PyO3/pyo3) and [maturin](https://github.com/PyO3/maturin). |
| 127 | + |
| 128 | +Bootstrap: |
| 129 | + |
| 130 | +```bash |
| 131 | +# fetch this repo |
| 132 | +git clone git@github.com:apache/arrow-datafusion.git |
| 133 | + |
| 134 | +cd arrow-datafusion/python |
| 135 | + |
| 136 | +# prepare development environment (used to build wheel / install in development) |
| 137 | +python3 -m venv venv |
| 138 | +pip install maturin==0.10.4 toml==0.10.1 pyarrow==1.0.0 |
| 139 | +``` |
| 140 | + |
| 141 | +Whenever rust code changes (your changes or via git pull): |
| 142 | + |
| 143 | +```bash |
| 144 | +venv/bin/maturin develop |
| 145 | +venv/bin/python -m unittest discover tests |
| 146 | +``` |
0 commit comments