Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ feast:
stream:
type: kafka
options:
topic: feast-features
bootstrapServers: localhost:9092
replicationFactor: 1
partitions: 1
Expand Down
65 changes: 65 additions & 0 deletions core/src/main/java/feast/core/config/FeatureStreamConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package feast.core.config;

import com.google.common.base.Strings;
import feast.core.SourceProto.KafkaSourceConfig;
import feast.core.SourceProto.SourceType;
import feast.core.config.FeastProperties.StreamProperties;
import feast.core.model.Source;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class FeatureStreamConfig {

@Autowired
@Bean
public Source getDefaultSource(FeastProperties feastProperties) {
StreamProperties streamProperties = feastProperties.getStream();
SourceType featureStreamType = SourceType
.valueOf(streamProperties.getType().toUpperCase());
switch (featureStreamType) {
case KAFKA:
String bootstrapServers = streamProperties.getOptions().get("bootstrapServers");
String topicName = streamProperties.getOptions().get("topic");
Map<String, Object> map = new HashMap<>();
map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000");
AdminClient client = AdminClient.create(map);

NewTopic newTopic = new NewTopic(topicName,
Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")),
Short.valueOf(streamProperties.getOptions().getOrDefault("replicationFactor", "1")));
CreateTopicsResult createTopicsResult = client
.createTopics(Collections.singleton(newTopic));
try {
createTopicsResult.values().get(topicName).get();
} catch (InterruptedException | ExecutionException e) {
if (e.getCause().getClass().equals(TopicExistsException.class)) {
log.warn(Strings
.lenientFormat(
"Unable to create topic %s in the feature stream, topic already exists, using existing topic.",
topicName));
} else {
throw new RuntimeException(e.getMessage(), e);
}
}
KafkaSourceConfig sourceConfig = KafkaSourceConfig.newBuilder()
.setBootstrapServers(bootstrapServers).setTopic(topicName).build();
return new Source(featureStreamType, sourceConfig, true);
default:
throw new RuntimeException("Unsupported source stream, only [KAFKA] is supported");
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/dao/JobInfoRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@
@Repository
public interface JobInfoRepository extends JpaRepository<JobInfo, String> {
List<JobInfo> findByStatusNotIn(Collection<JobStatus> statuses);
List<JobInfo> findByFeatureSetsNameAndStoreName(String featureSetsName, String storeName);
List<JobInfo> findBySourceIdAndStoreName(String sourceId, String storeName);
}
23 changes: 17 additions & 6 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package feast.core.grpc;

import com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
Expand All @@ -32,9 +33,11 @@
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.CoreServiceProto.UpdateStoreResponse.Status;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.SourceProto;
import feast.core.StoreProto.Store;
import feast.core.StoreProto.Store.Subscription;
import feast.core.exception.RetrievalException;
import feast.core.model.Source;
import feast.core.service.JobCoordinatorService;
import feast.core.service.SpecService;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -113,19 +116,23 @@ public void applyFeatureSet(
return p.matcher(featureSetName).matches();
})
.collect(Collectors.toList());
List<FeatureSetSpec> featureSetSpecs = new ArrayList<>();
Set<FeatureSetSpec> featureSetSpecs = new HashSet<>();
for (Subscription subscription : relevantSubscriptions) {
featureSetSpecs.addAll(
specService
.getFeatureSets(
GetFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(featureSetName)
.setFeatureSetName(subscription.getName())
.setFeatureSetVersion(subscription.getVersion())
.build())
.getFeatureSetsList());
}
if (!featureSetSpecs.isEmpty()) {
jobCoordinatorService.startOrUpdateJob(featureSetSpecs, store);
if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) {
// We use the request featureSet source because it contains the information
// about whether to default to the default feature stream or not
SourceProto.Source source = response.getFeatureSet().getSource();
jobCoordinatorService
.startOrUpdateJob(Lists.newArrayList(featureSetSpecs), source, store);
}
}
responseObserver.onNext(response);
Expand Down Expand Up @@ -158,11 +165,15 @@ public void updateStore(UpdateStoreRequest request,
.getFeatureSetsList()
);
}
if (featureSetSpecs.size() == 0) {
return;
}
featureSetSpecs.stream()
.collect(Collectors.groupingBy(FeatureSetSpec::getName))
.collect(Collectors.groupingBy(FeatureSetSpec::getSource))
.entrySet()
.stream()
.forEach(kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), store));
.forEach(
kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), kv.getKey(), store));
}
} catch (Exception e) {
log.error("Exception has occurred in UpdateStore method: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.JobInfo;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
Expand All @@ -36,6 +37,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -111,12 +113,29 @@ private ImportOptions getPipelineOptions(List<FeatureSetSpec> featureSetSpecs,
}

/**
* Unsupported.
* Stops an existing job and restarts a new job in its place as a proxy for job updates.
* Note that since we do not maintain a consumer group across the two jobs and the old job
* is not drained, some data may be lost.
*
* As a rule of thumb, direct jobs in feast should only be used for testing.
*
* @param jobInfo jobInfo of target job to change
* @return jobId of the job
*/
@Override
public String updateJob(JobInfo jobInfo) {
throw new UnsupportedOperationException(
"DirectRunner does not support job updates. To make changes to the worker, stop the existing job and rerun ingestion.");
String jobId = jobInfo.getExtId();
abortJob(jobId);
try {
List<FeatureSetSpec> featureSetSpecs = new ArrayList<>();
for (FeatureSet featureSet : jobInfo.getFeatureSets()) {
featureSetSpecs.add(featureSet.toProto());
}
startJob(jobId, featureSetSpecs, jobInfo.getStore().toProto());
} catch (JobExecutionException | InvalidProtocolBufferException e) {
throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);
}
return jobId;
}

/**
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/java/feast/core/model/JobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,21 @@ public class JobInfo extends AbstractTimestampEntity {
@Column(name = "ext_id")
private String extId;

// Import job source type
@Column(name = "type")
private String type;

// Runner type
@Column(name = "runner")
private String runner;

// Source id
@ManyToOne
@JoinColumn(name = "source_id")
private Source source;

// Sink id
@ManyToOne
@JoinColumn(name = "store_name")
private Store store;


// FeatureSets populated by the job
@ManyToMany
@JoinTable(
Expand All @@ -87,11 +89,11 @@ public JobInfo() {
super();
}

public JobInfo(String id, String extId, SourceType type, String runner, Store sink,
public JobInfo(String id, String extId, String runner, Source source, Store sink,
List<FeatureSet> featureSets, JobStatus jobStatus) {
this.id = id;
this.extId = extId;
this.type = type.toString();
this.source = source;
this.runner = runner;
this.store = sink;
this.featureSets = featureSets;
Expand Down
Loading