diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index cc93cff..268ea1c 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1,6 +1,6 @@ # These are supported funding model platforms -github: # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2] +github: [pcisar] patreon: # Replace with a single Patreon username open_collective: # Replace with a single Open Collective username ko_fi: # Replace with a single Ko-fi username @@ -12,4 +12,4 @@ lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cl polar: # Replace with a single Polar username buy_me_a_coffee: # Replace with a single Buy Me a Coffee username thanks_dev: # Replace with a single thanks.dev username -custom: https://firebirdsql.org/en/donate/ +custom: # https://firebirdsql.org/en/donate/ diff --git a/CHANGELOG.md b/CHANGELOG.md index a570257..57c5dea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,29 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [2.0.3] - 2026-04-20 + +### Fixed + +- #65: Segmentation fault when close connection or cursor +- #63: Exception ignored in: function Connection.__del__ +- #58: readline_timed() method appending \n to data received +- #56: Variable 'dsn' needs to be initialized in create_database() +- #53: Loss of scale when reading numeric with zero value +- #51: Exception ignored in: function Connection.__del__ ... connection shutdown + +## [2.0.2] - 2025-05-21 + +### Fixed + +- #49: database host configuration not work with version 2 + +## [2.0.1] - 2025-05-05 + +### Fixed + +- #48: AttributeError object has no attribute 'logging_id' in "__del__" methods + ## [2.0.0] - 2025-04-30 ### Fixed diff --git a/README.md b/README.md index dab83da..2f57d17 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![PyPI - Downloads](https://img.shields.io/pypi/dm/firebird-driver)](https://pypi.org/project/firebird-driver) [![Libraries.io SourceRank](https://img.shields.io/librariesio/sourcerank/pypi/firebird-driver)](https://libraries.io/pypi/firebird-driver) +[![Ask DeepWiki](https://deepwiki.com/badge.svg)](https://deepwiki.com/FirebirdSQL/python3-driver) This package provides official Python Database API 2.0-compliant driver for the open source relational database Firebird®. In addition to the minimal feature set of @@ -53,7 +54,7 @@ This project is using [pytest](https://docs.pytest.org/en/stable/) for testing, tests add several options via `tests/conftest.py`. By default, tests are configured to use local Firebird installation via network access. -To use local instllation in `mebedded` mode, comment out the section: +To use local installation in `embedded` mode, comment out the section: ``` [tool.hatch.envs.hatch-test] extra-args = ["--host=localhost"] diff --git a/docs/changelog.txt b/docs/changelog.txt index 78831c0..757ea57 100644 --- a/docs/changelog.txt +++ b/docs/changelog.txt @@ -2,6 +2,26 @@ Changelog ######### +Version 2.0.3 +============= + +- #65: Segmentation fault when close connection or cursor +- #63: Exception ignored in: function Connection.__del__ +- #58: readline_timed() method appending \n to data received +- #56: Variable 'dsn' needs to be initialized in create_database() +- #53: Loss of scale when reading numeric with zero value +- #51: Exception ignored in: function Connection.__del__ ... connection shutdown + +Version 2.0.2 +============= + +- #49: database host configuration not work with version 2 + +Version 2.0.1 +============= + +- #48: AttributeError object has no attribute 'logging_id' in "__del__" methods + Version 2.0.0 ============= diff --git a/docs/index.txt b/docs/index.txt index 1d59c97..3a5e65f 100644 --- a/docs/index.txt +++ b/docs/index.txt @@ -18,7 +18,7 @@ written by Helen Borrie and published by IBPhoenix_. .. seealso:: `firebird-lib`_ package for optional extensions to this driver. -.. note:: Requires Python 3.8+ +.. note:: Requires Python 3.11+ .. tip:: You can download docset for Dash_ (MacOS) or Zeal_ (Windows / Linux) documentation readers from releases_ at github. diff --git a/pyproject.toml b/pyproject.toml index c6e6c2b..b4d010a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,11 +55,23 @@ allow-direct-references = true dependencies = [ ] +[tool.hatch.envs.gdocs] +dependencies = [ + "great-docs", +] + +[tool.hatch.envs.gdocs.scripts] +build = "great-docs build" +preview = "great-docs preview" + [tool.hatch.envs.hatch-test] extra-args = ["--host=localhost"] +extra-dependencies = [ + "packaging>=25.0", +] [[tool.hatch.envs.hatch-test.matrix]] -python = ["3.11", "3.12", "3.13"] +python = ["3.11", "3.12", "3.13", "3.14"] [tool.hatch.envs.doc] detached = false @@ -119,7 +131,7 @@ ban-relative-imports = "all" [tool.ruff.lint.per-file-ignores] # Tests can use magic values, assertions, and relative imports -"tests/**/*" = ["PLR2004", "S101", "TID252"] +"test_*" = ["PLR2004", "S101", "TID252"] "fbapi.py" = ["N801", "E501"] "interfaces.py" = ["ARG001", "ARG002", "N801", "N803", "E501", "FBT001"] "hooks.py" = ["F401"] diff --git a/src/firebird/driver/__init__.py b/src/firebird/driver/__init__.py index 7a9e7b9..315b540 100644 --- a/src/firebird/driver/__init__.py +++ b/src/firebird/driver/__init__.py @@ -133,4 +133,4 @@ ) #: Current driver version, SEMVER string. -__VERSION__ = '2.0.0' +__VERSION__ = '2.0.3' diff --git a/src/firebird/driver/core.py b/src/firebird/driver/core.py index 6bf9bdd..211188c 100644 --- a/src/firebird/driver/core.py +++ b/src/firebird/driver/core.py @@ -53,6 +53,7 @@ import sys import threading import weakref +from urllib.parse import urlparse from abc import ABC, abstractmethod from collections.abc import Callable, Mapping, Sequence from ctypes import addressof, byref, create_string_buffer, memmove, memset, pointer, string_at @@ -1795,10 +1796,11 @@ def __init__(self, att: iAttachment, dsn: str, dpb: bytes | None=None, sql_diale self.__FIREBIRD_LIB__ = None def __del__(self): if not self.is_closed(): - warn(f"Connection '{self.logging_id}' disposed without prior close()", ResourceWarning) + warn("Connection disposed without prior close()", ResourceWarning) self._close() self._close_internals() - self._att.detach() + with contextlib.suppress(DatabaseError): + self._att.detach() def __enter__(self) -> Self: return self def __exit__(self, exc_type, exc_value, traceback) -> None: @@ -2179,6 +2181,75 @@ def _connect_helper(dsn: str, host: str, port: str, database: str, protocol: Net dsn += database return dsn +def _is_dsn(value: str) -> bool: + """ + Checks if the given string matches known patterns for Firebird DSNs. + + This function analyzes the string for structures that are typical for + Firebird connection strings, based on how the firebird-driver might + construct them (per _connect_helper) or how the Firebird client + library generally interprets them. + + Args: + value: The string to check. + + Returns: + True if the string matches a DSN pattern, False otherwise. + """ + if not isinstance(value, str) or not value.strip(): + # Empty or whitespace-only strings are not DSNs + return False + + # 1. Protocol-based DSNs (e.g., inet://localhost/employee) + # These are directly produced by _connect_helper if a protocol is specified. + try: + parsed = urlparse(value) + if parsed.scheme in [p.name.lower() for p in NetProtocol]: + # A scheme-based DSN must have a network location (host/port) or a path (database part). + # parsed.path.lstrip('/') handles cases like "xnet:///path/to/db" where netloc is empty. + if parsed.netloc or (parsed.path and parsed.path.lstrip('/')): + return True + except ValueError: + # urlparse can raise ValueError for some malformed inputs (e.g. "::1") + # These are unlikely to be scheme-based DSNs in the firebird context. + pass + + # 2. Windows Named Pipes (e.g., \\server\pipe_name or \\server@port\pipe_name) + # These are indicated by starting with \\. + if value.startswith("\\\\"): + # Basic check: must have some content after \\, and not be just \\ or \\\ + if len(value) > 2 and value[2] not in ['\\', '/']: + return True + + # 3. Classic host:database or host/port:database syntax + # (e.g., localhost:employee, server/3050:/data/db.fdb) + # This pattern should not be confused with "C:\path" or "http://..." + colon_idx = value.find(':') + if colon_idx > 0 and "://" not in value[:colon_idx]: # Colon exists, not at start, and not part of a scheme + host_spec = value[:colon_idx] + # db_spec = value[colon_idx+1:] # Not strictly needed for this check + + # Avoid misinterpreting "C:\path" as "host C, db \path". + # If host_spec is a single letter (like a drive) AND the char after colon is a path separator, + # it's more likely an absolute path. os.path.isabs handles these better. + is_windows_drive_abs_path_candidate = ( + len(host_spec) == 1 and host_spec[0].isalpha() and + len(value) > colon_idx + 1 and value[colon_idx+1] in ('/', '\\') + ) + + if not is_windows_drive_abs_path_candidate: + # host_spec should not contain backslashes if it's a hostname. + # It can contain a forward slash for host/port. + if '\\' not in host_spec: + if '/' in host_spec: # Potential host/port:db + # e.g., "server/3050:dbname" + parts = host_spec.split('/', 1) # Split only on the first / + if len(parts) == 2 and parts[0] and parts[1].isdigit(): # host part and port part (digits) + return True + elif host_spec: # Plain host:db, ensure host_spec is not empty + # e.g., "localhost:dbname", "localhost:/path/to/db", "localhost:C:relative_path_on_C" + return True + return False def __make_connection(dsn: str, utf8filename: bool, dpb: bytes, sql_dialect: int, charset: str, # noqa: FBT001 crypt_callback: iCryptKeyCallbackImpl, *, create: bool) -> Connection: with a.get_api().master.get_dispatcher() as provider: @@ -2244,13 +2315,14 @@ def connect(database: str | Path, *, user: str | None=None, password: str | None if isinstance(database, Path): database = str(database) db_config: DatabaseConfig = driver_config.get_database(database) + dsn: str | None = None if db_config is None: db_config = driver_config.db_defaults - # we'll assume that 'database' is 'dsn' - dsn = database - database = None srv_config = driver_config.server_defaults - srv_config.host.clear() + if _is_dsn(database): + dsn = database + database = None + srv_config.host.clear() else: database = db_config.database.value dsn = db_config.dsn.value @@ -2326,13 +2398,14 @@ def create_database(database: str | Path, *, user: str | None=None, password: st if isinstance(database, Path): database = str(database) db_config: DatabaseConfig = driver_config.get_database(database) + dsn: str | None = None if db_config is None: db_config = driver_config.db_defaults - # we'll assume that 'database' is 'dsn' - dsn = database - database = None srv_config = driver_config.server_defaults - srv_config.host.clear() + if _is_dsn(database): + dsn = database + database = None + srv_config.host.clear() else: database = db_config.database.value dsn = db_config.dsn.value @@ -2537,7 +2610,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() def __del__(self): if self._tra is not None: - warn(f"Transaction '{self.logging_id}' disposed while active", ResourceWarning) + warn("Transaction disposed while active", ResourceWarning) self._finish() def __dead_con(self, obj: Connection) -> None: # noqa: ARG002 self._connection = None @@ -2947,12 +3020,8 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self.free() def __del__(self): if self._in_meta or self._out_meta or self._istmt: - warn(f"Statement '{self.logging_id}' disposed without prior free()", ResourceWarning) + warn("Statement disposed without prior free()", ResourceWarning) self.free() - def __str__(self): - return f'{self.logging_id}[{self.sql}]' - def __repr__(self): - return str(self) def __dead_con(self, obj: Connection) -> None: # noqa: ARG002 self._connection = None def __get_plan(self, *, detailed: bool) -> str: @@ -3085,10 +3154,8 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() def __del__(self): if self._blob is not None: - warn(f"BlobReader '{self.logging_id}' disposed without prior close()", ResourceWarning) + warn("BlobReader disposed without prior close()", ResourceWarning) self.close() - def __repr__(self): - return f'{self.logging_id}[size={self.length}]' def flush(self) -> None: """Does nothing. """ @@ -3292,7 +3359,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() def __del__(self): if self._result is not None or self._stmt is not None or self.__blob_readers: - warn(f"Cursor '{self.logging_id}' disposed without prior close()", ResourceWarning) + warn("Cursor disposed without prior close()", ResourceWarning) self.close() def __next__(self): if (row := self.fetchone()) is not None: @@ -3327,7 +3394,7 @@ def _extract_db_array_to_list(self, esize: int, dtype: int, subtype: int, elif dtype in (a.blr_short, a.blr_long, a.blr_int64): val = (0).from_bytes(buf[bufpos:bufpos + esize], 'little', signed=True) if subtype or scale: - val = decimal.Decimal(val) / _ten_to[abs(scale)] + val = decimal.Decimal(val).scaleb(-abs(scale)) elif dtype == a.blr_bool: val = (0).from_bytes(buf[bufpos:bufpos + esize], 'little') == 1 elif dtype == a.blr_float: @@ -3734,7 +3801,7 @@ def _unpack_output(self) -> tuple: value = (0).from_bytes(buffer[offset:offset + length], 'little', signed=True) # It's scalled integer? if desc.subtype or desc.scale: - value = decimal.Decimal(value) / _ten_to[abs(desc.scale)] + value = decimal.Decimal(value).scaleb(-abs(desc.scale)) elif datatype == SQLDataType.DATE: value = _util.decode_date(buffer[offset:offset+length]) elif datatype == SQLDataType.TIME: @@ -3899,7 +3966,13 @@ def _execute(self, operation: str | Statement, in_meta.release() def _clear(self) -> None: if self._result is not None: - self._result.close() + if self._stmt is not None and self._stmt._istmt is not None: + self._result.close() + else: + # Statement was already freed; the result set is invalidated + # at the Firebird API level, so we must not call close() on it. + # Also prevent __del__ from calling release() on the invalid interface. + self._result._refcnt = 0 self._result = None self._name = None self._last_fetch_status = None @@ -4213,7 +4286,8 @@ def description(self) -> tuple[DESCRIPTION]: elif meta.datatype == SQLDataType.INT64: vtype = int dispsize = 20 - elif meta.datatype in (SQLDataType.FLOAT, SQLDataType.D_FLOAT, SQLDataType.DOUBLE): + elif meta.datatype in (SQLDataType.FLOAT, SQLDataType.D_FLOAT, SQLDataType.DOUBLE, + SQLDataType.DEC16, SQLDataType.DEC34): # Special case, dialect 1 DOUBLE/FLOAT # could be Fixed point if (self._stmt._dialect < 3) and meta.scale: @@ -4226,13 +4300,13 @@ def description(self) -> tuple[DESCRIPTION]: vtype = str if meta.subtype == 1 else bytes scale = meta.subtype dispsize = 0 - elif meta.datatype == SQLDataType.TIMESTAMP: + elif meta.datatype in (SQLDataType.TIMESTAMP, SQLDataType.TIMESTAMP_TZ, SQLDataType.TIMESTAMP_TZ_EX): vtype = datetime.datetime dispsize = 22 elif meta.datatype == SQLDataType.DATE: vtype = datetime.date dispsize = 10 - elif meta.datatype == SQLDataType.TIME: + elif meta.datatype in (SQLDataType.TIME, SQLDataType.TIME_TZ, SQLDataType.TIME_TZ_EX): vtype = datetime.time dispsize = 11 elif meta.datatype == SQLDataType.ARRAY: @@ -5547,7 +5621,7 @@ def __exit__(self, exc_type, exc_value, traceback) -> None: self.close() def __del__(self): if self._svc is not None: - warn(f"Server '{self.logging_id}' disposed without prior close()", ResourceWarning) + warn("Server disposed without prior close()", ResourceWarning) self.close() def __next__(self): if (line := self.readline()) is not None: @@ -5656,9 +5730,11 @@ def readline_timed(self, timeout: int) -> str | Sentinel | None: data = self.response.read_sized_string(encoding=self.encoding, errors=self.encoding_errors) if self.response.get_tag() == SrvInfoCode.TIMEOUT: return TIMEOUT - if data: - return data + '\n' - return None + # read_sized_string() returns lines ending with '\r ' (CR + space). + # Strip the trailing space to get proper '\r' line endings. + if data and data.endswith('\r '): + data = data[:-1] # Remove space, keep '\r' + return data if data else None def readline(self) -> str | None: """Get next line of textual output from last service query. @@ -5785,11 +5861,11 @@ def connect_server(server: str, *, user: str | None=None, password: str | None=N srv_config = driver_config.get_server(server) if srv_config is None: srv_config = driver_config.server_defaults - host = server or None - port = None - else: - host = srv_config.host.value - port = srv_config.port.value + #host = server or None + #port = None + #else: + host = srv_config.host.value + port = srv_config.port.value if host is None: host = 'service_mgr' if not host.endswith('service_mgr'): diff --git a/tests/conftest.py b/tests/conftest.py index 45776d9..14d72e9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -122,7 +122,7 @@ def pytest_configure(config): client_lib = Path(client_lib) if not client_lib.is_file(): pytest.exit(f"Client library '{client_lib}' not found!") - driver_config.fb_client_library.value = client_lib + driver_config.fb_client_library.value = str(client_lib) # if host := config.getoption('host'): _vars_['host'] = host diff --git a/tests/test_connection.py b/tests/test_connection.py index a6d6e50..b5a035a 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -27,7 +27,8 @@ import firebird.driver as driver from firebird.driver.types import ImpData, ImpDataOld from firebird.driver import (NetProtocol, connect, Isolation, tpb, DefaultAction, - DbInfoCode, DbWriteMode, DbAccessMode, DbSpaceReservation) + DbInfoCode, DbWriteMode, DbAccessMode, DbSpaceReservation, + driver_config) def test_connect_helper(): DB_LINUX_PATH = '/path/to/db/employee.fdb' @@ -376,3 +377,177 @@ def test_db_info(db_connection, fb_vars, db_file): guid = con.info.get_info(DbInfoCode.DB_GUID) assert isinstance(guid, str) assert len(guid) == 38 # Example check for {GUID} format + +def test_connect_with_driver_config_server_defaults_local(driver_cfg, db_file, fb_vars): + """ + Tests connect() using driver_config.server_defaults for a local connection. + The database alias registered for this test will have its 'server' attribute + set to None, which means it should pick up settings from server_defaults. + """ + db_alias = "pytest_cfg_local_db" + db_path_str = str(db_file) + + # Save original server_defaults to restore them, though driver_cfg fixture handles full reset + original_s_host = driver_config.server_defaults.host.value + original_s_port = driver_config.server_defaults.port.value + original_s_user = driver_config.server_defaults.user.value + original_s_password = driver_config.server_defaults.password.value + + # Configure server_defaults for a local connection + driver_config.server_defaults.host.value = None + driver_config.server_defaults.port.value = None # Explicitly None for local + driver_config.server_defaults.user.value = fb_vars['user'] + driver_config.server_defaults.password.value = fb_vars['password'] + + # Ensure the test-specific DB alias is clean if it exists from a prior failed run + if driver_config.get_database(db_alias): + driver_config.databases.value = [db_cfg for db_cfg in driver_config.databases.value if db_cfg.name != db_alias] + + # Register a database alias that will use these server_defaults + test_db_config_entry = driver_config.register_database(db_alias) + test_db_config_entry.database.value = db_path_str + test_db_config_entry.server.value = None # Key: This tells driver to use server_defaults + + # For a local connection (host=None, port=None), DSN is just the database path + expected_dsn = db_path_str + + conn = None + try: + conn = driver.connect(db_alias, charset='UTF8') + assert conn._att is not None, "Connection attachment failed" + assert conn.dsn == expected_dsn, f"Expected DSN '{expected_dsn}', got '{conn.dsn}'" + + # Verify connection is usable with a simple query + with conn.cursor() as cur: + cur.execute("SELECT 1 FROM RDB$DATABASE") + assert cur.fetchone()[0] == 1, "Query failed on the connection" + finally: + if conn and not conn.is_closed(): + conn.close() + # Restore original server_defaults values (driver_cfg also handles full reset) + driver_config.server_defaults.host.value = original_s_host + driver_config.server_defaults.port.value = original_s_port + driver_config.server_defaults.user.value = original_s_user + driver_config.server_defaults.password.value = original_s_password + + +def test_connect_with_driver_config_server_defaults_remote(driver_cfg, db_file, fb_vars): + """ + Tests connect() using driver_config.server_defaults for a remote-like connection. + This test relies on fb_vars providing a host (and optionally port) from conftest.py. + If no host is configured in fb_vars, this test variant is skipped. + """ + db_alias = "pytest_cfg_remote_db" + db_path_str = str(db_file) + + test_host = fb_vars.get('host') + test_port = fb_vars.get('port') # Can be None or empty string + + if not test_host: + pytest.skip("Skipping remote server_defaults test as no host is configured in fb_vars. " + "This test requires a configured host (and optionally port) for execution.") + return + + # Save original server_defaults + original_s_host = driver_config.server_defaults.host.value + original_s_port = driver_config.server_defaults.port.value + original_s_user = driver_config.server_defaults.user.value + original_s_password = driver_config.server_defaults.password.value + + # Configure server_defaults for a "remote" connection + driver_config.server_defaults.host.value = test_host + driver_config.server_defaults.port.value = str(test_port) if test_port else None + driver_config.server_defaults.user.value = fb_vars['user'] + driver_config.server_defaults.password.value = fb_vars['password'] + + # Ensure the test-specific DB alias is clean + if driver_config.get_database(db_alias): + driver_config.databases.value = [db_cfg for db_cfg in driver_config.databases.value if db_cfg.name != db_alias] + + test_db_config_entry = driver_config.register_database(db_alias) + test_db_config_entry.database.value = db_path_str + test_db_config_entry.server.value = None # Use server_defaults + + # Determine expected DSN based on _connect_helper logic for non-protocol DSNs + if test_host.startswith("\\\\"): # Windows Named Pipes + if test_port: + expected_dsn = f"{test_host}@{test_port}\\{db_path_str}" + else: + expected_dsn = f"{test_host}\\{db_path_str}" + elif test_port: # TCP/IP with port + expected_dsn = f"{test_host}/{test_port}:{db_path_str}" + else: # TCP/IP without port (or other local-like with host) + expected_dsn = f"{test_host}:{db_path_str}" + + conn = None + try: + conn = driver.connect(db_alias, charset='UTF8') + assert conn._att is not None, "Connection attachment failed" + assert conn.dsn == expected_dsn, f"Expected DSN '{expected_dsn}', got '{conn.dsn}'" + + with conn.cursor() as cur: + cur.execute("SELECT 1 FROM RDB$DATABASE") + assert cur.fetchone()[0] == 1, "Query failed on the connection" + finally: + if conn and not conn.is_closed(): + conn.close() + # Restore original server_defaults + driver_config.server_defaults.host.value = original_s_host + driver_config.server_defaults.port.value = original_s_port + driver_config.server_defaults.user.value = original_s_user + driver_config.server_defaults.password.value = original_s_password + +def test_connect_with_driver_config_db_defaults_local(driver_cfg, db_file, fb_vars): + """ + Tests connect() when db_defaults provides the database path, and + server_defaults provides local connection info (host=None, port=None). + Here, connect() is called with a DSN-like string that is *not* a registered alias. + """ + db_path_str = str(db_file) # This will be our "DSN" to connect to + + # Save original defaults + original_s_host = driver_config.server_defaults.host.value + original_s_port = driver_config.server_defaults.port.value + original_s_user = driver_config.server_defaults.user.value + original_s_password = driver_config.server_defaults.password.value + original_db_database = driver_config.db_defaults.database.value + original_db_server = driver_config.db_defaults.server.value + + + # Configure server_defaults for local connection + driver_config.server_defaults.host.value = None + driver_config.server_defaults.port.value = None + driver_config.server_defaults.user.value = fb_vars['user'] + driver_config.server_defaults.password.value = fb_vars['password'] + + # Configure db_defaults (it won't be used for database path if DSN is absolute path) + # but it's good to ensure it's set to something known for the test. + # The key here is that if connect(db_path_str) is called and db_path_str is + # an absolute path, it's treated as the DSN. Server info then comes from + # server_defaults IF db_path_str is NOT a full DSN with host/port. + # If db_path_str is an absolute path, it's treated as the direct database target. + driver_config.db_defaults.database.value = "some_default_db_ignore" # Should not be used if DSN is absolute + driver_config.db_defaults.server.value = None # Use server_defaults + + expected_dsn = db_path_str # For local connection with absolute path, DSN is the path + + conn = None + try: + # Connect using the absolute path as the DSN + conn = driver.connect(db_path_str, charset='UTF8') + assert conn._att is not None, "Connection attachment failed" + assert conn.dsn == expected_dsn, f"Expected DSN '{expected_dsn}', got '{conn.dsn}'" + + with conn.cursor() as cur: + cur.execute("SELECT 1 FROM RDB$DATABASE") + assert cur.fetchone()[0] == 1, "Query failed on the connection" + finally: + if conn and not conn.is_closed(): + conn.close() + # Restore originals + driver_config.server_defaults.host.value = original_s_host + driver_config.server_defaults.port.value = original_s_port + driver_config.server_defaults.user.value = original_s_user + driver_config.server_defaults.password.value = original_s_password + driver_config.db_defaults.database.value = original_db_database + driver_config.db_defaults.server.value = original_db_server diff --git a/tests/test_insert_data.py b/tests/test_insert_data.py index d70d51d..336228b 100644 --- a/tests/test_insert_data.py +++ b/tests/test_insert_data.py @@ -49,7 +49,7 @@ def utf8_connection(dsn): def test_insert_integers(db_connection): with db_connection.cursor() as cur: - cur.execute('insert into T2 (C1,C2,C3) values (?,?,?)', [1, 1, 1]) + cur.execute('insert into T2 (C1,C2,C3) values (?,?,?)', ['1', '1', '1']) db_connection.commit() cur.execute('select C1,C2,C3 from T2 where C1 = 1') rows = cur.fetchall() @@ -97,12 +97,12 @@ def test_insert_datetime(db_connection): # Insert from string (driver handles conversion if possible, though explicit types are better) # Note: Microsecond separator might vary based on driver/server locale. Use types. - # cur.execute('insert into T2 (C1,C6,C7,C8) values (?,?,?,?)', [4, '2011-11-13', '15:0:1.200', '2011-11-13 15:0:1.2000']) - # db_connection.commit() - # cur.execute('select C1,C6,C7,C8 from T2 where C1 = 4') - # rows = cur.fetchall() - # assert rows == [(4, datetime.date(2011, 11, 13), datetime.time(15, 0, 1, 200000), - # datetime.datetime(2011, 11, 13, 15, 0, 1, 200000))] + cur.execute('insert into T2 (C1,C6,C7,C8) values (?,?,?,?)', [4, '2011-11-13', '15:0:1.200', '2011-11-13 15:0:1.2000']) + db_connection.commit() + cur.execute('select C1,C6,C7,C8 from T2 where C1 = 4') + rows = cur.fetchall() + assert rows == [(4, datetime.date(2011, 11, 13), datetime.time(15, 0, 1, 200000), + datetime.datetime(2011, 11, 13, 15, 0, 1, 200000))] # encode date before 1859-11-17 produce a negative number diff --git a/tests/test_issues.py b/tests/test_issues.py index c52fbaa..cdfc3f2 100644 --- a/tests/test_issues.py +++ b/tests/test_issues.py @@ -31,3 +31,37 @@ def test_issue_02(db_connection): cur.execute('select C1,C2,C3 from T2 where C1 = 1') rows = cur.fetchall() assert rows == [(1, None, 1)] + +def test_issue_53(db_connection): + with db_connection.cursor() as cur: + cur.execute("select cast('0.00' as numeric(9,2)) from rdb$database") + numeric_val = cur.fetchone()[0] + numeric_val_exponent = numeric_val.as_tuple()[2] + db_connection.commit() + assert numeric_val_exponent == -2 + +def test_issue_65_prepare_ctx_mgr(db_connection): + """Freeing a Statement via context manager must not crash when cursor/connection closes.""" + with db_connection.cursor() as cur: + with cur.prepare('select count(*) from country where 1 < ?') as stmt: + row = cur.execute(stmt, (2,)).fetchone() + assert row is not None + +def test_issue_65_free_then_cursor_close(db_connection): + """Explicit stmt.free() followed by cursor.close() must not crash.""" + cur = db_connection.cursor() + stmt = cur.prepare('select count(*) from country where 1 < ?') + row = cur.execute(stmt, (2,)).fetchone() + assert row is not None + stmt.free() + cur.close() + +def test_issue_65_free_then_conn_close(dsn): + """stmt.free() followed by connection close must not crash.""" + from firebird.driver import connect + with connect(dsn) as conn: + cur = conn.cursor() + stmt = cur.prepare('select count(*) from country where 1 < ?') + row = cur.execute(stmt, (2,)).fetchone() + assert row is not None + stmt.free() diff --git a/tests/test_server.py b/tests/test_server.py index a3c54fa..93e1e7f 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -87,7 +87,7 @@ def test_query(server_connection, fb_vars, dsn, db_file): elif version in SpecifierSet('>=4.0'): assert sec_db.endswith('SECURITY4.FDB') else: # FB30 - assert sec_db.endswith('SECURITY53.FDB') + assert sec_db.endswith('SECURITY3.FDB') assert isinstance(svc.info.lock_directory, str) # Path can vary caps = svc.info.capabilities