Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9cdceab
Makefile: Formatting
job-almekinders Jun 19, 2024
dca9c9a
Makefile: Exclude Snowflake tests for postgres offline store tests
job-almekinders Jun 19, 2024
fc65cfc
Bootstrap: Use conninfo
job-almekinders Jun 19, 2024
a3ea80d
Tests: Make connection string compatible with psycopg3
job-almekinders Jun 19, 2024
e53d9e6
Tests: Test connection type pool and singleton
job-almekinders Jun 20, 2024
59cbd10
Global: Replace conn.set_session() calls to be psycopg3 compatible
job-almekinders Jun 20, 2024
0f86e9e
Offline: Use psycopg3
job-almekinders Jun 19, 2024
cd91fdc
Online: Use psycopg3
job-almekinders Jun 19, 2024
3504c77
Online: Restructure online_write_batch
job-almekinders Jun 20, 2024
6e45f8e
Online: Use correct placeholder
job-almekinders Jun 20, 2024
c755fcd
Online: Handle bytes properly in online_read()
job-almekinders Jun 20, 2024
acd4a8f
Online: Whitespace
job-almekinders Jun 20, 2024
36147ef
Online: Open ConnectionPool
job-almekinders Jun 21, 2024
d3fd7e7
Online: Add typehint
job-almekinders Jun 21, 2024
0a9bced
Utils: Use psycopg3
job-almekinders Jun 19, 2024
af136da
Lint: Raise exceptions if cursor returned no columns or rows
job-almekinders Jun 21, 2024
0926a15
Add comment on +psycopg string
job-almekinders Jun 21, 2024
6514987
Docs: Remove mention of psycopg2
job-almekinders Jun 21, 2024
915454c
Lint: Fix
job-almekinders Jun 21, 2024
d8e6619
Default to postgresql+psycopg and log warning
job-almekinders Jun 25, 2024
3328530
Solve merge conflicts
job-almekinders Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Utils: Use psycopg3
Use new ConnectionPool

Pass kwargs as named argument

Use executemany over execute_values

Remove not-required open argument in psycopg.connect

Improve

Use SpooledTemporaryFile

Use max_size and add docstring

Properly write with StringIO

Utils: Use SpooledTemporaryFile over StringIO object

Add replace

Fix df_to_postgres_table

Remove import

Utils

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
  • Loading branch information
job-almekinders committed Jun 27, 2024
commit 0a9bcedcd4d4b82ab9bebcb45b949b28a6fc9d75
83 changes: 45 additions & 38 deletions sdk/python/feast/infra/utils/postgres/connection_utils.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,59 @@
from typing import Dict
from typing import Any, Dict

import numpy as np
import pandas as pd
import psycopg2
import psycopg2.extras
import psycopg
import pyarrow as pa
from psycopg2.pool import SimpleConnectionPool
from psycopg.connection import Connection
from psycopg_pool import ConnectionPool

from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
from feast.type_map import arrow_to_pg_type


def _get_conn(config: PostgreSQLConfig):
conn = psycopg2.connect(
dbname=config.database,
host=config.host,
port=int(config.port),
user=config.user,
password=config.password,
sslmode=config.sslmode,
sslkey=config.sslkey_path,
sslcert=config.sslcert_path,
sslrootcert=config.sslrootcert_path,
options="-c search_path={}".format(config.db_schema or config.user),
def _get_conn(config: PostgreSQLConfig) -> Connection:
"""Get a psycopg `Connection`."""
conn = psycopg.connect(
conninfo=_get_conninfo(config),
keepalives_idle=config.keepalives_idle,
**_get_conn_kwargs(config),
)
return conn


def _get_connection_pool(config: PostgreSQLConfig):
return SimpleConnectionPool(
config.min_conn,
config.max_conn,
dbname=config.database,
host=config.host,
port=int(config.port),
user=config.user,
password=config.password,
sslmode=config.sslmode,
sslkey=config.sslkey_path,
sslcert=config.sslcert_path,
sslrootcert=config.sslrootcert_path,
options="-c search_path={}".format(config.db_schema or config.user),
def _get_connection_pool(config: PostgreSQLConfig) -> ConnectionPool:
"""Get a psycopg `ConnectionPool`."""
return ConnectionPool(
conninfo=_get_conninfo(config),
min_size=config.min_conn,
max_size=config.max_conn,
open=False,
kwargs=_get_conn_kwargs(config),
)


def _get_conninfo(config: PostgreSQLConfig) -> str:
"""Get the `conninfo` argument required for connection objects."""
return (
f"postgresql://{config.user}"
f":{config.password}"
f"@{config.host}"
f":{int(config.port)}"
f"/{config.database}"
)


def _get_conn_kwargs(config: PostgreSQLConfig) -> Dict[str, Any]:
"""Get the additional `kwargs` required for connection objects."""
return {
"sslmode": config.sslmode,
"sslkey": config.sslkey_path,
"sslcert": config.sslcert_path,
"sslrootcert": config.sslrootcert_path,
"options": "-c search_path={}".format(config.db_schema or config.user),
}


Comment on lines +35 to +56
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper functions to prevent code duplication in the above methods.

def _df_to_create_table_sql(entity_df, table_name) -> str:
pa_table = pa.Table.from_pandas(entity_df)
columns = [
Expand All @@ -63,16 +72,14 @@ def df_to_postgres_table(
"""
Create a table for the data frame, insert all the values, and return the table schema
"""
nr_columns = df.shape[1]
placeholders = ", ".join(["%s"] * nr_columns)
query = f"INSERT INTO {table_name} VALUES ({placeholders})"
values = df.replace({np.NaN: None}).to_numpy().tolist()

with _get_conn(config) as conn, conn.cursor() as cur:
cur.execute(_df_to_create_table_sql(df, table_name))
psycopg2.extras.execute_values(
cur,
f"""
INSERT INTO {table_name}
VALUES %s
""",
df.replace({np.NaN: None}).to_numpy(),
)
cur.executemany(query, values)
Comment on lines +75 to +82
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the parsing of variables further to the top of the function.

  1. Again, we need to replace execute_values by executemany.
  2. Again, we need to explicitly set the number of placeholders. Since this function should be able to handle a dynamic amount of columns, we use the placeholders variable

return dict(zip(df.columns, df.dtypes))


Expand Down