From 0e5fc6f2400ca1be12cb52bd48a3f0ca717ad8ab Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Sun, 7 May 2023 18:07:56 +0200 Subject: [PATCH 1/7] Fixing Kraft support for CP 7.4.0 --- .../KafkaContainerClusterTest.java | 12 ++++++ .../containers/KafkaContainer.java | 40 ++++++++++++++----- .../containers/KafkaContainerTest.java | 14 ++++++- 3 files changed, 54 insertions(+), 12 deletions(-) diff --git a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java index 60790514825..089d80fa080 100644 --- a/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java +++ b/examples/kafka-cluster/src/test/java/com/example/kafkacluster/KafkaContainerClusterTest.java @@ -51,6 +51,18 @@ public void testKafkaContainerKraftCluster() throws Exception { } } + @Test + public void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception { + try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) { + cluster.start(); + String bootstrapServers = cluster.getBootstrapServers(); + + assertThat(cluster.getBrokers()).hasSize(3); + + testKafkaFunctionality(bootstrapServers, 3, 2); + } + } + protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception { try ( AdminClient adminClient = AdminClient.create( diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 32bfc366896..23b455664f1 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -7,6 +7,8 @@ import org.testcontainers.utility.DockerImageName; import java.io.IOException; +import java.util.Objects; +import java.util.UUID; /** * This container wraps Confluent Kafka and Zookeeper (optionally) @@ -29,11 +31,13 @@ public class KafkaContainer extends GenericContainer { // https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft private static final String MIN_KRAFT_TAG = "7.0.0"; + public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw"; + protected String externalZookeeperConnect = null; private boolean kraftEnabled = false; - private String clusterId; + private String clusterId = DEFAULT_CLUSTER_ID; /** * @deprecated use {@link KafkaContainer(DockerImageName)} instead @@ -115,7 +119,13 @@ private void verifyMinKraftVersion() { } } + private boolean isLesThanCP740() { + String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart(); + return new ComparableVersion(actualVersion).isLessThan("7.4.0"); + } + public KafkaContainer withClusterId(String clusterId) { + Objects.requireNonNull(clusterId, "clusterId cannot be null"); this.clusterId = clusterId; return self(); } @@ -136,6 +146,13 @@ protected void configure() { } protected void configureKraft() { + //CP 7.4.0 + withEnv("KAFKA_CONTROLLER_QUORUM_MODE", "kraft"); + withEnv("CLUSTER_ID", getEnvMap() + .computeIfAbsent( + "CLUSTER_ID", + key -> clusterId + )); withEnv( "KAFKA_NODE_ID", getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")) @@ -186,10 +203,18 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { brokerAdvertisedListener(containerInfo) ); - command += (kraftEnabled) ? commandKraft() : commandZookeeper(); + if(kraftEnabled && isLesThanCP740()) { + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + command += commandKraft(); + } + + if(!kraftEnabled) { + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + command += commandZookeeper(); + } - // Optimization: skip the checks - command += "echo '' > /etc/confluent/docker/ensure \n"; // Run the original command command += "/etc/confluent/docker/run \n"; copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT); @@ -197,13 +222,6 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { protected String commandKraft() { String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n"; - try { - if (clusterId == null) { - clusterId = execInContainer("kafka-storage", "random-uuid").getStdout().trim(); - } - } catch (IOException | InterruptedException e) { - logger().error("Failed to execute `kafka-storage random-uuid`. Exception message: {}", e.getMessage()); - } command += "echo 'kafka-storage format --ignore-formatted -t \"" + clusterId + diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index bad3c8d89ef..65535bd3913 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -133,7 +133,7 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception { } @Test - public void testUsageKraft() throws Exception { + public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception { try ( // withKraftMode { KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft() @@ -144,6 +144,18 @@ public void testUsageKraft() throws Exception { } } + @Test + public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception { + try ( + // withKraftMode { + KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft() + // } + ) { + kafka.start(); + testKafkaFunctionality(kafka.getBootstrapServers()); + } + } + @Test public void testNotSupportedKraftVersion() { try ( From 0288f1074d0b8b8870d0697e875c06a9c2317d3c Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Sun, 7 May 2023 18:09:31 +0200 Subject: [PATCH 2/7] Code formatting --- .../testcontainers/containers/KafkaContainer.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 23b455664f1..95dc7bd9bf2 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -6,9 +6,7 @@ import org.testcontainers.utility.ComparableVersion; import org.testcontainers.utility.DockerImageName; -import java.io.IOException; import java.util.Objects; -import java.util.UUID; /** * This container wraps Confluent Kafka and Zookeeper (optionally) @@ -148,11 +146,7 @@ protected void configure() { protected void configureKraft() { //CP 7.4.0 withEnv("KAFKA_CONTROLLER_QUORUM_MODE", "kraft"); - withEnv("CLUSTER_ID", getEnvMap() - .computeIfAbsent( - "CLUSTER_ID", - key -> clusterId - )); + withEnv("CLUSTER_ID", getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId)); withEnv( "KAFKA_NODE_ID", getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")) @@ -203,13 +197,13 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { brokerAdvertisedListener(containerInfo) ); - if(kraftEnabled && isLesThanCP740()) { + if (kraftEnabled && isLesThanCP740()) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; command += commandKraft(); } - if(!kraftEnabled) { + if (!kraftEnabled) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; command += commandZookeeper(); From d1ca5235d9fa84c9b0b4a4376bc309c8e9cc89b7 Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Tue, 9 May 2023 16:27:56 +0200 Subject: [PATCH 3/7] Fixing reviews --- examples/.idea/workspace.xml | 123 ++++++++++++++++++ .../containers/KafkaContainer.java | 7 +- .../containers/KafkaContainerTest.java | 2 - 3 files changed, 126 insertions(+), 6 deletions(-) create mode 100644 examples/.idea/workspace.xml diff --git a/examples/.idea/workspace.xml b/examples/.idea/workspace.xml new file mode 100644 index 00000000000..476bbd95733 --- /dev/null +++ b/examples/.idea/workspace.xml @@ -0,0 +1,123 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + false + true + false + + + + + + + + false + true + false + + + + + + + + + + + + + + + + + 1683474120866 + + + + + + \ No newline at end of file diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 95dc7bd9bf2..0197822df24 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -117,7 +117,7 @@ private void verifyMinKraftVersion() { } } - private boolean isLesThanCP740() { + private boolean isLessThanCP740() { String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart(); return new ComparableVersion(actualVersion).isLessThan("7.4.0"); } @@ -145,8 +145,7 @@ protected void configure() { protected void configureKraft() { //CP 7.4.0 - withEnv("KAFKA_CONTROLLER_QUORUM_MODE", "kraft"); - withEnv("CLUSTER_ID", getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId)); + getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId); withEnv( "KAFKA_NODE_ID", getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")) @@ -197,7 +196,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { brokerAdvertisedListener(containerInfo) ); - if (kraftEnabled && isLesThanCP740()) { + if (kraftEnabled && isLessThanCP740()) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; command += commandKraft(); diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index 65535bd3913..dac7360bdf2 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -147,9 +147,7 @@ public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception { @Test public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception { try ( - // withKraftMode { KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft() - // } ) { kafka.start(); testKafkaFunctionality(kafka.getBootstrapServers()); From 4cbc82d253df2449dd48caf1aaf875a92ca11234 Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Tue, 9 May 2023 16:28:13 +0200 Subject: [PATCH 4/7] Fixing reviews --- examples/.idea/workspace.xml | 123 ----------------------------------- 1 file changed, 123 deletions(-) delete mode 100644 examples/.idea/workspace.xml diff --git a/examples/.idea/workspace.xml b/examples/.idea/workspace.xml deleted file mode 100644 index 476bbd95733..00000000000 --- a/examples/.idea/workspace.xml +++ /dev/null @@ -1,123 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - false - true - false - - - - - - - - false - true - false - - - - - - - - - - - - - - - - - 1683474120866 - - - - - - \ No newline at end of file From 14a3148f2774385c5d0aa281543d336e8ea876a6 Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Tue, 9 May 2023 16:43:16 +0200 Subject: [PATCH 5/7] Updating documentation --- docs/modules/kafka.md | 19 ++- examples/.idea/workspace.xml | 123 ++++++++++++++++++ .../containers/KafkaContainerTest.java | 4 +- 3 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 examples/.idea/workspace.xml diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index 691c4f10bf8..40f524f101b 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -24,24 +24,23 @@ Now your tests or any other process running on your machine can get access to ru ## Options - -### Using external Zookeeper - -If for some reason you want to use an externally running Zookeeper, then just pass its location during construction: - -[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper - ### Using Kraft mode -The self-managed (Kraft) mode is available as a preview feature since version 3.0 (confluentinc/cp-kafka:7.0.x) and -declared as a production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x). +KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)" [Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode -See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details. +See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details. + +### Using external Zookeeper + +If for some reason you want to use an externally running Zookeeper, then just pass its location during construction: + +[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper + ## Multi-container usage diff --git a/examples/.idea/workspace.xml b/examples/.idea/workspace.xml new file mode 100644 index 00000000000..40f02ce7ae7 --- /dev/null +++ b/examples/.idea/workspace.xml @@ -0,0 +1,123 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + false + true + false + + + + + + + + false + true + false + + + + + + + + + + + + + + + + + 1683474120866 + + + + + + \ No newline at end of file diff --git a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java index dac7360bdf2..860c990cb32 100644 --- a/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java +++ b/modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java @@ -135,9 +135,7 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception { @Test public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception { try ( - // withKraftMode { KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft() - // } ) { kafka.start(); testKafkaFunctionality(kafka.getBootstrapServers()); @@ -147,7 +145,9 @@ public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception { @Test public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception { try ( + // withKraftMode { KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft() + // } ) { kafka.start(); testKafkaFunctionality(kafka.getBootstrapServers()); From 29731d7f19f018b694425f9caac396703b2e6312 Mon Sep 17 00:00:00 2001 From: Daniel Petisme Date: Tue, 9 May 2023 16:43:34 +0200 Subject: [PATCH 6/7] Updating documentation --- examples/.idea/workspace.xml | 123 ----------------------------------- 1 file changed, 123 deletions(-) delete mode 100644 examples/.idea/workspace.xml diff --git a/examples/.idea/workspace.xml b/examples/.idea/workspace.xml deleted file mode 100644 index 40f02ce7ae7..00000000000 --- a/examples/.idea/workspace.xml +++ /dev/null @@ -1,123 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - false - true - false - - - - - - - - false - true - false - - - - - - - - - - - - - - - - - 1683474120866 - - - - - - \ No newline at end of file From ab8c1e32b97c3d5c3bf91da7b1fffd34acb9f72a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edd=C3=BA=20Mel=C3=A9ndez?= Date: Tue, 9 May 2023 17:38:14 -0600 Subject: [PATCH 7/7] Polish --- docs/modules/kafka.md | 16 ++++---- .../containers/KafkaContainer.java | 41 ++++++++----------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index 40f524f101b..13f8cf3b0a8 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -24,6 +24,13 @@ Now your tests or any other process running on your machine can get access to ru ## Options + +### Using external Zookeeper + +If for some reason you want to use an externally running Zookeeper, then just pass its location during construction: + +[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper + ### Using Kraft mode @@ -33,14 +40,7 @@ KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)" [Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode -See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details. - -### Using external Zookeeper - -If for some reason you want to use an externally running Zookeeper, then just pass its location during construction: - -[External Zookeeper](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withExternalZookeeper - +See the [versions interoperability matrix](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) for more details. ## Multi-container usage diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index 0197822df24..d00b15a1030 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -10,7 +10,6 @@ /** * This container wraps Confluent Kafka and Zookeeper (optionally) - * */ public class KafkaContainer extends GenericContainer { @@ -100,7 +99,7 @@ public KafkaContainer withKraft() { throw new IllegalStateException("Cannot configure Kraft mode when Zookeeper configured"); } verifyMinKraftVersion(); - kraftEnabled = true; + this.kraftEnabled = true; return self(); } @@ -134,7 +133,7 @@ public String getBootstrapServers() { @Override protected void configure() { - if (kraftEnabled) { + if (this.kraftEnabled) { waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1)); configureKraft(); } else { @@ -146,10 +145,7 @@ protected void configure() { protected void configureKraft() { //CP 7.4.0 getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId); - withEnv( - "KAFKA_NODE_ID", - getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")) - ); + getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID")); withEnv( "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP")) @@ -157,20 +153,17 @@ protected void configureKraft() { withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS"))); withEnv("KAFKA_PROCESS_ROLES", "broker,controller"); - withEnv( - "KAFKA_CONTROLLER_QUORUM_VOTERS", - getEnvMap() - .computeIfAbsent( - "KAFKA_CONTROLLER_QUORUM_VOTERS", - key -> { - return String.format( - "%s@%s:9094", - getEnvMap().get("KAFKA_NODE_ID"), - getNetwork() != null ? getNetworkAliases().get(0) : "localhost" - ); - } - ) - ); + getEnvMap() + .computeIfAbsent( + "KAFKA_CONTROLLER_QUORUM_VOTERS", + key -> { + return String.format( + "%s@%s:9094", + getEnvMap().get("KAFKA_NODE_ID"), + getNetwork() != null ? getNetworkAliases().get(0) : "localhost" + ); + } + ); withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER"); } @@ -196,13 +189,13 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { brokerAdvertisedListener(containerInfo) ); - if (kraftEnabled && isLessThanCP740()) { + if (this.kraftEnabled && isLessThanCP740()) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; command += commandKraft(); } - if (!kraftEnabled) { + if (!this.kraftEnabled) { // Optimization: skip the checks command += "echo '' > /etc/confluent/docker/ensure \n"; command += commandZookeeper(); @@ -217,7 +210,7 @@ protected String commandKraft() { String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n"; command += "echo 'kafka-storage format --ignore-formatted -t \"" + - clusterId + + this.clusterId + "\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n"; return command; }