Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
d372a09
ci: Add bigtable cleanup script
adchia Jul 3, 2023
f6d3caf
fix: Missing Catalog argument in athena connector (#3661)
GyuminJack Jul 3, 2023
d4f9158
ci: Disable flaky lambda materialization test
adchia Jul 3, 2023
4861af0
fix: Broken non-root path with projects-list.json (#3665)
bjfletcher Jul 3, 2023
48e0971
fix: Manage redis pipe's context (#3655)
j1wonpark Jul 4, 2023
315073f
chore: Bump tough-cookie from 4.0.0 to 4.1.3 in /sdk/python/feast/ui …
dependabot[bot] Jul 11, 2023
870762a
chore: Bump tough-cookie from 4.0.0 to 4.1.3 in /ui (#3676)
dependabot[bot] Jul 11, 2023
478caec
fix: For SQL registry, increase max data_source_name length to 255 (#…
radonnachie Jul 13, 2023
1c01035
fix: Optimize bytes processed when retrieving entity df schema to 0 (…
sudohainguyen Jul 13, 2023
ef4ef32
fix: Entityless fv breaks with `KeyError: __dummy` applying feature_s…
wfoschiera Jul 13, 2023
0ad2d62
chore: Bump protobufjs from 7.1.1 to 7.2.4 in /ui (#3674)
dependabot[bot] Jul 17, 2023
e4c0c9b
chore: Bump protobufjs from 7.1.2 to 7.2.4 in /sdk/python/feast/ui (#…
dependabot[bot] Jul 17, 2023
bef5791
chore: Bump semver from 6.3.0 to 6.3.1 in /ui (#3678)
dependabot[bot] Jul 17, 2023
928be7b
chore: Bump semver from 6.3.0 to 6.3.1 in /sdk/python/feast/ui (#3679)
dependabot[bot] Jul 17, 2023
12f57a9
chore: Bump google.golang.org/grpc from 1.47.0 to 1.53.0 (#3670)
dependabot[bot] Jul 17, 2023
9527183
chore(release): release 0.32.0
feast-ci-bot Jul 17, 2023
76270f6
fix: Redshift push ignores schema (#3671)
metavee Jul 24, 2023
c75a01f
fix: Add aws-sts dependency in java sdk so that S3 client acquires IR…
harmeet-singh-discovery Aug 1, 2023
0578b9b
Adding initial update changes
Aug 7, 2023
8487678
Merge branch 'feast-dev:master' into msudhir/add-vector-update-functi…
Manisha4 Aug 7, 2023
5828891
Added formatting changes
Aug 7, 2023
4a29d33
Revert "Merge branch 'feast-dev:master' into msudhir/add-vector-updat…
Aug 7, 2023
e209770
Added more tests and functionality
Aug 8, 2023
ebe1e32
updating tests
Aug 8, 2023
62692e0
updated functionality and added more tests
Aug 9, 2023
0680c94
correcting a test case
Aug 9, 2023
5c5490d
Making formatting corrections and changeing log
Aug 9, 2023
cdadb87
Improved tests and added functionality to convert feast schema to mil…
Aug 10, 2023
e1fd230
Added PR Review comments
Aug 11, 2023
d0c4269
Fixed failing test
Aug 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix: Redshift push ignores schema (feast-dev#3671)
* Add fully-qualified-table-name Redshift prop

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* pre-commit

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* Docstring

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* Test fully_qualified_table_name

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* Simplify logic

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* pre-commit

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* pre-commit

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* Test offline_write_batch

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* Bump to trigger CI

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

* another bump for ci

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>

---------

Signed-off-by: Robin Neufeld <metavee@users.noreply.github.com>
  • Loading branch information
metavee authored Jul 24, 2023
commit 76270f66b3d98b0119b70927c06908f9834b6120
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/offline_stores/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ def offline_write_batch(
s3_resource=s3_resource,
s3_path=f"{config.offline_store.s3_staging_location}/push/{uuid.uuid4()}.parquet",
iam_role=config.offline_store.iam_role,
table_name=redshift_options.table,
table_name=redshift_options.fully_qualified_table_name,
schema=pa_schema,
fail_if_exists=False,
)
Expand Down
37 changes: 36 additions & 1 deletion sdk/python/feast/infra/offline_stores/redshift_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,42 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):

return redshift_options

@property
def fully_qualified_table_name(self) -> str:
"""
The fully qualified table name of this Redshift table.

Returns:
A string in the format of <database>.<schema>.<table>
May be empty or None if the table is not set
"""

if not self.table:
return ""

# self.table may already contain the database and schema
parts = self.table.split(".")
if len(parts) == 3:
database, schema, table = parts
elif len(parts) == 2:
database = self.database
schema, table = parts
elif len(parts) == 1:
database = self.database
schema = self.schema
table = parts[0]
else:
raise ValueError(
f"Invalid table name: {self.table} - can't determine database and schema"
)

if database and schema:
return f"{database}.{schema}.{table}"
elif schema:
return f"{schema}.{table}"
else:
return table

def to_proto(self) -> DataSourceProto.RedshiftOptions:
"""
Converts an RedshiftOptionsProto object to its protobuf representation.
Expand Down Expand Up @@ -323,7 +359,6 @@ def __init__(self, table_ref: str):

@staticmethod
def from_proto(storage_proto: SavedDatasetStorageProto) -> SavedDatasetStorage:

return SavedDatasetRedshiftStorage(
table_ref=RedshiftOptions.from_proto(storage_proto.redshift_storage).table
)
Expand Down
67 changes: 67 additions & 0 deletions sdk/python/tests/unit/infra/offline_stores/test_redshift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from unittest.mock import MagicMock, patch

import pandas as pd
import pyarrow as pa

from feast import FeatureView
from feast.infra.offline_stores import offline_utils
from feast.infra.offline_stores.redshift import (
RedshiftOfflineStore,
RedshiftOfflineStoreConfig,
)
from feast.infra.offline_stores.redshift_source import RedshiftSource
from feast.infra.utils import aws_utils
from feast.repo_config import RepoConfig


@patch.object(aws_utils, "upload_arrow_table_to_redshift")
def test_offline_write_batch(
mock_upload_arrow_table_to_redshift: MagicMock,
simple_dataset_1: pd.DataFrame,
):
repo_config = RepoConfig(
registry="registry",
project="project",
provider="local",
offline_store=RedshiftOfflineStoreConfig(
type="redshift",
region="us-west-2",
cluster_id="cluster_id",
database="database",
user="user",
iam_role="abcdef",
s3_staging_location="s3://bucket/path",
),
)

batch_source = RedshiftSource(
name="test_source",
timestamp_field="ts",
table="table_name",
schema="schema_name",
)
feature_view = FeatureView(
name="test_view",
source=batch_source,
)

pa_dataset = pa.Table.from_pandas(simple_dataset_1)

# patch some more things so that the function can run
def mock_get_pyarrow_schema_from_batch_source(*args, **kwargs) -> pa.Schema:
return pa_dataset.schema, pa_dataset.column_names

with patch.object(
offline_utils,
"get_pyarrow_schema_from_batch_source",
new=mock_get_pyarrow_schema_from_batch_source,
):
RedshiftOfflineStore.offline_write_batch(
repo_config, feature_view, pa_dataset, progress=None
)

# check that we have included the fully qualified table name
mock_upload_arrow_table_to_redshift.assert_called_once()

call = mock_upload_arrow_table_to_redshift.call_args_list[0]
assert call.kwargs["table_name"] == "schema_name.table_name"
43 changes: 43 additions & 0 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,46 @@ def test_column_conflict():
timestamp_field="event_timestamp",
created_timestamp_column="event_timestamp",
)


@pytest.mark.parametrize(
"source_kwargs,expected_name",
[
(
{
"database": "test_database",
"schema": "test_schema",
"table": "test_table",
},
"test_database.test_schema.test_table",
),
(
{"database": "test_database", "table": "test_table"},
"test_database.public.test_table",
),
({"table": "test_table"}, "public.test_table"),
({"database": "test_database", "table": "b.c"}, "test_database.b.c"),
({"database": "test_database", "table": "a.b.c"}, "a.b.c"),
(
{
"database": "test_database",
"schema": "test_schema",
"query": "select * from abc",
},
"",
),
],
)
def test_redshift_fully_qualified_table_name(source_kwargs, expected_name):
redshift_source = RedshiftSource(
name="test_source",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
field_mapping={"foo": "bar"},
description="test description",
tags={"test": "test"},
owner="test@gmail.com",
**source_kwargs,
)

assert redshift_source.redshift_options.fully_qualified_table_name == expected_name