-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: Bump psycopg2 to psycopg3 for all Postgres components #4303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
9cdceab
Makefile: Formatting
job-almekinders dca9c9a
Makefile: Exclude Snowflake tests for postgres offline store tests
job-almekinders fc65cfc
Bootstrap: Use conninfo
job-almekinders a3ea80d
Tests: Make connection string compatible with psycopg3
job-almekinders e53d9e6
Tests: Test connection type pool and singleton
job-almekinders 59cbd10
Global: Replace conn.set_session() calls to be psycopg3 compatible
job-almekinders 0f86e9e
Offline: Use psycopg3
job-almekinders cd91fdc
Online: Use psycopg3
job-almekinders 3504c77
Online: Restructure online_write_batch
job-almekinders 6e45f8e
Online: Use correct placeholder
job-almekinders c755fcd
Online: Handle bytes properly in online_read()
job-almekinders acd4a8f
Online: Whitespace
job-almekinders 36147ef
Online: Open ConnectionPool
job-almekinders d3fd7e7
Online: Add typehint
job-almekinders 0a9bced
Utils: Use psycopg3
job-almekinders af136da
Lint: Raise exceptions if cursor returned no columns or rows
job-almekinders 0926a15
Add comment on +psycopg string
job-almekinders 6514987
Docs: Remove mention of psycopg2
job-almekinders 915454c
Lint: Fix
job-almekinders d8e6619
Default to postgresql+psycopg and log warning
job-almekinders 3328530
Solve merge conflicts
job-almekinders File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Utils: Use psycopg3
Use new ConnectionPool Pass kwargs as named argument Use executemany over execute_values Remove not-required open argument in psycopg.connect Improve Use SpooledTemporaryFile Use max_size and add docstring Properly write with StringIO Utils: Use SpooledTemporaryFile over StringIO object Add replace Fix df_to_postgres_table Remove import Utils Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
- Loading branch information
commit 0a9bcedcd4d4b82ab9bebcb45b949b28a6fc9d75
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,50 +1,59 @@ | ||
| from typing import Dict | ||
| from typing import Any, Dict | ||
|
|
||
| import numpy as np | ||
| import pandas as pd | ||
| import psycopg2 | ||
| import psycopg2.extras | ||
| import psycopg | ||
| import pyarrow as pa | ||
| from psycopg2.pool import SimpleConnectionPool | ||
| from psycopg.connection import Connection | ||
| from psycopg_pool import ConnectionPool | ||
|
|
||
| from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig | ||
| from feast.type_map import arrow_to_pg_type | ||
|
|
||
|
|
||
| def _get_conn(config: PostgreSQLConfig): | ||
| conn = psycopg2.connect( | ||
| dbname=config.database, | ||
| host=config.host, | ||
| port=int(config.port), | ||
| user=config.user, | ||
| password=config.password, | ||
| sslmode=config.sslmode, | ||
| sslkey=config.sslkey_path, | ||
| sslcert=config.sslcert_path, | ||
| sslrootcert=config.sslrootcert_path, | ||
| options="-c search_path={}".format(config.db_schema or config.user), | ||
| def _get_conn(config: PostgreSQLConfig) -> Connection: | ||
| """Get a psycopg `Connection`.""" | ||
| conn = psycopg.connect( | ||
| conninfo=_get_conninfo(config), | ||
| keepalives_idle=config.keepalives_idle, | ||
| **_get_conn_kwargs(config), | ||
| ) | ||
| return conn | ||
|
|
||
|
|
||
| def _get_connection_pool(config: PostgreSQLConfig): | ||
| return SimpleConnectionPool( | ||
| config.min_conn, | ||
| config.max_conn, | ||
| dbname=config.database, | ||
| host=config.host, | ||
| port=int(config.port), | ||
| user=config.user, | ||
| password=config.password, | ||
| sslmode=config.sslmode, | ||
| sslkey=config.sslkey_path, | ||
| sslcert=config.sslcert_path, | ||
| sslrootcert=config.sslrootcert_path, | ||
| options="-c search_path={}".format(config.db_schema or config.user), | ||
| def _get_connection_pool(config: PostgreSQLConfig) -> ConnectionPool: | ||
| """Get a psycopg `ConnectionPool`.""" | ||
| return ConnectionPool( | ||
| conninfo=_get_conninfo(config), | ||
| min_size=config.min_conn, | ||
| max_size=config.max_conn, | ||
| open=False, | ||
| kwargs=_get_conn_kwargs(config), | ||
| ) | ||
|
|
||
|
|
||
| def _get_conninfo(config: PostgreSQLConfig) -> str: | ||
| """Get the `conninfo` argument required for connection objects.""" | ||
| return ( | ||
| f"postgresql://{config.user}" | ||
| f":{config.password}" | ||
| f"@{config.host}" | ||
| f":{int(config.port)}" | ||
| f"/{config.database}" | ||
| ) | ||
|
|
||
|
|
||
| def _get_conn_kwargs(config: PostgreSQLConfig) -> Dict[str, Any]: | ||
| """Get the additional `kwargs` required for connection objects.""" | ||
| return { | ||
| "sslmode": config.sslmode, | ||
| "sslkey": config.sslkey_path, | ||
| "sslcert": config.sslcert_path, | ||
| "sslrootcert": config.sslrootcert_path, | ||
| "options": "-c search_path={}".format(config.db_schema or config.user), | ||
| } | ||
|
|
||
|
|
||
| def _df_to_create_table_sql(entity_df, table_name) -> str: | ||
| pa_table = pa.Table.from_pandas(entity_df) | ||
| columns = [ | ||
|
|
@@ -63,16 +72,14 @@ def df_to_postgres_table( | |
| """ | ||
| Create a table for the data frame, insert all the values, and return the table schema | ||
| """ | ||
| nr_columns = df.shape[1] | ||
| placeholders = ", ".join(["%s"] * nr_columns) | ||
| query = f"INSERT INTO {table_name} VALUES ({placeholders})" | ||
| values = df.replace({np.NaN: None}).to_numpy().tolist() | ||
|
|
||
| with _get_conn(config) as conn, conn.cursor() as cur: | ||
| cur.execute(_df_to_create_table_sql(df, table_name)) | ||
| psycopg2.extras.execute_values( | ||
| cur, | ||
| f""" | ||
| INSERT INTO {table_name} | ||
| VALUES %s | ||
| """, | ||
| df.replace({np.NaN: None}).to_numpy(), | ||
| ) | ||
| cur.executemany(query, values) | ||
|
Comment on lines
+75
to
+82
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved the parsing of variables further to the top of the function.
|
||
| return dict(zip(df.columns, df.dtypes)) | ||
|
|
||
|
|
||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Helper functions to prevent code duplication in the above methods.