Skip to content

Commit 7563928

Browse files
authored
[BEAM-7372] remove codepath and workaround for py2 from io (#14292)
1 parent 8da11dc commit 7563928

27 files changed

Lines changed: 58 additions & 209 deletions

sdks/python/apache_beam/io/avroio_test.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import unittest
2828
from builtins import range
2929
from typing import List
30-
import sys
3130

3231
# patches unittest.TestCase to be python3 compatible
3332
import future.tests.base # pylint: disable=unused-import
@@ -37,16 +36,10 @@
3736
import avro.datafile
3837
from avro.datafile import DataFileWriter
3938
from avro.io import DatumWriter
39+
from avro.schema import Parse
4040
from fastavro.schema import parse_schema
4141
from fastavro import writer
4242

43-
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
44-
try:
45-
from avro.schema import Parse # avro-python3 library for python3
46-
except ImportError:
47-
from avro.schema import parse as Parse # avro library for python2
48-
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
49-
5043
import apache_beam as beam
5144
from apache_beam import Create
5245
from apache_beam.io import avroio
@@ -102,12 +95,6 @@ def __init__(self, methodName='runTest'):
10295
}
10396
'''
10497

105-
@classmethod
106-
def setUpClass(cls):
107-
# Method has been renamed in Python 3
108-
if sys.version_info[0] < 3:
109-
cls.assertCountEqual = cls.assertItemsEqual
110-
11198
def setUp(self):
11299
# Reducing the size of thread pools. Without this test execution may fail in
113100
# environments with limited amount of resources.
@@ -446,7 +433,7 @@ def test_sink_transform_snappy(self):
446433

447434

448435
@unittest.skipIf(
449-
sys.version_info[0] == 3 and os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
436+
os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1',
450437
'This test requires that Beam depends on avro-python3>=1.9 or newer. '
451438
'See: BEAM-6522.')
452439
class TestAvro(AvroBase, unittest.TestCase):

sdks/python/apache_beam/io/filebasedsink_test.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import logging
2727
import os
2828
import shutil
29-
import sys
3029
import tempfile
3130
import unittest
3231
from builtins import range
@@ -105,12 +104,6 @@ def close(self, file_handle):
105104

106105

107106
class TestFileBasedSink(_TestCaseWithTempDirCleanUp):
108-
@classmethod
109-
def setUpClass(cls):
110-
# Method has been renamed in Python 3
111-
if sys.version_info[0] < 3:
112-
cls.assertCountEqual = cls.assertItemsEqual
113-
114107
def _common_init(self, sink):
115108
# Manually invoke the generic Sink API.
116109
init_token = sink.initialize_write()

sdks/python/apache_beam/io/filebasedsource_test.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import math
2727
import os
2828
import random
29-
import sys
3029
import tempfile
3130
import unittest
3231
from builtins import object
@@ -217,12 +216,6 @@ def read(self, range_tracker):
217216
def estimate_size(self):
218217
return len(self._values) # Assuming each value to be 1 byte.
219218

220-
@classmethod
221-
def setUpClass(cls):
222-
# Method has been renamed in Python 3
223-
if sys.version_info[0] < 3:
224-
cls.assertCountEqual = cls.assertItemsEqual
225-
226219
def setUp(self):
227220
# Reducing the size of thread pools. Without this test execution may fail in
228221
# environments with limited amount of resources.
@@ -266,12 +259,6 @@ def test_estimate_size(self):
266259

267260

268261
class TestFileBasedSource(unittest.TestCase):
269-
@classmethod
270-
def setUpClass(cls):
271-
# Method has been renamed in Python 3
272-
if sys.version_info[0] < 3:
273-
cls.assertCountEqual = cls.assertItemsEqual
274-
275262
def setUp(self):
276263
# Reducing the size of thread pools. Without this test execution may fail in
277264
# environments with limited amount of resources.
@@ -621,12 +608,6 @@ def default_output_coder(self):
621608

622609

623610
class TestSingleFileSource(unittest.TestCase):
624-
@classmethod
625-
def setUpClass(cls):
626-
# Method has been renamed in Python 3
627-
if sys.version_info[0] < 3:
628-
cls.assertCountEqual = cls.assertItemsEqual
629-
630611
def setUp(self):
631612
# Reducing the size of thread pools. Without this test execution may fail in
632613
# environments with limited amount of resources.

sdks/python/apache_beam/io/fileio_test.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import json
2727
import logging
2828
import os
29-
import sys
3029
import unittest
3130
import uuid
3231
import warnings
@@ -57,10 +56,7 @@
5756

5857

5958
def _get_file_reader(readable_file):
60-
if sys.version_info >= (3, 0):
61-
return io.TextIOWrapper(readable_file.open())
62-
else:
63-
return readable_file.open()
59+
return io.TextIOWrapper(readable_file.open())
6460

6561

6662
class MatchTest(_TestCaseWithTempDirCleanUp):

sdks/python/apache_beam/io/filesystems_test.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import logging
2727
import os
2828
import shutil
29-
import sys
3029
import tempfile
3130
import unittest
3231

@@ -48,12 +47,6 @@ def _join(first_path, *paths):
4847

4948

5049
class FileSystemsTest(unittest.TestCase):
51-
@classmethod
52-
def setUpClass(cls):
53-
# Method has been renamed in Python 3
54-
if sys.version_info[0] < 3:
55-
cls.assertCountEqual = cls.assertItemsEqual
56-
5750
def setUp(self):
5851
self.tmpdir = tempfile.mkdtemp()
5952

sdks/python/apache_beam/io/gcp/__init__.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,21 @@
1616
#
1717
from __future__ import absolute_import
1818

19-
import sys
20-
2119
# Important: the MIME library in the Python 3.x standard library used by
2220
# apitools causes uploads containing '\r\n' to be corrupted, unless we
2321
# patch the BytesGenerator class to write contents verbatim.
24-
if sys.version_info[0] == 3:
25-
try:
26-
# pylint: disable=wrong-import-order, wrong-import-position
27-
# pylint: disable=ungrouped-imports
28-
import apitools.base.py.transfer as transfer
29-
import email.generator as email_generator
22+
try:
23+
# pylint: disable=wrong-import-order, wrong-import-position
24+
# pylint: disable=ungrouped-imports
25+
import apitools.base.py.transfer as transfer
26+
import email.generator as email_generator
3027

31-
class _WrapperNamespace(object):
32-
class BytesGenerator(email_generator.BytesGenerator):
33-
def _write_lines(self, lines):
34-
self.write(lines)
28+
class _WrapperNamespace(object):
29+
class BytesGenerator(email_generator.BytesGenerator):
30+
def _write_lines(self, lines):
31+
self.write(lines)
3532

36-
transfer.email_generator = _WrapperNamespace
37-
except ImportError:
38-
# We may not have the GCP dependencies installed, so we pass in this case.
39-
pass
33+
transfer.email_generator = _WrapperNamespace
34+
except ImportError:
35+
# We may not have the GCP dependencies installed, so we pass in this case.
36+
pass

sdks/python/apache_beam/io/gcp/bigquery_avro_tools_test.py

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,13 @@
2222
import unittest
2323

2424
import fastavro
25+
from avro.schema import Parse
2526

2627
from apache_beam.io.gcp import bigquery_avro_tools
2728
from apache_beam.io.gcp import bigquery_tools
2829
from apache_beam.io.gcp.bigquery_test import HttpError
2930
from apache_beam.io.gcp.internal.clients import bigquery
3031

31-
try:
32-
from avro.schema import Parse # avro-python3 library for Python 3
33-
except ImportError:
34-
from avro.schema import parse as Parse # avro library for Python 2
35-
3632

3733
@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
3834
class TestBigQueryToAvroSchema(unittest.TestCase):
@@ -90,30 +86,33 @@ def test_convert_bigquery_schema_to_avro_schema(self):
9086

9187
# Test that schema can be parsed correctly by avro
9288
parsed_schema = Parse(json.dumps(avro_schema))
93-
# Avro RecordSchema provides field_map in py3 and fields_dict in py2
94-
field_map = getattr(parsed_schema, "field_map", None) or \
95-
getattr(parsed_schema, "fields_dict", None)
9689

97-
self.assertEqual(field_map["number"].type, Parse(json.dumps("long")))
9890
self.assertEqual(
99-
field_map["species"].type, Parse(json.dumps(["null", "string"])))
91+
parsed_schema.field_map["number"].type, Parse(json.dumps("long")))
92+
self.assertEqual(
93+
parsed_schema.field_map["species"].type,
94+
Parse(json.dumps(["null", "string"])))
10095
self.assertEqual(
101-
field_map["quality"].type, Parse(json.dumps(["null", "double"])))
96+
parsed_schema.field_map["quality"].type,
97+
Parse(json.dumps(["null", "double"])))
10298
self.assertEqual(
103-
field_map["grade"].type, Parse(json.dumps(["null", "double"])))
99+
parsed_schema.field_map["grade"].type,
100+
Parse(json.dumps(["null", "double"])))
104101
self.assertEqual(
105-
field_map["quantity"].type, Parse(json.dumps(["null", "long"])))
102+
parsed_schema.field_map["quantity"].type,
103+
Parse(json.dumps(["null", "long"])))
106104
self.assertEqual(
107-
field_map["dependents"].type, Parse(json.dumps(["null", "long"])))
105+
parsed_schema.field_map["dependents"].type,
106+
Parse(json.dumps(["null", "long"])))
108107
self.assertEqual(
109-
field_map["birthday"].type,
108+
parsed_schema.field_map["birthday"].type,
110109
Parse(
111110
json.dumps(
112111
["null", {
113112
"type": "long", "logicalType": "timestamp-micros"
114113
}])))
115114
self.assertEqual(
116-
field_map["birthdayMoney"].type,
115+
parsed_schema.field_map["birthdayMoney"].type,
117116
Parse(
118117
json.dumps([
119118
"null",
@@ -125,31 +124,35 @@ def test_convert_bigquery_schema_to_avro_schema(self):
125124
}
126125
])))
127126
self.assertEqual(
128-
field_map["flighted"].type, Parse(json.dumps(["null", "boolean"])))
127+
parsed_schema.field_map["flighted"].type,
128+
Parse(json.dumps(["null", "boolean"])))
129129
self.assertEqual(
130-
field_map["flighted2"].type, Parse(json.dumps(["null", "boolean"])))
130+
parsed_schema.field_map["flighted2"].type,
131+
Parse(json.dumps(["null", "boolean"])))
131132
self.assertEqual(
132-
field_map["sound"].type, Parse(json.dumps(["null", "bytes"])))
133+
parsed_schema.field_map["sound"].type,
134+
Parse(json.dumps(["null", "bytes"])))
133135
self.assertEqual(
134-
field_map["anniversaryDate"].type,
136+
parsed_schema.field_map["anniversaryDate"].type,
135137
Parse(json.dumps(["null", {
136138
"type": "int", "logicalType": "date"
137139
}])))
138140
self.assertEqual(
139-
field_map["anniversaryDatetime"].type,
141+
parsed_schema.field_map["anniversaryDatetime"].type,
140142
Parse(json.dumps(["null", "string"])))
141143
self.assertEqual(
142-
field_map["anniversaryTime"].type,
144+
parsed_schema.field_map["anniversaryTime"].type,
143145
Parse(
144146
json.dumps(["null", {
145147
"type": "long", "logicalType": "time-micros"
146148
}])))
147149
self.assertEqual(
148-
field_map["geoPositions"].type, Parse(json.dumps(["null", "string"])))
150+
parsed_schema.field_map["geoPositions"].type,
151+
Parse(json.dumps(["null", "string"])))
149152

150153
for field in ("scion", "family"):
151154
self.assertEqual(
152-
field_map[field].type,
155+
parsed_schema.field_map[field].type,
153156
Parse(
154157
json.dumps([
155158
"null",
@@ -169,7 +172,7 @@ def test_convert_bigquery_schema_to_avro_schema(self):
169172
])))
170173

171174
self.assertEqual(
172-
field_map["associates"].type,
175+
parsed_schema.field_map["associates"].type,
173176
Parse(
174177
json.dumps({
175178
"type": "array",

sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import logging
2525
import os
2626
import random
27-
import sys
2827
import time
2928
import unittest
3029

@@ -461,7 +460,6 @@ def test_records_traverse_transform_with_mocks(self):
461460

462461
assert_that(jobs, equal_to([job_reference]), label='CheckJobs')
463462

464-
@unittest.skipIf(sys.version_info[0] == 2, 'Mock pickling problems in Py 2')
465463
@mock.patch('time.sleep')
466464
def test_wait_for_job_completion(self, sleep_mock):
467465
job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]
@@ -497,7 +495,6 @@ def test_wait_for_job_completion(self, sleep_mock):
497495

498496
sleep_mock.assert_called_once()
499497

500-
@unittest.skipIf(sys.version_info[0] == 2, 'Mock pickling problems in Py 2')
501498
@mock.patch('time.sleep')
502499
def test_one_job_failed_after_waiting(self, sleep_mock):
503500
job_references = [bigquery_api.JobReference(), bigquery_api.JobReference()]

sdks/python/apache_beam/io/gcp/bigquery_read_internal.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,7 @@ def _to_decimal(value):
338338

339339
@staticmethod
340340
def _to_bytes(value):
341-
"""Converts value from str to bytes on Python 3.x. Does nothing on
342-
Python 2.7."""
341+
"""Converts value from str to bytes."""
343342
return value.encode('utf-8')
344343

345344
@classmethod

sdks/python/apache_beam/io/gcp/bigquery_tools.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
import json
3636
import logging
3737
import re
38-
import sys
3938
import time
4039
import uuid
4140
from builtins import object
41+
from json.decoder import JSONDecodeError
4242

4343
import fastavro
4444
from future.utils import iteritems
@@ -73,12 +73,6 @@
7373
except ImportError:
7474
pass
7575

76-
try:
77-
# TODO(pabloem): Remove this workaround after Python 2.7 support ends.
78-
from json.decoder import JSONDecodeError
79-
except ImportError:
80-
JSONDecodeError = ValueError
81-
8276
# pylint: enable=wrong-import-order, wrong-import-position
8377

8478
_LOGGER = logging.getLogger(__name__)
@@ -273,7 +267,7 @@ def __init__(self, client=None, temp_dataset_id=None):
273267
self.client = client or bigquery.BigqueryV2(
274268
http=get_new_http(),
275269
credentials=auth.get_service_credentials(),
276-
response_encoding=None if sys.version_info[0] < 3 else 'utf8')
270+
response_encoding='utf8')
277271
self._unique_row_id = 0
278272
# For testing scenarios where we pass in a client we do not want a
279273
# randomized prefix for row IDs.

0 commit comments

Comments
 (0)