Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,16 @@ public void registerFeature(
FeatureSpec request, StreamObserver<RegisterFeatureResponse> responseObserver) {
try {
validator.validateFeatureSpec(request);
FeatureInfo feature = specService.registerFeature(request);
FeatureInfo feature = specService.applyFeature(request);
RegisterFeatureResponse response =
RegisterFeatureResponse.newBuilder().setFeatureId(feature.getId()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RegistrationException e) {
log.error("Error in registerFeature: {}", e);
log.error("Error in applyFeature: {}", e);
responseObserver.onError(getRuntimeException(e));
} catch (IllegalArgumentException e) {
log.error("Error in registerFeature: {}", e);
log.error("Error in applyFeature: {}", e);
responseObserver.onError(getBadRequestException(e));
}
}
Expand All @@ -262,16 +262,16 @@ public void registerFeatureGroup(
StreamObserver<RegisterFeatureGroupResponse> responseObserver) {
try {
validator.validateFeatureGroupSpec(request);
FeatureGroupInfo featureGroup = specService.registerFeatureGroup(request);
FeatureGroupInfo featureGroup = specService.applyFeatureGroup(request);
RegisterFeatureGroupResponse response =
RegisterFeatureGroupResponse.newBuilder().setFeatureGroupId(featureGroup.getId()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RegistrationException e) {
log.error("Error in registerFeatureGroup: {}", e);
log.error("Error in applyFeatureGroup: {}", e);
responseObserver.onError(getRuntimeException(e));
} catch (IllegalArgumentException e) {
log.error("Error in registerFeatureGroup: {}", e);
log.error("Error in applyFeatureGroup: {}", e);
responseObserver.onError(getBadRequestException(e));
}
}
Expand All @@ -286,16 +286,16 @@ public void registerEntity(
EntitySpec request, StreamObserver<RegisterEntityResponse> responseObserver) {
try {
validator.validateEntitySpec(request);
EntityInfo entity = specService.registerEntity(request);
EntityInfo entity = specService.applyEntity(request);
RegisterEntityResponse response =
RegisterEntityResponse.newBuilder().setEntityName(entity.getName()).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RegistrationException e) {
log.error("Error in registerEntity: {}", e);
log.error("Error in applyEntity: {}", e);
responseObserver.onError(getRuntimeException(e));
} catch (IllegalArgumentException e) {
log.error("Error in registerEntity: {}", e);
log.error("Error in applyEntity: {}", e);
responseObserver.onError(getBadRequestException(e));
}
}
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/feast/core/model/EntityInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,15 @@ public EntityDetail getEntityDetail() {
.setLastUpdated(convertTimestamp(this.getLastUpdated()))
.build();
}

/**
* Updates the entity info with specifications from the incoming entity spec.
*
* @param update new entity spec
*/
public void update(EntitySpec update) {
this.description = update.getDescription();
this.tags = String.join(",", update.getTagsList());
}

}
89 changes: 57 additions & 32 deletions core/src/main/java/feast/core/model/FeatureGroupInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package feast.core.model;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import feast.core.UIServiceProto.UIServiceTypes.FeatureGroupDetail;
import feast.core.util.TypeConversion;
import feast.specs.FeatureGroupSpecProto.FeatureGroupSpec;
import feast.specs.FeatureSpecProto.DataStore;
import feast.specs.FeatureSpecProto.DataStores;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import javax.persistence.*;

Expand All @@ -39,8 +39,7 @@
@Table(name = "feature_groups")
public class FeatureGroupInfo extends AbstractTimestampEntity {

@Id
private String id;
@Id private String id;

@Column(name = "tags")
private String tags;
Expand All @@ -63,52 +62,78 @@ public FeatureGroupInfo() {
super();
}

public FeatureGroupInfo(FeatureGroupSpec spec,
StorageInfo servingStore,
StorageInfo warehouseStore) {
public FeatureGroupInfo(
FeatureGroupSpec spec, StorageInfo servingStore, StorageInfo warehouseStore) {
this.id = spec.getId();
this.tags = String.join(",", spec.getTagsList());
this.servingStore = servingStore;
this.warehouseStore = warehouseStore;
this.servingStoreOpts =
TypeConversion.convertMapToJsonString(spec.getDataStores().getServing().getOptionsMap());
TypeConversion.convertMapToJsonString(spec.getDataStores().getServing().getOptionsMap());
this.warehouseStoreOpts =
TypeConversion.convertMapToJsonString(spec.getDataStores().getWarehouse().getOptionsMap());
TypeConversion.convertMapToJsonString(spec.getDataStores().getWarehouse().getOptionsMap());
}

/**
* Get the feature group spec associated with this record.
*/
/** Get the feature group spec associated with this record. */
public FeatureGroupSpec getFeatureGroupSpec() {
DataStore servingDataStore =
DataStore.newBuilder()
.setId(servingStore.getId())
.putAllOptions(TypeConversion.convertJsonStringToMap(servingStoreOpts))
.build();
DataStore.newBuilder()
.setId(servingStore.getId())
.putAllOptions(TypeConversion.convertJsonStringToMap(servingStoreOpts))
.build();
DataStore warehouseDataStore =
DataStore.newBuilder()
.setId(warehouseStore.getId())
.putAllOptions(TypeConversion.convertJsonStringToMap(warehouseStoreOpts))
.build();
DataStore.newBuilder()
.setId(warehouseStore.getId())
.putAllOptions(TypeConversion.convertJsonStringToMap(warehouseStoreOpts))
.build();
DataStores dataStores =
DataStores.newBuilder()
.setWarehouse(warehouseDataStore)
.setServing(servingDataStore)
.build();
return FeatureGroupSpec.newBuilder()
.setId(id)
.addAllTags(TypeConversion.convertTagStringToList(tags))
.setDataStores(dataStores)
DataStores.newBuilder()
.setWarehouse(warehouseDataStore)
.setServing(servingDataStore)
.build();
return FeatureGroupSpec.newBuilder()
.setId(id)
.addAllTags(TypeConversion.convertTagStringToList(tags))
.setDataStores(dataStores)
.build();
}

/**
* Get the feature group detail containing both spec and metadata, associated with this record.
*/
public FeatureGroupDetail getFeatureGroupDetail() {
return FeatureGroupDetail.newBuilder()
.setSpec(this.getFeatureGroupSpec())
.setLastUpdated(TypeConversion.convertTimestamp(this.getLastUpdated()))
.build();
.setSpec(this.getFeatureGroupSpec())
.setLastUpdated(TypeConversion.convertTimestamp(this.getLastUpdated()))
.build();
}

public void update(FeatureGroupSpec update) throws IllegalArgumentException {
if (!isLegalUpdate(update)) {
throw new IllegalArgumentException(
"Feature group already exists. Update only allowed for fields: [tags]");
}
this.tags = String.join(",", update.getTagsList());
}

private boolean isLegalUpdate(FeatureGroupSpec update) {
DataStore updatedWarehouseStore =
update.getDataStores().hasWarehouse() ? update.getDataStores().getWarehouse() : null;
DataStore updatedServingStore =
update.getDataStores().hasServing() ? update.getDataStores().getServing() : null;
return isStoreEqual(this.warehouseStore, this.warehouseStoreOpts, updatedWarehouseStore)
&& isStoreEqual(this.servingStore, this.servingStoreOpts, updatedServingStore);
}

private boolean isStoreEqual(StorageInfo oldStore, String oldStoreOpts, DataStore newStore) {
return getStorageId(oldStore).equals(newStore == null ? "" : newStore.getId())
&& oldStoreOpts.equals(
newStore == null
? ""
: TypeConversion.convertMapToJsonString(newStore.getOptionsMap()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to have oldStoreOpts here? Can this also be a method on StorageInfo?

}

private String getStorageId(StorageInfo storage) {
return storage == null ? "" : storage.getId();
}
}
58 changes: 52 additions & 6 deletions core/src/main/java/feast/core/model/FeatureInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package feast.core.model;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import feast.core.UIServiceProto.UIServiceTypes.FeatureDetail;
import feast.core.storage.BigQueryStorageManager;
import feast.core.util.TypeConversion;
Expand All @@ -30,11 +28,9 @@
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.util.Strings;

import javax.persistence.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -133,10 +129,11 @@ public FeatureInfo(
if (spec.getDataStores() != null) {
this.servingStore = servingStore;
this.servingStoreOpts =
TypeConversion.convertMapToJsonString(spec.getDataStores().getServing().getOptionsMap());
TypeConversion.convertMapToJsonString(spec.getDataStores().getServing().getOptionsMap());
this.warehouseStore = warehouseStore;
this.warehouseStoreOpts =
TypeConversion.convertMapToJsonString(spec.getDataStores().getWarehouse().getOptionsMap());
TypeConversion.convertMapToJsonString(
spec.getDataStores().getWarehouse().getOptionsMap());
}
this.bigQueryView = createBigqueryViewLink(warehouseStore);
}
Expand Down Expand Up @@ -256,4 +253,53 @@ private String createBigqueryViewLink(StorageInfo warehouseStore) {
"https://bigquery.cloud.google.com/table/%s:%s.%s_%s_view",
projectId, dataset, entity.getName(), granularity.toString().toLowerCase());
}

/**
* Updates the feature info with specifications from the incoming feature spec.
*
* <p>TODO: maybe allow changes to id, store etc if no jobs are feeding into this feature
*
* @param update new feature spec
*/
public void update(FeatureSpec update) throws IllegalArgumentException {
if (!isLegalUpdate(update)) {
throw new IllegalArgumentException(
"Feature already exists. Update only allowed for fields: [owner, description, uri, tags]");
}
this.owner = update.getOwner();
this.description = update.getDescription();
this.uri = update.getUri();
this.tags = String.join(",", update.getTagsList());
}

private boolean isLegalUpdate(FeatureSpec update) {
DataStore updatedWarehouseStore =
update.getDataStores().hasWarehouse() ? update.getDataStores().getWarehouse() : null;
DataStore updatedServingStore =
update.getDataStores().hasServing() ? update.getDataStores().getServing() : null;
return update.getName().equals(this.name)
&& update.getEntity().equals(this.entity.getName())
&& update.getGranularityValue() == this.granularity.getNumber()
&& update.getValueTypeValue() == this.valueType.getNumber()
&& update.getGroup().equals(getFeatureGroupId(this.featureGroup))
&& TypeConversion.convertMapToJsonString(update.getOptionsMap()).equals(this.options)
&& isStoreEqual(this.warehouseStore, this.warehouseStoreOpts, updatedWarehouseStore)
&& isStoreEqual(this.servingStore, this.servingStoreOpts, updatedServingStore);
}

private boolean isStoreEqual(StorageInfo oldStore, String oldStoreOpts, DataStore newStore) {
return getStorageId(oldStore).equals(newStore == null ? "" : newStore.getId())
&& oldStoreOpts.equals(
newStore == null
? ""
: TypeConversion.convertMapToJsonString(newStore.getOptionsMap()));
}

private String getFeatureGroupId(FeatureGroupInfo featureGroupInfo) {
return featureGroupInfo == null ? "" : featureGroupInfo.getId();
}

private String getStorageId(StorageInfo storage) {
return storage == null ? "" : storage.getId();
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/model/StorageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package feast.core.model;

import feast.core.UIServiceProto.UIServiceTypes.StorageDetail;
import feast.specs.StorageSpecProto.StorageSpec;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import feast.core.UIServiceProto.UIServiceTypes.StorageDetail;
import feast.specs.StorageSpecProto.StorageSpec;

import javax.persistence.Column;
import javax.persistence.Entity;
Expand Down
Loading