@@ -13,25 +13,56 @@ def __init__(
1313 self ,
1414 event_timestamp_column : Optional [str ] = "" ,
1515 table : Optional [str ] = None ,
16+ schema : Optional [str ] = None ,
1617 created_timestamp_column : Optional [str ] = "" ,
1718 field_mapping : Optional [Dict [str , str ]] = None ,
1819 date_partition_column : Optional [str ] = "" ,
1920 query : Optional [str ] = None ,
2021 ):
22+ """
23+ Creates a RedshiftSource object.
24+
25+ Args:
26+ event_timestamp_column (optional): Event timestamp column used for point in
27+ time joins of feature values.
28+ table (optional): Redshift table where the features are stored.
29+ schema (optional): Redshift schema in which the table is located.
30+ created_timestamp_column (optional): Timestamp column indicating when the
31+ row was created, used for deduplicating rows.
32+ field_mapping (optional): A dictionary mapping of column names in this data
33+ source to column names in a feature table or view.
34+ date_partition_column (optional): Timestamp column used for partitioning.
35+ query (optional): The query to be executed to obtain the features.
36+ """
2137 super ().__init__ (
2238 event_timestamp_column ,
2339 created_timestamp_column ,
2440 field_mapping ,
2541 date_partition_column ,
2642 )
2743
28- self ._redshift_options = RedshiftOptions (table = table , query = query )
44+ # The default Redshift schema is named "public".
45+ _schema = "public" if table and not schema else schema
46+
47+ self ._redshift_options = RedshiftOptions (
48+ table = table , schema = _schema , query = query
49+ )
2950
3051 @staticmethod
3152 def from_proto (data_source : DataSourceProto ):
53+ """
54+ Creates a RedshiftSource from a protobuf representation of a RedshiftSource.
55+
56+ Args:
57+ data_source: A protobuf representation of a RedshiftSource
58+
59+ Returns:
60+ A RedshiftSource object based on the data_source protobuf.
61+ """
3262 return RedshiftSource (
3363 field_mapping = dict (data_source .field_mapping ),
3464 table = data_source .redshift_options .table ,
65+ schema = data_source .redshift_options .schema ,
3566 event_timestamp_column = data_source .event_timestamp_column ,
3667 created_timestamp_column = data_source .created_timestamp_column ,
3768 date_partition_column = data_source .date_partition_column ,
@@ -46,6 +77,7 @@ def __eq__(self, other):
4677
4778 return (
4879 self .redshift_options .table == other .redshift_options .table
80+ and self .redshift_options .schema == other .redshift_options .schema
4981 and self .redshift_options .query == other .redshift_options .query
5082 and self .event_timestamp_column == other .event_timestamp_column
5183 and self .created_timestamp_column == other .created_timestamp_column
@@ -54,27 +86,36 @@ def __eq__(self, other):
5486
5587 @property
5688 def table (self ):
89+ """Returns the table of this Redshift source."""
5790 return self ._redshift_options .table
5891
92+ @property
93+ def schema (self ):
94+ """Returns the schema of this Redshift source."""
95+ return self ._redshift_options .schema
96+
5997 @property
6098 def query (self ):
99+ """Returns the Redshift options of this Redshift source."""
61100 return self ._redshift_options .query
62101
63102 @property
64103 def redshift_options (self ):
65- """
66- Returns the Redshift options of this data source
67- """
104+ """Returns the Redshift options of this Redshift source."""
68105 return self ._redshift_options
69106
70107 @redshift_options .setter
71108 def redshift_options (self , _redshift_options ):
72- """
73- Sets the Redshift options of this data source
74- """
109+ """Sets the Redshift options of this Redshift source."""
75110 self ._redshift_options = _redshift_options
76111
77112 def to_proto (self ) -> DataSourceProto :
113+ """
114+ Converts a RedshiftSource object to its protobuf representation.
115+
116+ Returns:
117+ A DataSourceProto object.
118+ """
78119 data_source_proto = DataSourceProto (
79120 type = DataSourceProto .BATCH_REDSHIFT ,
80121 field_mapping = self .field_mapping ,
@@ -93,9 +134,9 @@ def validate(self, config: RepoConfig):
93134 self .get_table_column_names_and_types (config )
94135
95136 def get_table_query_string (self ) -> str :
96- """Returns a string that can directly be used to reference this table in SQL"""
137+ """Returns a string that can directly be used to reference this table in SQL. """
97138 if self .table :
98- return f'"{ self .table } "'
139+ return f'"{ self .schema } "." { self . table } "'
99140 else :
100141 return f"({ self .query } )"
101142
@@ -106,6 +147,12 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
106147 def get_table_column_names_and_types (
107148 self , config : RepoConfig
108149 ) -> Iterable [Tuple [str , str ]]:
150+ """
151+ Returns a mapping of column names to types for this Redshift source.
152+
153+ Args:
154+ config: A RepoConfig describing the feature repo
155+ """
109156 from botocore .exceptions import ClientError
110157
111158 from feast .infra .offline_stores .redshift import RedshiftOfflineStoreConfig
@@ -122,6 +169,7 @@ def get_table_column_names_and_types(
122169 Database = config .offline_store .database ,
123170 DbUser = config .offline_store .user ,
124171 Table = self .table ,
172+ Schema = self .schema ,
125173 )
126174 except ClientError as e :
127175 if e .response ["Error" ]["Code" ] == "ValidationException" :
@@ -150,55 +198,61 @@ def get_table_column_names_and_types(
150198
151199class RedshiftOptions :
152200 """
153- DataSource Redshift options used to source features from Redshift query
201+ DataSource Redshift options used to source features from Redshift query.
154202 """
155203
156- def __init__ (self , table : Optional [str ], query : Optional [str ]):
204+ def __init__ (
205+ self , table : Optional [str ], schema : Optional [str ], query : Optional [str ]
206+ ):
157207 self ._table = table
208+ self ._schema = schema
158209 self ._query = query
159210
160211 @property
161212 def query (self ):
162- """
163- Returns the Redshift SQL query referenced by this source
164- """
213+ """Returns the Redshift SQL query referenced by this source."""
165214 return self ._query
166215
167216 @query .setter
168217 def query (self , query ):
169- """
170- Sets the Redshift SQL query referenced by this source
171- """
218+ """Sets the Redshift SQL query referenced by this source."""
172219 self ._query = query
173220
174221 @property
175222 def table (self ):
176- """
177- Returns the table name of this Redshift table
178- """
223+ """Returns the table name of this Redshift table."""
179224 return self ._table
180225
181226 @table .setter
182227 def table (self , table_name ):
183- """
184- Sets the table ref of this Redshift table
185- """
228+ """Sets the table ref of this Redshift table."""
186229 self ._table = table_name
187230
231+ @property
232+ def schema (self ):
233+ """Returns the schema name of this Redshift table."""
234+ return self ._schema
235+
236+ @schema .setter
237+ def schema (self , schema ):
238+ """Sets the schema of this Redshift table."""
239+ self ._schema = schema
240+
188241 @classmethod
189242 def from_proto (cls , redshift_options_proto : DataSourceProto .RedshiftOptions ):
190243 """
191- Creates a RedshiftOptions from a protobuf representation of a Redshift option
244+ Creates a RedshiftOptions from a protobuf representation of a Redshift option.
192245
193246 Args:
194247 redshift_options_proto: A protobuf representation of a DataSource
195248
196249 Returns:
197- Returns a RedshiftOptions object based on the redshift_options protobuf
250+ A RedshiftOptions object based on the redshift_options protobuf.
198251 """
199-
200252 redshift_options = cls (
201- table = redshift_options_proto .table , query = redshift_options_proto .query ,
253+ table = redshift_options_proto .table ,
254+ schema = redshift_options_proto .schema ,
255+ query = redshift_options_proto .query ,
202256 )
203257
204258 return redshift_options
@@ -208,11 +262,10 @@ def to_proto(self) -> DataSourceProto.RedshiftOptions:
208262 Converts an RedshiftOptionsProto object to its protobuf representation.
209263
210264 Returns:
211- RedshiftOptionsProto protobuf
265+ A RedshiftOptionsProto protobuf.
212266 """
213-
214267 redshift_options_proto = DataSourceProto .RedshiftOptions (
215- table = self .table , query = self .query ,
268+ table = self .table , schema = self . schema , query = self .query ,
216269 )
217270
218271 return redshift_options_proto
0 commit comments