Skip to content

Commit b028f4c

Browse files
Merge pull request #10876 from hkhan/BAEL-4845-create-topic-in-kafka
[BAEL-4845] Create Kafka topics
2 parents f0d9004 + 22dbfe7 commit b028f4c

3 files changed

Lines changed: 134 additions & 2 deletions

File tree

libraries-data-3/pom.xml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
<groupId>org.apache.kafka</groupId>
1818
<artifactId>kafka-clients</artifactId>
1919
<version>${kafka.version}</version>
20-
<classifier>test</classifier>
21-
<scope>test</scope>
2220
</dependency>
2321
<dependency>
2422
<groupId>org.apache.kafka</groupId>
@@ -47,13 +45,20 @@
4745
<version>${testcontainers-kafka.version}</version>
4846
<scope>test</scope>
4947
</dependency>
48+
<dependency>
49+
<groupId>org.testcontainers</groupId>
50+
<artifactId>junit-jupiter</artifactId>
51+
<version>${testcontainers-jupiter.version}</version>
52+
<scope>test</scope>
53+
</dependency>
5054
</dependencies>
5155

5256
<properties>
5357
<assertj.version>3.6.2</assertj.version>
5458
<slf4j.version>1.7.25</slf4j.version>
5559
<kafka.version>2.8.0</kafka.version>
5660
<testcontainers-kafka.version>1.15.3</testcontainers-kafka.version>
61+
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
5762
</properties>
5863

5964
</project>
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package com.baeldung.kafka.admin;
2+
3+
import org.apache.kafka.clients.admin.Admin;
4+
import org.apache.kafka.clients.admin.AdminClientConfig;
5+
import org.apache.kafka.clients.admin.CreateTopicsOptions;
6+
import org.apache.kafka.clients.admin.CreateTopicsResult;
7+
import org.apache.kafka.clients.admin.NewTopic;
8+
import org.apache.kafka.common.KafkaFuture;
9+
import org.apache.kafka.common.config.TopicConfig;
10+
11+
import java.util.Collections;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
import java.util.Properties;
15+
16+
public class KafkaTopicApplication {
17+
18+
private final Properties properties;
19+
20+
public KafkaTopicApplication(Properties properties) {
21+
this.properties = properties;
22+
}
23+
24+
public void createTopic(String topicName) throws Exception {
25+
try (Admin admin = Admin.create(properties)) {
26+
int partitions = 1;
27+
short replicationFactor = 1;
28+
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
29+
30+
CreateTopicsResult result = admin.createTopics(
31+
Collections.singleton(newTopic));
32+
33+
// get the async result for the new topic creation
34+
KafkaFuture<Void> future = result.values().get(topicName);
35+
36+
// call get() to block until topic creation has completed or failed
37+
future.get();
38+
}
39+
}
40+
41+
public void createTopicWithOptions(String topicName) throws Exception {
42+
Properties props = new Properties();
43+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
44+
45+
try (Admin admin = Admin.create(props)) {
46+
int partitions = 1;
47+
short replicationFactor = 1;
48+
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
49+
50+
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
51+
.validateOnly(true)
52+
.retryOnQuotaViolation(true);
53+
54+
CreateTopicsResult result = admin.createTopics(
55+
Collections.singleton(newTopic), topicOptions
56+
);
57+
58+
KafkaFuture<Void> future = result.values().get(topicName);
59+
future.get();
60+
}
61+
}
62+
63+
public void createCompactedTopicWithCompression(String topicName) throws Exception {
64+
Properties props = new Properties();
65+
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
66+
67+
try (Admin admin = Admin.create(props)) {
68+
int partitions = 1;
69+
short replicationFactor = 1;
70+
71+
// Create a compacted topic with 'lz4' compression codec
72+
Map<String, String> newTopicConfig = new HashMap<>();
73+
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
74+
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
75+
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
76+
.configs(newTopicConfig);
77+
78+
CreateTopicsResult result = admin.createTopics(
79+
Collections.singleton(newTopic)
80+
);
81+
82+
KafkaFuture<Void> future = result.values().get(topicName);
83+
future.get();
84+
}
85+
}
86+
87+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.baeldung.kafka.admin;
2+
3+
import org.apache.kafka.clients.admin.AdminClientConfig;
4+
import org.junit.jupiter.api.BeforeEach;
5+
import org.junit.jupiter.api.Test;
6+
import org.testcontainers.containers.KafkaContainer;
7+
import org.testcontainers.junit.jupiter.Container;
8+
import org.testcontainers.junit.jupiter.Testcontainers;
9+
import org.testcontainers.utility.DockerImageName;
10+
11+
import java.util.Properties;
12+
13+
import static org.assertj.core.api.Assertions.assertThat;
14+
15+
@Testcontainers
16+
class KafkaTopicApplicationIntegrationTest {
17+
18+
@Container
19+
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
20+
21+
private KafkaTopicApplication kafkaTopicApplication;
22+
23+
@BeforeEach
24+
void setup() {
25+
Properties properties = new Properties();
26+
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
27+
kafkaTopicApplication = new KafkaTopicApplication(properties);
28+
}
29+
30+
@Test
31+
void givenTopicName_whenCreateNewTopic_thenTopicIsCreated() throws Exception {
32+
kafkaTopicApplication.createTopic("test-topic");
33+
34+
String topicCommand = "/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --list";
35+
String stdout = KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", topicCommand)
36+
.getStdout();
37+
38+
assertThat(stdout).contains("test-topic");
39+
}
40+
}

0 commit comments

Comments
 (0)