1+ package com .baeldung .kafka .producer ;
2+
3+ import com .baeldung .kafka .producer .EvenOddPartitioner ;
4+ import com .baeldung .kafka .producer .KafkaProducer ;
5+ import org .apache .kafka .clients .producer .MockProducer ;
6+ import org .apache .kafka .clients .producer .RecordMetadata ;
7+ import org .apache .kafka .common .Cluster ;
8+ import org .apache .kafka .common .Node ;
9+ import org .apache .kafka .common .PartitionInfo ;
10+ import org .apache .kafka .common .serialization .StringSerializer ;
11+ import org .junit .jupiter .api .Test ;
12+
13+ import java .util .ArrayList ;
14+ import java .util .List ;
15+ import java .util .concurrent .ExecutionException ;
16+ import java .util .concurrent .Future ;
17+
18+ import static java .util .Collections .emptySet ;
19+ import static org .junit .jupiter .api .Assertions .assertEquals ;
20+ import static org .junit .jupiter .api .Assertions .assertFalse ;
21+ import static org .junit .jupiter .api .Assertions .assertTrue ;
22+
23+ class KafkaProducerUnitTest {
24+
25+ private final String TOPIC_NAME = "topic_sports_news" ;
26+
27+ private KafkaProducer kafkaProducer ;
28+ private MockProducer <String , String > mockProducer ;
29+
30+ private void buildMockProducer (boolean autoComplete ) {
31+ this .mockProducer = new MockProducer <>(autoComplete , new StringSerializer (), new StringSerializer ());
32+ }
33+
34+ @ Test
35+ void givenKeyValue_whenSend_thenVerifyHistory () throws ExecutionException , InterruptedException {
36+
37+ buildMockProducer (true );
38+ //when
39+ kafkaProducer = new KafkaProducer (mockProducer );
40+ Future <RecordMetadata > recordMetadataFuture = kafkaProducer .send ("data" , "{\" site\" : \" baeldung\" }" );
41+
42+ //then
43+ assertTrue (mockProducer .history ().size () == 1 );
44+ assertTrue (mockProducer .history ().get (0 ).key ().equalsIgnoreCase ("data" ));
45+ assertTrue (recordMetadataFuture .get ().partition () == 0 );
46+
47+ }
48+
49+ @ Test
50+ void givenKeyValue_whenSend_thenSendOnlyAfterFlush () {
51+
52+ buildMockProducer (false );
53+ //when
54+ kafkaProducer = new KafkaProducer (mockProducer );
55+ Future <RecordMetadata > record = kafkaProducer .send ("data" , "{\" site\" : \" baeldung\" }" );
56+ assertFalse (record .isDone ());
57+
58+ //then
59+ kafkaProducer .flush ();
60+ assertTrue (record .isDone ());
61+ }
62+
63+ @ Test
64+ void givenKeyValue_whenSend_thenReturnException () {
65+
66+ buildMockProducer (false );
67+ //when
68+ kafkaProducer = new KafkaProducer (mockProducer );
69+ Future <RecordMetadata > record = kafkaProducer .send ("site" , "{\" site\" : \" baeldung\" }" );
70+ RuntimeException e = new RuntimeException ();
71+ mockProducer .errorNext (e );
72+ //then
73+ try {
74+ record .get ();
75+ } catch (ExecutionException | InterruptedException ex ) {
76+ assertEquals (e , ex .getCause ());
77+ }
78+ assertTrue (record .isDone ());
79+ }
80+
81+ @ Test
82+ void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit () {
83+
84+ buildMockProducer (true );
85+ //when
86+ kafkaProducer = new KafkaProducer (mockProducer );
87+ kafkaProducer .initTransaction ();
88+ kafkaProducer .beginTransaction ();
89+ Future <RecordMetadata > record = kafkaProducer .send ("data" , "{\" site\" : \" baeldung\" }" );
90+
91+ //then
92+ assertTrue (mockProducer .history ().isEmpty ());
93+ kafkaProducer .commitTransaction ();
94+ assertTrue (mockProducer .history ().size () == 1 );
95+ }
96+
97+ @ Test
98+ void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber () throws ExecutionException , InterruptedException {
99+
100+ PartitionInfo partitionInfo0 = new PartitionInfo (TOPIC_NAME , 0 , null , null , null );
101+ PartitionInfo partitionInfo1 = new PartitionInfo (TOPIC_NAME , 1 , null , null , null );
102+ List <PartitionInfo > list = new ArrayList <>();
103+ list .add (partitionInfo0 );
104+ list .add (partitionInfo1 );
105+ Cluster cluster = new Cluster ("kafkab" , new ArrayList <Node >(), list , emptySet (), emptySet ());
106+ this .mockProducer = new MockProducer <>(cluster , true , new EvenOddPartitioner (), new StringSerializer (), new StringSerializer ());
107+ //when
108+ kafkaProducer = new KafkaProducer (mockProducer );
109+ Future <RecordMetadata > recordMetadataFuture = kafkaProducer .send ("partition" , "{\" site\" : \" baeldung\" }" );
110+
111+ //then
112+ assertTrue (recordMetadataFuture .get ().partition () == 1 );
113+
114+ }
115+
116+ }
0 commit comments