Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tryuing
Signed-off-by: Francisco Javier Arceo <farceo@redhat.com>
  • Loading branch information
franciscojavierarceo committed Jun 3, 2025
commit 41088d1a402c1c4c83c1b396f7ed693c62f87e6f
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ postgres = ["psycopg[binary,pool]==3.2.5"]
# psycopg[c] install requires a system with a C compiler, python dev headers, & postgresql client dev headers
# https://www.psycopg.org/psycopg3/docs/basic/install.html#local-installation
postgres-c = ["psycopg[c,pool]==3.2.5"]
pytorch = ["torch>=2.7.0", "torchvision>=0.17.2"]
pytorch = ["torch==2.7.0", "torchvision>=0.17.2"]
qdrant = ["qdrant-client>=1.12.0"]
redis = [
"redis>=4.2.2,<5",
Expand Down
240 changes: 120 additions & 120 deletions sdk/python/feast/infra/utils/postgres/connection_utils.py
Original file line number Diff line number Diff line change
@@ -1,120 +1,120 @@
from typing import Any, Dict
import numpy as np
import pandas as pd
import psycopg
import pyarrow as pa
from psycopg import AsyncConnection, Connection
from psycopg.conninfo import make_conninfo
from psycopg_pool import AsyncConnectionPool, ConnectionPool
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
from feast.type_map import arrow_to_pg_type
def _get_conn(config: PostgreSQLConfig) -> Connection:
"""Get a psycopg `Connection`."""
conn = psycopg.connect(
conninfo=_get_conninfo(config),
keepalives_idle=config.keepalives_idle,
**_get_conn_kwargs(config),
)
return conn
async def _get_conn_async(config: PostgreSQLConfig) -> AsyncConnection:
"""Get a psycopg `AsyncConnection`."""
conn = await psycopg.AsyncConnection.connect(
conninfo=_get_conninfo(config),
keepalives_idle=config.keepalives_idle,
**_get_conn_kwargs(config),
)
return conn
def _get_connection_pool(config: PostgreSQLConfig) -> ConnectionPool:
"""Get a psycopg `ConnectionPool`."""
return ConnectionPool(
conninfo=_get_conninfo(config),
min_size=config.min_conn,
max_size=config.max_conn,
open=False,
kwargs=_get_conn_kwargs(config),
)
async def _get_connection_pool_async(config: PostgreSQLConfig) -> AsyncConnectionPool:
"""Get a psycopg `AsyncConnectionPool`."""
return AsyncConnectionPool(
conninfo=_get_conninfo(config),
min_size=config.min_conn,
max_size=config.max_conn,
open=False,
kwargs=_get_conn_kwargs(config),
)
def _get_conninfo(config: PostgreSQLConfig) -> str:
"""Get the `conninfo` argument required for connection objects."""
return make_conninfo(
conninfo="",
user=config.user,
password=config.password,
host=config.host,
port=int(config.port),
dbname=config.database,
)
def _get_conn_kwargs(config: PostgreSQLConfig) -> Dict[str, Any]:
"""Get the additional `kwargs` required for connection objects."""
return {
"sslmode": config.sslmode,
"sslkey": config.sslkey_path,
"sslcert": config.sslcert_path,
"sslrootcert": config.sslrootcert_path,
"options": "-c search_path={}".format(config.db_schema or config.user),
}
def _df_to_create_table_sql(entity_df, table_name) -> str:
pa_table = pa.Table.from_pandas(entity_df)
columns = [
f""""{f.name}" {arrow_to_pg_type(str(f.type))}""" for f in pa_table.schema
]
return f"""
CREATE TABLE "{table_name}" (
{", ".join(columns)}
);
"""
def df_to_postgres_table(
config: PostgreSQLConfig, df: pd.DataFrame, table_name: str
) -> Dict[str, np.dtype]:
"""
Create a table for the data frame, insert all the values, and return the table schema
"""
nr_columns = df.shape[1]
placeholders = ", ".join(["%s"] * nr_columns)
query = f"INSERT INTO {table_name} VALUES ({placeholders})"
values = df.replace({np.nan: None}).to_numpy().tolist()
with _get_conn(config) as conn, conn.cursor() as cur:
cur.execute(_df_to_create_table_sql(df, table_name))
cur.executemany(query, values)
return dict(zip(df.columns, df.dtypes))
def get_query_schema(config: PostgreSQLConfig, sql_query: str) -> Dict[str, np.dtype]:
"""
We'll use the statement when we perform the query rather than copying data to a
new table
"""
with _get_conn(config) as conn:
conn.read_only = True
df = pd.read_sql(
f"SELECT * FROM {sql_query} LIMIT 0",
conn,
)
return dict(zip(df.columns, df.dtypes))
from typing import Any, Dict

import numpy as np
import pandas as pd
import psycopg
import pyarrow as pa
from psycopg import AsyncConnection, Connection
from psycopg.conninfo import make_conninfo
from psycopg_pool import AsyncConnectionPool, ConnectionPool

from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
from feast.type_map import arrow_to_pg_type


def _get_conn(config: PostgreSQLConfig) -> Connection:
"""Get a psycopg `Connection`."""
conn = psycopg.connect(
conninfo=_get_conninfo(config),
keepalives_idle=config.keepalives_idle,
**_get_conn_kwargs(config),
)
return conn


async def _get_conn_async(config: PostgreSQLConfig) -> AsyncConnection:
"""Get a psycopg `AsyncConnection`."""
conn = await psycopg.AsyncConnection.connect(
conninfo=_get_conninfo(config),
keepalives_idle=config.keepalives_idle,
**_get_conn_kwargs(config),
)
return conn


def _get_connection_pool(config: PostgreSQLConfig) -> ConnectionPool:
"""Get a psycopg `ConnectionPool`."""
return ConnectionPool(
conninfo=_get_conninfo(config),
min_size=config.min_conn,
max_size=config.max_conn,
open=False,
kwargs=_get_conn_kwargs(config),
)


async def _get_connection_pool_async(config: PostgreSQLConfig) -> AsyncConnectionPool:
"""Get a psycopg `AsyncConnectionPool`."""
return AsyncConnectionPool(
conninfo=_get_conninfo(config),
min_size=config.min_conn,
max_size=config.max_conn,
open=False,
kwargs=_get_conn_kwargs(config),
)


def _get_conninfo(config: PostgreSQLConfig) -> str:
"""Get the `conninfo` argument required for connection objects."""
return make_conninfo(
conninfo="",
user=config.user,
password=config.password,
host=config.host,
port=int(config.port),
dbname=config.database,
)


def _get_conn_kwargs(config: PostgreSQLConfig) -> Dict[str, Any]:
"""Get the additional `kwargs` required for connection objects."""
return {
"sslmode": config.sslmode,
"sslkey": config.sslkey_path,
"sslcert": config.sslcert_path,
"sslrootcert": config.sslrootcert_path,
"options": "-c search_path={}".format(config.db_schema or config.user),
}


def _df_to_create_table_sql(entity_df, table_name) -> str:
pa_table = pa.Table.from_pandas(entity_df)
columns = [
f""""{f.name}" {arrow_to_pg_type(str(f.type))}""" for f in pa_table.schema
]
return f"""
CREATE TABLE "{table_name}" (
{", ".join(columns)}
);
"""


def df_to_postgres_table(
config: PostgreSQLConfig, df: pd.DataFrame, table_name: str
) -> Dict[str, np.dtype]:
"""
Create a table for the data frame, insert all the values, and return the table schema
"""
nr_columns = df.shape[1]
placeholders = ", ".join(["%s"] * nr_columns)
query = f"INSERT INTO {table_name} VALUES ({placeholders})"
values = df.replace({np.NaN: None}).to_numpy().tolist()

with _get_conn(config) as conn, conn.cursor() as cur:
cur.execute(_df_to_create_table_sql(df, table_name))
cur.executemany(query, values)
return dict(zip(df.columns, df.dtypes))


def get_query_schema(config: PostgreSQLConfig, sql_query: str) -> Dict[str, np.dtype]:
"""
We'll use the statement when we perform the query rather than copying data to a
new table
"""
with _get_conn(config) as conn:
conn.read_only = True
df = pd.read_sql(
f"SELECT * FROM {sql_query} LIMIT 0",
conn,
)
return dict(zip(df.columns, df.dtypes))
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,6 @@ tomli==2.2.1 \
# via
# flit-scm
# frozenlist
# hatch-fancy-pypi-readme
# hatchling
# maturin
# meson-python
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@
MILVUS_REQUIRED = ["pymilvus"]

TORCH_REQUIRED = [
"torch>=2.7.0",
"torch==2.7.0",
"torchvision>=0.17.2",
]

Expand Down