Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
e1e210d
Broken state
kevjumba Aug 5, 2022
011d1e0
working state
kevjumba Aug 10, 2022
a6a2fce
Fix the lint issues
kevjumba Aug 10, 2022
57b63bb
Semi working state
kevjumba Aug 10, 2022
ae7ed8a
Fix
kevjumba Aug 10, 2022
421645b
Fremove print
kevjumba Aug 10, 2022
07fece5
Fix lint
kevjumba Aug 11, 2022
4062031
Run build-sphinx
kevjumba Aug 11, 2022
cb39329
Add tutorials
kevjumba Aug 11, 2022
554ca1a
Fix
kevjumba Aug 11, 2022
4a969e7
Fix?
kevjumba Aug 11, 2022
116320a
Fix lint
kevjumba Aug 11, 2022
c0b16ef
Fix
kevjumba Aug 11, 2022
44d09d0
Fix lint
kevjumba Aug 12, 2022
b6f0a79
Begin configuring tests
adchia Aug 15, 2022
2b2ff40
Fix
kevjumba Aug 15, 2022
4616366
Working version
kevjumba Aug 16, 2022
c7d9852
Fix
kevjumba Aug 17, 2022
d2e290b
Fix
kevjumba Aug 17, 2022
a726a9a
Fix
kevjumba Aug 17, 2022
32992e3
Fix lint
kevjumba Aug 17, 2022
ebb934b
Fix lint
kevjumba Aug 17, 2022
e456acb
Fix
kevjumba Aug 17, 2022
45f479f
Fix lint
kevjumba Aug 17, 2022
4b8c4a2
Fix
kevjumba Aug 17, 2022
b1bf602
Fix
kevjumba Aug 17, 2022
4586f00
Fix azure
kevjumba Aug 17, 2022
3b88c0b
Fix
kevjumba Aug 17, 2022
9ae8ee3
Fix
kevjumba Aug 17, 2022
1b12e4a
Fix lint and address issues
kevjumba Aug 18, 2022
0ca5048
Fix integration tests
kevjumba Aug 18, 2022
883f314
Fix
kevjumba Aug 18, 2022
ccf8716
Fix lint and address issues
kevjumba Aug 18, 2022
f05288e
Fix
kevjumba Aug 18, 2022
ee30e73
Fix
kevjumba Aug 18, 2022
ab17db9
Fix
kevjumba Aug 18, 2022
be162f5
Revert
kevjumba Aug 18, 2022
f5aa476
Fix
kevjumba Aug 18, 2022
4423dfa
Fix
kevjumba Aug 18, 2022
5806507
Fix
kevjumba Aug 18, 2022
7a4d055
Fix lint
kevjumba Aug 19, 2022
78b74b1
Fix
kevjumba Aug 19, 2022
a9e8119
Fix lint
kevjumba Aug 19, 2022
1341e3e
Fix pyarrow
kevjumba Aug 19, 2022
3d42093
Fix lint
kevjumba Aug 19, 2022
1c591f0
add requirements files
adchia Aug 19, 2022
b4da607
fix name of docs
adchia Aug 19, 2022
c3a0423
fix offline store readme
adchia Aug 19, 2022
576b57e
fix offline store readme
adchia Aug 19, 2022
69940ac
fix
adchia Aug 19, 2022
516ff76
fix
adchia Aug 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Working version
Signed-off-by: Kevin Zhang <kzhang@tecton.ai>
  • Loading branch information
kevjumba committed Aug 19, 2022
commit 4616366bcef7bc074d8368afeafbeb737f954899
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/contrib/azure_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
class AzureProvider(Provider):
def __init__(self, config: RepoConfig):
warnings.warn(
"The azure provider is an experimental feature in alpha development. "
"The azure provider is an experimental feature in alpha development. "
"Some functionality may still be unstable so functionality can change in the future.",
RuntimeWarning,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.mssql import DATETIME2

from feast import FileSource, errors
from feast.data_source import DataSource
Expand Down Expand Up @@ -185,6 +186,7 @@ def get_historical_features(
entity_df_event_timestamp_col = (
offline_utils.infer_event_timestamp_from_entity_df(table_schema)
)

_assert_expected_columns_in_sqlserver(
expected_join_keys,
entity_df_event_timestamp_col,
Expand Down Expand Up @@ -407,7 +409,7 @@ def _upload_entity_df_into_sqlserver_and_get_entity_schema(
raise ValueError(
f"The entity dataframe you have provided must be a SQL Server SQL query,"
f" or a Pandas dataframe. But we found: {type(entity_df)} "
)
)

return entity_schema

Expand Down Expand Up @@ -601,21 +603,16 @@ def _get_entity_df_event_timestamp_range(
The entity_dataframe dataset being our source of truth here.
*/

SELECT entity_dataframe.*
{% for featureview in featureviews %}
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
{% endfor %}
{% endfor %}
SELECT {{ final_output_feature_names | join(', ')}}
FROM entity_dataframe
{% for featureview in featureviews %}
LEFT JOIN (
SELECT
{{featureview.name}}__entity_row_unique_id
{% for feature in featureview.features %}
,{% if full_feature_names %}{{ featureview.name }}__{{feature}}{% else %}{{ feature }}{% endif %}
,{% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}
{% endfor %}
FROM {{ featureview.name }}__cleaned
FROM "{{ featureview.name }}__cleaned"
) {{ featureview.name }}__cleaned
ON
{{ featureview.name }}__cleaned.{{ featureview.name }}__entity_row_unique_id = entity_dataframe.{{ featureview.name }}__entity_row_unique_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pytest
from sqlalchemy import create_engine
from testcontainers.core.container import DockerContainer
from testcontainers.mssql import SqlServerContainer
from testcontainers.core.waiting_utils import wait_for_logs

from feast.data_source import DataSource
Expand All @@ -24,6 +25,7 @@
MSSQL_PASSWORD = "yourStrong(!)Password"


# This is the sql container to use if your machine doesn't support the official msql docker container.
@pytest.fixture(scope="session")
Comment thread
adchia marked this conversation as resolved.
def mssql_container():
container = (
Expand All @@ -43,6 +45,7 @@ def mssql_container():

def _df_to_create_table_sql(df: pd.DataFrame, table_name: str) -> str:
pa_table = pa.Table.from_pandas(df)

columns = [f""""{f.name}" {pa_to_mssql_type(f.type)}""" for f in pa_table.schema]
return f"""
CREATE TABLE "{table_name}" (
Expand All @@ -51,6 +54,8 @@ def _df_to_create_table_sql(df: pd.DataFrame, table_name: str) -> str:
"""




class MsSqlDataSourceCreator(DataSourceCreator):
tables: List[str] = []

Expand All @@ -59,8 +64,10 @@ def __init__(
):
super().__init__(project_name)
self.tables_created: List[str] = []
self.container = fixture_request.getfixturevalue("mssql_container")
self.exposed_port = self.container.get_exposed_port("1433")
self.container = SqlServerContainer(user=MSSQL_USER, password=MSSQL_PASSWORD)
#self.container = fixture_request.getfixturevalue("mssql_container")
self.container.start()
#self.exposed_port = self.container.get_exposed_port("1433")
if not self.container:
raise RuntimeError(
"In order to use this data source "
Expand All @@ -70,10 +77,11 @@ def __init__(

def create_offline_store_config(self) -> MsSqlServerOfflineStoreConfig:
return MsSqlServerOfflineStoreConfig(
connection_string=(
f"mssql+pyodbc://{MSSQL_USER}:{MSSQL_PASSWORD}@0.0.0.0:1433/master?"
"driver=ODBC+Driver+17+for+SQL+Server"
)
connection_string=self.container.get_connection_url(),
# connection_string=(
# f"mssql+pyodbc://{MSSQL_USER}:{MSSQL_PASSWORD}@0.0.0.0:1433/master?"
# "driver=ODBC+Driver+17+for+SQL+Server"
# )
)

def create_data_source(
Expand All @@ -85,33 +93,30 @@ def create_data_source(
field_mapping: Dict[str, str] = None,
**kwargs,
) -> DataSource:
if timestamp_field in df:
df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True)
# Make sure the field mapping is correct and convert the datetime datasources.

if field_mapping:
timestamp_mapping = {value: key for key, value in field_mapping.items()}
if (
timestamp_field in timestamp_mapping
and timestamp_mapping[timestamp_field] in df
):
col = timestamp_mapping[timestamp_field]
df[col] = pd.to_datetime(df[col], utc=True)
# if timestamp_field in df:
# df[timestamp_field] = pd.to_datetime(df[timestamp_field], utc=True).fillna(pd.Timestamp.now()) #.dt.tz_localize(None)
# # Make sure the field mapping is correct and convert the datetime datasources.
# if created_timestamp_column in df:
# df[created_timestamp_column] = pd.to_datetime(df[created_timestamp_column], utc=True).fillna(pd.Timestamp.now()) #.dt.tz_localize(None)

connection_string = self.create_offline_store_config().connection_string
engine = create_engine(connection_string)
# Create table

destination_name = self.get_prefixed_table_name(destination_name)
#_df_to_create_table_sql(df, destination_name)
engine.execute(_df_to_create_table_sql(df, destination_name))
# Upload dataframe to azure table
# TODO
df.to_sql(destination_name, engine, index=False, if_exists='append')
#, dtype={timestamp_field: DATETIME2(), created_timestamp_column: DATETIME2()}
self.tables.append(destination_name)
return MsSqlServerSource(
name="ci_mssql_source",
connection_str=connection_string,
table_ref=destination_name,
event_timestamp_column=timestamp_field,
created_timestamp_column=created_timestamp_column,
Comment thread
adchia marked this conversation as resolved.
Outdated
field_mapping=field_mapping,
field_mapping=field_mapping or {"ts_1": "ts"},
)

def create_saved_dataset_destination(self) -> SavedDatasetStorage:
Expand Down