|
137 | 137 |
|
138 | 138 | import re |
139 | 139 | import ast |
| 140 | +import functools |
140 | 141 | import sys |
141 | 142 | import time |
142 | 143 | from concurrent import futures |
@@ -494,86 +495,91 @@ def _cell_magic(line, query): |
494 | 495 | args.use_bqstorage_api or context.use_bqstorage_api, context.credentials |
495 | 496 | ) |
496 | 497 |
|
497 | | - if args.max_results: |
498 | | - max_results = int(args.max_results) |
499 | | - else: |
500 | | - max_results = None |
| 498 | + close_transports = functools.partial(_close_transports, client, bqstorage_client) |
501 | 499 |
|
502 | | - query = query.strip() |
| 500 | + try: |
| 501 | + if args.max_results: |
| 502 | + max_results = int(args.max_results) |
| 503 | + else: |
| 504 | + max_results = None |
| 505 | + |
| 506 | + query = query.strip() |
| 507 | + |
| 508 | + # Any query that does not contain whitespace (aside from leading and trailing whitespace) |
| 509 | + # is assumed to be a table id |
| 510 | + if not re.search(r"\s", query): |
| 511 | + try: |
| 512 | + rows = client.list_rows(query, max_results=max_results) |
| 513 | + except Exception as ex: |
| 514 | + _handle_error(ex, args.destination_var) |
| 515 | + return |
| 516 | + |
| 517 | + result = rows.to_dataframe(bqstorage_client=bqstorage_client) |
| 518 | + if args.destination_var: |
| 519 | + IPython.get_ipython().push({args.destination_var: result}) |
| 520 | + return |
| 521 | + else: |
| 522 | + return result |
| 523 | + |
| 524 | + job_config = bigquery.job.QueryJobConfig() |
| 525 | + job_config.query_parameters = params |
| 526 | + job_config.use_legacy_sql = args.use_legacy_sql |
| 527 | + job_config.dry_run = args.dry_run |
| 528 | + |
| 529 | + if args.destination_table: |
| 530 | + split = args.destination_table.split(".") |
| 531 | + if len(split) != 2: |
| 532 | + raise ValueError( |
| 533 | + "--destination_table should be in a <dataset_id>.<table_id> format." |
| 534 | + ) |
| 535 | + dataset_id, table_id = split |
| 536 | + job_config.allow_large_results = True |
| 537 | + dataset_ref = client.dataset(dataset_id) |
| 538 | + destination_table_ref = dataset_ref.table(table_id) |
| 539 | + job_config.destination = destination_table_ref |
| 540 | + job_config.create_disposition = "CREATE_IF_NEEDED" |
| 541 | + job_config.write_disposition = "WRITE_TRUNCATE" |
| 542 | + _create_dataset_if_necessary(client, dataset_id) |
| 543 | + |
| 544 | + if args.maximum_bytes_billed == "None": |
| 545 | + job_config.maximum_bytes_billed = 0 |
| 546 | + elif args.maximum_bytes_billed is not None: |
| 547 | + value = int(args.maximum_bytes_billed) |
| 548 | + job_config.maximum_bytes_billed = value |
503 | 549 |
|
504 | | - # Any query that does not contain whitespace (aside from leading and trailing whitespace) |
505 | | - # is assumed to be a table id |
506 | | - if not re.search(r"\s", query): |
507 | 550 | try: |
508 | | - rows = client.list_rows(query, max_results=max_results) |
| 551 | + query_job = _run_query(client, query, job_config=job_config) |
509 | 552 | except Exception as ex: |
510 | 553 | _handle_error(ex, args.destination_var) |
511 | 554 | return |
512 | 555 |
|
513 | | - result = rows.to_dataframe(bqstorage_client=bqstorage_client) |
514 | | - if args.destination_var: |
515 | | - IPython.get_ipython().push({args.destination_var: result}) |
516 | | - return |
517 | | - else: |
518 | | - return result |
519 | | - |
520 | | - job_config = bigquery.job.QueryJobConfig() |
521 | | - job_config.query_parameters = params |
522 | | - job_config.use_legacy_sql = args.use_legacy_sql |
523 | | - job_config.dry_run = args.dry_run |
| 556 | + if not args.verbose: |
| 557 | + display.clear_output() |
524 | 558 |
|
525 | | - if args.destination_table: |
526 | | - split = args.destination_table.split(".") |
527 | | - if len(split) != 2: |
528 | | - raise ValueError( |
529 | | - "--destination_table should be in a <dataset_id>.<table_id> format." |
| 559 | + if args.dry_run and args.destination_var: |
| 560 | + IPython.get_ipython().push({args.destination_var: query_job}) |
| 561 | + return |
| 562 | + elif args.dry_run: |
| 563 | + print( |
| 564 | + "Query validated. This query will process {} bytes.".format( |
| 565 | + query_job.total_bytes_processed |
| 566 | + ) |
530 | 567 | ) |
531 | | - dataset_id, table_id = split |
532 | | - job_config.allow_large_results = True |
533 | | - dataset_ref = client.dataset(dataset_id) |
534 | | - destination_table_ref = dataset_ref.table(table_id) |
535 | | - job_config.destination = destination_table_ref |
536 | | - job_config.create_disposition = "CREATE_IF_NEEDED" |
537 | | - job_config.write_disposition = "WRITE_TRUNCATE" |
538 | | - _create_dataset_if_necessary(client, dataset_id) |
539 | | - |
540 | | - if args.maximum_bytes_billed == "None": |
541 | | - job_config.maximum_bytes_billed = 0 |
542 | | - elif args.maximum_bytes_billed is not None: |
543 | | - value = int(args.maximum_bytes_billed) |
544 | | - job_config.maximum_bytes_billed = value |
545 | | - |
546 | | - try: |
547 | | - query_job = _run_query(client, query, job_config=job_config) |
548 | | - except Exception as ex: |
549 | | - _handle_error(ex, args.destination_var) |
550 | | - return |
551 | | - |
552 | | - if not args.verbose: |
553 | | - display.clear_output() |
| 568 | + return query_job |
554 | 569 |
|
555 | | - if args.dry_run and args.destination_var: |
556 | | - IPython.get_ipython().push({args.destination_var: query_job}) |
557 | | - return |
558 | | - elif args.dry_run: |
559 | | - print( |
560 | | - "Query validated. This query will process {} bytes.".format( |
561 | | - query_job.total_bytes_processed |
| 570 | + if max_results: |
| 571 | + result = query_job.result(max_results=max_results).to_dataframe( |
| 572 | + bqstorage_client=bqstorage_client |
562 | 573 | ) |
563 | | - ) |
564 | | - return query_job |
565 | | - |
566 | | - if max_results: |
567 | | - result = query_job.result(max_results=max_results).to_dataframe( |
568 | | - bqstorage_client=bqstorage_client |
569 | | - ) |
570 | | - else: |
571 | | - result = query_job.to_dataframe(bqstorage_client=bqstorage_client) |
| 574 | + else: |
| 575 | + result = query_job.to_dataframe(bqstorage_client=bqstorage_client) |
572 | 576 |
|
573 | | - if args.destination_var: |
574 | | - IPython.get_ipython().push({args.destination_var: result}) |
575 | | - else: |
576 | | - return result |
| 577 | + if args.destination_var: |
| 578 | + IPython.get_ipython().push({args.destination_var: result}) |
| 579 | + else: |
| 580 | + return result |
| 581 | + finally: |
| 582 | + close_transports() |
577 | 583 |
|
578 | 584 |
|
579 | 585 | def _make_bqstorage_client(use_bqstorage_api, credentials): |
@@ -601,3 +607,21 @@ def _make_bqstorage_client(use_bqstorage_api, credentials): |
601 | 607 | credentials=credentials, |
602 | 608 | client_info=gapic_client_info.ClientInfo(user_agent=IPYTHON_USER_AGENT), |
603 | 609 | ) |
| 610 | + |
| 611 | + |
| 612 | +def _close_transports(client, bqstorage_client): |
| 613 | + """Close the given clients' underlying transport channels. |
| 614 | +
|
| 615 | + Closing the transport is needed to release system resources, namely open |
| 616 | + sockets. |
| 617 | +
|
| 618 | + Args: |
| 619 | + client (:class:`~google.cloud.bigquery.client.Client`): |
| 620 | + bqstorage_client |
| 621 | + (Optional[:class:`~google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient`]): |
| 622 | + A client for the BigQuery Storage API. |
| 623 | +
|
| 624 | + """ |
| 625 | + client.close() |
| 626 | + if bqstorage_client is not None: |
| 627 | + bqstorage_client.transport.channel.close() |
0 commit comments