[BEAM-13605] Modify groupby.apply implementation in preparation for pandas 1.4.0#16706
Conversation
| return df | ||
|
|
||
|
|
||
| do_apply = lambda gb: pd.concat([add_key_index(k, func(gb.get_group(k), *args, **kwargs)) for k in gb.groups.keys()]) |
There was a problem hiding this comment.
This is the critical change - when transform detection will break us, we override do_apply with a custom implementation that executes func over each group.
| requires_partition_by=partitionings.Arbitrary(), | ||
| preserves_partition_by=partitionings.Arbitrary() | ||
| ) | ||
| ) |
There was a problem hiding this comment.
It turns out the old implementation was relying on incorrect behavior in apply, so I've updated this not to use apply
| return self.groupby(by).apply( | ||
| lambda df: pd.DataFrame(df.duplicated(keep=keep, subset=subset), | ||
| columns=[None]))[None] | ||
| columns=[None]))[None].droplevel(by) |
|
I tested patching this on top of #16590, and verified all |
|
R: @yeandy do you think you could review this? |
Codecov Report
@@ Coverage Diff @@
## master #16706 +/- ##
===========================================
+ Coverage 46.43% 83.64% +37.20%
===========================================
Files 201 452 +251
Lines 19787 62161 +42374
===========================================
+ Hits 9189 51992 +42803
- Misses 9612 10169 +557
+ Partials 986 0 -986 Continue to review full report at Codecov.
|
yeandy
left a comment
There was a problem hiding this comment.
LGTM! Just added a few questions to clarify my understanding
|
|
||
| gb = project(gb) | ||
| return gb.apply(func, *args, **kwargs) | ||
|
|
There was a problem hiding this comment.
On this topic, what's Beam's guidance on flexibility with Python styling? I'm running formatting/linting on the commit hooks; they don't seem too strict or anything, and I don't want to focus on this too much. I suppose everyone will always impart some of his personalities to the code over time. 😄
There was a problem hiding this comment.
Deleted!
Generally, anything that passes the PythonLint PreCommit (which runs pylint and yapf checkers) is fine. That's not as opinionated as some checkers (e.g. black), so it does leave a decent amount of wiggle room and weird things can slip in like this whitespace change. It's reasonable to point out anything like this that looks odd to you.
There was a problem hiding this comment.
SG. I'm used to working in black, but don't really mind as long as we have a checker
| elif isinstance(result, pd.Series): | ||
| if isinstance(fn_input, pd.DataFrame): | ||
| # DataFrameGroupBy | ||
| dtype = pd.Series([result]).dtype | ||
| proxy = pd.DataFrame(columns=result.index, | ||
| dtype=result.dtype, | ||
| index=self._ungrouped.proxy().index) | ||
| elif isinstance(fn_input, pd.Series): | ||
| # SeriesGroupBy | ||
| proxy = pd.Series(dtype=result.dtype, | ||
| name=result.name, | ||
| index=index_to_arrays(self._ungrouped.proxy().index) + | ||
| index_to_arrays(result[:0].index)) |
There was a problem hiding this comment.
Can you help me better understand the logic under elif isinstance(result, pd.Series):, for both the if isinstance(fn_input, pd.DataFrame): and elif isinstance(fn_input, pd.Series): cases?
There was a problem hiding this comment.
I added some comments to explain both of these cases (and examples in the next comment). Does that help?
| if isinstance(fn_input, pd.DataFrame): | ||
| # DataFrameGroupBy | ||
| dtype = pd.Series([result]).dtype | ||
| proxy = pd.DataFrame(columns=result.index, | ||
| dtype=result.dtype, | ||
| index=self._ungrouped.proxy().index) |
There was a problem hiding this comment.
if the fn_input is a pd.DataFrame, we still want proxy to be of type pd.DataFrame even though the result is pd.Series?
There was a problem hiding this comment.
Yeah it's a bit surprising. It turns out in this case pandas transposes the Series - it's index values become the columns.
In [3]: df
Out[3]:
brand style rating
0 Yum Yum cup 4.0
1 Yum Yum cup 4.0
2 Indomie cup 3.5
3 Indomie pack 15.0
4 Indomie pack 5.0
In [5]: df.groupby('style').apply(lambda df: df.rating.describe())
Out[5]:
rating count mean std min 25% 50% 75% max
style
cup 3.0 3.833333 0.288675 3.5 3.75 4.0 4.0 4.0
pack 2.0 10.000000 7.071068 5.0 7.50 10.0 12.5 15.0
Compare this to the case where fn_input is a Series. In this case the output is still a Series:
In [6]: df.groupby('style').rating.apply(lambda s: s.describe())
Out[6]:
style
cup count 3.000000
mean 3.833333
std 0.288675
min 3.500000
25% 3.750000
50% 4.000000
75% 4.000000
max 4.000000
pack count 2.000000
mean 10.000000
std 7.071068
min 5.000000
25% 7.500000
50% 10.000000
75% 12.500000
max 15.000000
Name: rating, dtype: float64
I'll add a comment clarifying this
There was a problem hiding this comment.
Thanks for the example! Now it better paints the picture on how we construct the proxies. Figuring this out must require playing around with a lot of different DF/Series examples 😆
| requires_partition_by=partitionings.Arbitrary(), | ||
| preserves_partition_by=partitionings.Arbitrary() | ||
| ) | ||
| ) |
pandas 1.4.0 changes the logic used for transform detection, which breaks our implementation of
apply. Specifically, detecting transforms using index equality does not work for Beam DataFrames, since it requires observing the actual index values at pipeline construction time. In addition, the fact that pandas does it breaks some of our internal usages since some partitions can have equal indexes, and other partitions can have unequal indexes, leading to results that are different between partitions.This PR modifies our implementation of
applyso that it does not defer to pandas'applywhen it's transform detection will lead to inconsistent results. It also documents this new divergence from pandas (i.e. ourapplyimplementation will never detect a transform via index equality).ValidatesRunnercompliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.