2323import feast .core .model .FeatureInfo ;
2424import feast .core .storage .BigQueryStorageManager ;
2525import feast .specs .StorageSpecProto .StorageSpec ;
26+ import feast .types .ValueProto .ValueType .Enum ;
2627import java .time .Instant ;
2728import java .time .ZoneId ;
2829import java .time .format .DateTimeFormatter ;
2930import java .time .temporal .ChronoUnit ;
31+ import java .util .ArrayList ;
3032import java .util .HashMap ;
3133import java .util .List ;
3234import java .util .Map ;
3335import java .util .NoSuchElementException ;
3436import java .util .Set ;
3537import java .util .stream .Collectors ;
36- import lombok .Getter ;
37-
3838
3939public class BigQueryDatasetTemplater {
4040
@@ -45,7 +45,9 @@ public class BigQueryDatasetTemplater {
4545 private final DateTimeFormatter formatter ;
4646
4747 public BigQueryDatasetTemplater (
48- Jinjava jinjava , String templateString , StorageSpec storageSpec ,
48+ Jinjava jinjava ,
49+ String templateString ,
50+ StorageSpec storageSpec ,
4951 FeatureInfoRepository featureInfoRepository ) {
5052 this .storageSpec = storageSpec ;
5153 this .featureInfoRepository = featureInfoRepository ;
@@ -65,35 +67,84 @@ protected StorageSpec getStorageSpec() {
6567 * @param startDate start date
6668 * @param endDate end date
6769 * @param limit limit
70+ * @param filters additional WHERE clause
6871 * @return SQL query for creating training table.
6972 */
70- String createQuery (FeatureSet featureSet , Timestamp startDate , Timestamp endDate , long limit ) {
73+ String createQuery (
74+ FeatureSet featureSet ,
75+ Timestamp startDate ,
76+ Timestamp endDate ,
77+ long limit ,
78+ Map <String , String > filters ) {
7179 List <String > featureIds = featureSet .getFeatureIdsList ();
72- List <FeatureInfo > featureInfos = featureInfoRepository .findAllById (featureIds );
73- String tableId = featureInfos .size () > 0 ? getBqTableId (featureInfos .get (0 )) : "" ;
74- Features features = new Features (featureInfos , tableId );
80+ List <FeatureInfo > featureInfos = getFeatureInfosOrThrow (featureIds );
81+
82+ // split filter based on ValueType of the feature
83+ Map <String , String > tmpFilter = new HashMap <>(filters );
84+ Map <String , String > numberFilters = new HashMap <>();
85+ Map <String , String > stringFilters = new HashMap <>();
86+ if (filters .containsKey ("job_id" )) {
87+ stringFilters .put ("job_id" , tmpFilter .get ("job_id" ));
88+ tmpFilter .remove ("job_id" );
89+ }
90+
91+ List <FeatureInfo > featureFilterInfos = getFeatureInfosOrThrow (new ArrayList <>(tmpFilter .keySet ()));
92+ Map <String , FeatureInfo > featureInfoMap = new HashMap <>();
93+ for (FeatureInfo featureInfo : featureFilterInfos ) {
94+ featureInfoMap .put (featureInfo .getId (), featureInfo );
95+ }
96+
97+
98+ for (Map .Entry <String , String > filter : tmpFilter .entrySet ()) {
99+ FeatureInfo featureInfo = featureInfoMap .get (filter .getKey ());
100+ if (isMappableToString (featureInfo .getValueType ())) {
101+ stringFilters .put (featureInfo .getName (), filter .getValue ());
102+ } else {
103+ numberFilters .put (featureInfo .getName (), filter .getValue ());
104+ }
105+ }
75106
107+ List <String > featureNames = getFeatureNames (featureInfos );
108+ String tableId = getBqTableId (featureInfos .get (0 ));
109+ String startDateStr = formatDateString (startDate );
110+ String endDateStr = formatDateString (endDate );
111+ String limitStr = (limit != 0 ) ? String .valueOf (limit ) : null ;
112+ return renderTemplate (tableId , featureNames , startDateStr , endDateStr , limitStr ,
113+ numberFilters , stringFilters );
114+ }
115+
116+ private boolean isMappableToString (Enum valueType ) {
117+ return valueType .equals (Enum .STRING );
118+ }
119+
120+ private List <String > getFeatureNames (List <FeatureInfo > featureInfos ) {
121+ return featureInfos .stream ().map (FeatureInfo ::getName ).collect (Collectors .toList ());
122+ }
123+
124+ private List <FeatureInfo > getFeatureInfosOrThrow (List <String > featureIds ) {
125+ List <FeatureInfo > featureInfos = featureInfoRepository .findAllById (featureIds );
76126 if (featureInfos .size () < featureIds .size ()) {
77127 Set <String > foundFeatureIds =
78128 featureInfos .stream ().map (FeatureInfo ::getId ).collect (Collectors .toSet ());
79129 featureIds .removeAll (foundFeatureIds );
80130 throw new NoSuchElementException ("features not found: " + featureIds );
81131 }
82-
83- String startDateStr = formatDateString (startDate );
84- String endDateStr = formatDateString (endDate );
85- String limitStr = (limit != 0 ) ? String .valueOf (limit ) : null ;
86- return renderTemplate (features , startDateStr , endDateStr , limitStr );
132+ return featureInfos ;
87133 }
88134
89135 private String renderTemplate (
90- Features features , String startDateStr , String endDateStr , String limitStr ) {
136+ String tableId , List <String > features , String startDateStr , String endDateStr , String limitStr ,
137+ Map <String , String > numberFilters ,
138+ Map <String , String > stringFilters ) {
91139 Map <String , Object > context = new HashMap <>();
92140
93- context .put ("feature_set" , features );
141+ context .put ("table_id" , tableId );
142+ context .put ("features" , features );
94143 context .put ("start_date" , startDateStr );
95144 context .put ("end_date" , endDateStr );
96145 context .put ("limit" , limitStr );
146+ context .put ("number_filters" , numberFilters );
147+ context .put ("string_filters" , stringFilters );
97148 return jinjava .render (template , context );
98149 }
99150
@@ -117,16 +168,4 @@ private String formatDateString(Timestamp timestamp) {
117168 Instant instant = Instant .ofEpochSecond (timestamp .getSeconds ()).truncatedTo (ChronoUnit .DAYS );
118169 return formatter .format (instant );
119170 }
120-
121- @ Getter
122- static final class Features {
123-
124- final List <String > columns ;
125- final String tableId ;
126-
127- Features (List <FeatureInfo > featureInfos , String tableId ) {
128- columns = featureInfos .stream ().map (FeatureInfo ::getName ).collect (Collectors .toList ());
129- this .tableId = tableId ;
130- }
131- }
132171}
0 commit comments