diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index 89ddd826e33..2a4cb9e710c 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -97,6 +97,7 @@ feast: stream: type: kafka options: + topic: feast-features bootstrapServers: localhost:9092 replicationFactor: 1 partitions: 1 diff --git a/core/src/main/java/feast/core/config/FeatureStreamConfig.java b/core/src/main/java/feast/core/config/FeatureStreamConfig.java new file mode 100644 index 00000000000..40344681722 --- /dev/null +++ b/core/src/main/java/feast/core/config/FeatureStreamConfig.java @@ -0,0 +1,65 @@ +package feast.core.config; + +import com.google.common.base.Strings; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.SourceType; +import feast.core.config.FeastProperties.StreamProperties; +import feast.core.model.Source; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +public class FeatureStreamConfig { + + @Autowired + @Bean + public Source getDefaultSource(FeastProperties feastProperties) { + StreamProperties streamProperties = feastProperties.getStream(); + SourceType featureStreamType = SourceType + .valueOf(streamProperties.getType().toUpperCase()); + switch (featureStreamType) { + case KAFKA: + String bootstrapServers = streamProperties.getOptions().get("bootstrapServers"); + String topicName = streamProperties.getOptions().get("topic"); + Map map = new HashMap<>(); + map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); + AdminClient client = AdminClient.create(map); + + NewTopic newTopic = new NewTopic(topicName, + Integer.valueOf(streamProperties.getOptions().getOrDefault("numPartitions", "1")), + Short.valueOf(streamProperties.getOptions().getOrDefault("replicationFactor", "1"))); + CreateTopicsResult createTopicsResult = client + .createTopics(Collections.singleton(newTopic)); + try { + createTopicsResult.values().get(topicName).get(); + } catch (InterruptedException | ExecutionException e) { + if (e.getCause().getClass().equals(TopicExistsException.class)) { + log.warn(Strings + .lenientFormat( + "Unable to create topic %s in the feature stream, topic already exists, using existing topic.", + topicName)); + } else { + throw new RuntimeException(e.getMessage(), e); + } + } + KafkaSourceConfig sourceConfig = KafkaSourceConfig.newBuilder() + .setBootstrapServers(bootstrapServers).setTopic(topicName).build(); + return new Source(featureStreamType, sourceConfig, true); + default: + throw new RuntimeException("Unsupported source stream, only [KAFKA] is supported"); + } + } +} diff --git a/core/src/main/java/feast/core/dao/JobInfoRepository.java b/core/src/main/java/feast/core/dao/JobInfoRepository.java index ed457ef9f0b..06381aa5577 100644 --- a/core/src/main/java/feast/core/dao/JobInfoRepository.java +++ b/core/src/main/java/feast/core/dao/JobInfoRepository.java @@ -28,5 +28,5 @@ @Repository public interface JobInfoRepository extends JpaRepository { List findByStatusNotIn(Collection statuses); - List findByFeatureSetsNameAndStoreName(String featureSetsName, String storeName); + List findBySourceIdAndStoreName(String sourceId, String storeName); } \ No newline at end of file diff --git a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java index 7ac1657a627..18482b5e6d1 100644 --- a/core/src/main/java/feast/core/grpc/CoreServiceImpl.java +++ b/core/src/main/java/feast/core/grpc/CoreServiceImpl.java @@ -17,6 +17,7 @@ package feast.core.grpc; +import com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import feast.core.CoreServiceGrpc.CoreServiceImplBase; import feast.core.CoreServiceProto.ApplyFeatureSetRequest; @@ -32,9 +33,11 @@ import feast.core.CoreServiceProto.UpdateStoreResponse; import feast.core.CoreServiceProto.UpdateStoreResponse.Status; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto; import feast.core.StoreProto.Store; import feast.core.StoreProto.Store.Subscription; import feast.core.exception.RetrievalException; +import feast.core.model.Source; import feast.core.service.JobCoordinatorService; import feast.core.service.SpecService; import io.grpc.stub.StreamObserver; @@ -113,19 +116,23 @@ public void applyFeatureSet( return p.matcher(featureSetName).matches(); }) .collect(Collectors.toList()); - List featureSetSpecs = new ArrayList<>(); + Set featureSetSpecs = new HashSet<>(); for (Subscription subscription : relevantSubscriptions) { featureSetSpecs.addAll( specService .getFeatureSets( GetFeatureSetsRequest.Filter.newBuilder() - .setFeatureSetName(featureSetName) + .setFeatureSetName(subscription.getName()) .setFeatureSetVersion(subscription.getVersion()) .build()) .getFeatureSetsList()); } - if (!featureSetSpecs.isEmpty()) { - jobCoordinatorService.startOrUpdateJob(featureSetSpecs, store); + if (!featureSetSpecs.isEmpty() && featureSetSpecs.contains(response.getFeatureSet())) { + // We use the request featureSet source because it contains the information + // about whether to default to the default feature stream or not + SourceProto.Source source = response.getFeatureSet().getSource(); + jobCoordinatorService + .startOrUpdateJob(Lists.newArrayList(featureSetSpecs), source, store); } } responseObserver.onNext(response); @@ -158,11 +165,15 @@ public void updateStore(UpdateStoreRequest request, .getFeatureSetsList() ); } + if (featureSetSpecs.size() == 0) { + return; + } featureSetSpecs.stream() - .collect(Collectors.groupingBy(FeatureSetSpec::getName)) + .collect(Collectors.groupingBy(FeatureSetSpec::getSource)) .entrySet() .stream() - .forEach(kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), store)); + .forEach( + kv -> jobCoordinatorService.startOrUpdateJob(kv.getValue(), kv.getKey(), store)); } } catch (Exception e) { log.error("Exception has occurred in UpdateStore method: ", e); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 621874041f4..e7d4b70a90b 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -27,6 +27,7 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; +import feast.core.model.FeatureSet; import feast.core.model.JobInfo; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; @@ -36,6 +37,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -111,12 +113,29 @@ private ImportOptions getPipelineOptions(List featureSetSpecs, } /** - * Unsupported. + * Stops an existing job and restarts a new job in its place as a proxy for job updates. + * Note that since we do not maintain a consumer group across the two jobs and the old job + * is not drained, some data may be lost. + * + * As a rule of thumb, direct jobs in feast should only be used for testing. + * + * @param jobInfo jobInfo of target job to change + * @return jobId of the job */ @Override public String updateJob(JobInfo jobInfo) { - throw new UnsupportedOperationException( - "DirectRunner does not support job updates. To make changes to the worker, stop the existing job and rerun ingestion."); + String jobId = jobInfo.getExtId(); + abortJob(jobId); + try { + List featureSetSpecs = new ArrayList<>(); + for (FeatureSet featureSet : jobInfo.getFeatureSets()) { + featureSetSpecs.add(featureSet.toProto()); + } + startJob(jobId, featureSetSpecs, jobInfo.getStore().toProto()); + } catch (JobExecutionException | InvalidProtocolBufferException e) { + throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e); + } + return jobId; } /** diff --git a/core/src/main/java/feast/core/model/JobInfo.java b/core/src/main/java/feast/core/model/JobInfo.java index fd523b4e4c7..4a64aefbc5c 100644 --- a/core/src/main/java/feast/core/model/JobInfo.java +++ b/core/src/main/java/feast/core/model/JobInfo.java @@ -55,19 +55,21 @@ public class JobInfo extends AbstractTimestampEntity { @Column(name = "ext_id") private String extId; - // Import job source type - @Column(name = "type") - private String type; - // Runner type @Column(name = "runner") private String runner; + // Source id + @ManyToOne + @JoinColumn(name = "source_id") + private Source source; + // Sink id @ManyToOne @JoinColumn(name = "store_name") private Store store; + // FeatureSets populated by the job @ManyToMany @JoinTable( @@ -87,11 +89,11 @@ public JobInfo() { super(); } - public JobInfo(String id, String extId, SourceType type, String runner, Store sink, + public JobInfo(String id, String extId, String runner, Source source, Store sink, List featureSets, JobStatus jobStatus) { this.id = id; this.extId = extId; - this.type = type.toString(); + this.source = source; this.runner = runner; this.store = sink; this.featureSets = featureSets; diff --git a/core/src/main/java/feast/core/model/Source.java b/core/src/main/java/feast/core/model/Source.java index f1997801597..94515629cfe 100644 --- a/core/src/main/java/feast/core/model/Source.java +++ b/core/src/main/java/feast/core/model/Source.java @@ -1,19 +1,16 @@ package feast.core.model; import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Message; import feast.core.SourceProto; import feast.core.SourceProto.KafkaSourceConfig; import feast.core.SourceProto.Source.Builder; import feast.core.SourceProto.SourceType; +import io.grpc.Status; import java.util.Set; import javax.persistence.Column; import javax.persistence.Entity; -import javax.persistence.GeneratedValue; -import javax.persistence.GenerationType; import javax.persistence.Id; -import javax.persistence.Lob; import javax.persistence.Table; import lombok.Setter; @@ -25,57 +22,76 @@ public class Source { private static final Set KAFKA_OPTIONS = Sets.newHashSet("bootstrapServers"); @Id - @GeneratedValue(strategy = GenerationType.AUTO) @Column(name = "id", updatable = false, nullable = false) - private Long id; + private String id; // Type of the source. Should map to feast.types.Source.SourceType @Column(name = "type", nullable = false) private String type; - // Options for this source - @Column(name = "options") - @Lob - private byte[] options; + // Bootstrap servers, comma delimited. Used by kafka sources. + @Column(name = "bootstrap_servers") + private String bootstrapServers; - @Column(name = "use_default") - private boolean useDefault; + // Topics to listen to, comma delimited. Used by kafka sources. + @Column(name = "topics") + private String topics; + + @Column(name = "is_default") + private boolean isDefault; public Source() { super(); } - public Source(SourceType type, byte[] options) { + public Source(SourceType type, KafkaSourceConfig config, boolean isDefault) { + if (config.getBootstrapServers().isEmpty() || config.getTopic().isEmpty()) { + throw Status.INVALID_ARGUMENT.withDescription( + "Unsupported source options. Kafka source requires bootstrap servers and topic to be specified.") + .asRuntimeException(); + } this.type = type.toString(); - this.options = options; + this.bootstrapServers = config.getBootstrapServers(); + this.topics = config.getTopic(); + this.isDefault = isDefault; + this.id = generateId(); } + /** + * Construct a source facade object from a given proto object. + * + * @param sourceProto SourceProto.Source object + * @return Source facade object + */ public static Source fromProto(SourceProto.Source sourceProto) { if (sourceProto.equals(SourceProto.Source.getDefaultInstance())) { - Source source = new Source(SourceType.UNRECOGNIZED, - KafkaSourceConfig.getDefaultInstance().toByteArray()); - source.setUseDefault(true); - return source; + return new Source(); } - byte[] options; switch (sourceProto.getType()) { case KAFKA: - options = sourceProto.getKafkaSourceConfig().toByteArray(); - break; + return new Source(sourceProto.getType(), sourceProto.getKafkaSourceConfig(), false); case UNRECOGNIZED: default: - throw new IllegalArgumentException("Unsupported source type. Only [KAFKA] is supported."); + throw Status.INVALID_ARGUMENT + .withDescription("Unsupported source type. Only [KAFKA] is supported.") + .asRuntimeException(); } - return new Source(sourceProto.getType(), options); } - public SourceProto.Source toProto() throws InvalidProtocolBufferException { + /** + * Convert this object to its equivalent proto object. + * + * @return SourceProto.Source + */ + public SourceProto.Source toProto() { Builder builder = SourceProto.Source.newBuilder() .setType(SourceType.valueOf(type)); switch (SourceType.valueOf(type)) { case KAFKA: - KafkaSourceConfig config = KafkaSourceConfig.parseFrom(options); + KafkaSourceConfig config = KafkaSourceConfig.newBuilder() + .setBootstrapServers(bootstrapServers) + .setTopic(topics).build(); return builder.setKafkaSourceConfig(config).build(); case UNRECOGNIZED: default: @@ -83,17 +99,28 @@ public SourceProto.Source toProto() throws InvalidProtocolBufferException { } } + /** + * Get the id for this feature source + * + * @return feature source id in the format TYPE/options + */ + public String getId() { + return id; + } + /** * Get the options for this feature source * * @return feature source options */ - public Message getOptions() - throws InvalidProtocolBufferException { + public Message getOptions() { switch (SourceType.valueOf(type)) { case KAFKA: - KafkaSourceConfig config = KafkaSourceConfig.parseFrom(options); - return config; + return KafkaSourceConfig + .newBuilder() + .setBootstrapServers(bootstrapServers) + .setTopic(topics) + .build(); case UNRECOGNIZED: default: throw new RuntimeException("Unable to convert source to proto"); @@ -114,26 +141,20 @@ public SourceType getType() { * * @return boolean indicating whether this feature set source uses defaults. */ - public boolean isUseDefault() { - return useDefault; + public boolean isDefault() { + return isDefault; } /** - * Set the topic to the source stream. + * Override equality for sources. isDefault is always compared first; if both sources are using + * the default feature source, they will be equal. If not they will be compared based on their + * type-specific options. + * + * @param other other Source + * @return boolean equal */ - public void setTopic(String topic) throws InvalidProtocolBufferException { - switch (SourceType.valueOf(type)) { - case KAFKA: - KafkaSourceConfig kafkacfg = KafkaSourceConfig.parseFrom(options); - this.options = kafkacfg.toBuilder().setTopic(topic).build().toByteArray(); - case UNRECOGNIZED: - default: - throw new RuntimeException("Unable to convert source to proto"); - } - } - - public boolean equalTo(Source other) throws InvalidProtocolBufferException { - if (other.useDefault && useDefault) { + public boolean equalTo(Source other) { + if (other.isDefault && isDefault) { return true; } @@ -143,14 +164,23 @@ public boolean equalTo(Source other) throws InvalidProtocolBufferException { switch (SourceType.valueOf(type)) { case KAFKA: - KafkaSourceConfig kafkaCfg = KafkaSourceConfig.parseFrom(options); - KafkaSourceConfig otherKafkaCfg = KafkaSourceConfig.parseFrom(other.options); - return kafkaCfg.getBootstrapServers().equals(otherKafkaCfg.getBootstrapServers()); + return bootstrapServers.equals(other.bootstrapServers) && + topics.equals(other.topics); case UNRECOGNIZED: default: return false; } } + + private String generateId() { + switch (SourceType.valueOf(type)) { + case KAFKA: + return String.format("KAFKA/%s/%s", bootstrapServers, topics); + default: + // should not occur + return ""; + } + } } diff --git a/core/src/main/java/feast/core/service/FeatureStreamService.java b/core/src/main/java/feast/core/service/FeatureStreamService.java deleted file mode 100644 index 2532de13e3a..00000000000 --- a/core/src/main/java/feast/core/service/FeatureStreamService.java +++ /dev/null @@ -1,53 +0,0 @@ -package feast.core.service; - -import feast.core.SourceProto.SourceType; -import feast.core.config.FeastProperties; -import feast.core.model.FeatureSet; -import feast.core.model.Source; -import feast.core.stream.FeatureStream; -import feast.core.stream.kafka.KafkaFeatureStream; -import feast.core.stream.kafka.KafkaFeatureStreamConfig; -import java.util.Map; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -/** - * Facilitates management of the feature stream. - */ -@Slf4j -@Service -public class FeatureStreamService { - private final SourceType defaultStreamType; - private final Map defaultOptions; - - @Autowired - public FeatureStreamService(FeastProperties feastProperties) { - this.defaultOptions = feastProperties.getStream().getOptions(); - this.defaultStreamType = SourceType.valueOf(feastProperties.getStream().getType().toUpperCase()); - } - - /** - * Provisions a topic given the featureSet. If the topic already exists, and was not created by - * feast, an error will be thrown. - * - * @param featureSet featureSet to create the topic for - * @return Source updated with provisioned feature source - */ - public Source setUpSource(FeatureSet featureSet) { - if (featureSet.getSource().isUseDefault()){ - featureSet.getSource().setType(defaultStreamType.toString()); - } - - switch (featureSet.getSource().getType()) { - case KAFKA: - FeatureStream featureStream = new KafkaFeatureStream(KafkaFeatureStreamConfig.fromMap(defaultOptions)); - return featureStream.provision(featureSet); - case UNRECOGNIZED: - default: - throw new IllegalArgumentException( - String.format("Invalid source %s provided, only source of type [KAFKA] allowed", - featureSet.getSource().getType())); - } - } -} diff --git a/core/src/main/java/feast/core/service/JobCoordinatorService.java b/core/src/main/java/feast/core/service/JobCoordinatorService.java index 61e600c9804..2cd5cd6778a 100644 --- a/core/src/main/java/feast/core/service/JobCoordinatorService.java +++ b/core/src/main/java/feast/core/service/JobCoordinatorService.java @@ -1,7 +1,9 @@ package feast.core.service; import com.google.common.base.Strings; +import feast.core.FeatureSetProto; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto; import feast.core.SourceProto.SourceType; import feast.core.StoreProto; import feast.core.dao.JobInfoRepository; @@ -14,6 +16,7 @@ import feast.core.model.FeatureSet; import feast.core.model.JobInfo; import feast.core.model.JobStatus; +import feast.core.model.Source; import feast.core.model.Store; import java.time.Instant; import java.util.ArrayList; @@ -44,9 +47,11 @@ public JobCoordinatorService( * there has been no change in the featureSet, and there is a running job for the featureSet, this * method will do nothing. */ - public JobInfo startOrUpdateJob(List featureSetSpecs, StoreProto.Store store) { - String featureSetName = featureSetSpecs.get(0).getName(); - Optional job = getJob(featureSetName, store.getName()); + public JobInfo startOrUpdateJob(List featureSetSpecs, + SourceProto.Source sourceSpec, + StoreProto.Store store) { + Source source = Source.fromProto(sourceSpec); + Optional job = getJob(source.getId(), store.getName()); if (job.isPresent()) { Set existingFeatureSetsPopulatedByJob = job.get().getFeatureSets().stream().map(FeatureSet::getId).collect(Collectors.toSet()); @@ -61,14 +66,17 @@ public JobInfo startOrUpdateJob(List featureSetSpecs, StoreProto return updateJob(job.get(), featureSetSpecs, store); } } else { - return startJob(createJobId(featureSetName, store.getName()), featureSetSpecs, store); + return startJob(createJobId(source.getId(), store.getName()), + featureSetSpecs, sourceSpec, store); } } - /** Get the non-terminal job associated with the given featureSet name and store name, if any. */ - private Optional getJob(String featureSetName, String storeName) { + /** + * Get the non-terminal job associated with the given featureSet name and store name, if any. + */ + private Optional getJob(String sourceId, String storeName) { List jobs = - jobInfoRepository.findByFeatureSetsNameAndStoreName(featureSetName, storeName); + jobInfoRepository.findBySourceIdAndStoreName(sourceId, storeName); if (jobs.isEmpty()) { return Optional.empty(); } @@ -77,9 +85,12 @@ private Optional getJob(String featureSetName, String storeName) { .findFirst(); } - /** Start or update the job to ingest data to the sink. */ + /** + * Start or update the job to ingest data to the sink. + */ private JobInfo startJob( - String jobId, List featureSetSpecs, StoreProto.Store sinkSpec) { + String jobId, List featureSetSpecs, SourceProto.Source source, + StoreProto.Store sinkSpec) { try { AuditLogger.log( Resource.JOB, @@ -102,7 +113,6 @@ private JobInfo startJob( jobManager.getRunnerType().getName(), extId); - SourceType sourceType = featureSetSpecs.get(0).getSource().getType(); List featureSets = new ArrayList<>(); for (FeatureSetSpec featureSetSpec : featureSetSpecs) { @@ -115,8 +125,8 @@ private JobInfo startJob( new JobInfo( jobId, extId, - sourceType, jobManager.getRunnerType().getName(), + Source.fromProto(source), Store.fromProto(sinkSpec), featureSets, JobStatus.RUNNING); @@ -134,7 +144,9 @@ private JobInfo startJob( } } - /** Update the given job */ + /** + * Update the given job + */ private JobInfo updateJob( JobInfo jobInfo, List featureSetSpecs, StoreProto.Store store) { jobInfo.setFeatureSets( @@ -171,7 +183,9 @@ public void abortJob(String id) { jobInfoRepository.saveAndFlush(job); } - /** Update a given job's status */ + /** + * Update a given job's status + */ public void updateJobStatus(String jobId, JobStatus status) { Optional jobRecordOptional = jobInfoRepository.findById(jobId); if (jobRecordOptional.isPresent()) { @@ -181,9 +195,10 @@ public void updateJobStatus(String jobId, JobStatus status) { } } - public String createJobId(String featureSetName, String storeName) { + public String createJobId(String sourceId, String storeName) { String dateSuffix = String.valueOf(Instant.now().toEpochMilli()); - String jobId = String.format("%s-to-%s", featureSetName, storeName) + dateSuffix; + String sourceIdTrunc = sourceId.split("/")[0].toLowerCase(); + String jobId = String.format("%s-to-%s", sourceIdTrunc, storeName) + dateSuffix; return jobId.replaceAll("_", "-"); } } diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index ea83eee262b..b7862c2dd91 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -29,6 +29,7 @@ import feast.core.CoreServiceProto.UpdateStoreRequest; import feast.core.CoreServiceProto.UpdateStoreResponse; import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.SourceProto; import feast.core.StoreProto; import feast.core.dao.FeatureSetRepository; import feast.core.dao.StoreRepository; @@ -56,8 +57,7 @@ public class SpecService { private final FeatureSetRepository featureSetRepository; private final StoreRepository storeRepository; - private final FeatureStreamService featureStreamService; - private final JobCoordinatorService jobCoordinatorService; + private final Source defaultSource; private final Pattern versionPattern = Pattern .compile("^(?[\\>\\<\\=]{0,2})(?\\d*)$"); @@ -66,12 +66,10 @@ public class SpecService { public SpecService( FeatureSetRepository featureSetRepository, StoreRepository storeRepository, - FeatureStreamService featureStreamService, - JobCoordinatorService jobCoordinatorService) { + Source defaultSource) { this.featureSetRepository = featureSetRepository; this.storeRepository = storeRepository; - this.featureStreamService = featureStreamService; - this.jobCoordinatorService = jobCoordinatorService; + this.defaultSource = defaultSource; } /** @@ -181,9 +179,9 @@ public ApplyFeatureSetResponse applyFeatureSet(FeatureSetSpec newFeatureSetSpec) .build(); } FeatureSet featureSet = FeatureSet.fromProto(newFeatureSetSpec); - Source updatedSource = featureStreamService.setUpSource(featureSet); - featureSet.setSource(updatedSource); - + if (newFeatureSetSpec.getSource() == SourceProto.Source.getDefaultInstance()) { + featureSet.setSource(defaultSource); + } featureSetRepository.saveAndFlush(featureSet); return ApplyFeatureSetResponse.newBuilder() diff --git a/core/src/main/java/feast/core/stream/FeatureStream.java b/core/src/main/java/feast/core/stream/FeatureStream.java deleted file mode 100644 index f6881998d9f..00000000000 --- a/core/src/main/java/feast/core/stream/FeatureStream.java +++ /dev/null @@ -1,21 +0,0 @@ -package feast.core.stream; - -import feast.core.SourceProto.SourceType; -import feast.core.model.FeatureSet; -import feast.core.model.Source; - -public interface FeatureStream { - - /** - * Gets the type of feature stream - * @return type of feature stream - */ - SourceType getType(); - - /** - * Provisions a sink for the feature producer to write to. For the given topic name. - * - * - */ - Source provision(FeatureSet featureSet) throws RuntimeException; -} diff --git a/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStream.java b/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStream.java deleted file mode 100644 index 43ab3b4393d..00000000000 --- a/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStream.java +++ /dev/null @@ -1,84 +0,0 @@ -package feast.core.stream.kafka; - -import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; -import feast.core.SourceProto.KafkaSourceConfig; -import feast.core.SourceProto.SourceType; -import feast.core.model.FeatureSet; -import feast.core.model.Source; -import feast.core.stream.FeatureStream; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import lombok.AllArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.common.errors.TopicExistsException; - -@Slf4j -@AllArgsConstructor -public class KafkaFeatureStream implements FeatureStream { - - private static SourceType FEATURE_STREAM_TYPE = SourceType.KAFKA; - - private KafkaFeatureStreamConfig defaultConfig; - - @Override - public SourceType getType() { - return FEATURE_STREAM_TYPE; - } - - @Override - public Source provision(FeatureSet featureSet) throws RuntimeException { - - Source source = featureSet.getSource(); - KafkaSourceConfig config = KafkaSourceConfig.getDefaultInstance(); - String bootstrapServers = defaultConfig.getBootstrapServers(); - if (!source.isUseDefault()) { - try { - config = (KafkaSourceConfig) source.getOptions(); - bootstrapServers = config.getBootstrapServers(); - } catch (InvalidProtocolBufferException | NullPointerException e) { - throw new RuntimeException(String - .format("Unable to retrieve bootstrap servers for featureSet %s", featureSet.getName()), - e); - } - } - - Map map = new HashMap<>(); - map.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); - map.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000"); - AdminClient client = AdminClient.create(map); - - String topicName = generateTopicName(featureSet.getName()); - NewTopic newTopic = new NewTopic(topicName, - defaultConfig.getTopicNumPartitions(), - defaultConfig.getTopicReplicationFactor()); - CreateTopicsResult createTopicsResult = client.createTopics(Collections.singleton(newTopic)); - try { - createTopicsResult.values().get(topicName).get(); - } catch (InterruptedException | ExecutionException e) { - if (e.getCause().getClass().equals(TopicExistsException.class)) { - log.warn(Strings - .lenientFormat( - "Unable to create topic %s in the feature stream, topic already exists, using existing topic.", - topicName)); - } else { - throw new RuntimeException(e.getMessage(), e); - } - } - assert config != null; - source.setOptions( - config.toBuilder().setTopic(topicName).setBootstrapServers(bootstrapServers).build() - .toByteArray()); - return source; - } - - public String generateTopicName(String featureSetName) { - return Strings.lenientFormat("%s-%s-features", defaultConfig.getTopicPrefix(), featureSetName); - } -} diff --git a/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStreamConfig.java b/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStreamConfig.java deleted file mode 100644 index aa8248cf9d4..00000000000 --- a/core/src/main/java/feast/core/stream/kafka/KafkaFeatureStreamConfig.java +++ /dev/null @@ -1,54 +0,0 @@ -package feast.core.stream.kafka; - -import feast.core.util.TypeConversion; -import java.util.Map; -import javax.naming.ConfigurationException; -import lombok.Value; - -@Value -public class KafkaFeatureStreamConfig { - - private static String NUM_PARTITIONS_DEFAULT = "1"; - private static String REPLICATION_FACTOR_DEFAULT = "1"; - - /** - * Feast stream kafka bootstrap servers - */ - String bootstrapServers; - - /** - * Feast stream topic prefix, to be prepended to topic name - */ - String topicPrefix; - - /** - * Number of partitions per topic - */ - int topicNumPartitions; - - /** - * Replication factor of each topic created - */ - short topicReplicationFactor; - - /** - * Constructor from a map - * - * @param optionMap map containing the kafka feature stream configuration in key:value - * format. The options should contain: - * 1. bootstrapServers: optional, default kafka bootstrap servers, defaults to "KAFKA:9092"
- * 2. topicPrefix: optional, topic prefix, defaults to "feast"
- * 3. partitions: optional, number of partitions per topic created, defaults to 10
- * 4. replicationFactor: optional, replication factor of topics created, defaults to 2
- * - * @return KafkaFeatureStreamConfig object - */ - public static KafkaFeatureStreamConfig fromMap(Map optionMap) { - String bootstrapServers = optionMap.getOrDefault("bootstrapServers", "KAFKA:9092"); - String topicPrefix = optionMap.getOrDefault("topicPrefix", "feast"); - int numPartitions = Integer.parseInt(optionMap.getOrDefault("partitions", NUM_PARTITIONS_DEFAULT)); - short replicationFactor = Short.parseShort(optionMap.getOrDefault("replicationFactor", REPLICATION_FACTOR_DEFAULT)); - - return new KafkaFeatureStreamConfig(bootstrapServers, topicPrefix, numPartitions, replicationFactor); - } -} diff --git a/core/src/main/resources/application.yml b/core/src/main/resources/application.yml index 6867b05ac85..be4f8c6b72a 100644 --- a/core/src/main/resources/application.yml +++ b/core/src/main/resources/application.yml @@ -47,6 +47,7 @@ feast: type: kafka # Feature stream options. options: + topic: feast-features bootstrapServers: kafka:9092 replicationFactor: 1 partitions: 1 diff --git a/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java b/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java index 3d7a082f711..631e5a257a9 100644 --- a/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java +++ b/core/src/test/java/feast/core/job/ScheduledJobMonitorTest.java @@ -25,11 +25,15 @@ import static org.mockito.Mockito.when; import com.google.common.collect.Lists; +import feast.core.SourceProto; +import feast.core.SourceProto.KafkaSourceConfig; +import feast.core.SourceProto.SourceType; import feast.core.dao.JobInfoRepository; import feast.core.dao.MetricsRepository; import feast.core.model.JobInfo; import feast.core.model.JobStatus; import feast.core.model.Metrics; +import feast.core.model.Source; import feast.core.model.Store; import java.util.ArrayList; import java.util.Arrays; @@ -46,9 +50,11 @@ public class ScheduledJobMonitorTest { ScheduledJobMonitor scheduledJobMonitor; - @Mock JobMonitor jobMonitor; + @Mock + JobMonitor jobMonitor; - @Mock JobInfoRepository jobInfoRepository; + @Mock + JobInfoRepository jobInfoRepository; @Before public void setUp() { @@ -58,12 +64,15 @@ public void setUp() { @Test public void getJobStatus_shouldUpdateJobInfoForRunningJob() { + Source source = new Source(SourceType.KAFKA, + KafkaSourceConfig.newBuilder().setBootstrapServers("kafka:9092") + .setTopic("feast-topic").build(), true); JobInfo job = new JobInfo( "jobId", "extId1", - "Streaming", "DataflowRunner", + source, new Store(), Collections.emptyList(), Collections.emptyList(), diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index ad58bf2f300..2dbd53105e0 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -75,6 +75,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setBlockOnRun(false); expectedPipelineOptions.setProject(""); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + expectedPipelineOptions.setProject(""); expectedPipelineOptions .setFeatureSetSpecJson(Lists.newArrayList(printer.print(featureSetSpec))); diff --git a/core/src/test/java/feast/core/service/FeatureStreamServiceTest.java b/core/src/test/java/feast/core/service/FeatureStreamServiceTest.java deleted file mode 100644 index 2603000fff6..00000000000 --- a/core/src/test/java/feast/core/service/FeatureStreamServiceTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package feast.core.service; - -import static org.mockito.MockitoAnnotations.initMocks; - -import feast.core.stream.FeatureStream; -import org.junit.Before; -import org.junit.Rule; -import org.junit.rules.ExpectedException; -import org.mockito.Mock; - -public class FeatureStreamServiceTest { - - @Mock - private FeatureStream featureStream; - - @Rule - public final ExpectedException expectedException = ExpectedException.none(); - - @Before - public void setUp() { - initMocks(this); - } - -// @Test -// public void shouldProvisionTopicGivenFeatureSet() { -// String topicName = "feast-featureSet-topic"; -// FeatureSet featureSet = new FeatureSet(); -// featureSet.setName("featureSet"); -// when(featureStream.generateTopicName("featureSet")).thenReturn(topicName); -// -// FeatureStreamService featureStreamService = new FeatureStreamService( -// featureStreamTopicRepository, featureStream); -// FeatureStreamTopic actual = featureStreamService.provisionTopic(featureSet); -// -// FeatureStreamTopic expectedTopic = new FeatureStreamTopic(topicName, -// Lists.newArrayList(featureSet)); -// verify(featureStream, times(1)).provisionTopic("feast-featureSet-topic"); -// verify(featureStreamTopicRepository, times(1)).saveAndFlush(expectedTopic); -// assertThat(actual, equalTo(expectedTopic)); -// } -// -// @Test -// public void shouldUpdateRecordIfSelfCreatedTopicExistsGivenFeatureSet() { -// String topicName = "feast-featureSet-topic"; -// FeatureSet oldFeatureSet = new FeatureSet(); -// oldFeatureSet.setName("featureSet"); -// oldFeatureSet.setVersion(1); -// -// FeatureSet newFeatureSet = new FeatureSet(); -// newFeatureSet.setName("featureSet"); -// oldFeatureSet.setVersion(2); -// -// FeatureStreamTopic originalTopic = new FeatureStreamTopic(topicName, -// Lists.newArrayList(oldFeatureSet)); -// -// when(featureStream.generateTopicName("featureSet")).thenReturn(topicName); -// doThrow(new TopicExistsException()).when(featureStream).provisionTopic(topicName); -// when(featureStreamTopicRepository.findById(topicName)).thenReturn(Optional.of(originalTopic)); -// FeatureStreamService featureStreamService = new FeatureStreamService( -// featureStreamTopicRepository, featureStream); -// -// FeatureStreamTopic expectedTopic = new FeatureStreamTopic(topicName, -// Lists.newArrayList(oldFeatureSet, newFeatureSet)); -// -// FeatureStreamTopic actual = featureStreamService.provisionTopic(newFeatureSet); -// verify(featureStreamTopicRepository, times(1)).saveAndFlush(expectedTopic); -// -// assertThat(actual, equalTo(expectedTopic)); -// } -// -// @Test -// public void shouldThrowErrorIfTopicExistsGivenFeatureSet() { -// String topicName = "feast-featureSet-topic"; -// -// FeatureSet featureSet = new FeatureSet(); -// featureSet.setName("featureSet"); -// -// when(featureStream.generateTopicName("featureSet")).thenReturn(topicName); -// doThrow(new TopicExistsException()).when(featureStream).provisionTopic(topicName); -// when(featureStreamTopicRepository.findById(topicName)).thenReturn(Optional.empty()); -// -// FeatureStreamService featureStreamService = new FeatureStreamService( -// featureStreamTopicRepository, featureStream); -// -// expectedException.expect(TopicExistsException.class); -// featureStreamService.provisionTopic(featureSet); -// } - - -} \ No newline at end of file diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index efffbcc2981..47d3e8486da 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -8,7 +8,8 @@ import com.google.common.collect.Lists; import feast.core.FeatureSetProto.FeatureSetSpec; -import feast.core.SourceProto.Source; +import feast.core.SourceProto; +import feast.core.SourceProto.KafkaSourceConfig; import feast.core.SourceProto.SourceType; import feast.core.StoreProto; import feast.core.StoreProto.Store.RedisConfig; @@ -19,6 +20,7 @@ import feast.core.model.FeatureSet; import feast.core.model.JobInfo; import feast.core.model.JobStatus; +import feast.core.model.Source; import feast.core.model.Store; import org.junit.Before; import org.junit.Rule; @@ -37,6 +39,7 @@ public class JobCoordinatorServiceTest { private JobCoordinatorService jobCoordinatorService; private JobInfo existingJob; + private Source defaultSource; @Before public void setUp() { @@ -47,14 +50,19 @@ public void setUp() { .setType(StoreType.REDIS) .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379)) .build()); + defaultSource = new Source(SourceType.KAFKA, + KafkaSourceConfig.newBuilder().setBootstrapServers("kafka:9092").setTopic("feast-topic") + .build(), true); FeatureSet featureSet1 = new FeatureSet(); featureSet1.setId("featureSet1:1"); + featureSet1.setSource(defaultSource); FeatureSet featureSet2 = new FeatureSet(); featureSet2.setId("featureSet2:1"); - existingJob = new JobInfo("extid", "name", "KAFKA", "DirectRunner", store, + featureSet2.setSource(defaultSource); + existingJob = new JobInfo("extid", "name", "DirectRunner", defaultSource, store, Lists.newArrayList(featureSet1, featureSet2), Lists.newArrayList(), JobStatus.RUNNING); - when(jobInfoRepository.findByFeatureSetsNameAndStoreName("featureSet1", "SERVING")) + when(jobInfoRepository.findBySourceIdAndStoreName(defaultSource.getId(), "SERVING")) .thenReturn(Lists.newArrayList(existingJob)); jobCoordinatorService = new JobCoordinatorService(jobInfoRepository, jobManager); @@ -66,10 +74,12 @@ public void shouldNotStartOrUpdateJobIfNoChanges() { FeatureSetSpec featureSet1 = FeatureSetSpec.newBuilder() .setName("featureSet1") .setVersion(1) + .setSource(defaultSource.toProto()) .build(); FeatureSetSpec featureSet2 = FeatureSetSpec.newBuilder() .setName("featureSet2") .setVersion(1) + .setSource(defaultSource.toProto()) .build(); StoreProto.Store store = StoreProto.Store.newBuilder() .setName("SERVING") @@ -77,7 +87,8 @@ public void shouldNotStartOrUpdateJobIfNoChanges() { .setRedisConfig(RedisConfig.newBuilder().setHost("localhost").setPort(6379)) .build(); JobInfo jobInfo = jobCoordinatorService - .startOrUpdateJob(Lists.newArrayList(featureSet1, featureSet2), store); + .startOrUpdateJob(Lists.newArrayList(featureSet1, featureSet2), + defaultSource.toProto(), store); assertThat(jobInfo, equalTo(existingJob)); } @@ -86,7 +97,7 @@ public void shouldStartJobIfNotExists() { FeatureSetSpec featureSet = FeatureSetSpec.newBuilder() .setName("featureSet") .setVersion(1) - .setSource(Source.newBuilder().setType(SourceType.KAFKA)) + .setSource(defaultSource.toProto()) .build(); StoreProto.Store store = StoreProto.Store.newBuilder() .setName("SERVING") @@ -102,11 +113,13 @@ public void shouldStartJobIfNotExists() { when(jobManager.getRunnerType()).thenReturn(Runner.DIRECT); FeatureSet expectedFeatureSet = new FeatureSet(); expectedFeatureSet.setId("featureSet:1"); - JobInfo expectedJobInfo = new JobInfo(jobId, extJobId, SourceType.KAFKA, "DirectRunner", - Store.fromProto(store), Lists.newArrayList(expectedFeatureSet), JobStatus.RUNNING); + JobInfo expectedJobInfo = new JobInfo(jobId, extJobId, "DirectRunner", + defaultSource, Store.fromProto(store), Lists.newArrayList(expectedFeatureSet), + JobStatus.RUNNING); when(jobInfoRepository.save(expectedJobInfo)).thenReturn(expectedJobInfo); JobInfo jobInfo = jobCoordinatorService - .startOrUpdateJob(Lists.newArrayList(featureSet), store); + .startOrUpdateJob(Lists.newArrayList(featureSet), defaultSource.toProto(), + store); assertThat(jobInfo, equalTo(expectedJobInfo)); } @@ -115,7 +128,7 @@ public void shouldUpdateJobIfAlreadyExistsButThereIsAChange() { FeatureSetSpec featureSet = FeatureSetSpec.newBuilder() .setName("featureSet1") .setVersion(1) - .setSource(Source.newBuilder().setType(SourceType.KAFKA)) + .setSource(defaultSource.toProto()) .build(); StoreProto.Store store = StoreProto.Store.newBuilder() .setName("SERVING") @@ -124,14 +137,15 @@ public void shouldUpdateJobIfAlreadyExistsButThereIsAChange() { .build(); String extId = "extId123"; JobInfo modifiedJob = new JobInfo(existingJob.getId(), existingJob.getExtId(), - SourceType.valueOf(existingJob.getType()), existingJob.getRunner(), Store.fromProto(store), + existingJob.getRunner(), defaultSource, Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), JobStatus.RUNNING); when(jobManager.updateJob(modifiedJob)).thenReturn(extId); JobInfo expectedJobInfo = modifiedJob; expectedJobInfo.setExtId(extId); when(jobInfoRepository.save(expectedJobInfo)).thenReturn(expectedJobInfo); JobInfo jobInfo = jobCoordinatorService - .startOrUpdateJob(Lists.newArrayList(featureSet), store); + .startOrUpdateJob(Lists.newArrayList(featureSet), defaultSource.toProto(), + store); assertThat(jobInfo, equalTo(expectedJobInfo)); } diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index 73650de1bfa..63f15c8bd28 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -72,26 +72,26 @@ public class SpecServiceTest { @Mock private StoreRepository storeRepository; - @Mock - private FeatureStreamService featureStreamService; - - @Mock - private JobCoordinatorService jobCoordinatorService; - @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; private List featureSets; private List stores; + private Source defaultSource; @Before public void setUp() { initMocks(this); + defaultSource = new Source(SourceType.KAFKA, + KafkaSourceConfig.newBuilder().setBootstrapServers("kafka:9092").setTopic("my-topic") + .build(), true); + FeatureSet featureSet1v1 = newDummyFeatureSet("f1", 1); FeatureSet featureSet1v2 = newDummyFeatureSet("f1", 2); FeatureSet featureSet1v3 = newDummyFeatureSet("f1", 3); FeatureSet featureSet2v1 = newDummyFeatureSet("f2", 1); + featureSets = Arrays.asList(featureSet1v1, featureSet1v2, featureSet1v3, featureSet2v1); when(featureSetRepository.findAll()) .thenReturn(featureSets); @@ -111,8 +111,9 @@ public void setUp() { when(storeRepository.findById("SERVING")).thenReturn(Optional.of(store1)); when(storeRepository.findById("NOTFOUND")).thenReturn(Optional.empty()); - specService = new SpecService(featureSetRepository, storeRepository, featureStreamService, - jobCoordinatorService); + + + specService = new SpecService(featureSetRepository, storeRepository, defaultSource); } @Test @@ -205,7 +206,7 @@ public void shouldGetLatestFeatureSetGivenLatestVersionFilter() GetFeatureSetsResponse actual = specService .getFeatureSets( Filter.newBuilder().setFeatureSetName("f1").setFeatureSetVersion("latest").build()); - List expectedFeatureSets = featureSets.subList(2,3); + List expectedFeatureSets = featureSets.subList(2, 3); List list = new ArrayList<>(); for (FeatureSet expectedFeatureSet : expectedFeatureSets) { FeatureSetSpec toProto = expectedFeatureSet.toProto(); @@ -280,11 +281,6 @@ public void applyFeatureSetShouldReturnFeatureSetWithLatestVersionIfFeatureSetHa public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() throws InvalidProtocolBufferException { when(featureSetRepository.findByName("f2")).thenReturn(Lists.newArrayList()); - Source updatedSource = new Source(SourceType.KAFKA, - KafkaSourceConfig.newBuilder().setBootstrapServers("kafka:9092") - .setTopic("feast-f2-features").build().toByteArray()); - when(featureStreamService.setUpSource(ArgumentMatchers.any(FeatureSet.class))) - .thenReturn(updatedSource); FeatureSetSpec incomingFeatureSet = newDummyFeatureSet("f2", 1) .toProto() .toBuilder() @@ -295,7 +291,7 @@ public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() verify(featureSetRepository).saveAndFlush(ArgumentMatchers.any(FeatureSet.class)); FeatureSetSpec expected = incomingFeatureSet.toBuilder() .setVersion(1) - .setSource(updatedSource.toProto()) + .setSource(defaultSource.toProto()) .build(); assertThat(applyFeatureSetResponse.getStatus(), equalTo(Status.CREATED)); assertThat(applyFeatureSetResponse.getFeatureSet(), equalTo(expected)); @@ -304,18 +300,13 @@ public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() @Test public void applyFeatureSetShouldIncrementFeatureSetVersionIfAlreadyExists() throws InvalidProtocolBufferException { - Source updatedSource = new Source(SourceType.KAFKA, - KafkaSourceConfig.newBuilder().setBootstrapServers("kafka:9092") - .setTopic("feast-f1-features").build().toByteArray()); - when(featureStreamService.setUpSource(ArgumentMatchers.any(FeatureSet.class))) - .thenReturn(updatedSource); FeatureSetSpec incomingFeatureSet = featureSets.get(2).toProto().toBuilder() .clearVersion() .addFeatures(FeatureSpec.newBuilder().setName("feature2").setValueType(Enum.STRING)) .build(); FeatureSetSpec expected = incomingFeatureSet.toBuilder() .setVersion(4) - .setSource(updatedSource.toProto()) + .setSource(defaultSource.toProto()) .build(); ApplyFeatureSetResponse applyFeatureSetResponse = specService .applyFeatureSet(incomingFeatureSet); @@ -359,15 +350,10 @@ public void shouldDoNothingIfNoChange() throws InvalidProtocolBufferException { } private FeatureSet newDummyFeatureSet(String name, int version) { - KafkaSourceConfig kafkaFeatureSourceOptions = - KafkaSourceConfig.newBuilder() - .setBootstrapServers("kafka:9092") - .build(); Field feature = new Field(name, "feature", Enum.INT64); Field entity = new Field(name, "entity", Enum.STRING); return new FeatureSet(name, version, 100L, Arrays.asList(entity), Arrays.asList(feature), - new Source( - SourceType.KAFKA, kafkaFeatureSourceOptions.toByteArray())); + defaultSource); } private Store newDummyStore(String name) { diff --git a/core/src/test/java/feast/core/stream/kafka/KafkaFeatureStreamTest.java b/core/src/test/java/feast/core/stream/kafka/KafkaFeatureStreamTest.java deleted file mode 100644 index a75f3848878..00000000000 --- a/core/src/test/java/feast/core/stream/kafka/KafkaFeatureStreamTest.java +++ /dev/null @@ -1,61 +0,0 @@ -package feast.core.stream.kafka; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.core.IsEqual.equalTo; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import feast.core.exception.TopicExistsException; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.common.KafkaFuture; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.mockito.ArgumentMatchers; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -public class KafkaFeatureStreamTest { - -// @Rule -// public ExpectedException expectedException = ExpectedException.none(); -// -// @Mock -// private AdminClient kafkaAdminClient; -// @Mock -// private CreateTopicsResult createTopicsResult; -// -// private KafkaFeatureStream kafkaFeatureStream; -// -// @Before -// public void setUp() { -// MockitoAnnotations.initMocks(this); -// KafkaFeatureStreamConfig config = new KafkaFeatureStreamConfig("localhost:8121", "feast", 1, -// (short) 2); -// kafkaFeatureStream = new KafkaFeatureStream(kafkaAdminClient, config); -// } -// -// @Test -// public void shouldThrowTopicExistsExceptionIfTopicExists() -// throws ExecutionException, InterruptedException { -// -// KafkaFuture result = mock(KafkaFuture.class); -// -// when(result.get()).thenThrow(new org.apache.kafka.common.errors.TopicExistsException("")); -// Map> resultMap = new HashMap<>(); -// resultMap.put("my-topic", result); -// when(createTopicsResult.values()).thenReturn(resultMap); -// doReturn(createTopicsResult).when(kafkaAdminClient) -// .createTopics(ArgumentMatchers.anyCollection()); -// -// expectedException.expect(TopicExistsException.class); -// kafkaFeatureStream.provisionTopic("my-topic"); -// } -} - diff --git a/ingestion/src/main/java/feast/options/Options.java b/ingestion/src/main/java/feast/options/Options.java deleted file mode 100644 index 47077810c30..00000000000 --- a/ingestion/src/main/java/feast/options/Options.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.options; - -import java.io.Serializable; - -/** - * interface for identifying classes that can use the OptionsParser for extra type safety - */ -public interface Options extends Serializable { - -} diff --git a/ingestion/src/main/java/feast/options/OptionsParser.java b/ingestion/src/main/java/feast/options/OptionsParser.java deleted file mode 100644 index 8400f5d423d..00000000000 --- a/ingestion/src/main/java/feast/options/OptionsParser.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.options; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.DeserializationFeature; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.jsonSchema.JsonSchema; -import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator; -import com.google.common.collect.Lists; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Set; -import javax.validation.ConstraintViolation; -import javax.validation.Validation; -import javax.validation.Validator; -import javax.validation.ValidatorFactory; - -public class OptionsParser { - - private static final ObjectMapper strictMapper = new ObjectMapper(); - private static final ObjectMapper lenientMapper = new ObjectMapper() - .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - private static final Validator validator; - - static { - try (ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory()) { - validator = validatorFactory.getValidator(); - } - } - - /** - * Return a json schema string representing an options class for error messages - */ - static String getJsonSchema(Class optionsClass) { - JsonSchemaGenerator schemaGen = new JsonSchemaGenerator(strictMapper); - JsonSchema schema = null; - try { - schema = schemaGen.generateSchema(optionsClass); - schema.setId(null); // clear the ID as it's visual noise - return strictMapper.writer().forType(JsonSchema.class).writeValueAsString(schema); - } catch (IOException e) { - return ""; - } - } - - - private static T parse(Map optionsMap, Class clazz, - boolean lenient) { - ObjectMapper mapper = lenient ? lenientMapper : strictMapper; - List messages = Lists.newArrayList(); - T options; - try { - options = mapper.convertValue(optionsMap, clazz); - } catch (IllegalArgumentException e) { - messages.add("Expecting options convertible to schema: " + getJsonSchema(clazz)); - try { - messages.add("Got " + mapper.writer().writeValueAsString(optionsMap)); - } catch (JsonProcessingException ee) { - // - } - throw new IllegalArgumentException(String.join(", ", messages), e); - } - Set> violations = validator.validate(options); - if (violations.size() > 0) { - messages.add("Expecting options convertible to schema: " + getJsonSchema(clazz)); - for (ConstraintViolation violation : violations) { - messages.add( - String.format( - "property \"%s\" %s", violation.getPropertyPath(), violation.getMessage())); - } - throw new IllegalArgumentException(String.join(", ", messages)); - } - return options; - } - - /** - * Construct a class from string options and validate with any javax validation annotations, - * unknown options are ignored - */ - public static T lenientParse(Map optionsMap, Class clazz) { - return parse(optionsMap, clazz, true); - } - - - /** - * Construct a class from string options and validate with any javax validation annotations - */ - public static T parse(Map optionsMap, Class clazz) { - return parse(optionsMap, clazz, false); - } -} diff --git a/ingestion/src/main/java/feast/options/Validation.java b/ingestion/src/main/java/feast/options/Validation.java deleted file mode 100644 index 892c9f36fa6..00000000000 --- a/ingestion/src/main/java/feast/options/Validation.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2018 The Feast Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package feast.options; - -import static java.lang.annotation.ElementType.ANNOTATION_TYPE; -import static java.lang.annotation.ElementType.CONSTRUCTOR; -import static java.lang.annotation.ElementType.FIELD; -import static java.lang.annotation.ElementType.METHOD; -import static java.lang.annotation.ElementType.PARAMETER; -import static java.lang.annotation.ElementType.TYPE_USE; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -import java.lang.annotation.Documented; -import java.lang.annotation.Retention; -import java.lang.annotation.Target; -import javax.validation.Constraint; -import javax.validation.ConstraintValidator; -import javax.validation.ConstraintValidatorContext; -import javax.validation.Payload; -import javax.validation.ReportAsSingleViolation; -import org.joda.time.format.ISOPeriodFormat; - -public @interface Validation { - @Documented - @ReportAsSingleViolation - @Target({METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE}) - @Constraint(validatedBy = ISO8601Duration.ISO8601DurationValidator.class) - @Retention(RUNTIME) - @interface ISO8601Duration { - - String message() default "must match duration format for ISO 8601 standard"; - - Class[] groups() default {}; - - Class[] payload() default {}; - - @Target({METHOD, FIELD, ANNOTATION_TYPE, CONSTRUCTOR, PARAMETER, TYPE_USE}) - @Retention(RUNTIME) - @Documented - @interface List { - ISO8601Duration[] value(); - } - - class ISO8601DurationValidator implements ConstraintValidator { - - @Override - public boolean isValid(String value, ConstraintValidatorContext context) { - if (value == null) { - return true; - } - try { - ISOPeriodFormat.standard().parsePeriod(value); - return true; - } catch (Throwable e) { - return false; - } - } - } - } -}