1+ package feast .source .kafka ;
2+
3+ import feast .specs .ImportSpecProto .ImportSpec ;
4+ import feast .types .FeatureRowProto .FeatureRow ;
5+ import java .util .Map ;
6+ import java .util .concurrent .ExecutionException ;
7+ import java .util .concurrent .Executors ;
8+ import java .util .concurrent .ScheduledExecutorService ;
9+ import java .util .concurrent .TimeUnit ;
10+ import lombok .extern .slf4j .Slf4j ;
11+ import org .apache .beam .sdk .testing .PAssert ;
12+ import org .apache .beam .sdk .testing .TestPipeline ;
13+ import org .apache .beam .sdk .values .PCollection ;
14+ import org .apache .beam .sdk .values .PCollection .IsBounded ;
15+ import org .apache .kafka .clients .producer .Producer ;
16+ import org .apache .kafka .common .serialization .ByteArraySerializer ;
17+ import org .apache .kafka .common .serialization .Serializer ;
18+ import org .junit .Assert ;
19+ import org .junit .ClassRule ;
20+ import org .junit .Rule ;
21+ import org .junit .Test ;
22+ import org .junit .runner .RunWith ;
23+ import org .springframework .beans .factory .annotation .Autowired ;
24+ import org .springframework .boot .test .context .SpringBootTest ;
25+ import org .springframework .context .annotation .Bean ;
26+ import org .springframework .context .annotation .Configuration ;
27+ import org .springframework .kafka .core .DefaultKafkaProducerFactory ;
28+ import org .springframework .kafka .core .KafkaTemplate ;
29+ import org .springframework .kafka .core .ProducerFactory ;
30+ import org .springframework .kafka .test .rule .EmbeddedKafkaRule ;
31+ import org .springframework .kafka .test .utils .KafkaTestUtils ;
32+ import org .springframework .test .annotation .DirtiesContext ;
33+ import org .springframework .test .context .junit4 .SpringRunner ;
34+
35+ @ Slf4j
36+ @ RunWith (SpringRunner .class )
37+ @ SpringBootTest
38+ @ DirtiesContext
39+ public class KafkaFeatureSourceTest {
40+
41+ @ ClassRule
42+ public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule (1 , true , "TEST_TOPIC" );
43+ @ Rule
44+ public TestPipeline pipeline = TestPipeline .create ();
45+ @ Autowired
46+ private KafkaTemplate <byte [], FeatureRow > template ;
47+
48+
49+ public void send (FeatureRow ... rows ) {
50+ for (FeatureRow row : rows ) {
51+ try {
52+ log .info ("Sent: " + template .send ("TEST_TOPIC" , row ).get ().toString ());
53+ } catch (InterruptedException e ) {
54+ e .printStackTrace ();
55+ } catch (ExecutionException e ) {
56+ e .printStackTrace ();
57+ }
58+ }
59+ }
60+
61+ @ Test
62+ public void testFoo () throws ExecutionException , InterruptedException {
63+ String server = embeddedKafka .getEmbeddedKafka ().getBrokerAddresses ()[0 ].toString ();
64+ ImportSpec importSpec = ImportSpec .newBuilder ().setType ("kafka" )
65+ .addEntities ("testEntity" )
66+ .putSourceOptions ("topics" , "TEST_TOPIC" )
67+ .putSourceOptions ("server" , server )
68+ .putJobOptions ("sample.limit" , "1" )
69+ .build ();
70+ FeatureRow row = FeatureRow .newBuilder ().setEntityKey ("key" ).build ();
71+ ScheduledExecutorService scheduler =
72+ Executors .newScheduledThreadPool (1 );
73+ // we keep sending on loop because beam will only start consuming rows that were sent after startup.
74+ scheduler .scheduleAtFixedRate (() -> send (row ), 0 , 1 , TimeUnit .SECONDS );
75+
76+ PCollection <FeatureRow > rows = pipeline .apply (new KafkaFeatureSource (importSpec ));
77+ Assert .assertEquals (IsBounded .BOUNDED , rows .isBounded ());
78+ PAssert .that (rows ).containsInAnyOrder (row );
79+ pipeline .run ();
80+ }
81+
82+ public Producer <byte [], FeatureRow > getProducer () {
83+ Map <String , Object > producerProps =
84+ KafkaTestUtils .producerProps (embeddedKafka .getEmbeddedKafka ());
85+ return new DefaultKafkaProducerFactory <>(producerProps , new ByteArraySerializer (),
86+ new FeatureRowSerializer ()).createProducer ();
87+ }
88+
89+
90+ public static class FeatureRowSerializer implements Serializer <FeatureRow > {
91+
92+ @ Override
93+ public void configure (Map <String , ?> configs , boolean isKey ) {
94+
95+ }
96+
97+ @ Override
98+ public byte [] serialize (String topic , FeatureRow data ) {
99+ return data .toByteArray ();
100+ }
101+
102+ @ Override
103+ public void close () {
104+
105+ }
106+ }
107+
108+ @ Configuration
109+ static class ContextConfiguration {
110+
111+ @ Bean
112+ ProducerFactory <byte [], FeatureRow > producerFactory () {
113+ Map <String , Object > producerProps =
114+ KafkaTestUtils .producerProps (embeddedKafka .getEmbeddedKafka ());
115+
116+ return new DefaultKafkaProducerFactory <>(
117+ producerProps , new ByteArraySerializer (), new FeatureRowSerializer ());
118+ }
119+
120+ @ Bean
121+ KafkaTemplate <byte [], FeatureRow > kafkaTemplate () {
122+ return new KafkaTemplate <>(producerFactory (), true );
123+ }
124+ }
125+ }
0 commit comments