diff --git a/.github/workflows/code-quality-checks.yml b/.github/workflows/code-quality-checks.yml index 80ac94a7c..946374d1f 100644 --- a/.github/workflows/code-quality-checks.yml +++ b/.github/workflows/code-quality-checks.yml @@ -4,23 +4,22 @@ on: branches: - main pull_request: - branches: - - main + jobs: run-unit-tests: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10", "3.11"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -29,6 +28,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -38,7 +38,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} @@ -62,16 +62,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -80,6 +80,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -89,7 +90,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} @@ -114,16 +115,16 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: [3.8, 3.9, "3.10"] + python-version: [3.9, "3.10", "3.11"] steps: #---------------------------------------------- # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python ${{ matrix.python-version }} id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} #---------------------------------------------- @@ -132,6 +133,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -141,7 +143,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index f28c22a85..4d10464bd 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -20,10 +20,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: "3.10" #---------------------------------------------- @@ -32,6 +32,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -41,7 +42,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/publish-test.yml b/.github/workflows/publish-test.yml index d95e2e3c6..b7ffee9f4 100644 --- a/.github/workflows/publish-test.yml +++ b/.github/workflows/publish-test.yml @@ -9,10 +9,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.9 #---------------------------------------------- @@ -21,6 +21,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -29,7 +30,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 324575ff8..c592756b8 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -11,10 +11,10 @@ jobs: # check-out repo and set-up python #---------------------------------------------- - name: Check out repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Set up python id: setup-python - uses: actions/setup-python@v2 + uses: actions/setup-python@v5 with: python-version: 3.9 #---------------------------------------------- @@ -23,6 +23,7 @@ jobs: - name: Install Poetry uses: snok/install-poetry@v1 with: + version: "2.2.1" virtualenvs-create: true virtualenvs-in-project: true installer-parallel: true @@ -31,7 +32,7 @@ jobs: #---------------------------------------------- - name: Load cached venv id: cached-poetry-dependencies - uses: actions/cache@v2 + uses: actions/cache@v4 with: path: .venv key: venv-${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-${{ github.event.repository.name }}-${{ hashFiles('**/poetry.lock') }} diff --git a/CHANGELOG.md b/CHANGELOG.md index d426b97e0..c2ffbbe92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,27 @@ # Release History +# 3.7.5 (2026-03-02) + +- Fix: Pyarrow concat to now merge with promote options as default (databricks/databricks-sql-python#745 by @jprakash-db) + +# 3.7.4 (2025-04-21) + +- Fix: compatibility with urllib3 versions less than 2.x (databricks/databricks-sql-python#545 by @jprakash-db) + +# 3.7.3 (2025-03-28) + +- Fix: Unable to poll small results in execute_async function (databricks/databricks-sql-python#515 by @jprakash-db) +- Updated log messages to show the status code and error messages of requests (databricks/databricks-sql-python#511 by @jprakash-db) +- Fix: Incorrect metadata was fetched in case of queries with the same alias (databricks/databricks-sql-python#505 by @jprakash-db) + +# 3.7.2 (2025-01-31) + +- Updated the retry_dela_max and retry_timeout (databricks/databricks-sql-python#497 by @jprakash-db) + +# 3.7.1 (2025-01-07) + +- Relaxed the number of Http retry attempts (databricks/databricks-sql-python#486 by @jprakash-db) + # 3.7.0 (2024-12-23) - Fix: Incorrect number of rows fetched in inline results when fetching results with FETCH_NEXT orientation (databricks/databricks-sql-python#479 by @jprakash-db) diff --git a/pyproject.toml b/pyproject.toml index dc13f283a..7303785e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "databricks-sql-connector" -version = "3.7.0" +version = "3.7.5" description = "Databricks SQL Connector for Python" authors = ["Databricks "] license = "Apache-2.0" diff --git a/src/databricks/sql/__init__.py b/src/databricks/sql/__init__.py index eff1e812d..a33b0e6c5 100644 --- a/src/databricks/sql/__init__.py +++ b/src/databricks/sql/__init__.py @@ -68,7 +68,7 @@ def __repr__(self): DATE = DBAPITypeObject("date") ROWID = DBAPITypeObject() -__version__ = "3.7.0" +__version__ = "3.7.5" USER_AGENT_NAME = "PyDatabricksSqlConnector" # These two functions are pyhive legacy diff --git a/src/databricks/sql/auth/retry.py b/src/databricks/sql/auth/retry.py index 0243d0aa2..f7b6fc9da 100755 --- a/src/databricks/sql/auth/retry.py +++ b/src/databricks/sql/auth/retry.py @@ -2,6 +2,7 @@ import random import time import typing +from importlib.metadata import version from enum import Enum from typing import List, Optional, Tuple, Union @@ -290,8 +291,9 @@ def sleep_for_retry(self, response: BaseHTTPResponse) -> bool: else: proposed_wait = self.get_backoff_time() - proposed_wait = min(proposed_wait, self.delay_max) + proposed_wait = max(proposed_wait, self.delay_max) self.check_proposed_wait(proposed_wait) + logger.debug(f"Retrying after {proposed_wait} seconds") time.sleep(proposed_wait) return True @@ -307,8 +309,11 @@ def get_backoff_time(self) -> float: current_attempt = self.stop_after_attempts_count - int(self.total or 0) proposed_backoff = (2**current_attempt) * self.delay_min - if self.backoff_jitter != 0.0: - proposed_backoff += random.random() * self.backoff_jitter + + library_version = version("urllib3") + if int(library_version.split(".")[0]) >= 2: + if self.backoff_jitter != 0.0: + proposed_backoff += random.random() * self.backoff_jitter proposed_backoff = min(proposed_backoff, self.delay_max) self.check_proposed_wait(proposed_backoff) @@ -344,23 +349,24 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: if a retry would violate the configured policy. """ + logger.info(f"Received status code {status_code} for {method} request") + # Request succeeded. Don't retry. if status_code == 200: return False, "200 codes are not retried" if status_code == 401: - raise NonRecoverableNetworkError( - "Received 401 - UNAUTHORIZED. Confirm your authentication credentials." + return ( + False, + "Received 401 - UNAUTHORIZED. Confirm your authentication credentials.", ) if status_code == 403: - raise NonRecoverableNetworkError( - "Received 403 - FORBIDDEN. Confirm your authentication credentials." - ) + return False, "403 codes are not retried" # Request failed and server said NotImplemented. This isn't recoverable. Don't retry. if status_code == 501: - raise NonRecoverableNetworkError("Received code 501 from server.") + return False, "Received code 501 from server." # Request failed and this method is not retryable. We only retry POST requests. if not self._is_method_retryable(method): @@ -399,8 +405,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]: and status_code not in self.status_forcelist and status_code not in self.force_dangerous_codes ): - raise UnsafeToRetryError( - "ExecuteStatement command can only be retried for codes 429 and 503" + return ( + False, + "ExecuteStatement command can only be retried for codes 429 and 503", ) # Request failed with a dangerous code, was an ExecuteStatement, but user forced retries for this diff --git a/src/databricks/sql/auth/thrift_http_client.py b/src/databricks/sql/auth/thrift_http_client.py index 6273ab284..f0daae162 100644 --- a/src/databricks/sql/auth/thrift_http_client.py +++ b/src/databricks/sql/auth/thrift_http_client.py @@ -198,6 +198,12 @@ def flush(self): self.message = self.__resp.reason self.headers = self.__resp.headers + logger.info( + "HTTP Response with status code {}, message: {}".format( + self.code, self.message + ) + ) + @staticmethod def basic_proxy_auth_headers(proxy): if proxy is None or not proxy.username: diff --git a/src/databricks/sql/client.py b/src/databricks/sql/client.py index aefed1ef0..59c74c6ba 100755 --- a/src/databricks/sql/client.py +++ b/src/databricks/sql/client.py @@ -737,6 +737,7 @@ def execute( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ Execute a query and wait for execution to complete. @@ -801,6 +802,7 @@ def execute( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=False, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) self.active_result_set = ResultSet( self.connection, @@ -822,6 +824,7 @@ def execute_async( self, operation: str, parameters: Optional[TParameterCollection] = None, + enforce_embedded_schema_correctness=False, ) -> "Cursor": """ @@ -862,6 +865,7 @@ def execute_async( use_cloud_fetch=self.connection.use_cloud_fetch, parameters=prepared_params, async_op=True, + enforce_embedded_schema_correctness=enforce_embedded_schema_correctness, ) return self @@ -1331,7 +1335,9 @@ def fetchmany_arrow(self, size: int) -> "pyarrow.Table": ): self._fill_results_buffer() partial_results = self.results.next_n_rows(n_remaining_rows) - results = pyarrow.concat_tables([results, partial_results]) + results = pyarrow.concat_tables( + [results, partial_results], promote_options="default" + ) n_remaining_rows -= partial_results.num_rows self._next_row_index += partial_results.num_rows @@ -1387,7 +1393,9 @@ def fetchall_arrow(self) -> "pyarrow.Table": while not self.has_been_closed_server_side and self.has_more_rows: self._fill_results_buffer() partial_results = self.results.remaining_rows() - results = pyarrow.concat_tables([results, partial_results]) + results = pyarrow.concat_tables( + [results, partial_results], promote_options="default" + ) self._next_row_index += partial_results.num_rows return results diff --git a/src/databricks/sql/thrift_backend.py b/src/databricks/sql/thrift_backend.py index 5fbd9f749..972d9a9b2 100644 --- a/src/databricks/sql/thrift_backend.py +++ b/src/databricks/sql/thrift_backend.py @@ -66,8 +66,8 @@ # - 900s attempts-duration lines up w ODBC/JDBC drivers (for cluster startup > 10 mins) _retry_policy = { # (type, default, min, max) "_retry_delay_min": (float, 1, 0.1, 60), - "_retry_delay_max": (float, 30, 5, 3600), - "_retry_stop_after_attempts_count": (int, 5, 1, 60), + "_retry_delay_max": (float, 60, 5, 3600), + "_retry_stop_after_attempts_count": (int, 30, 1, 60), "_retry_stop_after_attempts_duration": (float, 900, 1, 86400), "_retry_delay_default": (float, 5, 1, 60), } @@ -883,6 +883,7 @@ def execute_command( use_cloud_fetch=True, parameters=[], async_op=False, + enforce_embedded_schema_correctness=False, ): assert session_handle is not None @@ -898,8 +899,12 @@ def execute_command( sessionHandle=session_handle, statement=operation, runAsync=True, - getDirectResults=ttypes.TSparkGetDirectResults( - maxRows=max_rows, maxBytes=max_bytes + # For async operation we don't want the direct results + getDirectResults=None + if async_op + else ttypes.TSparkGetDirectResults( + maxRows=max_rows, + maxBytes=max_bytes, ), canReadArrowResult=True if pyarrow else False, canDecompressLZ4Result=lz4_compression, @@ -910,6 +915,7 @@ def execute_command( }, useArrowNativeTypes=spark_arrow_types, parameters=parameters, + enforceEmbeddedSchemaCorrectness=enforce_embedded_schema_correctness, ) resp = self.make_request(self._client.ExecuteStatement, req) diff --git a/src/databricks/sql/utils.py b/src/databricks/sql/utils.py index cd655c4ea..48e0aa77e 100644 --- a/src/databricks/sql/utils.py +++ b/src/databricks/sql/utils.py @@ -265,7 +265,9 @@ def next_n_rows(self, num_rows: int) -> "pyarrow.Table": # Get remaining of num_rows or the rest of the current table, whichever is smaller length = min(num_rows, self.table.num_rows - self.table_row_index) table_slice = self.table.slice(self.table_row_index, length) - results = pyarrow.concat_tables([results, table_slice]) + results = pyarrow.concat_tables( + [results, table_slice], promote_options="default" + ) self.table_row_index += table_slice.num_rows # Replace current table with the next table if we are at the end of the current table @@ -292,7 +294,9 @@ def remaining_rows(self) -> "pyarrow.Table": table_slice = self.table.slice( self.table_row_index, self.table.num_rows - self.table_row_index ) - results = pyarrow.concat_tables([results, table_slice]) + results = pyarrow.concat_tables( + [results, table_slice], promote_options="default" + ) self.table_row_index += table_slice.num_rows self.table = self._create_next_table() self.table_row_index = 0 diff --git a/tests/e2e/common/large_queries_mixin.py b/tests/e2e/common/large_queries_mixin.py index 41ef029bb..5fe292d91 100644 --- a/tests/e2e/common/large_queries_mixin.py +++ b/tests/e2e/common/large_queries_mixin.py @@ -83,18 +83,18 @@ def test_query_with_large_narrow_result_set(self): assert row[0] == row_id def test_long_running_query(self): - """Incrementally increase query size until it takes at least 5 minutes, + """Incrementally increase query size until it takes at least 2 minutes, and asserts that the query completes successfully. """ minutes = 60 - min_duration = 5 * minutes + min_duration = 2 * minutes duration = -1 scale0 = 10000 scale_factor = 1 with self.cursor() as cursor: while duration < min_duration: - assert scale_factor < 512, "Detected infinite loop" + assert scale_factor < 1024, "Detected infinite loop" start = time.time() cursor.execute( @@ -113,5 +113,5 @@ def test_long_running_query(self): duration = time.time() - start current_fraction = duration / min_duration print("Took {} s with scale factor={}".format(duration, scale_factor)) - # Extrapolate linearly to reach 5 min and add 50% padding to push over the limit + # Extrapolate linearly to reach 2 min and add 50% padding to push over the limit scale_factor = math.ceil(1.5 * scale_factor / current_fraction) diff --git a/tests/e2e/common/retry_test_mixins.py b/tests/e2e/common/retry_test_mixins.py index 942955cab..b5d01a45d 100755 --- a/tests/e2e/common/retry_test_mixins.py +++ b/tests/e2e/common/retry_test_mixins.py @@ -121,9 +121,9 @@ class PySQLRetryTestsMixin: # For testing purposes _retry_policy = { "_retry_delay_min": 0.1, - "_retry_delay_max": 5, + "_retry_delay_max": 3, "_retry_stop_after_attempts_count": 5, - "_retry_stop_after_attempts_duration": 10, + "_retry_stop_after_attempts_duration": 30, "_retry_delay_default": 0.5, } @@ -135,7 +135,7 @@ def test_retry_urllib3_settings_are_honored(self): urllib3_config = {"connect": 10, "read": 11, "redirect": 12} rp = DatabricksRetryPolicy( delay_min=0.1, - delay_max=10.0, + delay_max=3, stop_after_attempts_count=10, stop_after_attempts_duration=10.0, delay_default=1.0, @@ -174,14 +174,14 @@ def test_retry_max_count_not_exceeded(self): def test_retry_exponential_backoff(self): """GIVEN the retry policy is configured for reasonable exponential backoff WHEN the server sends nothing but 429 responses with retry-afters - THEN the connector will use those retry-afters values as delay + THEN the connector will use those retry-afters values as floor """ retry_policy = self._retry_policy.copy() retry_policy["_retry_delay_min"] = 1 time_start = time.time() with mocked_server_response( - status=429, headers={"Retry-After": "3"} + status=429, headers={"Retry-After": "8"} ) as mock_obj: with pytest.raises(RequestError) as cm: with self.connection(extra_params=retry_policy) as conn: @@ -191,14 +191,14 @@ def test_retry_exponential_backoff(self): assert isinstance(cm.value.args[1], MaxRetryDurationError) # With setting delay_min to 1, the expected retry delays should be: - # 3, 3, 3, 3 + # 8, 8, 8, 8 # The first 3 retries are allowed, the 4th retry puts the total duration over the limit - # of 10 seconds + # of 30 seconds assert mock_obj.return_value.getresponse.call_count == 4 - assert duration > 6 + assert duration > 24 - # Should be less than 7, but this is a safe margin for CI/CD slowness - assert duration < 10 + # Should be less than 26, but this is a safe margin for CI/CD slowness + assert duration < 30 def test_retry_max_duration_not_exceeded(self): """GIVEN the max attempt duration of 10 seconds diff --git a/tests/e2e/common/staging_ingestion_tests.py b/tests/e2e/common/staging_ingestion_tests.py index 008055e33..39aea3feb 100644 --- a/tests/e2e/common/staging_ingestion_tests.py +++ b/tests/e2e/common/staging_ingestion_tests.py @@ -46,7 +46,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): ) as conn: cursor = conn.cursor() - query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' OVERWRITE" + query = f"PUT '{temp_path}' INTO 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' OVERWRITE" cursor.execute(query) # GET should succeed @@ -57,7 +57,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): extra_params={"staging_allowed_local_path": new_temp_path} ) as conn: cursor = conn.cursor() - query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' TO '{new_temp_path}'" cursor.execute(query) with open(new_fh, "rb") as fp: @@ -67,7 +67,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): # REMOVE should succeed - remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv'" + remove_query = f"REMOVE 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv'" with self.connection(extra_params={"staging_allowed_local_path": "/"}) as conn: cursor = conn.cursor() @@ -79,7 +79,7 @@ def test_staging_ingestion_life_cycle(self, ingestion_user): Error, match="Staging operation over HTTP was unsuccessful: 404" ): cursor = conn.cursor() - query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/15/file1.csv' TO '{new_temp_path}'" + query = f"GET 'stage://tmp/{ingestion_user}/tmp/11/13/file1.csv' TO '{new_temp_path}'" cursor.execute(query) os.remove(temp_path) diff --git a/tests/e2e/test_complex_types.py b/tests/e2e/test_complex_types.py index 446a6b50a..77d634242 100644 --- a/tests/e2e/test_complex_types.py +++ b/tests/e2e/test_complex_types.py @@ -14,7 +14,7 @@ def table_fixture(self, connection_details): # Create the table cursor.execute( """ - CREATE TABLE IF NOT EXISTS pysql_test_complex_types_table ( + CREATE TABLE IF NOT EXISTS pysql_test_complex_types_table_deprecated ( array_col ARRAY, map_col MAP, struct_col STRUCT @@ -24,7 +24,7 @@ def table_fixture(self, connection_details): # Insert a record cursor.execute( """ - INSERT INTO pysql_test_complex_types_table + INSERT INTO pysql_test_complex_types_table_deprecated VALUES ( ARRAY('a', 'b', 'c'), MAP('a', 1, 'b', 2, 'c', 3), @@ -34,7 +34,7 @@ def table_fixture(self, connection_details): ) yield # Clean up the table after the test - cursor.execute("DROP TABLE IF EXISTS pysql_test_complex_types_table") + cursor.execute("DROP TABLE IF EXISTS pysql_test_complex_types_table_deprecated") @pytest.mark.parametrize( "field,expected_type", @@ -45,7 +45,7 @@ def test_read_complex_types_as_arrow(self, field, expected_type, table_fixture): with self.cursor() as cursor: result = cursor.execute( - "SELECT * FROM pysql_test_complex_types_table LIMIT 1" + "SELECT * FROM pysql_test_complex_types_table_deprecated LIMIT 1" ).fetchone() assert isinstance(result[field], expected_type) @@ -57,7 +57,7 @@ def test_read_complex_types_as_string(self, field, table_fixture): extra_params={"_use_arrow_native_complex_types": False} ) as cursor: result = cursor.execute( - "SELECT * FROM pysql_test_complex_types_table LIMIT 1" + "SELECT * FROM pysql_test_complex_types_table_deprecated LIMIT 1" ).fetchone() assert isinstance(result[field], str) diff --git a/tests/e2e/test_driver.py b/tests/e2e/test_driver.py index 2f0881cda..45fea480b 100644 --- a/tests/e2e/test_driver.py +++ b/tests/e2e/test_driver.py @@ -177,19 +177,22 @@ def test_cloud_fetch(self): for i in range(len(cf_result)): assert cf_result[i] == noop_result[i] - def test_execute_async(self): - def isExecuting(operation_state): - return not operation_state or operation_state in [ - ttypes.TOperationState.RUNNING_STATE, - ttypes.TOperationState.PENDING_STATE, - ] + +class TestPySQLAsyncQueriesSuite(PySQLPytestTestCase): + def isExecuting(self, operation_state): + return not operation_state or operation_state in [ + ttypes.TOperationState.RUNNING_STATE, + ttypes.TOperationState.PENDING_STATE, + ] + + def test_execute_async__long_running(self): long_running_query = "SELECT COUNT(*) FROM RANGE(10000 * 16) x JOIN RANGE(10000) y ON FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') LIKE '%not%a%date%'" with self.cursor() as cursor: cursor.execute_async(long_running_query) ## Polling after every POLLING_INTERVAL seconds - while isExecuting(cursor.get_query_state()): + while self.isExecuting(cursor.get_query_state()): time.sleep(self.POLLING_INTERVAL) log.info("Polling the status in test_execute_async") @@ -198,6 +201,55 @@ def isExecuting(operation_state): assert result[0].asDict() == {"count(1)": 0} + def test_execute_async__small_result(self): + small_result_query = "SELECT 1" + + with self.cursor() as cursor: + cursor.execute_async(small_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert result[0].asDict() == {"1": 1} + + def test_execute_async__large_result(self): + x_dimension = 1000 + y_dimension = 1000 + large_result_query = f""" + SELECT + x.id AS x_id, + y.id AS y_id, + FROM_UNIXTIME(x.id * y.id, 'yyyy-MM-dd') AS date + FROM + RANGE({x_dimension}) x + JOIN + RANGE({y_dimension}) y + """ + + with self.cursor() as cursor: + cursor.execute_async(large_result_query) + + ## Fake sleep for 5 secs + time.sleep(5) + + ## Polling after every POLLING_INTERVAL seconds + while self.isExecuting(cursor.get_query_state()): + time.sleep(self.POLLING_INTERVAL) + log.info("Polling the status in test_execute_async") + + cursor.get_async_execution_result() + result = cursor.fetchall() + + assert len(result) == x_dimension * y_dimension + # Exclude Retry tests because they require specific setups, and LargeQueries too slow for core # tests diff --git a/tests/e2e/test_parameterized_queries.py b/tests/e2e/test_parameterized_queries.py index d346ad5c6..ddf8b33fd 100644 --- a/tests/e2e/test_parameterized_queries.py +++ b/tests/e2e/test_parameterized_queries.py @@ -123,7 +123,7 @@ def inline_table(self, connection_details): """ query = """ - CREATE TABLE IF NOT EXISTS pysql_e2e_inline_param_test_table ( + CREATE TABLE IF NOT EXISTS pysql_e2e_inline_param_test_table_deprecated ( null_col INT, int_col INT, bigint_col BIGINT, @@ -167,9 +167,9 @@ def _inline_roundtrip(self, params: dict, paramstyle: ParamStyle): This is a no-op but is included to make the test-code easier to read. """ target_column = self._get_inline_table_column(params.get("p")) - INSERT_QUERY = f"INSERT INTO pysql_e2e_inline_param_test_table (`{target_column}`) VALUES (%(p)s)" - SELECT_QUERY = f"SELECT {target_column} `col` FROM pysql_e2e_inline_param_test_table LIMIT 1" - DELETE_QUERY = "DELETE FROM pysql_e2e_inline_param_test_table" + INSERT_QUERY = f"INSERT INTO pysql_e2e_inline_param_test_table_deprecated (`{target_column}`) VALUES (%(p)s)" + SELECT_QUERY = f"SELECT {target_column} `col` FROM pysql_e2e_inline_param_test_table_deprecated LIMIT 1" + DELETE_QUERY = "DELETE FROM pysql_e2e_inline_param_test_table_deprecated" with self.connection(extra_params={"use_inline_params": True}) as conn: with conn.cursor() as cursor: diff --git a/tests/unit/test_cloud_fetch_queue.py b/tests/unit/test_cloud_fetch_queue.py index 01d8a79b9..7f19f5a57 100644 --- a/tests/unit/test_cloud_fetch_queue.py +++ b/tests/unit/test_cloud_fetch_queue.py @@ -185,7 +185,7 @@ def test_next_n_rows_more_than_one_table(self, mock_create_next_table): assert ( result == pyarrow.concat_tables( - [self.make_arrow_table(), self.make_arrow_table()] + [self.make_arrow_table(), self.make_arrow_table()],promote_options="default" )[:7] ) @@ -309,7 +309,7 @@ def test_remaining_rows_multiple_tables_fully_returned( assert ( result == pyarrow.concat_tables( - [self.make_arrow_table(), self.make_arrow_table()] + [self.make_arrow_table(), self.make_arrow_table()],promote_options="default" )[3:] ) diff --git a/tests/unit/test_retry.py b/tests/unit/test_retry.py index 1e18e1f43..897a1d111 100644 --- a/tests/unit/test_retry.py +++ b/tests/unit/test_retry.py @@ -34,8 +34,11 @@ def test_sleep__no_retry_after(self, t_mock, retry_policy, error_history): retry_policy.history = [error_history, error_history] retry_policy.sleep(HTTPResponse(status=503)) - expected_backoff_time = self.calculate_backoff_time( - 0, retry_policy.delay_min, retry_policy.delay_max + expected_backoff_time = max( + self.calculate_backoff_time( + 0, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) t_mock.assert_called_with(expected_backoff_time) @@ -54,8 +57,11 @@ def test_sleep__no_retry_after_header__multiple_retries(self, t_mock, retry_poli expected_backoff_times = [] for attempt in range(num_attempts): expected_backoff_times.append( - self.calculate_backoff_time( - attempt, retry_policy.delay_min, retry_policy.delay_max + max( + self.calculate_backoff_time( + attempt, retry_policy.delay_min, retry_policy.delay_max + ), + retry_policy.delay_max, ) ) @@ -77,10 +83,3 @@ def test_excessive_retry_attempts_error(self, t_mock, retry_policy): retry_policy.sleep(HTTPResponse(status=503)) # Internally urllib3 calls the increment function generating a new instance for every retry retry_policy = retry_policy.increment() - - @patch("time.sleep") - def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history): - retry_policy._retry_start_time = time.time() - retry_policy.history = [error_history, error_history, error_history] - retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"})) - t_mock.assert_called_with(3)