Skip to content

Commit 06cb1ec

Browse files
zhilingcfeast-ci-bot
authored andcommitted
Python SDK (feast-dev#47)
* Initial commit * Add feast resource definitions * Add mapping from dtype to valueType * Add utilities for google cloud * Fix printer to print yaml in order * Add importer * Add client * Update workflow notebook * Add tests * Clean up sdk code * Add push to pypi resources * Add ValueType and Granularity so that we don't expose internal proto to client * Scrub sensitve data * Add feature_set stub * Clean add env, clean up docstrings * Add create_feature_set implementation * Move feature set to be a resource * Add serving retrieval functions * Add FeatureSet and DatasetInfo implementation * Light refactor, fix tests * Add checking for different entity in a feature set * Add create_training_dataset api to feast client * Deduplicate requesting storage specs * Use TableReference when defining destination table * Fix downloading csv from GCS * Add tests and refactoring * Refactor and clean move classes locations * Add license * Fix typo * Move download functionality from DatasetInfo into Client * Add TrainingService.CreateTrainingDataset API to Core for creating training dataset * Fix failing test * Update generated python file * Create dataset if not exists. * Use LEFT join * Integrate with core's training dataset creation and remove the same logic in sdk * Refactoring * Fix for setting warehouse and serving storages * Add support for no results returned * Remove examples first * Made clients lazy initalised * Add quickstart * Rename method, remove email * Add retrieve serving data steps * Change gs to gcs * Rename TrainingService into DatasetService * Update importer api to support default datastores * Update quickstart to reflect new api * Change datasetPrefix env var key * Update serving default port
1 parent d191c3d commit 06cb1ec

File tree

75 files changed

+20970
-14
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+20970
-14
lines changed

charts/feast/templates/core-deploy.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ spec:
7575
secretKeyRef:
7676
name: {{ template "postgresql.fullname" . }}
7777
key: postgres-password
78+
- name: PROJECT_ID
79+
value: "{{ .Values.core.projectId}}"
80+
- name: TRAINING_DATASET_PREFIX
81+
value: "{{ .Values.core.trainingDatasetPrefix }}"
7882
- name: CORE_API_URI
7983
value: {{ .Values.core.service.extIPAdr }}:{{ .Values.core.service.grpc.targetPort }}
8084
- name: JOB_RUNNER

charts/feast/values.yaml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
---
2-
core:
2+
core:
3+
projectId: "gcp-project-id"
34
image:
45
pullPolicy: IfNotPresent
56
registry: feast
@@ -30,7 +31,8 @@ core:
3031
errorStoreOptions: "{}"
3132
monitoring:
3233
period: 5000
33-
initialDelay: 60000
34+
initialDelay: 60000
35+
trainingDatasetPrefix: "fs"
3436

3537
postgresql:
3638
name: feast-metadata

core/pom.xml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,6 @@
7979
<artifactId>spring-boot-starter-data-jpa</artifactId>
8080
<version>${springBootVersion}</version>
8181
</dependency>
82-
<!--compile "org.springframework.data:spring-data-jpa:${springBootVersion}"-->
83-
<dependency>
84-
<groupId>org.springframework.data</groupId>
85-
<artifactId>spring-data-jpa</artifactId>
86-
<version>${springBootVersion}</version>
87-
</dependency>
8882
<!--compile "org.springframework.boot:spring-boot-starter-actuator:${springBootVersion}"-->
8983
<dependency>
9084
<groupId>org.springframework.boot</groupId>
@@ -196,6 +190,11 @@
196190
<artifactId>compiler</artifactId>
197191
<version>0.9.5</version>
198192
</dependency>
193+
<dependency>
194+
<groupId>com.hubspot.jinjava</groupId>
195+
<artifactId>jinjava</artifactId>
196+
<version>2.4.12</version>
197+
</dependency>
199198
<!--compile 'io.micrometer:micrometer-core:1.0.7'-->
200199
<dependency>
201200
<groupId>io.micrometer</groupId>
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package feast.core.config;
2+
3+
import com.google.cloud.bigquery.BigQuery;
4+
import com.google.cloud.bigquery.BigQueryOptions;
5+
import com.google.common.base.Charsets;
6+
import com.google.common.io.CharStreams;
7+
import com.hubspot.jinjava.Jinjava;
8+
import feast.core.dao.FeatureInfoRepository;
9+
import feast.core.training.BigQueryDatasetCreator;
10+
import feast.core.training.BigQueryDatasetTemplater;
11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.io.InputStreamReader;
14+
import java.time.Clock;
15+
import org.springframework.beans.factory.annotation.Value;
16+
import org.springframework.context.annotation.Bean;
17+
import org.springframework.context.annotation.Configuration;
18+
import org.springframework.core.io.ClassPathResource;
19+
import org.springframework.core.io.Resource;
20+
21+
/** Configuration related to training API */
22+
@Configuration
23+
public class TrainingConfig {
24+
25+
@Bean
26+
public BigQueryDatasetTemplater getBigQueryTrainingDatasetTemplater(
27+
FeatureInfoRepository featureInfoRepository) throws IOException {
28+
Resource resource = new ClassPathResource("templates/bq_training.tmpl");
29+
InputStream resourceInputStream = resource.getInputStream();
30+
String tmpl = CharStreams.toString(new InputStreamReader(resourceInputStream, Charsets.UTF_8));
31+
return new BigQueryDatasetTemplater(new Jinjava(), tmpl, featureInfoRepository);
32+
}
33+
34+
@Bean
35+
public BigQueryDatasetCreator getBigQueryTrainingDatasetCreator(
36+
BigQueryDatasetTemplater templater,
37+
@Value("${feast.core.projectId}") String projectId,
38+
@Value("${feast.core.datasetPrefix}") String datasetPrefix) {
39+
BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();
40+
Clock clock = Clock.systemUTC();
41+
return new BigQueryDatasetCreator(templater, bigquery, clock, projectId, datasetPrefix);
42+
}
43+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package feast.core.exception;
2+
3+
/**
4+
* Exception that happens when creation of training dataset failed.
5+
*/
6+
public class TrainingDatasetCreationException extends RuntimeException {
7+
public TrainingDatasetCreationException() {
8+
super();
9+
}
10+
11+
public TrainingDatasetCreationException(String message) {
12+
super(message);
13+
}
14+
15+
public TrainingDatasetCreationException(String message, Throwable cause) {
16+
super(message, cause);
17+
}
18+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2018 The Feast Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package feast.core.grpc;
18+
19+
import com.google.common.base.Strings;
20+
import com.google.protobuf.Timestamp;
21+
import feast.core.DatasetServiceGrpc.DatasetServiceImplBase;
22+
import feast.core.DatasetServiceProto.DatasetInfo;
23+
import feast.core.DatasetServiceProto.FeatureSet;
24+
import feast.core.DatasetServiceProto.DatasetServiceTypes.CreateDatasetRequest;
25+
import feast.core.DatasetServiceProto.DatasetServiceTypes.CreateDatasetResponse;
26+
import feast.core.training.BigQueryDatasetCreator;
27+
import io.grpc.Status;
28+
import io.grpc.Status.Code;
29+
import io.grpc.stub.StreamObserver;
30+
import java.time.Instant;
31+
import java.time.temporal.ChronoUnit;
32+
import lombok.extern.slf4j.Slf4j;
33+
import org.lognet.springboot.grpc.GRpcService;
34+
import org.springframework.beans.factory.annotation.Autowired;
35+
36+
@Slf4j
37+
@GRpcService
38+
public class DatasetServiceImpl extends DatasetServiceImplBase {
39+
40+
private final BigQueryDatasetCreator datasetCreator;
41+
42+
@Autowired
43+
public DatasetServiceImpl(BigQueryDatasetCreator DatasetCreator) {
44+
this.datasetCreator = DatasetCreator;
45+
}
46+
47+
@Override
48+
public void createDataset(
49+
CreateDatasetRequest request,
50+
StreamObserver<CreateDatasetResponse> responseObserver) {
51+
try {
52+
checkRequest(request);
53+
} catch (IllegalArgumentException e) {
54+
responseObserver.onError(
55+
Status.fromCode(Code.INVALID_ARGUMENT)
56+
.withCause(e)
57+
.withDescription(e.getMessage())
58+
.asException());
59+
return;
60+
}
61+
62+
try {
63+
DatasetInfo datasetInfo =
64+
datasetCreator.createDataset(
65+
request.getFeatureSet(),
66+
request.getStartDate(),
67+
request.getEndDate(),
68+
request.getLimit(),
69+
request.getNamePrefix());
70+
CreateDatasetResponse response =
71+
CreateDatasetResponse.newBuilder().setDatasetInfo(datasetInfo).build();
72+
73+
responseObserver.onNext(response);
74+
responseObserver.onCompleted();
75+
} catch (Exception e) {
76+
log.error("Training dataset creation failed", e);
77+
responseObserver.onError(
78+
Status.fromCode(Code.INTERNAL)
79+
.withCause(e)
80+
.withDescription("Training dataset creation failed: " + e.getMessage())
81+
.asException());
82+
}
83+
}
84+
85+
private void checkRequest(CreateDatasetRequest request) {
86+
FeatureSet featureSet = request.getFeatureSet();
87+
Timestamp startDate = request.getStartDate();
88+
Timestamp endDate = request.getEndDate();
89+
90+
checkHasSameEntity(featureSet);
91+
checkStartIsBeforeEnd(startDate, endDate);
92+
}
93+
94+
private void checkStartIsBeforeEnd(Timestamp startDate, Timestamp endDate) {
95+
Instant start = Instant.ofEpochSecond(startDate.getSeconds()).truncatedTo(ChronoUnit.DAYS);
96+
Instant end = Instant.ofEpochSecond(endDate.getSeconds()).truncatedTo(ChronoUnit.DAYS);
97+
98+
if (start.compareTo(end) > 0) {
99+
throw new IllegalArgumentException("startDate is after endDate");
100+
}
101+
}
102+
103+
private void checkHasSameEntity(FeatureSet featureSet) {
104+
String entityName = featureSet.getEntityName();
105+
if (Strings.isNullOrEmpty(entityName)) {
106+
throw new IllegalArgumentException("entity name in feature set is null or empty");
107+
}
108+
109+
if (featureSet.getFeatureIdsCount() < 1) {
110+
throw new IllegalArgumentException("feature set is empty");
111+
}
112+
113+
for (String featureId : featureSet.getFeatureIdsList()) {
114+
String entity = featureId.split("\\.")[0];
115+
if (!entityName.equals(entity)) {
116+
throw new IllegalArgumentException("feature set contains different entity name: " + entity);
117+
}
118+
}
119+
}
120+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Copyright 2018 The Feast Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
package feast.core.training;
18+
19+
import com.google.cloud.bigquery.BigQuery;
20+
import com.google.cloud.bigquery.BigQuery.JobOption;
21+
import com.google.cloud.bigquery.JobException;
22+
import com.google.cloud.bigquery.QueryJobConfiguration;
23+
import com.google.cloud.bigquery.TableId;
24+
import com.google.common.base.Strings;
25+
import com.google.protobuf.Timestamp;
26+
import feast.core.DatasetServiceProto.DatasetInfo;
27+
import feast.core.DatasetServiceProto.FeatureSet;
28+
import feast.core.exception.TrainingDatasetCreationException;
29+
import java.time.Clock;
30+
import java.time.Instant;
31+
import java.time.ZoneId;
32+
import java.time.format.DateTimeFormatter;
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
@Slf4j
36+
public class BigQueryDatasetCreator {
37+
38+
private final String projectId;
39+
private final String datasetPrefix;
40+
private final BigQueryDatasetTemplater templater;
41+
private final BigQuery bigQuery;
42+
private final DateTimeFormatter formatter;
43+
private final Clock clock;
44+
45+
public BigQueryDatasetCreator(
46+
BigQueryDatasetTemplater templater,
47+
BigQuery bigQuery,
48+
Clock clock,
49+
String projectId,
50+
String datasetPrefix) {
51+
this.templater = templater;
52+
this.projectId = projectId;
53+
this.datasetPrefix = datasetPrefix;
54+
this.bigQuery = bigQuery;
55+
this.clock = clock;
56+
this.formatter = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneId.of("UTC"));
57+
}
58+
59+
/**
60+
* Create dataset for a feature set
61+
*
62+
* @param featureSet feature set for which the training dataset should be created
63+
* @param startDate starting date of the training dataset (inclusive)
64+
* @param endDate end date of the training dataset (inclusive)
65+
* @param limit maximum number of row should be created.
66+
* @param namePrefix prefix for dataset name
67+
* @return dataset info associated with the created training dataset
68+
*/
69+
public DatasetInfo createDataset(
70+
FeatureSet featureSet,
71+
Timestamp startDate,
72+
Timestamp endDate,
73+
long limit,
74+
String namePrefix) {
75+
try {
76+
String query = templater.createQuery(featureSet, startDate, endDate, limit);
77+
String tableName = createBqTableName(startDate, endDate, namePrefix);
78+
String bqDatasetName = createBqDatasetName(featureSet.getEntityName());
79+
80+
createBqDatasetIfMissing(bqDatasetName);
81+
82+
TableId destinationTable =
83+
TableId.of(projectId, createBqDatasetName(featureSet.getEntityName()), tableName);
84+
QueryJobConfiguration queryConfig =
85+
QueryJobConfiguration.newBuilder(query)
86+
.setAllowLargeResults(true)
87+
.setDestinationTable(destinationTable)
88+
.build();
89+
JobOption jobOption = JobOption.fields();
90+
bigQuery.query(queryConfig, jobOption);
91+
return DatasetInfo.newBuilder()
92+
.setName(createTrainingDatasetName(namePrefix, featureSet.getEntityName(), tableName))
93+
.setTableUrl(toTableUrl(destinationTable))
94+
.build();
95+
} catch (JobException e) {
96+
log.error("Failed creating training dataset", e);
97+
throw new TrainingDatasetCreationException("Failed creating training dataset", e);
98+
} catch (InterruptedException e) {
99+
log.error("Training dataset creation was interrupted", e);
100+
throw new TrainingDatasetCreationException("Training dataset creation was interrupted", e);
101+
}
102+
}
103+
104+
private void createBqDatasetIfMissing(String bqDatasetName) {
105+
if (bigQuery.getDataset(bqDatasetName) != null) {
106+
return;
107+
}
108+
109+
// create dataset
110+
bigQuery.create(com.google.cloud.bigquery.DatasetInfo.of(bqDatasetName));
111+
}
112+
113+
private String createBqTableName(Timestamp startDate, Timestamp endDate, String namePrefix) {
114+
String currentTime = String.valueOf(clock.millis());
115+
if (!Strings.isNullOrEmpty(namePrefix)) {
116+
// only alphanumeric and underscore are allowed
117+
namePrefix = namePrefix.replaceAll("[^a-zA-Z0-9_]", "_");
118+
return String.format(
119+
"%s_%s_%s_%s",
120+
namePrefix, currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
121+
}
122+
123+
return String.format(
124+
"%s_%s_%s", currentTime, formatTimestamp(startDate), formatTimestamp(endDate));
125+
}
126+
127+
private String createBqDatasetName(String entity) {
128+
return String.format("%s_%s", datasetPrefix, entity);
129+
}
130+
131+
private String formatTimestamp(Timestamp timestamp) {
132+
Instant instant = Instant.ofEpochSecond(timestamp.getSeconds());
133+
return formatter.format(instant);
134+
}
135+
136+
private String toTableUrl(TableId tableId) {
137+
return String.format(
138+
"%s.%s.%s", tableId.getProject(), tableId.getDataset(), tableId.getTable());
139+
}
140+
141+
private String createTrainingDatasetName(String namePrefix, String entityName, String tableName) {
142+
if (!Strings.isNullOrEmpty(namePrefix)) {
143+
return tableName;
144+
}
145+
return String.format("%s_%s", entityName, tableName);
146+
}
147+
}

0 commit comments

Comments
 (0)