Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ If for some reason you want to use an externally running Zookeeper, then just pa

### Using Kraft mode

The self-managed (Kraft) mode is available as a preview feature since version 3.0 (confluentinc/cp-kafka:7.0.x) and
declared as a production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x).
KRaft mode was declared production ready in 3.3.1 (confluentinc/cp-kafka:7.3.x)"

<!--codeinclude-->
[Kraft mode](../../modules/kafka/src/test/java/org/testcontainers/containers/KafkaContainerTest.java) inside_block:withKraftMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public void testKafkaContainerKraftCluster() throws Exception {
}
}

@Test
public void testKafkaContainerKraftClusterAfterConfluentPlatform740() throws Exception {
try (KafkaContainerKraftCluster cluster = new KafkaContainerKraftCluster("7.4.0", 3, 2)) {
cluster.start();
String bootstrapServers = cluster.getBootstrapServers();

assertThat(cluster.getBrokers()).hasSize(3);

testKafkaFunctionality(bootstrapServers, 3, 2);
}
}

protected void testKafkaFunctionality(String bootstrapServers, int partitions, int rf) throws Exception {
try (
AdminClient adminClient = AdminClient.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@
import org.testcontainers.utility.ComparableVersion;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.util.Objects;

/**
* This container wraps Confluent Kafka and Zookeeper (optionally)
*
*/
public class KafkaContainer extends GenericContainer<KafkaContainer> {

Expand All @@ -29,11 +28,13 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
// https://docs.confluent.io/platform/7.0.0/release-notes/index.html#ak-raft-kraft
private static final String MIN_KRAFT_TAG = "7.0.0";

public static final String DEFAULT_CLUSTER_ID = "4L6g3nShT-eMCtK--X86sw";

protected String externalZookeeperConnect = null;

private boolean kraftEnabled = false;

private String clusterId;
private String clusterId = DEFAULT_CLUSTER_ID;

/**
* @deprecated use {@link KafkaContainer(DockerImageName)} instead
Expand Down Expand Up @@ -98,7 +99,7 @@ public KafkaContainer withKraft() {
throw new IllegalStateException("Cannot configure Kraft mode when Zookeeper configured");
}
verifyMinKraftVersion();
kraftEnabled = true;
this.kraftEnabled = true;
return self();
}

Expand All @@ -115,7 +116,13 @@ private void verifyMinKraftVersion() {
}
}

private boolean isLessThanCP740() {
String actualVersion = DockerImageName.parse(getDockerImageName()).getVersionPart();
return new ComparableVersion(actualVersion).isLessThan("7.4.0");
}

public KafkaContainer withClusterId(String clusterId) {
Objects.requireNonNull(clusterId, "clusterId cannot be null");
this.clusterId = clusterId;
return self();
}
Expand All @@ -126,7 +133,7 @@ public String getBootstrapServers() {

@Override
protected void configure() {
if (kraftEnabled) {
if (this.kraftEnabled) {
waitingFor(Wait.forLogMessage(".*Transitioning from RECOVERY to RUNNING.*", 1));
configureKraft();
} else {
Expand All @@ -136,31 +143,27 @@ protected void configure() {
}

protected void configureKraft() {
withEnv(
"KAFKA_NODE_ID",
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"))
);
//CP 7.4.0
getEnvMap().computeIfAbsent("CLUSTER_ID", key -> clusterId);
getEnvMap().computeIfAbsent("KAFKA_NODE_ID", key -> getEnvMap().get("KAFKA_BROKER_ID"));
withEnv(
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP",
String.format("%s,CONTROLLER:PLAINTEXT", getEnvMap().get("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"))
);
withEnv("KAFKA_LISTENERS", String.format("%s,CONTROLLER://0.0.0.0:9094", getEnvMap().get("KAFKA_LISTENERS")));

withEnv("KAFKA_PROCESS_ROLES", "broker,controller");
withEnv(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
)
);
getEnvMap()
.computeIfAbsent(
"KAFKA_CONTROLLER_QUORUM_VOTERS",
key -> {
return String.format(
"%s@%s:9094",
getEnvMap().get("KAFKA_NODE_ID"),
getNetwork() != null ? getNetworkAliases().get(0) : "localhost"
);
}
);
withEnv("KAFKA_CONTROLLER_LISTENER_NAMES", "CONTROLLER");
}

Expand All @@ -186,27 +189,28 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
brokerAdvertisedListener(containerInfo)
);

command += (kraftEnabled) ? commandKraft() : commandZookeeper();
if (this.kraftEnabled && isLessThanCP740()) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandKraft();
}

if (!this.kraftEnabled) {
// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
command += commandZookeeper();
}

// Optimization: skip the checks
command += "echo '' > /etc/confluent/docker/ensure \n";
// Run the original command
command += "/etc/confluent/docker/run \n";
copyFileToContainer(Transferable.of(command, 0777), STARTER_SCRIPT);
}

protected String commandKraft() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this only called for < CP 7.4? If yes, it may be good to add a defense in depth and check that it is indeed isLessThanCP740 ()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it supposed to be invoked in Zk mode or when Kraft AND CP <7.4
https://github.com/testcontainers/testcontainers-java/pull/7014/files#diff-7c5a407b71c96d4816697ed454df5cb084987573025af294ffa6c182dbd8879eR199-R209

Would like to add a test?

TBH, I'm questioning the optimization gains of removing the checks /etc/confluent/docker/ensure contains... For now, I suggested this solution to be fully backward compatible.

@eddumelendez here are the checks the script is performing... IMHO, the gain does not worth the "complexity" of the code... but that's my 2cts...
https://github.com/confluentinc/kafka-images/blob/master/kafka/include/etc/confluent/docker/ensure

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, the module is already complex enough 😅 Let's rollback that change, please

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done
#7030

String command = "sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure\n";
try {
if (clusterId == null) {
clusterId = execInContainer("kafka-storage", "random-uuid").getStdout().trim();
}
} catch (IOException | InterruptedException e) {
logger().error("Failed to execute `kafka-storage random-uuid`. Exception message: {}", e.getMessage());
}
command +=
"echo 'kafka-storage format --ignore-formatted -t \"" +
clusterId +
this.clusterId +
"\" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure\n";
return command;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,20 @@ public void testWithHostExposedPortAndExternalNetwork() throws Exception {
}

@Test
public void testUsageKraft() throws Exception {
public void testUsageKraftBeforeConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.0.1")).withKraft()
) {
kafka.start();
testKafkaFunctionality(kafka.getBootstrapServers());
}
}

@Test
public void testUsageKraftAfterConfluentPlatformVersion74() throws Exception {
try (
// withKraftMode {
Comment thread
danielpetisme marked this conversation as resolved.
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0")).withKraft()
// }
Comment thread
danielpetisme marked this conversation as resolved.
) {
kafka.start();
Expand Down