-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Expand file tree
/
Copy pathconvert_test.py
More file actions
209 lines (162 loc) · 7.31 KB
/
convert_test.py
File metadata and controls
209 lines (162 loc) · 7.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import pandas as pd
import apache_beam as beam
from apache_beam.dataframe import convert
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
def equal_to_unordered_series(expected):
def check(actual):
actual = pd.concat(actual)
if sorted(expected) != sorted(actual):
raise AssertionError('Series not equal: \n%s\n%s\n' % (expected, actual))
return check
class ConvertTest(unittest.TestCase):
def test_convert_yield_pandas(self):
with beam.Pipeline() as p:
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
pc_a = p | 'A' >> beam.Create([a])
pc_b = p | 'B' >> beam.Create([b])
df_a = convert.to_dataframe(pc_a, proxy=a[:0])
df_b = convert.to_dataframe(pc_b, proxy=b[:0])
df_2a = 2 * df_a
df_3a = 3 * df_a
df_ab = df_a * df_b
# Converting multiple results at a time can be more efficient.
pc_2a, pc_ab = convert.to_pcollection(df_2a, df_ab,
yield_elements='pandas')
# But separate conversions can be done as well.
pc_3a = convert.to_pcollection(df_3a, yield_elements='pandas')
assert_that(pc_2a, equal_to_unordered_series(2 * a), label='Check2a')
assert_that(pc_3a, equal_to_unordered_series(3 * a), label='Check3a')
assert_that(pc_ab, equal_to_unordered_series(a * b), label='Checkab')
def test_convert(self):
with beam.Pipeline() as p:
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
pc_a = p | 'A' >> beam.Create(a)
pc_b = p | 'B' >> beam.Create(b)
df_a = convert.to_dataframe(pc_a)
df_b = convert.to_dataframe(pc_b)
df_2a = 2 * df_a
df_3a = 3 * df_a
df_ab = df_a * df_b
# Converting multiple results at a time can be more efficient.
pc_2a, pc_ab = convert.to_pcollection(df_2a, df_ab)
# But separate conversions can be done as well.
pc_3a = convert.to_pcollection(df_3a)
assert_that(pc_2a, equal_to(list(2 * a)), label='Check2a')
assert_that(pc_3a, equal_to(list(3 * a)), label='Check3a')
assert_that(pc_ab, equal_to(list(a * b)), label='Checkab')
def test_convert_with_none(self):
# Ensure the logical Any type allows (nullable) None, see BEAM-12587.
df = pd.DataFrame({'A': ['str', 10, None], 'B': [None, 'str', 20]})
with beam.Pipeline() as p:
res = convert.to_pcollection(df, pipeline=p) | beam.Map(tuple)
assert_that(res, equal_to([(row.A, row.B) for _, row in df.iterrows()]))
def test_convert_scalar(self):
with beam.Pipeline() as p:
pc = p | 'A' >> beam.Create([1, 2, 3])
s = convert.to_dataframe(pc)
pc_sum = convert.to_pcollection(s.sum())
assert_that(pc_sum, equal_to([6]))
def test_convert_non_deferred(self):
with beam.Pipeline() as p:
s1 = pd.Series([1, 2, 3])
s2 = convert.to_dataframe(p | beam.Create([100, 200, 300]))
pc1, pc2 = convert.to_pcollection(s1, s2, pipeline=p)
assert_that(pc1, equal_to([1, 2, 3]), label='CheckNonDeferred')
assert_that(pc2, equal_to([100, 200, 300]), label='CheckDeferred')
def test_convert_memoization(self):
with beam.Pipeline() as p:
a = pd.Series([1, 2, 3])
b = pd.Series([100, 200, 300])
pc_a = p | 'A' >> beam.Create([a])
pc_b = p | 'B' >> beam.Create([b])
df_a = convert.to_dataframe(pc_a, proxy=a[:0])
df_b = convert.to_dataframe(pc_b, proxy=b[:0])
df_2a = 2 * df_a
df_3a = 3 * df_a
df_ab = df_a * df_b
# Two calls to to_pcollection with the same Dataframe should produce the
# same PCollection(s)
pc_2a_, pc_ab_ = convert.to_pcollection(df_2a, df_ab)
pc_3a, pc_2a, pc_ab = convert.to_pcollection(df_3a, df_2a, df_ab)
self.assertIs(pc_2a, pc_2a_)
self.assertIs(pc_ab, pc_ab_)
self.assertIsNot(pc_3a, pc_2a)
self.assertIsNot(pc_3a, pc_ab)
# The same conversions without the unbatching transform should also cache
# PCollections
pc_2a_pandas_, pc_ab_pandas_ = convert.to_pcollection(df_2a, df_ab,
yield_elements='pandas')
pc_3a_pandas, pc_2a_pandas, pc_ab_pandas = convert.to_pcollection(df_3a,
df_2a,
df_ab,
yield_elements='pandas')
self.assertIs(pc_2a_pandas, pc_2a_pandas_)
self.assertIs(pc_ab_pandas, pc_ab_pandas_)
self.assertIsNot(pc_3a_pandas, pc_2a_pandas)
self.assertIsNot(pc_3a_pandas, pc_ab_pandas)
# .. but the cached PCollections should be different
self.assertIsNot(pc_2a_pandas, pc_2a)
self.assertIsNot(pc_ab_pandas, pc_ab)
self.assertIsNot(pc_3a_pandas, pc_3a)
def test_convert_memoization_clears_cache(self):
# This test re-runs the other memoization test, and makes sure that the
# cache is cleaned up with the pipeline. Otherwise there would be concerns
# of it growing without bound.
import gc
# Make sure cache is clear
gc.collect()
self.assertEqual(len(convert.TO_PCOLLECTION_CACHE), 0)
# Disable GC so it doesn't run pre-emptively, confounding assertions about
# cache size
gc.disable()
# Also disable logging, as some implementations may artificially extend
# the life of objects.
import logging
logging.disable(logging.INFO)
try:
self.test_convert_memoization()
self.assertEqual(len(convert.TO_PCOLLECTION_CACHE), 3)
gc.collect()
# PCollections should be removed from cache after pipelines go out of
# scope and are GC'd
self.assertEqual(len(convert.TO_PCOLLECTION_CACHE), 0)
finally:
# Always re-enable GC and logging
gc.enable()
logging.disable(logging.NOTSET)
def test_auto_convert(self):
class MySchemaTransform(beam.PTransform):
def expand(self, pcoll):
return pcoll | beam.Map(
lambda x: beam.Row(
a=x.n**2 - x.m**2, b=2 * x.m * x.n, c=x.n**2 + x.m**2))
with beam.Pipeline() as p:
pc_mn = p | beam.Create([
(1, 2), (2, 3), (3, 10)
]) | beam.MapTuple(lambda m, n: beam.Row(m=m, n=n))
df_mn = convert.to_dataframe(pc_mn)
# Apply a transform directly to a dataframe to get another dataframe.
df_abc = df_mn | MySchemaTransform()
pc_abc = convert.to_pcollection(df_abc) | beam.Map(tuple)
assert_that(pc_abc, equal_to([(3, 4, 5), (5, 12, 13), (91, 60, 109)]))
if __name__ == '__main__':
unittest.main()