Skip to content

Commit 5fd30d3

Browse files
authored
GH-49376: [Python][Parquet] Add ability to write Bloom filters from pyarrow (#49377)
Fixes #49376 ### Rationale for this change Adds ability to enable the writing of Parquet Bloom filters via pyarrow. ### What changes are included in this PR? Adds `bloom_filter_options` to `parquet.write_table`. ### Are these changes tested? Yes, new tests are added. ### Are there any user-facing changes? Adds an option (defaults to `None`) to `parquet.write_table`. * GitHub Issue: #49376 Authored-by: seidl <seidl2@llnl.gov> Signed-off-by: mwish <maplewish117@gmail.com>
1 parent 8eb2ca1 commit 5fd30d3

5 files changed

Lines changed: 168 additions & 4 deletions

File tree

python/pyarrow/_parquet.pxd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
5656
write_page_checksum=*,
5757
sorting_columns=*,
5858
store_decimal_as_integer=*,
59-
use_content_defined_chunking=*
59+
use_content_defined_chunking=*,
60+
bloom_filter_options=*
6061
) except *
6162

6263

python/pyarrow/_parquet.pyx

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1973,6 +1973,60 @@ cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *:
19731973

19741974
return c_sorting_columns
19751975

1976+
cdef void _set_bloom_opts_for_column(
1977+
WriterProperties.Builder* props,
1978+
column,
1979+
column_bloom_opts) except *:
1980+
"""Set Bloom filter options for a single column"""
1981+
cdef:
1982+
BloomFilterOptions bloom_opts
1983+
1984+
if isinstance(column_bloom_opts, dict):
1985+
if "ndv" in column_bloom_opts:
1986+
ndv = column_bloom_opts["ndv"]
1987+
if isinstance(ndv, int):
1988+
if ndv <= 0:
1989+
raise ValueError(
1990+
f"'bloom_filter_options:ndv' for column '{column}' must be greater than zero, got {ndv}")
1991+
bloom_opts.ndv = ndv
1992+
else:
1993+
raise TypeError(
1994+
f"'bloom_filter_options:ndv' for column '{column}' must be an int")
1995+
if "fpp" in column_bloom_opts:
1996+
fpp = column_bloom_opts["fpp"]
1997+
if isinstance(fpp, float):
1998+
if fpp <= 0.0 or fpp >= 1.0:
1999+
raise ValueError(
2000+
f"'bloom_filter_options:fpp' for column '{column}' must be in (0.0, 1.0), got {fpp}")
2001+
bloom_opts.fpp = fpp
2002+
else:
2003+
raise TypeError(
2004+
f"'bloom_filter_options:fpp' for column '{column}' must be a float")
2005+
elif isinstance(column_bloom_opts, bool):
2006+
# if True then use the defaults set above, if False then disable
2007+
if not column_bloom_opts:
2008+
props.disable_bloom_filter(tobytes(column))
2009+
return
2010+
else:
2011+
raise TypeError(
2012+
f"'bloom_filter_options:{column}' must be a boolean or a dictionary")
2013+
2014+
props.enable_bloom_filter(tobytes(column), bloom_opts)
2015+
2016+
2017+
cdef void _set_bloom_filter_opts(
2018+
WriterProperties.Builder* props,
2019+
bloom_filter_options) except *:
2020+
"""Set Bloom filter options for all columns"""
2021+
if bloom_filter_options is not None:
2022+
if isinstance(bloom_filter_options, dict):
2023+
# for each entry in bloom_filter_options, {"path": {"ndv": ndv, "fpp", fpp}}
2024+
# convert (ndv,fpp) to BloomFilterOptions struct and pass to props
2025+
for column, _bloom_opts in bloom_filter_options.items():
2026+
_set_bloom_opts_for_column(props, column, _bloom_opts)
2027+
else:
2028+
raise TypeError("'bloom_filter_options' must be a dictionary")
2029+
19762030

19772031
cdef shared_ptr[WriterProperties] _create_writer_properties(
19782032
use_dictionary=None,
@@ -1992,7 +2046,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
19922046
write_page_checksum=False,
19932047
sorting_columns=None,
19942048
store_decimal_as_integer=False,
1995-
use_content_defined_chunking=False) except *:
2049+
use_content_defined_chunking=False,
2050+
bloom_filter_options=None) except *:
19962051

19972052
"""General writer properties"""
19982053
cdef:
@@ -2122,6 +2177,9 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
21222177
raise TypeError(
21232178
"'column_encoding' should be a dictionary or a string")
21242179

2180+
# bloom filters
2181+
_set_bloom_filter_opts(&props, bloom_filter_options)
2182+
21252183
# size limits
21262184
if data_page_size is not None:
21272185
props.data_pagesize(data_page_size)
@@ -2317,7 +2375,8 @@ cdef class ParquetWriter(_Weakrefable):
23172375
sorting_columns=None,
23182376
store_decimal_as_integer=False,
23192377
use_content_defined_chunking=False,
2320-
write_time_adjusted_to_utc=False):
2378+
write_time_adjusted_to_utc=False,
2379+
bloom_filter_options=None):
23212380
cdef:
23222381
shared_ptr[WriterProperties] properties
23232382
shared_ptr[ArrowWriterProperties] arrow_properties
@@ -2353,7 +2412,8 @@ cdef class ParquetWriter(_Weakrefable):
23532412
write_page_checksum=write_page_checksum,
23542413
sorting_columns=sorting_columns,
23552414
store_decimal_as_integer=store_decimal_as_integer,
2356-
use_content_defined_chunking=use_content_defined_chunking
2415+
use_content_defined_chunking=use_content_defined_chunking,
2416+
bloom_filter_options=bloom_filter_options
23572417
)
23582418
arrow_properties = _create_arrow_writer_properties(
23592419
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,

python/pyarrow/includes/libparquet.pxd

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,10 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
464464

465465

466466
cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
467+
cdef cppclass BloomFilterOptions:
468+
int32_t ndv
469+
double fpp
470+
467471
cdef cppclass CdcOptions:
468472
int64_t min_chunk_size
469473
int64_t max_chunk_size
@@ -506,6 +510,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
506510
Builder* enable_content_defined_chunking()
507511
Builder* disable_content_defined_chunking()
508512
Builder* content_defined_chunking_options(CdcOptions options)
513+
Builder* disable_bloom_filter(const c_string& path)
514+
Builder* enable_bloom_filter(const c_string& path,
515+
BloomFilterOptions bloom_filter_options)
509516
shared_ptr[WriterProperties] build()
510517

511518
cdef cppclass ArrowWriterProperties:

python/pyarrow/parquet/core.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,32 @@ def _sanitize_table(table, new_schema, flavor):
956956
are expressed in reference to midnight in the UTC timezone.
957957
If False (the default), the TIME columns are assumed to be expressed
958958
in reference to midnight in an unknown, presumably local, timezone.
959+
bloom_filter_options : dict, default None
960+
Create Bloom filters for the columns specified by the provided `dict`.
961+
962+
Bloom filters can be configured with two parameters: number of distinct values
963+
(NDV), and false-positive probability (FPP).
964+
965+
Bloom filters are most effective for high-cardinality columns. A good default
966+
is to set NDV equal to the number of rows. Lower values reduce disk usage but
967+
may not be worthwhile for very small NDVs. Increasing NDV (without increasing FPP)
968+
increases disk and memory usage.
969+
970+
Lower FPP values require more disk and memory space. For a fixed NDV, the
971+
space requirement grows roughly proportional to log(1/FPP). Recommended
972+
values are 0.1, 0.05, or 0.01. Very small values are counterproductive as
973+
the bitset may exceed the size of the actual data. Set NDV appropriately
974+
to minimize space usage.
975+
976+
The keys of the `dict` are column paths. For each path, the value can be either:
977+
978+
- A dictionary, with keys `ndv` and `fpp`. The value for `ndv` must be a positive
979+
integer. If the 'ndv' key is not present, the default value of `1048576` will be
980+
used. The value for `fpp` must be a float between 0.0 and 1.0. If the `fpp` key
981+
is not present, the default value of `0.05` will be used.
982+
- A boolean, with ``True`` indicating that a Bloom filter should be produced with
983+
the above mentioned default values of `ndv=1048576` and `fpp=0.05`. This is
984+
equivalent to passing an empty dict.
959985
"""
960986

961987
_parquet_writer_example_doc = """\
@@ -1985,6 +2011,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
19852011
store_decimal_as_integer=False,
19862012
write_time_adjusted_to_utc=False,
19872013
max_rows_per_page=None,
2014+
bloom_filter_options=None,
19882015
**kwargs):
19892016
# Implementor's note: when adding keywords here / updating defaults, also
19902017
# update it in write_to_dataset and _dataset_parquet.pyx ParquetFileWriteOptions
@@ -2018,6 +2045,7 @@ def write_table(table, where, row_group_size=None, version='2.6',
20182045
store_decimal_as_integer=store_decimal_as_integer,
20192046
write_time_adjusted_to_utc=write_time_adjusted_to_utc,
20202047
max_rows_per_page=max_rows_per_page,
2048+
bloom_filter_options=bloom_filter_options,
20212049
**kwargs) as writer:
20222050
writer.write_table(table, row_group_size=row_group_size)
20232051
except Exception:

python/pyarrow/tests/parquet/test_basic.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import sys
2020
from collections import OrderedDict
2121
import io
22+
import re
2223
import warnings
2324
from shutil import copytree
2425
from decimal import Decimal
@@ -620,6 +621,73 @@ def test_lz4_raw_compression_alias():
620621
_check_roundtrip(table, expected=table, compression="LZ4_RAW")
621622

622623

624+
def test_bloom_filter_options():
625+
arr_int = pa.array(list(map(int, range(100))))
626+
arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary())
627+
data = [arr_int, arr_bin]
628+
table = pa.Table.from_arrays(data, names=['a', 'b'])
629+
630+
# bloom filter for one column
631+
_check_roundtrip(table, expected=table, bloom_filter_options={
632+
'a': {'ndv': 100, 'fpp': 0.05}})
633+
634+
# bloom filter for two columns
635+
_check_roundtrip(table, expected=table, bloom_filter_options={
636+
'a': {'ndv': 100, 'fpp': 0.05}, 'b': {'ndv': 10, 'fpp': 0.1}})
637+
638+
# bloom filter for one column with default ndv
639+
_check_roundtrip(table, expected=table, bloom_filter_options={
640+
'a': {'fpp': 0.05}})
641+
642+
# bloom filter for one column with default fpp
643+
_check_roundtrip(table, expected=table, bloom_filter_options={
644+
'a': {'ndv': 100}})
645+
646+
# bloom filter for one column with default ndv and fpp
647+
_check_roundtrip(table, expected=table, bloom_filter_options={
648+
'a': {}})
649+
_check_roundtrip(table, expected=table, bloom_filter_options={
650+
'a': True})
651+
652+
# should remain disabled
653+
_check_roundtrip(table, expected=table, bloom_filter_options={
654+
'a': False})
655+
656+
# wrong type for ndv
657+
buf = io.BytesIO()
658+
expect = "'bloom_filter_options:ndv' for column 'a' must be an int"
659+
with pytest.raises(TypeError, match=expect):
660+
_write_table(table, buf, bloom_filter_options={
661+
'a': {'ndv': '100', 'fpp': 0.05}})
662+
663+
# wrong type for fpp
664+
expect = "'bloom_filter_options:fpp' for column 'a' must be a float"
665+
with pytest.raises(TypeError, match=expect):
666+
_write_table(table, buf, bloom_filter_options={
667+
'a': {'ndv': 100, 'fpp': '0.05'}})
668+
669+
# wrong type for options
670+
with pytest.raises(TypeError, match="'bloom_filter_options' must be a dictionary"):
671+
_write_table(table, buf, bloom_filter_options=True)
672+
673+
# invalid ndv value
674+
expect = \
675+
"'bloom_filter_options:ndv' for column 'a' must be greater than zero, got -10"
676+
with pytest.raises(ValueError, match=expect):
677+
_write_table(table, buf, bloom_filter_options={
678+
'a': {'ndv': -10}})
679+
680+
# invalid fpp values
681+
expect = "'bloom_filter_options:fpp' for column 'a' must be in (0.0, 1.0), got 2.0"
682+
with pytest.raises(ValueError, match=re.escape(expect)):
683+
_write_table(table, buf, bloom_filter_options={
684+
'a': {'fpp': 2.0}})
685+
expect = "'bloom_filter_options:fpp' for column 'a' must be in (0.0, 1.0), got -0.5"
686+
with pytest.raises(ValueError, match=re.escape(expect)):
687+
_write_table(table, buf, bloom_filter_options={
688+
'a': {'fpp': -0.5}})
689+
690+
623691
def test_sanitized_spark_field_names():
624692
a0 = pa.array([0, 1, 2, 3, 4])
625693
name = 'prohib; ,\t{}'

0 commit comments

Comments
 (0)