Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9cdceab
Makefile: Formatting
job-almekinders Jun 19, 2024
dca9c9a
Makefile: Exclude Snowflake tests for postgres offline store tests
job-almekinders Jun 19, 2024
fc65cfc
Bootstrap: Use conninfo
job-almekinders Jun 19, 2024
a3ea80d
Tests: Make connection string compatible with psycopg3
job-almekinders Jun 19, 2024
e53d9e6
Tests: Test connection type pool and singleton
job-almekinders Jun 20, 2024
59cbd10
Global: Replace conn.set_session() calls to be psycopg3 compatible
job-almekinders Jun 20, 2024
0f86e9e
Offline: Use psycopg3
job-almekinders Jun 19, 2024
cd91fdc
Online: Use psycopg3
job-almekinders Jun 19, 2024
3504c77
Online: Restructure online_write_batch
job-almekinders Jun 20, 2024
6e45f8e
Online: Use correct placeholder
job-almekinders Jun 20, 2024
c755fcd
Online: Handle bytes properly in online_read()
job-almekinders Jun 20, 2024
acd4a8f
Online: Whitespace
job-almekinders Jun 20, 2024
36147ef
Online: Open ConnectionPool
job-almekinders Jun 21, 2024
d3fd7e7
Online: Add typehint
job-almekinders Jun 21, 2024
0a9bced
Utils: Use psycopg3
job-almekinders Jun 19, 2024
af136da
Lint: Raise exceptions if cursor returned no columns or rows
job-almekinders Jun 21, 2024
0926a15
Add comment on +psycopg string
job-almekinders Jun 21, 2024
6514987
Docs: Remove mention of psycopg2
job-almekinders Jun 21, 2024
915454c
Lint: Fix
job-almekinders Jun 21, 2024
d8e6619
Default to postgresql+psycopg and log warning
job-almekinders Jun 25, 2024
3328530
Solve merge conflicts
job-almekinders Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Lint: Raise exceptions if cursor returned no columns or rows
Add log statement

Lint: Fix _to_arrow_internal

Lint: Fix _get_entity_df_event_timestamp_range

Update exception

Use ZeroColumnQueryResult

Signed-off-by: Job Almekinders <job.almekinders@teampicnic.com>
  • Loading branch information
job-almekinders committed Jun 27, 2024
commit af136daf4d4abe7794ed016f2d48613b75219d8a
10 changes: 10 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,3 +389,13 @@ def __init__(self, input_dict: dict):
super().__init__(
f"Failed to serialize the provided dictionary into a pandas DataFrame: {input_dict.keys()}"
)


class ZeroRowsQueryResult(Exception):
def __init__(self, query: str):
super().__init__(f"This query returned zero rows:\n{query}")


class ZeroColumnQueryResult(Exception):
def __init__(self, query: str):
super().__init__(f"This query returned zero columns:\n{query}")
Comment on lines +394 to +401
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exceptions to use for stricter handling of type hints of psycopg3

Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import contextlib
import logging
from dataclasses import asdict
from datetime import datetime
from typing import (
Expand All @@ -23,7 +24,7 @@
from pytz import utc

from feast.data_source import DataSource
from feast.errors import InvalidEntityType
from feast.errors import InvalidEntityType, ZeroColumnQueryResult, ZeroRowsQueryResult
from feast.feature_view import DUMMY_ENTITY_ID, DUMMY_ENTITY_VAL, FeatureView
from feast.infra.offline_stores import offline_utils
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
Expand Down Expand Up @@ -276,6 +277,8 @@ def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table:
with _get_conn(self.config.offline_store) as conn, conn.cursor() as cur:
conn.read_only = True
cur.execute(query)
if not cur.description:
raise ZeroColumnQueryResult(query)
Comment thread
job-almekinders marked this conversation as resolved.
fields = [
(c.name, pg_type_code_to_arrow(c.type_code))
for c in cur.description
Expand Down Expand Up @@ -331,16 +334,19 @@ def _get_entity_df_event_timestamp_range(
entity_df_event_timestamp.max().to_pydatetime(),
)
elif isinstance(entity_df, str):
# If the entity_df is a string (SQL query), determine range
# from table
# If the entity_df is a string (SQL query), determine range from table
with _get_conn(config.offline_store) as conn, conn.cursor() as cur:
(
cur.execute(
f"SELECT MIN({entity_df_event_timestamp_col}) AS min, MAX({entity_df_event_timestamp_col}) AS max FROM ({entity_df}) as tmp_alias"
),
)
query = f"""
SELECT
MIN({entity_df_event_timestamp_col}) AS min,
MAX({entity_df_event_timestamp_col}) AS max
FROM ({entity_df}) AS tmp_alias
"""
Comment on lines +338 to +343
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No updates here, only re-formatting the query

cur.execute(query)
res = cur.fetchone()
entity_df_event_timestamp_range = (res[0], res[1])
if not res:
raise ZeroRowsQueryResult(query)
entity_df_event_timestamp_range = (res[0], res[1])
else:
raise InvalidEntityType(type(entity_df))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import json
import logging
from typing import Callable, Dict, Iterable, Optional, Tuple

from typeguard import typechecked

from feast.data_source import DataSource
from feast.errors import DataSourceNoNameException
from feast.errors import DataSourceNoNameException, ZeroColumnQueryResult
from feast.infra.utils.postgres.connection_utils import _get_conn
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.SavedDataset_pb2 import (
Expand Down Expand Up @@ -111,7 +112,11 @@ def get_table_column_names_and_types(
self, config: RepoConfig
) -> Iterable[Tuple[str, str]]:
with _get_conn(config.offline_store) as conn, conn.cursor() as cur:
cur.execute(f"SELECT * FROM {self.get_table_query_string()} AS sub LIMIT 0")
query = f"SELECT * FROM {self.get_table_query_string()} AS sub LIMIT 0"
cur.execute(query)
if not cur.description:
raise ZeroColumnQueryResult(query)

return (
(c.name, pg_type_code_to_pg_type(c.type_code)) for c in cur.description
)
Expand Down