|
31 | 31 | from sqlalchemy.pool import QueuePool |
32 | 32 | from sqlalchemy.sql import column |
33 | 33 | from sqlalchemy.sql import literal |
| 34 | +from sqlalchemy.sql.elements import literal_column |
34 | 35 | from sqlalchemy.testing import assert_raises |
35 | 36 | from sqlalchemy.testing import assert_raises_message |
36 | 37 | from sqlalchemy.testing import config |
@@ -1771,6 +1772,56 @@ def before_execute( |
1771 | 1772 | with e1.connect() as conn: |
1772 | 1773 | conn.execute(select(literal("1"))) |
1773 | 1774 |
|
| 1775 | + @testing.only_on("sqlite") |
| 1776 | + def test_dont_modify_statement_driversql(self, connection): |
| 1777 | + m1 = mock.Mock() |
| 1778 | + |
| 1779 | + @event.listens_for(connection, "before_execute", retval=True) |
| 1780 | + def _modify( |
| 1781 | + conn, clauseelement, multiparams, params, execution_options |
| 1782 | + ): |
| 1783 | + m1.run_event() |
| 1784 | + return clauseelement.replace("hi", "there"), multiparams, params |
| 1785 | + |
| 1786 | + # the event does not take effect for the "driver SQL" option |
| 1787 | + eq_(connection.exec_driver_sql("select 'hi'").scalar(), "hi") |
| 1788 | + |
| 1789 | + # event is not called at all |
| 1790 | + eq_(m1.mock_calls, []) |
| 1791 | + |
| 1792 | + @testing.combinations((True,), (False,), argnames="future") |
| 1793 | + @testing.only_on("sqlite") |
| 1794 | + def test_modify_statement_internal_driversql(self, connection, future): |
| 1795 | + m1 = mock.Mock() |
| 1796 | + |
| 1797 | + @event.listens_for(connection, "before_execute", retval=True) |
| 1798 | + def _modify( |
| 1799 | + conn, clauseelement, multiparams, params, execution_options |
| 1800 | + ): |
| 1801 | + m1.run_event() |
| 1802 | + return clauseelement.replace("hi", "there"), multiparams, params |
| 1803 | + |
| 1804 | + eq_( |
| 1805 | + connection._exec_driver_sql( |
| 1806 | + "select 'hi'", [], {}, {}, future=future |
| 1807 | + ).scalar(), |
| 1808 | + "hi" if future else "there", |
| 1809 | + ) |
| 1810 | + |
| 1811 | + if future: |
| 1812 | + eq_(m1.mock_calls, []) |
| 1813 | + else: |
| 1814 | + eq_(m1.mock_calls, [call.run_event()]) |
| 1815 | + |
| 1816 | + def test_modify_statement_clauseelement(self, connection): |
| 1817 | + @event.listens_for(connection, "before_execute", retval=True) |
| 1818 | + def _modify( |
| 1819 | + conn, clauseelement, multiparams, params, execution_options |
| 1820 | + ): |
| 1821 | + return select(literal_column("'there'")), multiparams, params |
| 1822 | + |
| 1823 | + eq_(connection.scalar(select(literal_column("'hi'"))), "there") |
| 1824 | + |
1774 | 1825 | def test_argument_format_execute(self, testing_engine): |
1775 | 1826 | def before_execute( |
1776 | 1827 | conn, clauseelement, multiparams, params, execution_options |
|
0 commit comments