Is your feature request related to a problem? Please describe.
When using to_dataframe_iterable for a large result set (with nested/repeated records) on a system with a fast upstream connection to BigQuery, but slow downstream processing, Python can use all the memory on the system, and get killed.
I think I have tracked it down to the worker_queue that is used to pass data from the worker threads back to the main thread: it does not have a maxsize.
|
worker_queue = queue.Queue() |
This means that if items are not pulled from the queue fast enough, then all the memory on the system can be used
Describe the solution you'd like
I think a reasonable solution would be to set the maxsize to be 1:
worker_queue = queue.Queue(maxsize=1)
This would still effectively be a buffer size of "number of threads + 1" pages since each thread would fetch into a variable, and then wait if the queue is full.
However, it being configurable would also be reasonable I think.
Describe alternatives you've considered
I've monkey-patched for now, and it seems to work. But ideally, it wouldn't be necessary
# ...
def _monkey_patch_queue_maxsize_1():
OriginalQueue = queue.Queue
class QueueWithMaxsize1(OriginalQueue):
def __init__(self):
super().__init__(maxsize=1)
def _restore():
queue.Queue = OriginalQueue
queue.Queue = QueueWithMaxsize1
return _restore
# ....
query = bqClient.query(sql)
result_rows = query.result()
ensure_original_queue = _monkey_patch_queue_maxsize_1()
pages = result_rows.to_dataframe_iterable(bqStorageClient)
for page in pages:
ensure_original_queue()
# ... something slow, even time.sleep can do it
Is your feature request related to a problem? Please describe.
When using
to_dataframe_iterablefor a large result set (with nested/repeated records) on a system with a fast upstream connection to BigQuery, but slow downstream processing, Python can use all the memory on the system, and get killed.I think I have tracked it down to the
worker_queuethat is used to pass data from the worker threads back to the main thread: it does not have amaxsize.python-bigquery/google/cloud/bigquery/_pandas_helpers.py
Line 657 in cc3394f
This means that if items are not pulled from the queue fast enough, then all the memory on the system can be used
Describe the solution you'd like
I think a reasonable solution would be to set the
maxsizeto be 1:This would still effectively be a buffer size of "number of threads + 1" pages since each thread would fetch into a variable, and then wait if the queue is full.
However, it being configurable would also be reasonable I think.
Describe alternatives you've considered
I've monkey-patched for now, and it seems to work. But ideally, it wouldn't be necessary