forked from googleapis/python-bigquery-dataframes
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
98 lines (79 loc) · 3.01 KB
/
__init__.py
File metadata and controls
98 lines (79 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import typing
import bigframes_vendored.pandas.core.window.rolling as vendored_pandas_rolling
from bigframes.core import log_adapter
import bigframes.core as core
import bigframes.core.blocks as blocks
import bigframes.operations.aggregations as agg_ops
@log_adapter.class_logger
class Window(vendored_pandas_rolling.Window):
__doc__ = vendored_pandas_rolling.Window.__doc__
def __init__(
self,
block: blocks.Block,
window_spec: core.WindowSpec,
value_column_ids: typing.Sequence[str],
drop_null_groups: bool = True,
is_series: bool = False,
):
self._block = block
self._window_spec = window_spec
self._value_column_ids = value_column_ids
self._drop_null_groups = drop_null_groups
self._is_series = is_series
def count(self):
return self._apply_aggregate(agg_ops.count_op)
def sum(self):
return self._apply_aggregate(agg_ops.sum_op)
def mean(self):
return self._apply_aggregate(agg_ops.mean_op)
def var(self):
return self._apply_aggregate(agg_ops.var_op)
def std(self):
return self._apply_aggregate(agg_ops.std_op)
def max(self):
return self._apply_aggregate(agg_ops.max_op)
def min(self):
return self._apply_aggregate(agg_ops.min_op)
def _apply_aggregate(
self,
op: agg_ops.UnaryAggregateOp,
):
block = self._block
labels = [block.col_id_to_label[col] for col in self._value_column_ids]
block, result_ids = block.multi_apply_window_op(
self._value_column_ids,
op,
self._window_spec,
skip_null_groups=self._drop_null_groups,
never_skip_nulls=True,
)
if self._window_spec.grouping_keys:
original_index_ids = block.index_columns
block = block.reset_index(drop=False)
index_ids = (
*[col.id.name for col in self._window_spec.grouping_keys],
*original_index_ids,
)
block = block.set_index(col_ids=index_ids)
if self._is_series:
from bigframes.series import Series
return Series(block.select_columns(result_ids).with_column_labels(labels))
else:
from bigframes.dataframe import DataFrame
return DataFrame(
block.select_columns(result_ids).with_column_labels(labels)
)