Skip to content

Commit f9ab779

Browse files
pradithya ariafeast-ci-bot
authored andcommitted
Implement filter for create dataset api (feast-dev#215)
* Add conditional filter to dataset creation endpoint * Replace table naming to use uuid suffix * Enable to filter based on job_id * Add filters to create_dataset api
1 parent 8a6b2fc commit f9ab779

18 files changed

Lines changed: 504 additions & 137 deletions

core/src/main/java/feast/core/config/TrainingConfig.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package feast.core.config;
22

3-
import com.google.cloud.bigquery.BigQuery;
4-
import com.google.cloud.bigquery.BigQueryOptions;
53
import com.google.common.base.Charsets;
64
import com.google.common.io.CharStreams;
75
import com.hubspot.jinjava.Jinjava;
86
import feast.core.config.StorageConfig.StorageSpecs;
97
import feast.core.dao.FeatureInfoRepository;
108
import feast.core.training.BigQueryDatasetTemplater;
119
import feast.core.training.BigQueryTraningDatasetCreator;
10+
import feast.core.util.RandomUuidProvider;
1211
import java.io.IOException;
1312
import java.io.InputStream;
1413
import java.io.InputStreamReader;
@@ -18,9 +17,7 @@
1817
import org.springframework.core.io.ClassPathResource;
1918
import org.springframework.core.io.Resource;
2019

21-
/**
22-
* Configuration related to training API
23-
*/
20+
/** Configuration related to training API */
2421
@Configuration
2522
public class TrainingConfig {
2623

@@ -37,10 +34,9 @@ public BigQueryDatasetTemplater getBigQueryTrainingDatasetTemplater(
3734
@Bean
3835
public BigQueryTraningDatasetCreator getBigQueryTrainingDatasetCreator(
3936
BigQueryDatasetTemplater templater,
40-
StorageSpecs storageSpecs,
4137
@Value("${feast.core.projectId}") String projectId,
4238
@Value("${feast.core.datasetPrefix}") String datasetPrefix) {
43-
BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();
44-
return new BigQueryTraningDatasetCreator(templater, projectId, datasetPrefix);
39+
return new BigQueryTraningDatasetCreator(
40+
templater, projectId, datasetPrefix, new RandomUuidProvider());
4541
}
4642
}

core/src/main/java/feast/core/grpc/DatasetServiceImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ public void createDataset(
6666
request.getStartDate(),
6767
request.getEndDate(),
6868
request.getLimit(),
69-
request.getNamePrefix());
69+
request.getNamePrefix(),
70+
request.getFiltersMap());
7071
CreateDatasetResponse response =
7172
CreateDatasetResponse.newBuilder().setDatasetInfo(datasetInfo).build();
7273

core/src/main/java/feast/core/training/BigQueryDatasetTemplater.java

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,18 @@
2323
import feast.core.model.FeatureInfo;
2424
import feast.core.storage.BigQueryStorageManager;
2525
import feast.specs.StorageSpecProto.StorageSpec;
26+
import feast.types.ValueProto.ValueType.Enum;
2627
import java.time.Instant;
2728
import java.time.ZoneId;
2829
import java.time.format.DateTimeFormatter;
2930
import java.time.temporal.ChronoUnit;
31+
import java.util.ArrayList;
3032
import java.util.HashMap;
3133
import java.util.List;
3234
import java.util.Map;
3335
import java.util.NoSuchElementException;
3436
import java.util.Set;
3537
import java.util.stream.Collectors;
36-
import lombok.Getter;
37-
3838

3939
public class BigQueryDatasetTemplater {
4040

@@ -45,7 +45,9 @@ public class BigQueryDatasetTemplater {
4545
private final DateTimeFormatter formatter;
4646

4747
public BigQueryDatasetTemplater(
48-
Jinjava jinjava, String templateString, StorageSpec storageSpec,
48+
Jinjava jinjava,
49+
String templateString,
50+
StorageSpec storageSpec,
4951
FeatureInfoRepository featureInfoRepository) {
5052
this.storageSpec = storageSpec;
5153
this.featureInfoRepository = featureInfoRepository;
@@ -65,35 +67,84 @@ protected StorageSpec getStorageSpec() {
6567
* @param startDate start date
6668
* @param endDate end date
6769
* @param limit limit
70+
* @param filters additional WHERE clause
6871
* @return SQL query for creating training table.
6972
*/
70-
String createQuery(FeatureSet featureSet, Timestamp startDate, Timestamp endDate, long limit) {
73+
String createQuery(
74+
FeatureSet featureSet,
75+
Timestamp startDate,
76+
Timestamp endDate,
77+
long limit,
78+
Map<String, String> filters) {
7179
List<String> featureIds = featureSet.getFeatureIdsList();
72-
List<FeatureInfo> featureInfos = featureInfoRepository.findAllById(featureIds);
73-
String tableId = featureInfos.size() > 0 ? getBqTableId(featureInfos.get(0)) : "";
74-
Features features = new Features(featureInfos, tableId);
80+
List<FeatureInfo> featureInfos = getFeatureInfosOrThrow(featureIds);
81+
82+
// split filter based on ValueType of the feature
83+
Map<String, String> tmpFilter = new HashMap<>(filters);
84+
Map<String, String> numberFilters = new HashMap<>();
85+
Map<String, String> stringFilters = new HashMap<>();
86+
if (filters.containsKey("job_id")) {
87+
stringFilters.put("job_id", tmpFilter.get("job_id"));
88+
tmpFilter.remove("job_id");
89+
}
90+
91+
List<FeatureInfo> featureFilterInfos = getFeatureInfosOrThrow(new ArrayList<>(tmpFilter.keySet()));
92+
Map<String, FeatureInfo> featureInfoMap = new HashMap<>();
93+
for (FeatureInfo featureInfo: featureFilterInfos) {
94+
featureInfoMap.put(featureInfo.getId(), featureInfo);
95+
}
96+
97+
98+
for (Map.Entry<String, String> filter : tmpFilter.entrySet()) {
99+
FeatureInfo featureInfo = featureInfoMap.get(filter.getKey());
100+
if (isMappableToString(featureInfo.getValueType())) {
101+
stringFilters.put(featureInfo.getName(), filter.getValue());
102+
} else {
103+
numberFilters.put(featureInfo.getName(), filter.getValue());
104+
}
105+
}
75106

107+
List<String> featureNames = getFeatureNames(featureInfos);
108+
String tableId = getBqTableId(featureInfos.get(0));
109+
String startDateStr = formatDateString(startDate);
110+
String endDateStr = formatDateString(endDate);
111+
String limitStr = (limit != 0) ? String.valueOf(limit) : null;
112+
return renderTemplate(tableId, featureNames, startDateStr, endDateStr, limitStr,
113+
numberFilters, stringFilters);
114+
}
115+
116+
private boolean isMappableToString(Enum valueType) {
117+
return valueType.equals(Enum.STRING);
118+
}
119+
120+
private List<String> getFeatureNames(List<FeatureInfo> featureInfos) {
121+
return featureInfos.stream().map(FeatureInfo::getName).collect(Collectors.toList());
122+
}
123+
124+
private List<FeatureInfo> getFeatureInfosOrThrow(List<String> featureIds) {
125+
List<FeatureInfo> featureInfos = featureInfoRepository.findAllById(featureIds);
76126
if (featureInfos.size() < featureIds.size()) {
77127
Set<String> foundFeatureIds =
78128
featureInfos.stream().map(FeatureInfo::getId).collect(Collectors.toSet());
79129
featureIds.removeAll(foundFeatureIds);
80130
throw new NoSuchElementException("features not found: " + featureIds);
81131
}
82-
83-
String startDateStr = formatDateString(startDate);
84-
String endDateStr = formatDateString(endDate);
85-
String limitStr = (limit != 0) ? String.valueOf(limit) : null;
86-
return renderTemplate(features, startDateStr, endDateStr, limitStr);
132+
return featureInfos;
87133
}
88134

89135
private String renderTemplate(
90-
Features features, String startDateStr, String endDateStr, String limitStr) {
136+
String tableId, List<String> features, String startDateStr, String endDateStr, String limitStr,
137+
Map<String, String> numberFilters,
138+
Map<String, String> stringFilters) {
91139
Map<String, Object> context = new HashMap<>();
92140

93-
context.put("feature_set", features);
141+
context.put("table_id", tableId);
142+
context.put("features", features);
94143
context.put("start_date", startDateStr);
95144
context.put("end_date", endDateStr);
96145
context.put("limit", limitStr);
146+
context.put("number_filters", numberFilters);
147+
context.put("string_filters", stringFilters);
97148
return jinjava.render(template, context);
98149
}
99150

@@ -117,16 +168,4 @@ private String formatDateString(Timestamp timestamp) {
117168
Instant instant = Instant.ofEpochSecond(timestamp.getSeconds()).truncatedTo(ChronoUnit.DAYS);
118169
return formatter.format(instant);
119170
}
120-
121-
@Getter
122-
static final class Features {
123-
124-
final List<String> columns;
125-
final String tableId;
126-
127-
Features(List<FeatureInfo> featureInfos, String tableId) {
128-
columns = featureInfos.stream().map(FeatureInfo::getName).collect(Collectors.toList());
129-
this.tableId = tableId;
130-
}
131-
}
132171
}

core/src/main/java/feast/core/training/BigQueryTraningDatasetCreator.java

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,10 @@
3030
import feast.core.DatasetServiceProto.DatasetInfo;
3131
import feast.core.DatasetServiceProto.FeatureSet;
3232
import feast.core.exception.TrainingDatasetCreationException;
33-
import java.math.BigInteger;
34-
import java.security.MessageDigest;
35-
import java.security.NoSuchAlgorithmException;
33+
import feast.core.util.UuidProvider;
3634
import java.time.Instant;
3735
import java.time.ZoneId;
3836
import java.time.format.DateTimeFormatter;
39-
import java.util.ArrayList;
40-
import java.util.Collections;
41-
import java.util.List;
4237
import java.util.Map;
4338
import lombok.extern.slf4j.Slf4j;
4439

@@ -49,26 +44,34 @@ public class BigQueryTraningDatasetCreator {
4944
private final DateTimeFormatter formatter;
5045
private final String projectId;
5146
private final String datasetPrefix;
47+
private final UuidProvider uuidProvider;
5248
private transient BigQuery bigQuery;
5349

5450
public BigQueryTraningDatasetCreator(
5551
BigQueryDatasetTemplater templater,
5652
String projectId,
57-
String datasetPrefix) {
58-
this(templater, projectId, datasetPrefix,
53+
String datasetPrefix,
54+
UuidProvider uuidProvider) {
55+
this(
56+
templater,
57+
projectId,
58+
datasetPrefix,
59+
uuidProvider,
5960
BigQueryOptions.newBuilder().setProjectId(projectId).build().getService());
6061
}
6162

6263
public BigQueryTraningDatasetCreator(
6364
BigQueryDatasetTemplater templater,
6465
String projectId,
6566
String datasetPrefix,
67+
UuidProvider uuidProvider,
6668
BigQuery bigQuery) {
6769
this.templater = templater;
6870
this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("UTC"));
6971
this.projectId = projectId;
7072
this.datasetPrefix = datasetPrefix;
7173
this.bigQuery = bigQuery;
74+
this.uuidProvider = uuidProvider;
7275
}
7376

7477
/**
@@ -80,18 +83,19 @@ public BigQueryTraningDatasetCreator(
8083
* @param endDate end date of the training dataset (inclusive)
8184
* @param limit maximum number of row should be created.
8285
* @param namePrefix prefix for dataset name
86+
* @param filters additional where clause
8387
* @return dataset info associated with the created training dataset
8488
*/
8589
public DatasetInfo createDataset(
8690
FeatureSet featureSet,
8791
Timestamp startDate,
8892
Timestamp endDate,
8993
long limit,
90-
String namePrefix) {
94+
String namePrefix,
95+
Map<String, String> filters) {
9196
try {
92-
String query = templater.createQuery(featureSet, startDate, endDate, limit);
93-
String tableName =
94-
createBqTableName(datasetPrefix, featureSet, startDate, endDate, namePrefix);
97+
String query = templater.createQuery(featureSet, startDate, endDate, limit, filters);
98+
String tableName = createBqTableName(datasetPrefix, featureSet, namePrefix);
9599
String tableDescription = createBqTableDescription(featureSet, startDate, endDate, query);
96100

97101
Map<String, String> options = templater.getStorageSpec().getOptionsMap();
@@ -124,47 +128,22 @@ public DatasetInfo createDataset(
124128
throw new TrainingDatasetCreationException("Failed creating training dataset", e);
125129
} catch (InterruptedException e) {
126130
log.error("Training dataset creation was interrupted", e);
127-
throw new TrainingDatasetCreationException("Training dataset creation was interrupted",
128-
e);
131+
throw new TrainingDatasetCreationException("Training dataset creation was interrupted", e);
129132
}
130133
}
131134

132-
private String createBqTableName(
133-
String datasetPrefix,
134-
FeatureSet featureSet,
135-
Timestamp startDate,
136-
Timestamp endDate,
137-
String namePrefix) {
138-
139-
List<String> features = new ArrayList(featureSet.getFeatureIdsList());
140-
Collections.sort(features);
135+
private String createBqTableName(String datasetPrefix, FeatureSet featureSet, String namePrefix) {
141136

142-
String datasetId = String.format("%s_%s_%s", features, startDate, endDate);
143-
StringBuilder hashText;
144-
145-
// create hash from datasetId
146-
try {
147-
MessageDigest md = MessageDigest.getInstance("SHA-1");
148-
byte[] messageDigest = md.digest(datasetId.getBytes());
149-
BigInteger no = new BigInteger(1, messageDigest);
150-
hashText = new StringBuilder(no.toString(16));
151-
while (hashText.length() < 32) {
152-
hashText.insert(0, "0");
153-
}
154-
} catch (NoSuchAlgorithmException e) {
155-
throw new RuntimeException(e);
156-
}
137+
String suffix = uuidProvider.getUuid();
157138

158139
if (!Strings.isNullOrEmpty(namePrefix)) {
159140
// only alphanumeric and underscore are allowed
160141
namePrefix = namePrefix.replaceAll("[^a-zA-Z0-9_]", "_");
161142
return String.format(
162-
"%s_%s_%s_%s", datasetPrefix, featureSet.getEntityName(), namePrefix,
163-
hashText.toString());
143+
"%s_%s_%s_%s", datasetPrefix, featureSet.getEntityName(), namePrefix, suffix);
164144
}
165145

166-
return String.format(
167-
"%s_%s_%s", datasetPrefix, featureSet.getEntityName(), hashText.toString());
146+
return String.format("%s_%s_%s", datasetPrefix, featureSet.getEntityName(), suffix);
168147
}
169148

170149
private String createBqTableDescription(
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package feast.core.util;
2+
3+
import java.util.UUID;
4+
5+
public class RandomUuidProvider implements UuidProvider {
6+
@Override
7+
public String getUuid() {
8+
return UUID.randomUUID().toString().replace("-","");
9+
}
10+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package feast.core.util;
2+
3+
public interface UuidProvider {
4+
String getUuid();
5+
}
Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
SELECT
22
id,
3-
event_timestamp{%- if feature_set.columns | length > 0 %},{%- endif %}
4-
{{ feature_set.columns | join(',') }}
3+
event_timestamp{%- if features | length > 0 %},{%- endif %}
4+
{{ features | join(',') }}
55
FROM
6-
`{{ feature_set.tableId }}`
6+
`{{ table_id }}`
77
WHERE event_timestamp >= TIMESTAMP("{{ start_date }}") AND event_timestamp <= TIMESTAMP(DATETIME_ADD("{{ end_date }}", INTERVAL 1 DAY))
8+
{%- for key, val in number_filters.items() %} AND {{ key }} = {{ val }} {%- endfor %}
9+
{%- for key, val in string_filters.items() %} AND {{ key }} = "{{ val }}" {%- endfor %}
810
{% if limit is not none -%}
911
LIMIT {{ limit }}
1012
{%- endif %}

0 commit comments

Comments
 (0)