Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ format-python:
cd ${ROOT_DIR}/sdk/python; python -m black --target-version py38 feast tests

lint-python:
cd ${ROOT_DIR}/sdk/python; python -m mypy --exclude=/tests/ --follow-imports=skip feast
cd ${ROOT_DIR}/sdk/python; python -m mypy feast
cd ${ROOT_DIR}/sdk/python; python -m isort feast/ tests/ --check-only
cd ${ROOT_DIR}/sdk/python; python -m flake8 feast/ tests/
cd ${ROOT_DIR}/sdk/python; python -m black --check feast tests
Expand Down
11 changes: 3 additions & 8 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.avro.functions import from_avro
from pyspark.sql.functions import col, from_json
from pyspark.sql.streaming import StreamingQuery

from feast.data_format import AvroFormat, JsonFormat
from feast.data_source import KafkaSource, PushMode
Expand Down Expand Up @@ -68,13 +67,10 @@ def __init__(
# data_source type has been checked to be an instance of KafkaSource.
self.data_source: KafkaSource = self.data_source # type: ignore

def ingest_stream_feature_view(
self, to: PushMode = PushMode.ONLINE
) -> StreamingQuery:
def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None:
Comment thread
sudohainguyen marked this conversation as resolved.
Outdated
ingested_stream_df = self._ingest_stream_data()
transformed_df = self._construct_transformation_plan(ingested_stream_df)
online_store_query = self._write_stream_data(transformed_df, to)
return online_store_query
self._write_stream_data(transformed_df, to)

# In the line 64 of __init__(), the "data_source" is assigned a stream_source (and has to be KafkaSource as in line 40).
@no_type_check
Expand Down Expand Up @@ -131,7 +127,7 @@ def _ingest_stream_data(self) -> StreamTable:
def _construct_transformation_plan(self, df: StreamTable) -> StreamTable:
return self.sfv.udf.__call__(df) if self.sfv.udf else df

def _write_stream_data(self, df: StreamTable, to: PushMode) -> StreamingQuery:
def _write_stream_data(self, df: StreamTable, to: PushMode) -> None:
# Validation occurs at the fs.write_to_online_store() phase against the stream feature view schema.
def batch_write(row: DataFrame, batch_id: int):
rows: pd.DataFrame = row.toPandas()
Expand Down Expand Up @@ -170,4 +166,3 @@ def batch_write(row: DataFrame, batch_id: int):
)

query.awaitTermination(timeout=self.query_timeout)
return query
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Dict, Optional
from typing import Dict, Literal, Optional

import pandas as pd
import pytest
Expand All @@ -12,6 +12,7 @@
PostgreSQLSource,
)
from feast.infra.utils.postgres.connection_utils import df_to_postgres_table
from feast.infra.utils.postgres.postgres_config import PostgreSQLConfig
from tests.integration.feature_repos.universal.data_source_creator import (
DataSourceCreator,
)
Expand All @@ -26,6 +27,10 @@
POSTGRES_DB = "test"


class PostgreSQLOnlineStoreConfig(PostgreSQLConfig):
type: Literal["postgres"] = "postgres"


@pytest.fixture(scope="session")
def postgres_container():
container = (
Expand Down Expand Up @@ -106,17 +111,17 @@ def create_offline_store_config(self) -> PostgreSQLOfflineStoreConfig:
def get_prefixed_table_name(self, suffix: str) -> str:
return f"{self.project_name}_{suffix}"

def create_online_store(self) -> Dict[str, str]:
def create_online_store(self) -> PostgreSQLOnlineStoreConfig:
assert self.container
return {
"type": "postgres",
"host": "localhost",
"port": self.container.get_exposed_port(5432),
"database": POSTGRES_DB,
"db_schema": "feature_store",
"user": POSTGRES_USER,
"password": POSTGRES_PASSWORD,
}
return PostgreSQLOnlineStoreConfig(
type="postgres",
host="localhost",
port=self.container.get_exposed_port(5432),
database=POSTGRES_DB,
db_schema="feature_store",
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
)

def create_saved_dataset_destination(self):
# FIXME: ...
Expand Down
13 changes: 2 additions & 11 deletions sdk/python/feast/infra/offline_stores/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,9 @@ def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame:
def _to_arrow_internal(self, timeout: Optional[int] = None) -> pyarrow.Table:
pa_table = execute_snowflake_statement(
self.snowflake_conn, self.to_sql()
).fetch_arrow_all()
).fetch_arrow_all(force_return_table=True)

if pa_table:
return pa_table
else:
empty_result = execute_snowflake_statement(
self.snowflake_conn, self.to_sql()
)

return pyarrow.Table.from_pandas(
pd.DataFrame(columns=[md.name for md in empty_result.description])
)
return pa_table
Comment thread
bushwhackr marked this conversation as resolved.
Outdated

def to_sql(self) -> str:
"""
Expand Down