Skip to content

Commit ac71cdf

Browse files
Add schema parameter to RedshiftSource (feast-dev#1847)
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent cdd6659 commit ac71cdf

2 files changed

Lines changed: 86 additions & 30 deletions

File tree

protos/feast/core/DataSource.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ message DataSource {
121121
// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
122122
// entity columns
123123
string query = 2;
124+
125+
// Redshift schema name
126+
string schema = 3;
124127
}
125128

126129
// Defines configuration for custom third-party data sources.

sdk/python/feast/infra/offline_stores/redshift_source.py

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,56 @@ def __init__(
1313
self,
1414
event_timestamp_column: Optional[str] = "",
1515
table: Optional[str] = None,
16+
schema: Optional[str] = None,
1617
created_timestamp_column: Optional[str] = "",
1718
field_mapping: Optional[Dict[str, str]] = None,
1819
date_partition_column: Optional[str] = "",
1920
query: Optional[str] = None,
2021
):
22+
"""
23+
Creates a RedshiftSource object.
24+
25+
Args:
26+
event_timestamp_column (optional): Event timestamp column used for point in
27+
time joins of feature values.
28+
table (optional): Redshift table where the features are stored.
29+
schema (optional): Redshift schema in which the table is located.
30+
created_timestamp_column (optional): Timestamp column indicating when the
31+
row was created, used for deduplicating rows.
32+
field_mapping (optional): A dictionary mapping of column names in this data
33+
source to column names in a feature table or view.
34+
date_partition_column (optional): Timestamp column used for partitioning.
35+
query (optional): The query to be executed to obtain the features.
36+
"""
2137
super().__init__(
2238
event_timestamp_column,
2339
created_timestamp_column,
2440
field_mapping,
2541
date_partition_column,
2642
)
2743

28-
self._redshift_options = RedshiftOptions(table=table, query=query)
44+
# The default Redshift schema is named "public".
45+
_schema = "public" if table and not schema else schema
46+
47+
self._redshift_options = RedshiftOptions(
48+
table=table, schema=_schema, query=query
49+
)
2950

3051
@staticmethod
3152
def from_proto(data_source: DataSourceProto):
53+
"""
54+
Creates a RedshiftSource from a protobuf representation of a RedshiftSource.
55+
56+
Args:
57+
data_source: A protobuf representation of a RedshiftSource
58+
59+
Returns:
60+
A RedshiftSource object based on the data_source protobuf.
61+
"""
3262
return RedshiftSource(
3363
field_mapping=dict(data_source.field_mapping),
3464
table=data_source.redshift_options.table,
65+
schema=data_source.redshift_options.schema,
3566
event_timestamp_column=data_source.event_timestamp_column,
3667
created_timestamp_column=data_source.created_timestamp_column,
3768
date_partition_column=data_source.date_partition_column,
@@ -46,6 +77,7 @@ def __eq__(self, other):
4677

4778
return (
4879
self.redshift_options.table == other.redshift_options.table
80+
and self.redshift_options.schema == other.redshift_options.schema
4981
and self.redshift_options.query == other.redshift_options.query
5082
and self.event_timestamp_column == other.event_timestamp_column
5183
and self.created_timestamp_column == other.created_timestamp_column
@@ -54,27 +86,36 @@ def __eq__(self, other):
5486

5587
@property
5688
def table(self):
89+
"""Returns the table of this Redshift source."""
5790
return self._redshift_options.table
5891

92+
@property
93+
def schema(self):
94+
"""Returns the schema of this Redshift source."""
95+
return self._redshift_options.schema
96+
5997
@property
6098
def query(self):
99+
"""Returns the Redshift options of this Redshift source."""
61100
return self._redshift_options.query
62101

63102
@property
64103
def redshift_options(self):
65-
"""
66-
Returns the Redshift options of this data source
67-
"""
104+
"""Returns the Redshift options of this Redshift source."""
68105
return self._redshift_options
69106

70107
@redshift_options.setter
71108
def redshift_options(self, _redshift_options):
72-
"""
73-
Sets the Redshift options of this data source
74-
"""
109+
"""Sets the Redshift options of this Redshift source."""
75110
self._redshift_options = _redshift_options
76111

77112
def to_proto(self) -> DataSourceProto:
113+
"""
114+
Converts a RedshiftSource object to its protobuf representation.
115+
116+
Returns:
117+
A DataSourceProto object.
118+
"""
78119
data_source_proto = DataSourceProto(
79120
type=DataSourceProto.BATCH_REDSHIFT,
80121
field_mapping=self.field_mapping,
@@ -93,9 +134,9 @@ def validate(self, config: RepoConfig):
93134
self.get_table_column_names_and_types(config)
94135

95136
def get_table_query_string(self) -> str:
96-
"""Returns a string that can directly be used to reference this table in SQL"""
137+
"""Returns a string that can directly be used to reference this table in SQL."""
97138
if self.table:
98-
return f'"{self.table}"'
139+
return f'"{self.schema}"."{self.table}"'
99140
else:
100141
return f"({self.query})"
101142

@@ -106,6 +147,12 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
106147
def get_table_column_names_and_types(
107148
self, config: RepoConfig
108149
) -> Iterable[Tuple[str, str]]:
150+
"""
151+
Returns a mapping of column names to types for this Redshift source.
152+
153+
Args:
154+
config: A RepoConfig describing the feature repo
155+
"""
109156
from botocore.exceptions import ClientError
110157

111158
from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig
@@ -122,6 +169,7 @@ def get_table_column_names_and_types(
122169
Database=config.offline_store.database,
123170
DbUser=config.offline_store.user,
124171
Table=self.table,
172+
Schema=self.schema,
125173
)
126174
except ClientError as e:
127175
if e.response["Error"]["Code"] == "ValidationException":
@@ -150,55 +198,61 @@ def get_table_column_names_and_types(
150198

151199
class RedshiftOptions:
152200
"""
153-
DataSource Redshift options used to source features from Redshift query
201+
DataSource Redshift options used to source features from Redshift query.
154202
"""
155203

156-
def __init__(self, table: Optional[str], query: Optional[str]):
204+
def __init__(
205+
self, table: Optional[str], schema: Optional[str], query: Optional[str]
206+
):
157207
self._table = table
208+
self._schema = schema
158209
self._query = query
159210

160211
@property
161212
def query(self):
162-
"""
163-
Returns the Redshift SQL query referenced by this source
164-
"""
213+
"""Returns the Redshift SQL query referenced by this source."""
165214
return self._query
166215

167216
@query.setter
168217
def query(self, query):
169-
"""
170-
Sets the Redshift SQL query referenced by this source
171-
"""
218+
"""Sets the Redshift SQL query referenced by this source."""
172219
self._query = query
173220

174221
@property
175222
def table(self):
176-
"""
177-
Returns the table name of this Redshift table
178-
"""
223+
"""Returns the table name of this Redshift table."""
179224
return self._table
180225

181226
@table.setter
182227
def table(self, table_name):
183-
"""
184-
Sets the table ref of this Redshift table
185-
"""
228+
"""Sets the table ref of this Redshift table."""
186229
self._table = table_name
187230

231+
@property
232+
def schema(self):
233+
"""Returns the schema name of this Redshift table."""
234+
return self._schema
235+
236+
@schema.setter
237+
def schema(self, schema):
238+
"""Sets the schema of this Redshift table."""
239+
self._schema = schema
240+
188241
@classmethod
189242
def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):
190243
"""
191-
Creates a RedshiftOptions from a protobuf representation of a Redshift option
244+
Creates a RedshiftOptions from a protobuf representation of a Redshift option.
192245
193246
Args:
194247
redshift_options_proto: A protobuf representation of a DataSource
195248
196249
Returns:
197-
Returns a RedshiftOptions object based on the redshift_options protobuf
250+
A RedshiftOptions object based on the redshift_options protobuf.
198251
"""
199-
200252
redshift_options = cls(
201-
table=redshift_options_proto.table, query=redshift_options_proto.query,
253+
table=redshift_options_proto.table,
254+
schema=redshift_options_proto.schema,
255+
query=redshift_options_proto.query,
202256
)
203257

204258
return redshift_options
@@ -208,11 +262,10 @@ def to_proto(self) -> DataSourceProto.RedshiftOptions:
208262
Converts an RedshiftOptionsProto object to its protobuf representation.
209263
210264
Returns:
211-
RedshiftOptionsProto protobuf
265+
A RedshiftOptionsProto protobuf.
212266
"""
213-
214267
redshift_options_proto = DataSourceProto.RedshiftOptions(
215-
table=self.table, query=self.query,
268+
table=self.table, schema=self.schema, query=self.query,
216269
)
217270

218271
return redshift_options_proto

0 commit comments

Comments
 (0)