Skip to content

Commit 1b6320d

Browse files
committed
Add 'jobs.ExtractTableToStorageJob'.
Inverse of 'jobs.LoadTableFromStorageJob': extracts a table to one or more files in CloudStorage.
1 parent 1259287 commit 1b6320d

File tree

2 files changed

+479
-0
lines changed

2 files changed

+479
-0
lines changed

gcloud/bigquery/job.py

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -939,3 +939,192 @@ def _build_resource(self):
939939
self._populate_config_resource(configuration)
940940

941941
return resource
942+
943+
944+
class _ExtractConfiguration(object):
945+
"""User-settable configuration options for extract jobs."""
946+
# None -> use server default.
947+
_compression = None
948+
_destination_format = None
949+
_field_delimiter = None
950+
_print_header = None
951+
952+
953+
class Compression(_Enum):
954+
"""Pseudo-enum for allowed values for ``compression`` properties.
955+
"""
956+
GZIP = 'GZIP'
957+
NONE = 'NONE'
958+
ALLOWED = (GZIP, NONE)
959+
960+
961+
class DestinationFormat(_Enum):
962+
"""Pseudo-enum for allowed values for ``destination_format`` properties.
963+
"""
964+
CSV = 'CSV'
965+
NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
966+
AVRO = 'AVRO'
967+
ALLOWED = (CSV, NEWLINE_DELIMITED_JSON, AVRO)
968+
969+
970+
class ExtractTableToStorageJob(_BaseJob):
971+
"""Asynchronous job: extract data from a BQ table into Cloud Storage.
972+
973+
:type name: string
974+
:param name: the name of the job
975+
976+
:type source: :class:`gcloud.bigquery.table.Table`
977+
:param source: Table into which data is to be loaded.
978+
979+
:type destination_uris: list of string
980+
:param destination_uris: URIs describing Cloud Storage blobs into which
981+
extracted data will be written.
982+
983+
:type client: :class:`gcloud.bigquery.client.Client`
984+
:param client: A client which holds credentials and project configuration
985+
for the dataset (which requires a project).
986+
"""
987+
def __init__(self, name, source, destination_uris, client):
988+
super(ExtractTableToStorageJob, self).__init__(name, client)
989+
self.source = source
990+
self.destination_uris = destination_uris
991+
self._configuration = _ExtractConfiguration()
992+
993+
@property
994+
def compression(self):
995+
"""Compression to apply to destination blobs.
996+
997+
:rtype: string, or ``NoneType``
998+
:returns: The value as set by the user, or None (the default).
999+
"""
1000+
return self._configuration._compression
1001+
1002+
@compression.setter
1003+
def compression(self, value):
1004+
"""Update compression.
1005+
1006+
:type value: boolean
1007+
:param value: allowed value for :class:`Compression`.
1008+
"""
1009+
Compression.validate(value) # raises ValueError if invalie
1010+
self._configuration._compression = value
1011+
1012+
@compression.deleter
1013+
def compression(self):
1014+
"""Delete compression."""
1015+
del self._configuration._compression
1016+
1017+
@property
1018+
def destination_format(self):
1019+
"""Handling for missing destination table.
1020+
1021+
:rtype: string, or ``NoneType``
1022+
:returns: The value as set by the user, or None (the default).
1023+
"""
1024+
return self._configuration._destination_format
1025+
1026+
@destination_format.setter
1027+
def destination_format(self, value):
1028+
"""Update destination_format.
1029+
1030+
:type value: boolean
1031+
:param value: allowed value for :class:`DestinationFormat`.
1032+
"""
1033+
DestinationFormat.validate(value) # raises ValueError if invalid
1034+
self._configuration._destination_format = value
1035+
1036+
@destination_format.deleter
1037+
def destination_format(self):
1038+
"""Delete destination_format."""
1039+
del self._configuration._destination_format
1040+
1041+
@property
1042+
def field_delimiter(self):
1043+
"""Allow rows with missing trailing commas for optional fields.
1044+
1045+
:rtype: string, or ``NoneType``
1046+
:returns: The value as set by the user, or None (the default).
1047+
"""
1048+
return self._configuration._field_delimiter
1049+
1050+
@field_delimiter.setter
1051+
def field_delimiter(self, value):
1052+
"""Update field_delimiter.
1053+
1054+
:type value: string
1055+
:param value: new field delimiter
1056+
1057+
:raises: ValueError for invalid value types.
1058+
"""
1059+
if not isinstance(value, six.string_types):
1060+
raise ValueError("Pass a string")
1061+
self._configuration._field_delimiter = value
1062+
1063+
@field_delimiter.deleter
1064+
def field_delimiter(self):
1065+
"""Delete field_delimiter."""
1066+
del self._configuration._field_delimiter
1067+
1068+
@property
1069+
def print_header(self):
1070+
"""Write a header row into destination blobs.
1071+
1072+
:rtype: boolean, or ``NoneType``
1073+
:returns: The value as set by the user, or None (the default).
1074+
"""
1075+
return self._configuration._print_header
1076+
1077+
@print_header.setter
1078+
def print_header(self, value):
1079+
"""Update print_header.
1080+
1081+
:type value: boolean
1082+
:param value: new print_header
1083+
1084+
:raises: ValueError for invalid value types.
1085+
"""
1086+
if not isinstance(value, bool):
1087+
raise ValueError("Pass a boolean")
1088+
self._configuration._print_header = value
1089+
1090+
@print_header.deleter
1091+
def print_header(self):
1092+
"""Delete print_header."""
1093+
del self._configuration._print_header
1094+
1095+
def _populate_config_resource(self, configuration):
1096+
1097+
if self.compression is not None:
1098+
configuration['compression'] = self.compression
1099+
if self.destination_format is not None:
1100+
configuration['destinationFormat'] = self.destination_format
1101+
if self.field_delimiter is not None:
1102+
configuration['fieldDelimiter'] = self.field_delimiter
1103+
if self.print_header is not None:
1104+
configuration['printHeader'] = self.print_header
1105+
1106+
def _build_resource(self):
1107+
"""Generate a resource for ``begin``."""
1108+
1109+
source_ref = {
1110+
'projectId': self.source.project,
1111+
'datasetId': self.source.dataset_name,
1112+
'tableId': self.source.name,
1113+
}
1114+
1115+
resource = {
1116+
'jobReference': {
1117+
'projectId': self.project,
1118+
'jobId': self.name,
1119+
},
1120+
'configuration': {
1121+
'extract': {
1122+
'sourceTable': source_ref,
1123+
'destinationUris': self.destination_uris,
1124+
},
1125+
},
1126+
}
1127+
configuration = resource['configuration']['extract']
1128+
self._populate_config_resource(configuration)
1129+
1130+
return resource

0 commit comments

Comments
 (0)