Skip to content

Commit 6193aa9

Browse files
timsfeast-ci-bot
authored andcommitted
kafka test the tests the source class, and remove unnecessary FeatureRowKey class (#160)
1 parent 022fbd6 commit 6193aa9

File tree

6 files changed

+149
-203
lines changed

6 files changed

+149
-203
lines changed

ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowDeserializer.java renamed to ingestion/src/main/java/feast/source/kafka/FeatureRowDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*
1616
*/
1717

18-
package feast.source.kafka.deserializer;
18+
package feast.source.kafka;
1919

2020
import com.google.protobuf.InvalidProtocolBufferException;
2121
import feast.types.FeatureRowProto.FeatureRow;

ingestion/src/main/java/feast/source/kafka/KafkaFeatureSource.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,15 @@
2121

2222
import com.google.auto.service.AutoService;
2323
import com.google.common.base.Strings;
24+
import feast.ingestion.options.JobOptions;
2425
import feast.ingestion.transform.fn.FilterFeatureRowDoFn;
2526
import feast.options.Options;
2627
import feast.options.OptionsParser;
2728
import feast.source.FeatureSource;
2829
import feast.source.FeatureSourceFactory;
29-
import feast.source.kafka.deserializer.FeatureRowDeserializer;
30-
import feast.source.kafka.deserializer.FeatureRowKeyDeserializer;
3130
import feast.specs.ImportSpecProto.Field;
3231
import feast.specs.ImportSpecProto.ImportSpec;
3332
import feast.types.FeatureRowProto.FeatureRow;
34-
import feast.types.FeatureRowProto.FeatureRowKey;
3533
import java.util.ArrayList;
3634
import java.util.Arrays;
3735
import java.util.List;
@@ -43,6 +41,7 @@
4341
import org.apache.beam.sdk.transforms.ParDo;
4442
import org.apache.beam.sdk.values.PCollection;
4543
import org.apache.beam.sdk.values.PInput;
44+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
4645

4746
/**
4847
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
@@ -60,29 +59,35 @@ public PCollection<FeatureRow> expand(PInput input) {
6059

6160
KafkaReadOptions options =
6261
OptionsParser.parse(importSpec.getSourceOptionsMap(), KafkaReadOptions.class);
62+
JobOptions jobOptions = OptionsParser.parse(importSpec.getJobOptionsMap(), JobOptions.class);
6363

6464
List<String> topicsList = new ArrayList<>(Arrays.asList(options.topics.split(",")));
6565

66-
PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
67-
input.getPipeline().apply( KafkaIO.<FeatureRowKey, FeatureRow>read()
68-
.withBootstrapServers(options.server)
69-
.withTopics(topicsList)
70-
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
71-
.withValueDeserializer(FeatureRowDeserializer.class));
66+
KafkaIO.Read<byte[], FeatureRow> read = KafkaIO.<byte[], FeatureRow>read()
67+
.withBootstrapServers(options.server)
68+
.withTopics(topicsList)
69+
.withKeyDeserializer(ByteArrayDeserializer.class)
70+
.withValueDeserializer(FeatureRowDeserializer.class);
71+
if (jobOptions.getSampleLimit() > 0) {
72+
read = read.withMaxNumRecords(jobOptions.getSampleLimit());
73+
}
74+
75+
PCollection<KafkaRecord<byte[], FeatureRow>> featureRowRecord =
76+
input.getPipeline().apply(read);
7277

73-
PCollection<FeatureRow> featureRow = featureRowRecord.apply(
78+
PCollection<FeatureRow> featureRow = featureRowRecord.apply(
7479
ParDo.of(
75-
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
80+
new DoFn<KafkaRecord<byte[], FeatureRow>, FeatureRow>() {
7681
@ProcessElement
7782
public void processElement(ProcessContext processContext) {
78-
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
83+
KafkaRecord<byte[], FeatureRow> record = processContext.element();
7984
processContext.output(record.getKV().getValue());
8085
}
8186
}));
8287

8388
if (options.discardUnknownFeatures) {
8489
List<String> featureIds = new ArrayList<>();
85-
for(Field field: importSpec.getSchema().getFieldsList()) {
90+
for (Field field : importSpec.getSchema().getFieldsList()) {
8691
String featureId = field.getFeatureId();
8792
if (!Strings.isNullOrEmpty(featureId)) {
8893
featureIds.add(featureId);
@@ -94,8 +99,11 @@ public void processElement(ProcessContext processContext) {
9499
}
95100

96101
public static class KafkaReadOptions implements Options {
97-
@NotEmpty public String server;
98-
@NotEmpty public String topics;
102+
103+
@NotEmpty
104+
public String server;
105+
@NotEmpty
106+
public String topics;
99107
public boolean discardUnknownFeatures = false;
100108
}
101109

ingestion/src/main/java/feast/source/kafka/deserializer/FeatureRowKeyDeserializer.java

Lines changed: 0 additions & 48 deletions
This file was deleted.
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package feast.source.kafka;
2+
3+
import feast.specs.ImportSpecProto.ImportSpec;
4+
import feast.types.FeatureRowProto.FeatureRow;
5+
import java.util.Map;
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.ScheduledExecutorService;
9+
import java.util.concurrent.TimeUnit;
10+
import lombok.extern.slf4j.Slf4j;
11+
import org.apache.beam.sdk.testing.PAssert;
12+
import org.apache.beam.sdk.testing.TestPipeline;
13+
import org.apache.beam.sdk.values.PCollection;
14+
import org.apache.beam.sdk.values.PCollection.IsBounded;
15+
import org.apache.kafka.clients.producer.Producer;
16+
import org.apache.kafka.common.serialization.ByteArraySerializer;
17+
import org.apache.kafka.common.serialization.Serializer;
18+
import org.junit.Assert;
19+
import org.junit.ClassRule;
20+
import org.junit.Rule;
21+
import org.junit.Test;
22+
import org.junit.runner.RunWith;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.boot.test.context.SpringBootTest;
25+
import org.springframework.context.annotation.Bean;
26+
import org.springframework.context.annotation.Configuration;
27+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
28+
import org.springframework.kafka.core.KafkaTemplate;
29+
import org.springframework.kafka.core.ProducerFactory;
30+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
31+
import org.springframework.kafka.test.utils.KafkaTestUtils;
32+
import org.springframework.test.annotation.DirtiesContext;
33+
import org.springframework.test.context.junit4.SpringRunner;
34+
35+
@Slf4j
36+
@RunWith(SpringRunner.class)
37+
@SpringBootTest
38+
@DirtiesContext
39+
public class KafkaFeatureSourceTest {
40+
41+
@ClassRule
42+
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "TEST_TOPIC");
43+
@Rule
44+
public TestPipeline pipeline = TestPipeline.create();
45+
@Autowired
46+
private KafkaTemplate<byte[], FeatureRow> template;
47+
48+
49+
public void send(FeatureRow... rows) {
50+
for (FeatureRow row : rows) {
51+
try {
52+
log.info("Sent: " + template.send("TEST_TOPIC", row).get().toString());
53+
} catch (InterruptedException e) {
54+
e.printStackTrace();
55+
} catch (ExecutionException e) {
56+
e.printStackTrace();
57+
}
58+
}
59+
}
60+
61+
@Test
62+
public void testFoo() throws ExecutionException, InterruptedException {
63+
String server = embeddedKafka.getEmbeddedKafka().getBrokerAddresses()[0].toString();
64+
ImportSpec importSpec = ImportSpec.newBuilder().setType("kafka")
65+
.addEntities("testEntity")
66+
.putSourceOptions("topics", "TEST_TOPIC")
67+
.putSourceOptions("server", server)
68+
.putJobOptions("sample.limit", "1")
69+
.build();
70+
FeatureRow row = FeatureRow.newBuilder().setEntityKey("key").build();
71+
ScheduledExecutorService scheduler =
72+
Executors.newScheduledThreadPool(1);
73+
// we keep sending on loop because beam will only start consuming rows that were sent after startup.
74+
scheduler.scheduleAtFixedRate(() -> send(row), 0, 1, TimeUnit.SECONDS);
75+
76+
PCollection<FeatureRow> rows = pipeline.apply(new KafkaFeatureSource(importSpec));
77+
Assert.assertEquals(IsBounded.BOUNDED, rows.isBounded());
78+
PAssert.that(rows).containsInAnyOrder(row);
79+
pipeline.run();
80+
}
81+
82+
public Producer<byte[], FeatureRow> getProducer() {
83+
Map<String, Object> producerProps =
84+
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
85+
return new DefaultKafkaProducerFactory<>(producerProps, new ByteArraySerializer(),
86+
new FeatureRowSerializer()).createProducer();
87+
}
88+
89+
90+
public static class FeatureRowSerializer implements Serializer<FeatureRow> {
91+
92+
@Override
93+
public void configure(Map<String, ?> configs, boolean isKey) {
94+
95+
}
96+
97+
@Override
98+
public byte[] serialize(String topic, FeatureRow data) {
99+
return data.toByteArray();
100+
}
101+
102+
@Override
103+
public void close() {
104+
105+
}
106+
}
107+
108+
@Configuration
109+
static class ContextConfiguration {
110+
111+
@Bean
112+
ProducerFactory<byte[], FeatureRow> producerFactory() {
113+
Map<String, Object> producerProps =
114+
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
115+
116+
return new DefaultKafkaProducerFactory<>(
117+
producerProps, new ByteArraySerializer(), new FeatureRowSerializer());
118+
}
119+
120+
@Bean
121+
KafkaTemplate<byte[], FeatureRow> kafkaTemplate() {
122+
return new KafkaTemplate<>(producerFactory(), true);
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)