1616
1717import json
1818from typing import Optional
19+ import warnings
1920
2021from google .cloud import bigquery
2122
2425
2526def to_bigtable (
2627 query : str ,
28+ * ,
2729 instance : str ,
2830 table : str ,
29- bq_client : Optional [bigquery .Client ] = None ,
31+ service_account_email : Optional [str ] = None ,
32+ session : Optional [bigframes .Session ] = None ,
3033 app_profile : Optional [str ] = None ,
3134 truncate : bool = False ,
3235 overwrite : bool = False ,
@@ -53,10 +56,15 @@ def to_bigtable(
5356 The name of the bigtable instance to export to.
5457 table (str):
5558 The name of the bigtable table to export to.
56- bq_client (str, default None):
57- The Client object to use for the query. This determines
59+ service_account_email (str):
60+ Full name of the service account to run the continuous query.
61+ Example: accountname@projectname.gserviceaccounts.com
62+ If not provided, the user account will be used, but this
63+ limits the lifetime of the continuous query.
64+ session (bigframes.Session, default None):
65+ The session object to use for the query. This determines
5866 the project id and location of the query. If None, will
59- default to the bigframes global session default client .
67+ default to the bigframes global session.
6068 app_profile (str, default None):
6169 The bigtable app profile to export to. If None, no app
6270 profile will be used.
@@ -90,9 +98,16 @@ def to_bigtable(
9098 For example, the job can be cancelled or its error status
9199 can be examined.
92100 """
101+ warnings .warn (
102+ "The bigframes.streaming module is a preview feature, and subject to change." ,
103+ stacklevel = 1 ,
104+ category = bigframes .exceptions .PreviewWarning ,
105+ )
106+
93107 # get default client if not passed
94- if bq_client is None :
95- bq_client = bigframes .get_global_session ().bqclient
108+ if session is None :
109+ session = bigframes .get_global_session ()
110+ bq_client = session .bqclient
96111
97112 # build export string from parameters
98113 project = bq_client .project
@@ -123,7 +138,117 @@ def to_bigtable(
123138
124139 # override continuous http parameter
125140 job_config = bigquery .job .QueryJobConfig ()
126- job_config_filled = job_config .from_api_repr ({"query" : {"continuous" : True }})
141+
142+ job_config_dict : dict = {"query" : {"continuous" : True }}
143+ if service_account_email is not None :
144+ job_config_dict ["query" ]["connectionProperties" ] = {
145+ "key" : "service_account" ,
146+ "value" : service_account_email ,
147+ }
148+ job_config_filled = job_config .from_api_repr (job_config_dict )
149+ job_config_filled .labels = {"bigframes-api" : "streaming_to_bigtable" }
150+
151+ # begin the query job
152+ query_job = bq_client .query (
153+ sql ,
154+ job_config = job_config_filled , # type:ignore
155+ # typing error above is in bq client library
156+ # (should accept abstract job_config, only takes concrete)
157+ job_id = job_id ,
158+ job_id_prefix = job_id_prefix ,
159+ )
160+
161+ # return the query job to the user for lifetime management
162+ return query_job
163+
164+
165+ def to_pubsub (
166+ query : str ,
167+ * ,
168+ topic : str ,
169+ service_account_email : str ,
170+ session : Optional [bigframes .Session ] = None ,
171+ job_id : Optional [str ] = None ,
172+ job_id_prefix : Optional [str ] = None ,
173+ ) -> bigquery .QueryJob :
174+ """Launches a BigQuery continuous query and returns a
175+ QueryJob object for some management functionality.
176+
177+ This method requires an existing pubsub topic. For instructions
178+ on creating a pubsub topic, see
179+ https://cloud.google.com/pubsub/docs/samples/pubsub-quickstart-create-topic?hl=en
180+
181+ Note that a service account is a requirement for continuous queries
182+ exporting to pubsub.
183+
184+ Args:
185+ query (str):
186+ The sql statement to execute as a continuous function.
187+ For example: "SELECT * FROM dataset.table"
188+ This will be wrapped in an EXPORT DATA statement to
189+ launch a continuous query writing to pubsub.
190+ topic (str):
191+ The name of the pubsub topic to export to.
192+ For example: "taxi-rides"
193+ service_account_email (str):
194+ Full name of the service account to run the continuous query.
195+ Example: accountname@projectname.gserviceaccounts.com
196+ session (bigframes.Session, default None):
197+ The session object to use for the query. This determines
198+ the project id and location of the query. If None, will
199+ default to the bigframes global session.
200+ job_id (str, default None):
201+ If specified, replace the default job id for the query,
202+ see job_id parameter of
203+ https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
204+ job_id_prefix (str, default None):
205+ If specified, a job id prefix for the query, see
206+ job_id_prefix parameter of
207+ https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.client.Client#google_cloud_bigquery_client_Client_query
208+
209+ Returns:
210+ google.cloud.bigquery.QueryJob:
211+ See https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob
212+ The ongoing query job can be managed using this object.
213+ For example, the job can be cancelled or its error status
214+ can be examined.
215+ """
216+ warnings .warn (
217+ "The bigframes.streaming module is a preview feature, and subject to change." ,
218+ stacklevel = 1 ,
219+ category = bigframes .exceptions .PreviewWarning ,
220+ )
221+
222+ # get default client if not passed
223+ if session is None :
224+ session = bigframes .get_global_session ()
225+ bq_client = session .bqclient
226+
227+ # build export string from parameters
228+ sql = (
229+ "EXPORT DATA\n "
230+ "OPTIONS (\n "
231+ "format = 'CLOUD_PUBSUB',\n "
232+ f'uri = "https://pubsub.googleapis.com/projects/{ bq_client .project } /topics/{ topic } "\n '
233+ ")\n "
234+ "AS (\n "
235+ f"{ query } );"
236+ )
237+
238+ # override continuous http parameter
239+ job_config = bigquery .job .QueryJobConfig ()
240+ job_config_filled = job_config .from_api_repr (
241+ {
242+ "query" : {
243+ "continuous" : True ,
244+ "connectionProperties" : {
245+ "key" : "service_account" ,
246+ "value" : service_account_email ,
247+ },
248+ }
249+ }
250+ )
251+ job_config_filled .labels = {"bigframes-api" : "streaming_to_pubsub" }
127252
128253 # begin the query job
129254 query_job = bq_client .query (
0 commit comments