From 43fa964c4bc679d73eaa21219f01705dfb021cd9 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 20 Nov 2024 13:44:55 +0530 Subject: [PATCH 01/22] [ PECO - 1768 ] PySQL: adjust HTTP retry logic to align with Go and Nodejs drivers (#467) * Added the exponential backoff code * Added the exponential backoff algorithm and refractored the code * Added jitter and added unit tests * Reformatted * Fixed the test_retry_exponential_backoff integration test --- src/databricks/sql/auth/retry.py | 26 +++++++----- src/databricks/sql/thrift_backend.py | 4 +- tests/e2e/common/retry_test_mixins.py | 8 ++-- tests/unit/test_retry.py | 57 ++++++++++++++++++--------- 4 files changed, 61 insertions(+), 34 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 0c6547cb4..ec321bed0 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -1,4 +1,5 @@ import logging +import random import time import typing from enum import Enum @@ -285,25 +286,30 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: """ retry_after = self.get_retry_after(response) if retry_after: - backoff = self.get_backoff_time() - proposed_wait = max(backoff, retry_after) - self.check_proposed_wait(proposed_wait) - time.sleep(proposed_wait) - return True + proposed_wait = retry_after + else: + proposed_wait = self.get_backoff_time() - return False + proposed_wait = min(proposed_wait, self.delay_max) + self.check_proposed_wait(proposed_wait) + time.sleep(proposed_wait) + return True def get_backoff_time(self) -> float: - """Calls urllib3's built-in get_backoff_time. + """ + This method implements the exponential backoff algorithm to calculate the delay between retries. Never returns a value larger than self.delay_max A MaxRetryDurationError will be raised if the calculated backoff would exceed self.max_attempts_duration - Note: within urllib3, a backoff is only calculated in cases where a Retry-After header is not present - in the previous unsuccessful request and `self.respect_retry_after_header` is True (which is always true) + :return: """ - proposed_backoff = super().get_backoff_time() + current_attempt = self.stop_after_attempts_count - self.total + proposed_backoff = (2**current_attempt) * self.delay_min + if self.backoff_jitter != 0.0: + proposed_backoff += random.random() * self.backoff_jitter + proposed_backoff = min(proposed_backoff, self.delay_max) self.check_proposed_wait(proposed_backoff) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index cf5cd906b..29be54820 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -64,8 +64,8 @@ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins) _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), - "_retry_delay_max": (float, 60, 5, 3600), - "_retry_stop_after_attempts_count": (int, 30, 1, 60), + "_retry_delay_max": (float, 30, 5, 3600), + "_retry_stop_after_attempts_count": (int, 5, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), } diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index 7dd5f7450..942955cab 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -174,7 +174,7 @@ def test_retry_max_count_not_exceeded(self): def test_retry_exponential_backoff(self): """GIVEN the retry policy is configured for reasonable exponential backoff WHEN the server sends nothing but 429 responses with retry-afters - THEN the connector will use those retry-afters as a floor + THEN the connector will use those retry-afters values as delay """ retry_policy = self._retry_policy.copy() retry_policy["_retry_delay_min"] = 1 @@ -191,10 +191,10 @@ def test_retry_exponential_backoff(self): assert isinstance(cm.value.args[1], MaxRetryDurationError) # With setting delay_min to 1, the expected retry delays should be: - # 3, 3, 4 - # The first 2 retries are allowed, the 3rd retry puts the total duration over the limit + # 3, 3, 3, 3 + # The first 3 retries are allowed, the 4th retry puts the total duration over the limit # of 10 seconds - assert mock_obj.return_value.getresponse.call_count == 3 + assert mock_obj.return_value.getresponse.call_count == 4 assert duration > 6 # Should be less than 7, but this is a safe margin for CI/CD slowness diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 2108af4fa..b7648ffbd 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -1,11 +1,9 @@ -from os import error import time -from unittest.mock import Mock, patch +from unittest.mock import patch, call import pytest -from requests import Request from urllib3 import HTTPResponse -from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory - +from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory, CommandType +from urllib3.exceptions import MaxRetryError class TestRetry: @pytest.fixture() @@ -25,32 +23,55 @@ def error_history(self) -> RequestHistory: method="POST", url=None, error=None, status=503, redirect_location=None ) + def calculate_backoff_time(self, attempt, delay_min, delay_max): + exponential_backoff_time = (2**attempt) * delay_min + return min(exponential_backoff_time, delay_max) + @patch("time.sleep") def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy._retry_start_time = time.time() retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - t_mock.assert_called_with(2) + + expected_backoff_time = self.calculate_backoff_time(0, retry_policy.delay_min, retry_policy.delay_max) + t_mock.assert_called_with(expected_backoff_time) @patch("time.sleep") - def test_sleep__retry_after_is_binding(self, t_mock, retry_policy, error_history): + def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_policy): + num_attempts = retry_policy.stop_after_attempts_count + retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(3) + retry_policy.command_type = CommandType.OTHER + + for attempt in range(num_attempts): + retry_policy.sleep(HTTPResponse(status=503)) + # Internally urllib3 calls the increment function generating a new instance for every retry + retry_policy = retry_policy.increment() + + expected_backoff_times = [] + for attempt in range(num_attempts): + expected_backoff_times.append(self.calculate_backoff_time(attempt, retry_policy.delay_min, retry_policy.delay_max)) + + # Asserts if the sleep value was called in the expected order + t_mock.assert_has_calls([call(expected_time) for expected_time in expected_backoff_times]) @patch("time.sleep") - def test_sleep__retry_after_present_but_not_binding( - self, t_mock, retry_policy, error_history - ): + def test_excessive_retry_attempts_error(self, t_mock, retry_policy): + # Attempting more than stop_after_attempt_count + num_attempts = retry_policy.stop_after_attempts_count + 1 + retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "1"})) - t_mock.assert_called_with(2) + retry_policy.command_type = CommandType.OTHER + + with pytest.raises(MaxRetryError): + for attempt in range(num_attempts): + retry_policy.sleep(HTTPResponse(status=503)) + # Internally urllib3 calls the increment function generating a new instance for every retry + retry_policy = retry_policy.increment() @patch("time.sleep") - def test_sleep__retry_after_surpassed(self, t_mock, retry_policy, error_history): + def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history): retry_policy._retry_start_time = time.time() retry_policy.history = [error_history, error_history, error_history] retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(4) + t_mock.assert_called_with(3) From 328aeb528f714cb22055e923d27c6d8cf1a9e0fd Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 26 Nov 2024 21:31:32 +0530 Subject: [PATCH 02/22] [ PECO-2065 ] Create the async execution flow for the PySQL Connector (#463) * Built the basic flow for the async pipeline - testing is remaining * Implemented the flow for the get_execution_result, but the problem of invalid operation handle still persists * Missed adding some files in previous commit * Working prototype of execute_async, get_query_state and get_execution_result * Added integration tests for execute_async * add docs for functions * Refractored the async code * Fixed java doc * Reformatted --- src/databricks/sql/client.py | 105 +++++++++++++++++++++++++++ src/databricks/sql/thrift_backend.py | 76 ++++++++++++++++++- tests/e2e/test_driver.py | 23 ++++++ 3 files changed, 203 insertions(+), 1 deletion(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 4e0ab941b..8ea81e12b 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1,3 +1,4 @@ +import time from typing import Dict, Tuple, List, Optional, Any, Union, Sequence import pandas @@ -47,6 +48,7 @@ from databricks.sql.thrift_api.TCLIService.ttypes import ( TSparkParameter, + TOperationState, ) @@ -430,6 +432,8 @@ def __init__( self.escaper = ParamEscaper() self.lastrowid = None + self.ASYNC_DEFAULT_POLLING_INTERVAL = 2 + # The ideal return type for this method is perhaps Self, but that was not added until 3.11, and we support pre-3.11 pythons, currently. def __enter__(self) -> "Cursor": return self @@ -796,6 +800,7 @@ def execute( cursor=self, use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, + async_op=False, ) self.active_result_set = ResultSet( self.connection, @@ -812,6 +817,106 @@ def execute( return self + def execute_async( + self, + operation: str, + parameters: Optional[TParameterCollection] = None, + ) -> "Cursor": + """ + + Execute a query and do not wait for it to complete and just move ahead + + :param operation: + :param parameters: + :return: + """ + param_approach = self._determine_parameter_approach(parameters) + if param_approach == ParameterApproach.NONE: + prepared_params = NO_NATIVE_PARAMS + prepared_operation = operation + + elif param_approach == ParameterApproach.INLINE: + prepared_operation, prepared_params = self._prepare_inline_parameters( + operation, parameters + ) + elif param_approach == ParameterApproach.NATIVE: + normalized_parameters = self._normalize_tparametercollection(parameters) + param_structure = self._determine_parameter_structure(normalized_parameters) + transformed_operation = transform_paramstyle( + operation, normalized_parameters, param_structure + ) + prepared_operation, prepared_params = self._prepare_native_parameters( + transformed_operation, normalized_parameters, param_structure + ) + + self._check_not_closed() + self._close_and_clear_active_result_set() + self.thrift_backend.execute_command( + operation=prepared_operation, + session_handle=self.connection._session_handle, + max_rows=self.arraysize, + max_bytes=self.buffer_size_bytes, + lz4_compression=self.connection.lz4_compression, + cursor=self, + use_cloud_fetch=self.connection.use_cloud_fetch, + parameters=prepared_params, + async_op=True, + ) + + return self + + def get_query_state(self) -> "TOperationState": + """ + Get the state of the async executing query or basically poll the status of the query + + :return: + """ + self._check_not_closed() + return self.thrift_backend.get_query_state(self.active_op_handle) + + def get_async_execution_result(self): + """ + + Checks for the status of the async executing query and fetches the result if the query is finished + Otherwise it will keep polling the status of the query till there is a Not pending state + :return: + """ + self._check_not_closed() + + def is_executing(operation_state) -> "bool": + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + while is_executing(self.get_query_state()): + # Poll after some default time + time.sleep(self.ASYNC_DEFAULT_POLLING_INTERVAL) + + operation_state = self.get_query_state() + if operation_state == ttypes.TOperationState.FINISHED_STATE: + execute_response = self.thrift_backend.get_execution_result( + self.active_op_handle, self + ) + self.active_result_set = ResultSet( + self.connection, + execute_response, + self.thrift_backend, + self.buffer_size_bytes, + self.arraysize, + ) + + if execute_response.is_staging_operation: + self._handle_staging_operation( + staging_allowed_local_path=self.thrift_backend.staging_allowed_local_path + ) + + return self + else: + raise Error( + f"get_execution_result failed with Operation status {operation_state}" + ) + def executemany(self, operation, seq_of_parameters): """ Execute the operation once for every set of passed in parameters. diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 29be54820..8c212c554 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -7,6 +7,8 @@ import threading from typing import List, Union +from databricks.sql.thrift_api.TCLIService.ttypes import TOperationState + try: import pyarrow except ImportError: @@ -769,6 +771,63 @@ def _results_message_to_execute_response(self, resp, operation_state): arrow_schema_bytes=schema_bytes, ) + def get_execution_result(self, op_handle, cursor): + + assert op_handle is not None + + req = ttypes.TFetchResultsReq( + operationHandle=ttypes.TOperationHandle( + op_handle.operationId, + op_handle.operationType, + False, + op_handle.modifiedRowCount, + ), + maxRows=cursor.arraysize, + maxBytes=cursor.buffer_size_bytes, + orientation=ttypes.TFetchOrientation.FETCH_NEXT, + includeResultSetMetadata=True, + ) + + resp = self.make_request(self._client.FetchResults, req) + + t_result_set_metadata_resp = resp.resultSetMetadata + + lz4_compressed = t_result_set_metadata_resp.lz4Compressed + is_staging_operation = t_result_set_metadata_resp.isStagingOperation + has_more_rows = resp.hasMoreRows + description = self._hive_schema_to_description( + t_result_set_metadata_resp.schema + ) + + schema_bytes = ( + t_result_set_metadata_resp.arrowSchema + or self._hive_schema_to_arrow_schema(t_result_set_metadata_resp.schema) + .serialize() + .to_pybytes() + ) + + queue = ResultSetQueueFactory.build_queue( + row_set_type=resp.resultSetMetadata.resultFormat, + t_row_set=resp.results, + arrow_schema_bytes=schema_bytes, + max_download_threads=self.max_download_threads, + lz4_compressed=lz4_compressed, + description=description, + ssl_options=self._ssl_options, + ) + + return ExecuteResponse( + arrow_queue=queue, + status=resp.status, + has_been_closed_server_side=False, + has_more_rows=has_more_rows, + lz4_compressed=lz4_compressed, + is_staging_operation=is_staging_operation, + command_handle=op_handle, + description=description, + arrow_schema_bytes=schema_bytes, + ) + def _wait_until_command_done(self, op_handle, initial_operation_status_resp): if initial_operation_status_resp: self._check_command_not_in_error_or_closed_state( @@ -787,6 +846,12 @@ def _wait_until_command_done(self, op_handle, initial_operation_status_resp): self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) return operation_state + def get_query_state(self, op_handle) -> "TOperationState": + poll_resp = self._poll_for_status(op_handle) + operation_state = poll_resp.operationState + self._check_command_not_in_error_or_closed_state(op_handle, poll_resp) + return operation_state + @staticmethod def _check_direct_results_for_error(t_spark_direct_results): if t_spark_direct_results: @@ -817,6 +882,7 @@ def execute_command( cursor, use_cloud_fetch=True, parameters=[], + async_op=False, ): assert session_handle is not None @@ -846,7 +912,11 @@ def execute_command( parameters=parameters, ) resp = self.make_request(self._client.ExecuteStatement, req) - return self._handle_execute_response(resp, cursor) + + if async_op: + self._handle_execute_response_async(resp, cursor) + else: + return self._handle_execute_response(resp, cursor) def get_catalogs(self, session_handle, max_rows, max_bytes, cursor): assert session_handle is not None @@ -945,6 +1015,10 @@ def _handle_execute_response(self, resp, cursor): return self._results_message_to_execute_response(resp, final_operation_state) + def _handle_execute_response_async(self, resp, cursor): + cursor.active_op_handle = resp.operationHandle + self._check_direct_results_for_error(resp.directResults) + def fetch_results( self, op_handle, diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index cfd1e9699..2f0881cda 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -36,6 +36,7 @@ compare_dbr_versions, is_thrift_v5_plus, ) +from databricks.sql.thrift_api.TCLIService import ttypes from tests.e2e.common.core_tests import CoreTestMixin, SmokeTestMixin from tests.e2e.common.large_queries_mixin import LargeQueriesMixin from tests.e2e.common.timestamp_tests import TimestampTestsMixin @@ -78,6 +79,7 @@ class PySQLPytestTestCase: } arraysize = 1000 buffer_size_bytes = 104857600 + POLLING_INTERVAL = 2 @pytest.fixture(autouse=True) def get_details(self, connection_details): @@ -175,6 +177,27 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] + def test_execute_async(self): + def isExecuting(operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" + with self.cursor() as cursor: + cursor.execute_async(long_running_query) + + ## Polling after every POLLING_INTERVAL seconds + while isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"count(1)": 0} + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests From 980af886677edcb3fbfd1fd482a187e780d318e9 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 26 Nov 2024 22:53:06 +0530 Subject: [PATCH 03/22] Fix for check_types github action failing (#472) Fixed the chekc_types failing --- src/databricks/sql/auth/retry.py | 2 +- tests/unit/test_retry.py | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index ec321bed0..0243d0aa2 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -305,7 +305,7 @@ def get_backoff_time(self) -> float: :return: """ - current_attempt = self.stop_after_attempts_count - self.total + current_attempt = self.stop_after_attempts_count - int(self.total or 0) proposed_backoff = (2**current_attempt) * self.delay_min if self.backoff_jitter != 0.0: proposed_backoff += random.random() * self.backoff_jitter diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index b7648ffbd..1e18e1f43 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -5,6 +5,7 @@ from databricks.sql.auth.retry import DatabricksRetryPolicy, RequestHistory, CommandType from urllib3.exceptions import MaxRetryError + class TestRetry: @pytest.fixture() def retry_policy(self) -> DatabricksRetryPolicy: @@ -33,7 +34,9 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - expected_backoff_time = self.calculate_backoff_time(0, retry_policy.delay_min, retry_policy.delay_max) + expected_backoff_time = self.calculate_backoff_time( + 0, retry_policy.delay_min, retry_policy.delay_max + ) t_mock.assert_called_with(expected_backoff_time) @patch("time.sleep") @@ -50,10 +53,16 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli expected_backoff_times = [] for attempt in range(num_attempts): - expected_backoff_times.append(self.calculate_backoff_time(attempt, retry_policy.delay_min, retry_policy.delay_max)) + expected_backoff_times.append( + self.calculate_backoff_time( + attempt, retry_policy.delay_min, retry_policy.delay_max + ) + ) # Asserts if the sleep value was called in the expected order - t_mock.assert_has_calls([call(expected_time) for expected_time in expected_backoff_times]) + t_mock.assert_has_calls( + [call(expected_time) for expected_time in expected_backoff_times] + ) @patch("time.sleep") def test_excessive_retry_attempts_error(self, t_mock, retry_policy): From d6905164f5df4591f5c311ca39c16ef3b230624d Mon Sep 17 00:00:00 2001 From: arredond Date: Thu, 5 Dec 2024 18:44:15 +0100 Subject: [PATCH 04/22] Remove upper caps on dependencies (#452) * Remove upper caps on numpy and pyarrow versions --- poetry.lock | 2 +- pyproject.toml | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9fe49690f..576adbd35 100755 --- a/poetry.lock +++ b/poetry.lock @@ -1202,4 +1202,4 @@ sqlalchemy = ["sqlalchemy"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "31066a85f646d0009d6fe9ffc833a64fcb4b6923c2e7f2652e7aa8540acba298" +content-hash = "9d8a91369fc79f9ca9f7502e2ed284b66531c954ae59a723e465a76073966998" diff --git a/pyproject.toml b/pyproject.toml index 1747d21b1..23ffed58a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,14 +14,14 @@ thrift = ">=0.16.0,<0.21.0" pandas = [ { version = ">=1.2.5,<2.3.0", python = ">=3.8" } ] -pyarrow = ">=14.0.1,<17" +pyarrow = ">=14.0.1" lz4 = "^4.0.2" requests = "^2.18.1" oauthlib = "^3.1.0" numpy = [ - { version = "^1.16.6", python = ">=3.8,<3.11" }, - { version = "^1.23.4", python = ">=3.11" }, + { version = ">=1.16.6", python = ">=3.8,<3.11" }, + { version = ">=1.23.4", python = ">=3.11" }, ] sqlalchemy = { version = ">=2.0.21", optional = true } openpyxl = "^3.0.10" From 680b3b6df55518bb22939a9c1f052759c1b680df Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 6 Dec 2024 10:15:23 +0530 Subject: [PATCH 05/22] Updated the doc to specify native parameters in PUT operation is not supported from >=3.x connector (#477) Added doc update --- docs/parameters.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/parameters.md b/docs/parameters.md index a538af1a6..f9f4c5ff9 100644 --- a/docs/parameters.md +++ b/docs/parameters.md @@ -17,6 +17,7 @@ See `examples/parameters.py` in this repository for a runnable demo. - A query executed with native parameters can contain at most 255 parameter markers - The maximum size of all parameterized values cannot exceed 1MB +- For volume operations such as PUT, native parameters are not supported ## SQL Syntax From ab4b73bb1dfcd6baed43e46e77cbe7db26c7f2a0 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sun, 22 Dec 2024 22:08:11 +0530 Subject: [PATCH 06/22] Incorrect rows in inline fetch result (#479) * Raised error when incorrect Row offset it returned * Changed error type * grammar fix * Added unit tests and modified the code * Updated error message * Updated the non retying to only inline case * Updated fix * Changed the flow * Minor update * Updated the retryable condition * Minor test fix * Added extra space --- src/databricks/sql/client.py | 4 ++++ src/databricks/sql/thrift_backend.py | 12 +++++++----- tests/unit/test_fetches.py | 1 + 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 8ea81e12b..aefed1ef0 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -808,6 +808,7 @@ def execute( self.thrift_backend, self.buffer_size_bytes, self.arraysize, + self.connection.use_cloud_fetch, ) if execute_response.is_staging_operation: @@ -1202,6 +1203,7 @@ def __init__( thrift_backend: ThriftBackend, result_buffer_size_bytes: int = DEFAULT_RESULT_BUFFER_SIZE_BYTES, arraysize: int = 10000, + use_cloud_fetch: bool = True, ): """ A ResultSet manages the results of a single command. @@ -1223,6 +1225,7 @@ def __init__( self.description = execute_response.description self._arrow_schema_bytes = execute_response.arrow_schema_bytes self._next_row_index = 0 + self._use_cloud_fetch = use_cloud_fetch if execute_response.arrow_queue: # In this case the server has taken the fast path and returned an initial batch of @@ -1250,6 +1253,7 @@ def _fill_results_buffer(self): lz4_compressed=self.lz4_compressed, arrow_schema_bytes=self._arrow_schema_bytes, description=self.description, + use_cloud_fetch=self._use_cloud_fetch, ) self.results = results self.has_more_rows = has_more_rows diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 8c212c554..5fbd9f749 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -321,7 +321,7 @@ def _handle_request_error(self, error_info, attempt, elapsed): # FUTURE: Consider moving to https://github.com/litl/backoff or # https://github.com/jd/tenacity for retry logic. - def make_request(self, method, request): + def make_request(self, method, request, retryable=True): """Execute given request, attempting retries when 1. Receiving HTTP 429/503 from server 2. OSError is raised during a GetOperationStatus @@ -460,7 +460,7 @@ def attempt_request(attempt): # return on success # if available: bounded delay and retry # if not: raise error - max_attempts = self._retry_stop_after_attempts_count + max_attempts = self._retry_stop_after_attempts_count if retryable else 1 # use index-1 counting for logging/human consistency for attempt in range(1, max_attempts + 1): @@ -1028,6 +1028,7 @@ def fetch_results( lz4_compressed, arrow_schema_bytes, description, + use_cloud_fetch=True, ): assert op_handle is not None @@ -1044,10 +1045,11 @@ def fetch_results( includeResultSetMetadata=True, ) - resp = self.make_request(self._client.FetchResults, req) + # Fetch results in Inline mode with FETCH_NEXT orientation are not idempotent and hence not retried + resp = self.make_request(self._client.FetchResults, req, use_cloud_fetch) if resp.results.startRowOffset > expected_row_start_offset: - logger.warning( - "Expected results to start from {} but they instead start at {}".format( + raise DataError( + "fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format( expected_row_start_offset, resp.results.startRowOffset ) ) diff --git a/tests/unit/test_fetches.py b/tests/unit/test_fetches.py index e9a58acdd..89cedcfae 100644 --- a/tests/unit/test_fetches.py +++ b/tests/unit/test_fetches.py @@ -66,6 +66,7 @@ def fetch_results( lz4_compressed, arrow_schema_bytes, description, + use_cloud_fetch=True, ): nonlocal batch_index results = FetchTests.make_arrow_queue(batch_list[batch_index]) From f9d6ef12cf9d49298d7d374cacb21a58519c24ce Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 23 Dec 2024 14:12:02 +0530 Subject: [PATCH 07/22] Bumped up to version 3.7.0 (#482) * bumped up version * Updated to version 3.7.0 * Grammar fix * Minor fix --- CHANGELOG.md | 8 ++++++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1a70f961..d426b97e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Release History +# 3.7.0 (2024-12-23) + +- Fix: Incorrect number of rows fetched in inline results when fetching results with FETCH_NEXT orientation (databricks/databricks-sql-python#479 by @jprakash-db) +- Updated the doc to specify native parameters are not supported in PUT operation (databricks/databricks-sql-python#477 by @jprakash-db) +- Relax `pyarrow` and `numpy` pin (databricks/databricks-sql-python#452 by @arredond) +- Feature: Support for async execute has been added (databricks/databricks-sql-python#463 by @jprakash-db) +- Updated the HTTP retry logic to be similar to the other Databricks drivers (databricks/databricks-sql-python#467 by @jprakash-db) + # 3.6.0 (2024-10-25) - Support encryption headers in the cloud fetch request (https://github.com/databricks/databricks-sql-python/pull/460 by @jackyhu-db) diff --git a/pyproject.toml b/pyproject.toml index 23ffed58a..dc13f283a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.6.0" +version = "3.7.0" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index 42167b008..eff1e812d 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.6.0" +__version__ = "3.7.0" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From 97b6392bf84cf4f2a70bcd89c20e3bf81e229621 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 7 Jan 2025 22:49:39 +0530 Subject: [PATCH 08/22] Increased the number of retry attempts allowed (#486) Updated the number of attempts allowed --- src/databricks/sql/thrift_backend.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 5fbd9f749..f76350a21 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -67,7 +67,7 @@ _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), "_retry_delay_max": (float, 30, 5, 3600), - "_retry_stop_after_attempts_count": (int, 5, 1, 60), + "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), } From 4c62c69178f7ff337123da273abafc82ce012b19 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 7 Jan 2025 23:03:13 +0530 Subject: [PATCH 09/22] bump version to 3.7.1 (#487) bumped up version --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d426b97e0..5a2b6c885 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +# 3.7.1 (2025-01-07) + +- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db) + # 3.7.0 (2024-12-23) - Fix: Incorrect number of rows fetched in inline results when fetching results with FETCH_NEXT orientation (databricks/databricks-sql-python#479 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index dc13f283a..0156a5b8c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.0" +version = "3.7.1" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index eff1e812d..445328847 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.0" +__version__ = "3.7.1" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From fc9da22e35a7a2b9615be66dfe7b9ed1e5cb4ba7 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sat, 1 Feb 2025 02:52:06 +0530 Subject: [PATCH 10/22] Updated retry timeout (#497) * Updated the retry_logic * Added logging for checking retry time --- src/databricks/sql/auth/retry.py | 3 ++- src/databricks/sql/thrift_backend.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 0243d0aa2..eedcc773f 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -290,8 +290,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: else: proposed_wait = self.get_backoff_time() - proposed_wait = min(proposed_wait, self.delay_max) + proposed_wait = max(proposed_wait, self.delay_max) self.check_proposed_wait(proposed_wait) + logger.debug(f"Retrying after {proposed_wait} seconds") time.sleep(proposed_wait) return True diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index f76350a21..9beab0371 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -66,7 +66,7 @@ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins) _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), - "_retry_delay_max": (float, 30, 5, 3600), + "_retry_delay_max": (float, 60, 5, 3600), "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), From accf6ffffe9a2b0dd66b4266739f3d5002131c0f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 3 Feb 2025 03:38:43 +0530 Subject: [PATCH 11/22] Fixed the retry tests failing (#500) --- tests/e2e/common/retry_test_mixins.py | 20 ++++++++++---------- tests/unit/test_retry.py | 21 ++++++++++----------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index 942955cab..b5d01a45d 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -121,9 +121,9 @@ class PySQLRetryTestsMixin: # For testing purposes _retry_policy = { "_retry_delay_min": 0.1, - "_retry_delay_max": 5, + "_retry_delay_max": 3, "_retry_stop_after_attempts_count": 5, - "_retry_stop_after_attempts_duration": 10, + "_retry_stop_after_attempts_duration": 30, "_retry_delay_default": 0.5, } @@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self): urllib3_config = {"connect": 10, "read": 11, "redirect": 12} rp = DatabricksRetryPolicy( delay_min=0.1, - delay_max=10.0, + delay_max=3, stop_after_attempts_count=10, stop_after_attempts_duration=10.0, delay_default=1.0, @@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self): def test_retry_exponential_backoff(self): """GIVEN the retry policy is configured for reasonable exponential backoff WHEN the server sends nothing but 429 responses with retry-afters - THEN the connector will use those retry-afters values as delay + THEN the connector will use those retry-afters values as floor """ retry_policy = self._retry_policy.copy() retry_policy["_retry_delay_min"] = 1 time_start = time.time() with mocked_server_response( - status=429, headers={"Retry-After": "3"} + status=429, headers={"Retry-After": "8"} ) as mock_obj: with pytest.raises(RequestError) as cm: with self.connection(extra_params=retry_policy) as conn: @@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self): assert isinstance(cm.value.args[1], MaxRetryDurationError) # With setting delay_min to 1, the expected retry delays should be: - # 3, 3, 3, 3 + # 8, 8, 8, 8 # The first 3 retries are allowed, the 4th retry puts the total duration over the limit - # of 10 seconds + # of 30 seconds assert mock_obj.return_value.getresponse.call_count == 4 - assert duration > 6 + assert duration > 24 - # Should be less than 7, but this is a safe margin for CI/CD slowness - assert duration < 10 + # Should be less than 26, but this is a safe margin for CI/CD slowness + assert duration < 30 def test_retry_max_duration_not_exceeded(self): """GIVEN the max attempt duration of 10 seconds diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 1e18e1f43..897a1d111 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -34,8 +34,11 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - expected_backoff_time = self.calculate_backoff_time( - 0, retry_policy.delay_min, retry_policy.delay_max + expected_backoff_time = max( + self.calculate_backoff_time( + 0, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) t_mock.assert_called_with(expected_backoff_time) @@ -54,8 +57,11 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli expected_backoff_times = [] for attempt in range(num_attempts): expected_backoff_times.append( - self.calculate_backoff_time( - attempt, retry_policy.delay_min, retry_policy.delay_max + max( + self.calculate_backoff_time( + attempt, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) ) @@ -77,10 +83,3 @@ def test_excessive_retry_attempts_error(self, t_mock, retry_policy): retry_policy.sleep(HTTPResponse(status=503)) # Internally urllib3 calls the increment function generating a new instance for every retry retry_policy = retry_policy.increment() - - @patch("time.sleep") - def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history): - retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(3) From 74585f3e401d0d002f8a1c319165fd72d8daa66f Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 3 Feb 2025 05:44:25 +0530 Subject: [PATCH 12/22] Bump version to v3.7.2 (#498) Updated version bump --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a2b6c885..94db8bcdc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +# 3.7.2 (2025-01-31) + +- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db) + # 3.7.1 (2025-01-07) - Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index 0156a5b8c..6a38e33b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.1" +version = "3.7.2" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index 445328847..931e5c0af 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.1" +__version__ = "3.7.2" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From 72b0044573ef4de868ce0037d70b40032e5fbb43 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 19 Feb 2025 17:03:36 +0530 Subject: [PATCH 13/22] Support for enforcing embedded schema (#505) * Added the enforce schema option * Renamed the variable --- src/databricks/sql/client.py | 4 ++++ src/databricks/sql/thrift_backend.py | 2 ++ 2 files changed, 6 insertions(+) diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index aefed1ef0..7e3aa02a9 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -737,6 +737,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -801,6 +802,7 @@ def execute( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=False, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) self.active_result_set = ResultSet( self.connection, @@ -822,6 +824,7 @@ def execute_async( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ @@ -862,6 +865,7 @@ def execute_async( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=True, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) return self diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 9beab0371..aa0f2ede4 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -883,6 +883,7 @@ def execute_command( use_cloud_fetch=True, parameters=[], async_op=False, + enforce_embedded_schema_correctness=False, ): assert session_handle is not None @@ -910,6 +911,7 @@ def execute_command( }, useArrowNativeTypes=spark_arrow_types, parameters=parameters, + enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness, ) resp = self.make_request(self._client.ExecuteStatement, req) From ce456fa585db886882373959f9923904adfe7332 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Wed, 19 Feb 2025 17:03:50 +0530 Subject: [PATCH 14/22] Reduced duration of long running query (#507) --- tests/e2e/common/large_queries_mixin.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 41ef029bb..29231103a 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -83,11 +83,11 @@ def test_query_with_large_narrow_result_set(self): assert row[0] == row_id def test_long_running_query(self): - """Incrementally increase query size until it takes at least 5 minutes, + """Incrementally increase query size until it takes at least 4 minutes, and asserts that the query completes successfully. """ minutes = 60 - min_duration = 5 * minutes + min_duration = 4 * minutes duration = -1 scale0 = 10000 @@ -113,5 +113,5 @@ def test_long_running_query(self): duration = time.time() - start current_fraction = duration / min_duration print("Took {} s with scale factor={}".format(duration, scale_factor)) - # Extrapolate linearly to reach 5 min and add 50% padding to push over the limit + # Extrapolate linearly to reach 4 min and add 50% padding to push over the limit scale_factor = math.ceil(1.5 * scale_factor / current_fraction) From 31111581730a9a110e354753a1a4393cb5c94b43 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Sun, 23 Feb 2025 23:43:49 +0530 Subject: [PATCH 15/22] Added info logging for response (#511) * Added some logging support * Logs are based on what the server returns * Added back some of the better errors * Print status if not detected as error * Added logging in the flush step * Reformatted * Removed some unnecessary --- src/databricks/sql/auth/retry.py | 18 ++++++++++-------- src/databricks/sql/auth/thrift_http_client.py | 6 ++++++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index eedcc773f..57cfeed58 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -345,23 +345,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: if a retry would violate the configured policy. """ + logger.info(f"Received status code {status_code} for {method} request") + # Request succeeded. Don't retry. if status_code == 200: return False, "200 codes are not retried" if status_code == 401: - raise NonRecoverableNetworkError( - "Received 401 - UNAUTHORIZED. Confirm your authentication credentials." + return ( + False, + "Received 401 - UNAUTHORIZED. Confirm your authentication credentials.", ) if status_code == 403: - raise NonRecoverableNetworkError( - "Received 403 - FORBIDDEN. Confirm your authentication credentials." - ) + return False, "403 codes are not retried" # Request failed and server said NotImplemented. This isn't recoverable. Don't retry. if status_code == 501: - raise NonRecoverableNetworkError("Received code 501 from server.") + return False, "Received code 501 from server." # Request failed and this method is not retryable. We only retry POST requests. if not self._is_method_retryable(method): @@ -400,8 +401,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: and status_code not in self.status_forcelist and status_code not in self.force_dangerous_codes ): - raise UnsafeToRetryError( - "ExecuteStatement command can only be retried for codes 429 and 503" + return ( + False, + "ExecuteStatement command can only be retried for codes 429 and 503", ) # Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this diff --git a/src/databricks/sql/auth/thrift_http_client.py b/src/databricks/sql/auth/thrift_http_client.py index 6273ab284..f0daae162 100644 --- a/src/databricks/sql/auth/thrift_http_client.py +++ b/src/databricks/sql/auth/thrift_http_client.py @@ -198,6 +198,12 @@ def flush(self): self.message = self.__resp.reason self.headers = self.__resp.headers + logger.info( + "HTTP Response with status code {}, message: {}".format( + self.code, self.message + ) + ) + @staticmethod def basic_proxy_auth_headers(proxy): if proxy is None or not proxy.username: From fb3e32c58bdf6ea3c7b66da2c6ecf5eea065ae2c Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 28 Feb 2025 10:45:13 +0530 Subject: [PATCH 16/22] Fix : Unable to poll results in async API (#515) * Fixed the async issue * Added unit tests * Minor change * Changed to 5 * Increased time --- src/databricks/sql/thrift_backend.py | 8 ++- tests/e2e/common/large_queries_mixin.py | 2 +- tests/e2e/test_driver.py | 66 ++++++++++++++++++++++--- 3 files changed, 66 insertions(+), 10 deletions(-) diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index aa0f2ede4..972d9a9b2 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -899,8 +899,12 @@ def execute_command( sessionHandle=session_handle, statement=operation, runAsync=True, - getDirectResults=ttypes.TSparkGetDirectResults( - maxRows=max_rows, maxBytes=max_bytes + # For async operation we don't want the direct results + getDirectResults=None + if async_op + else ttypes.TSparkGetDirectResults( + maxRows=max_rows, + maxBytes=max_bytes, ), canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 29231103a..3a35d8014 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -94,7 +94,7 @@ def test_long_running_query(self): scale_factor = 1 with self.cursor() as cursor: while duration < min_duration: - assert scale_factor < 512, "Detected infinite loop" + assert scale_factor < 1024, "Detected infinite loop" start = time.time() cursor.execute( diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 2f0881cda..45fea480b 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -177,19 +177,22 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] - def test_execute_async(self): - def isExecuting(operation_state): - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] + +class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase): + def isExecuting(self, operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + def test_execute_async__long_running(self): long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" with self.cursor() as cursor: cursor.execute_async(long_running_query) ## Polling after every POLLING_INTERVAL seconds - while isExecuting(cursor.get_query_state()): + while self.isExecuting(cursor.get_query_state()): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -198,6 +201,55 @@ def isExecuting(operation_state): assert result[0].asDict() == {"count(1)": 0} + def test_execute_async__small_result(self): + small_result_query = "SELECT 1" + + with self.cursor() as cursor: + cursor.execute_async(small_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"1": 1} + + def test_execute_async__large_result(self): + x_dimension = 1000 + y_dimension = 1000 + large_result_query = f""" + SELECT + x.id AS x_id, + y.id AS y_id, + FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date + FROM + RANGE({x_dimension}) x + JOIN + RANGE({y_dimension}) y + """ + + with self.cursor() as cursor: + cursor.execute_async(large_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert len(result) == x_dimension * y_dimension + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests From eb6d926f3b97fadcb5744176b951c71be0df5082 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Fri, 28 Feb 2025 23:34:38 +0530 Subject: [PATCH 17/22] Bump up version to 3.7.3 (#523) Version bump changes --- CHANGELOG.md | 6 ++++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 94db8bcdc..01547f0aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Release History +# 3.7.3 (2025-03-28) + +- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db) +- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db) +- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db) + # 3.7.2 (2025-01-31) - Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index 6a38e33b1..a393c0f2f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.2" +version = "3.7.3" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index 931e5c0af..7769d8f42 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.2" +__version__ = "3.7.3" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From 99be68e891362642ca5140c7de781fa9bbc06c18 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Thu, 17 Apr 2025 15:15:50 +0530 Subject: [PATCH 18/22] Added version check for urllib3 (#545) --- src/databricks/sql/auth/retry.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 57cfeed58..f7b6fc9da 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -2,6 +2,7 @@ import random import time import typing +from importlib.metadata import version from enum import Enum from typing import List, Optional, Tuple, Union @@ -308,8 +309,11 @@ def get_backoff_time(self) -> float: current_attempt = self.stop_after_attempts_count - int(self.total or 0) proposed_backoff = (2**current_attempt) * self.delay_min - if self.backoff_jitter != 0.0: - proposed_backoff += random.random() * self.backoff_jitter + + library_version = version("urllib3") + if int(library_version.split(".")[0]) >= 2: + if self.backoff_jitter != 0.0: + proposed_backoff += random.random() * self.backoff_jitter proposed_backoff = min(proposed_backoff, self.delay_max) self.check_proposed_wait(proposed_backoff) From 62b5aad383bed8411f278905c12b233449a40566 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 21 Apr 2025 17:28:26 +0530 Subject: [PATCH 19/22] Updating the actions/cache version (#546) Updated the cache version --- .github/workflows/code-quality-checks.yml | 6 +++--- .github/workflows/integration.yml | 2 +- .github/workflows/publish-test.yml | 2 +- .github/workflows/publish.yml | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index 80ac94a7c..b4d4776e7 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -38,7 +38,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} @@ -89,7 +89,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} @@ -141,7 +141,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index f28c22a85..c0122f255 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -41,7 +41,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/publish-test.yml b/.github/workflows/publish-test.yml index d95e2e3c6..203c6571d 100644 --- a/.github/workflows/publish-test.yml +++ b/.github/workflows/publish-test.yml @@ -29,7 +29,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 324575ff8..7960e9ed0 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -31,7 +31,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} From 76a376b94d5a4ebbeffcbbeb1c4fc99659260e74 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Tue, 22 Apr 2025 10:13:19 +0530 Subject: [PATCH 20/22] Bump up version to 3.7.4 (#548) Version bump 3.7.4 --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 01547f0aa..99173ceae 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +# 3.7.4 (2025-04-21) + +- Fix: compatibility with urllib3 versions less than 2.x (databricks/databricks-sql-python#545 by @jprakash-db) + # 3.7.3 (2025-03-28) - Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index a393c0f2f..afe7dd3fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.3" +version = "3.7.4" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index 7769d8f42..e80bb8376 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.3" +__version__ = "3.7.4" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy From bd7f870f1e4c63ee72998d41322ffd5615715834 Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 2 Mar 2026 20:27:25 +0530 Subject: [PATCH 21/22] Pyarrow Concat to to know merge with promote options as default (#745) * Update the Pyarrow concat * Updated the tests python versions * Updated github actions * removed python3.12 * Black formatted * Fixed tests * Fixed more tests --- .github/workflows/code-quality-checks.yml | 24 +++++++++++---------- .github/workflows/integration.yml | 5 +++-- .github/workflows/publish-test.yml | 5 +++-- .github/workflows/publish.yml | 5 +++-- src/databricks/sql/client.py | 8 +++++-- src/databricks/sql/utils.py | 8 +++++-- tests/e2e/common/large_queries_mixin.py | 6 +++--- tests/e2e/common/staging_ingestion_tests.py | 8 +++---- tests/e2e/test_complex_types.py | 10 ++++----- tests/e2e/test_parameterized_queries.py | 8 +++---- tests/unit/test_cloud_fetch_queue.py | 4 ++-- 11 files changed, 52 insertions(+), 39 deletions(-) diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index b4d4776e7..946374d1f 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -4,23 +4,22 @@ on: branches: - main pull_request: - branches: - - main + jobs: run-unit-tests: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10", "3.11"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -29,6 +28,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -62,16 +62,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -80,6 +80,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -114,16 +115,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -132,6 +133,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index c0122f255..4d10464bd 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -20,10 +20,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.10" #---------------------------------------------- @@ -32,6 +32,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true diff --git a/.github/workflows/publish-test.yml b/.github/workflows/publish-test.yml index 203c6571d..b7ffee9f4 100644 --- a/.github/workflows/publish-test.yml +++ b/.github/workflows/publish-test.yml @@ -9,10 +9,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.9 #---------------------------------------------- @@ -21,6 +21,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 7960e9ed0..c592756b8 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -11,10 +11,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.9 #---------------------------------------------- @@ -23,6 +23,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index 7e3aa02a9..59c74c6ba 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -1335,7 +1335,9 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table": ): self._fill_results_buffer() partial_results = self.results.next_n_rows(n_remaining_rows) - results = pyarrow.concat_tables([results, partial_results]) + results = pyarrow.concat_tables( + [results, partial_results], promote_options="default" + ) n_remaining_rows -= partial_results.num_rows self._next_row_index += partial_results.num_rows @@ -1391,7 +1393,9 @@ def fetchall_arrow(self) -> "pyarrow.Table": while not self.has_been_closed_server_side and self.has_more_rows: self._fill_results_buffer() partial_results = self.results.remaining_rows() - results = pyarrow.concat_tables([results, partial_results]) + results = pyarrow.concat_tables( + [results, partial_results], promote_options="default" + ) self._next_row_index += partial_results.num_rows return results diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index cd655c4ea..48e0aa77e 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -265,7 +265,9 @@ def next_n_rows(self, num_rows: int) -> "pyarrow.Table": # Get remaining of num_rows or the rest of the current table, whichever is smaller length = min(num_rows, self.table.num_rows - self.table_row_index) table_slice = self.table.slice(self.table_row_index, length) - results = pyarrow.concat_tables([results, table_slice]) + results = pyarrow.concat_tables( + [results, table_slice], promote_options="default" + ) self.table_row_index += table_slice.num_rows # Replace current table with the next table if we are at the end of the current table @@ -292,7 +294,9 @@ def remaining_rows(self) -> "pyarrow.Table": table_slice = self.table.slice( self.table_row_index, self.table.num_rows - self.table_row_index ) - results = pyarrow.concat_tables([results, table_slice]) + results = pyarrow.concat_tables( + [results, table_slice], promote_options="default" + ) self.table_row_index += table_slice.num_rows self.table = self._create_next_table() self.table_row_index = 0 diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 3a35d8014..5fe292d91 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -83,11 +83,11 @@ def test_query_with_large_narrow_result_set(self): assert row[0] == row_id def test_long_running_query(self): - """Incrementally increase query size until it takes at least 4 minutes, + """Incrementally increase query size until it takes at least 2 minutes, and asserts that the query completes successfully. """ minutes = 60 - min_duration = 4 * minutes + min_duration = 2 * minutes duration = -1 scale0 = 10000 @@ -113,5 +113,5 @@ def test_long_running_query(self): duration = time.time() - start current_fraction = duration / min_duration print("Took {} s with scale factor={}".format(duration, scale_factor)) - # Extrapolate linearly to reach 4 min and add 50% padding to push over the limit + # Extrapolate linearly to reach 2 min and add 50% padding to push over the limit scale_factor = math.ceil(1.5 * scale_factor / current_fraction) diff --git a/tests/e2e/common/staging_ingestion_tests.py b/tests/e2e/common/staging_ingestion_tests.py index 008055e33..39aea3feb 100644 --- a/tests/e2e/common/staging_ingestion_tests.py +++ b/tests/e2e/common/staging_ingestion_tests.py @@ -46,7 +46,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): ) as conn: cursor = conn.cursor() - query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' OVERWRITE" cursor.execute(query) # GET should succeed @@ -57,7 +57,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): extra_params={"staging_allowed_local_path": new_temp_path} ) as conn: cursor = conn.cursor() - query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' TO '{new_temp_path}'" cursor.execute(query) with open(new_fh, "rb") as fp: @@ -67,7 +67,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): # REMOVE should succeed - remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv'" + remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv'" with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn: cursor = conn.cursor() @@ -79,7 +79,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): Error, match="Staging operation over HTTP was unsuccessful: 404" ): cursor = conn.cursor() - query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' TO '{new_temp_path}'" cursor.execute(query) os.remove(temp_path) diff --git a/tests/e2e/test_complex_types.py b/tests/e2e/test_complex_types.py index 446a6b50a..77d634242 100644 --- a/tests/e2e/test_complex_types.py +++ b/tests/e2e/test_complex_types.py @@ -14,7 +14,7 @@ def table_fixture(self, connection_details): # Create the table cursor.execute( """ - CREATE TABLE IF NOT EXISTS pysql_test_complex_types_table ( + CREATE TABLE IF NOT EXISTS pysql_test_complex_types_table_deprecated ( array_col ARRAY, map_col MAP, struct_col STRUCT @@ -24,7 +24,7 @@ def table_fixture(self, connection_details): # Insert a record cursor.execute( """ - INSERT INTO pysql_test_complex_types_table + INSERT INTO pysql_test_complex_types_table_deprecated VALUES ( ARRAY('a', 'b', 'c'), MAP('a', 1, 'b', 2, 'c', 3), @@ -34,7 +34,7 @@ def table_fixture(self, connection_details): ) yield # Clean up the table after the test - cursor.execute("DROP TABLE IF EXISTS pysql_test_complex_types_table") + cursor.execute("DROP TABLE IF EXISTS pysql_test_complex_types_table_deprecated") @pytest.mark.parametrize( "field,expected_type", @@ -45,7 +45,7 @@ def test_read_complex_types_as_arrow(self, field, expected_type, table_fixture): with self.cursor() as cursor: result = cursor.execute( - "SELECT * FROM pysql_test_complex_types_table LIMIT 1" + "SELECT * FROM pysql_test_complex_types_table_deprecated LIMIT 1" ).fetchone() assert isinstance(result[field], expected_type) @@ -57,7 +57,7 @@ def test_read_complex_types_as_string(self, field, table_fixture): extra_params={"_use_arrow_native_complex_types": False} ) as cursor: result = cursor.execute( - "SELECT * FROM pysql_test_complex_types_table LIMIT 1" + "SELECT * FROM pysql_test_complex_types_table_deprecated LIMIT 1" ).fetchone() assert isinstance(result[field], str) diff --git a/tests/e2e/test_parameterized_queries.py b/tests/e2e/test_parameterized_queries.py index d346ad5c6..ddf8b33fd 100644 --- a/tests/e2e/test_parameterized_queries.py +++ b/tests/e2e/test_parameterized_queries.py @@ -123,7 +123,7 @@ def inline_table(self, connection_details): """ query = """ - CREATE TABLE IF NOT EXISTS pysql_e2e_inline_param_test_table ( + CREATE TABLE IF NOT EXISTS pysql_e2e_inline_param_test_table_deprecated ( null_col INT, int_col INT, bigint_col BIGINT, @@ -167,9 +167,9 @@ def _inline_roundtrip(self, params: dict, paramstyle: ParamStyle): This is a no-op but is included to make the test-code easier to read. """ target_column = self._get_inline_table_column(params.get("p")) - INSERT_QUERY = f"INSERT INTO pysql_e2e_inline_param_test_table (`{target_column}`) VALUES (%(p)s)" - SELECT_QUERY = f"SELECT {target_column} `col` FROM pysql_e2e_inline_param_test_table LIMIT 1" - DELETE_QUERY = "DELETE FROM pysql_e2e_inline_param_test_table" + INSERT_QUERY = f"INSERT INTO pysql_e2e_inline_param_test_table_deprecated (`{target_column}`) VALUES (%(p)s)" + SELECT_QUERY = f"SELECT {target_column} `col` FROM pysql_e2e_inline_param_test_table_deprecated LIMIT 1" + DELETE_QUERY = "DELETE FROM pysql_e2e_inline_param_test_table_deprecated" with self.connection(extra_params={"use_inline_params": True}) as conn: with conn.cursor() as cursor: diff --git a/tests/unit/test_cloud_fetch_queue.py b/tests/unit/test_cloud_fetch_queue.py index 01d8a79b9..7f19f5a57 100644 --- a/tests/unit/test_cloud_fetch_queue.py +++ b/tests/unit/test_cloud_fetch_queue.py @@ -185,7 +185,7 @@ def test_next_n_rows_more_than_one_table(self, mock_create_next_table): assert ( result == pyarrow.concat_tables( - [self.make_arrow_table(), self.make_arrow_table()] + [self.make_arrow_table(), self.make_arrow_table()],promote_options="default" )[:7] ) @@ -309,7 +309,7 @@ def test_remaining_rows_multiple_tables_fully_returned( assert ( result == pyarrow.concat_tables( - [self.make_arrow_table(), self.make_arrow_table()] + [self.make_arrow_table(), self.make_arrow_table()],promote_options="default" )[3:] ) From c6664895828adc7534b00635fb867698103ad78d Mon Sep 17 00:00:00 2001 From: Jothi Prakash Date: Mon, 2 Mar 2026 21:07:58 +0530 Subject: [PATCH 22/22] Bumped version to 3.7.5 (#748) Bumped version --- CHANGELOG.md | 4 ++++ pyproject.toml | 2 +- src/databricks/sql/__init__.py | 2 +- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 99173ceae..c2ffbbe92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Release History +# 3.7.5 (2026-03-02) + +- Fix: Pyarrow concat to now merge with promote options as default (databricks/databricks-sql-python#745 by @jprakash-db) + # 3.7.4 (2025-04-21) - Fix: compatibility with urllib3 versions less than 2.x (databricks/databricks-sql-python#545 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index afe7dd3fc..7303785e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.4" +version = "3.7.5" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index e80bb8376..a33b0e6c5 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.4" +__version__ = "3.7.5" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy