Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use KafkaReadOptions class, to parse import spec options map
  • Loading branch information
tims committed Dec 31, 2018
commit f019e618d28741cbc84131deeb8b91e29585ada3
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,81 @@

package feast.ingestion.transform;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import static com.google.common.base.Preconditions.checkArgument;

import feast.ingestion.deserializer.FeatureRowDeserializer;
import feast.ingestion.deserializer.FeatureRowKeyDeserializer;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.validation.constraints.NotEmpty;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

public class FeatureRowKafkaIO {
static final String KAFKA_TYPE = "kafka";

static final String KAFKA_TYPE = "kafka";
/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
* from kafka one or more kafka topics.
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}

public static class KafkaReadOptions implements Options {
@NotEmpty public String server;
@NotEmpty public String topics;
}

/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow}
* proto messages from kafka one or more kafka topics.
*
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}
public static class Read extends FeatureIO.Read {

public static class Read extends FeatureIO.Read {
private ImportSpec importSpec;

private ImportSpec importSpec;

private Read(ImportSpec importSpec) {
this.importSpec = importSpec;
}

@Override
public PCollection<FeatureRow> expand(PInput input) {

checkArgument(importSpec.getType().equals(KAFKA_TYPE));

String bootstrapServer = importSpec.getOptionsMap().get("server");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set");

String topics = importSpec.getOptionsMap().get("topics");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(topics), "kafka topic(s) must be set");

List<String> topicsList = new ArrayList<>(Arrays.asList(topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(bootstrapServer)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord = input.getPipeline().apply(kafkaIOReader);
private Read(ImportSpec importSpec) {
this.importSpec = importSpec;
}

PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
return featureRow;
}
@Override
public PCollection<FeatureRow> expand(PInput input) {

checkArgument(importSpec.getType().equals(KAFKA_TYPE));

KafkaReadOptions options =
OptionsParser.parse(importSpec.getOptionsMap(), KafkaReadOptions.class);

List<String> topicsList = new ArrayList<>(Arrays.asList(options.topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader =
KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(options.server)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
input.getPipeline().apply(kafkaIOReader);

PCollection<FeatureRow> featureRow =
featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
return featureRow;
}
}
}