Skip to content

Commit 5d3f08d

Browse files
committed
Fix the whitespace and newline issues - wuth code formatter
1 parent 80cd71f commit 5d3f08d

6 files changed

Lines changed: 40 additions & 22 deletions

File tree

apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/payload/UserEvent.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package com.baeldung.kafka.message.ordering.payload;
22

33
import java.util.Objects;
4+
45
public class UserEvent implements Comparable<UserEvent> {
56
private String userEventId;
67
private long eventNanoTime;
78
private long globalSequenceNumber;
89

910
@SuppressWarnings("unused")
10-
public UserEvent(){
11+
public UserEvent() {
1112
// Required for Jackson Serialization and Deserialization
1213
}
1314

apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonDeserializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.baeldung.kafka.message.ordering.serialization;
2+
23
import com.baeldung.kafka.message.ordering.Config;
34
import com.fasterxml.jackson.databind.ObjectMapper;
5+
46
import org.apache.kafka.common.serialization.Deserializer;
57

68
import java.util.Map;
@@ -12,7 +14,6 @@ public class JacksonDeserializer<T> implements Deserializer<T> {
1214
private final ObjectMapper objectMapper = new ObjectMapper();
1315
private Class<T> type;
1416

15-
1617
@Override
1718
public void configure(Map<String, ?> configs, boolean isKey) {
1819
this.type = (Class<T>) configs.get(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS);

apache-kafka-2/src/main/java/com/baeldung/kafka/message/ordering/serialization/JacksonSerializer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.baeldung.kafka.message.ordering.serialization;
22

33
import com.fasterxml.jackson.databind.ObjectMapper;
4+
45
import org.apache.kafka.common.serialization.Serializer;
56

67
/**

apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/ExternalSequenceWithTimeWindowIntegrationTest.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.baeldung.kafka.message.ordering.payload.UserEvent;
55
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
66
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
7+
78
import org.apache.kafka.clients.admin.*;
89
import org.apache.kafka.clients.consumer.ConsumerConfig;
910
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -23,11 +24,14 @@
2324
import org.testcontainers.junit.jupiter.Container;
2425
import org.testcontainers.junit.jupiter.Testcontainers;
2526
import org.testcontainers.utility.DockerImageName;
27+
2628
import java.time.Duration;
2729
import java.util.*;
2830
import java.util.concurrent.ExecutionException;
2931
import java.util.concurrent.Future;
32+
3033
import com.google.common.collect.ImmutableList;
34+
3135
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
3236

3337
@Testcontainers
@@ -37,8 +41,8 @@ public class ExternalSequenceWithTimeWindowIntegrationTest {
3741
private static KafkaProducer<Long, UserEvent> producer;
3842
private static KafkaConsumer<Long, UserEvent> consumer;
3943
private static final Duration TIMEOUT_WAIT_FOR_MESSAGES = Duration.ofSeconds(5);
40-
private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5).toNanos();
41-
44+
private static final long BUFFER_PERIOD_NS = Duration.ofSeconds(5)
45+
.toNanos();
4246
private static Logger logger = LoggerFactory.getLogger(ExternalSequenceWithTimeWindowIntegrationTest.class);
4347

4448
@Container
@@ -66,7 +70,9 @@ static void setup() throws ExecutionException, InterruptedException {
6670
admin = Admin.create(adminProperties);
6771
producer = new KafkaProducer<>(producerProperties);
6872
consumer = new KafkaConsumer<>(consumerProperties);
69-
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get();
73+
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
74+
.all()
75+
.get();
7076
}
7177

7278
@AfterAll
@@ -78,8 +84,9 @@ static void destroy() {
7884
void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndTimeWindow_thenCheckForMessageOrder() throws ExecutionException, InterruptedException {
7985
List<UserEvent> sentUserEventList = new ArrayList<>();
8086
List<UserEvent> receivedUserEventList = new ArrayList<>();
81-
for (long sequenceNumber = 1; sequenceNumber <= 10 ; sequenceNumber++) {
82-
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
87+
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
88+
UserEvent userEvent = new UserEvent(UUID.randomUUID()
89+
.toString());
8390
userEvent.setEventNanoTime(System.nanoTime());
8491
userEvent.setGlobalSequenceNumber(sequenceNumber);
8592
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
@@ -105,9 +112,8 @@ void givenMultiplePartitions_whenPublishedToKafkaAndConsumedWithExtSeqNumberAndT
105112
buffer.add(record.value());
106113
});
107114
}
108-
assertThat(receivedUserEventList)
109-
.isEqualTo(sentUserEventList)
110-
.containsExactlyElementsOf(sentUserEventList);
115+
assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
116+
.containsExactlyElementsOf(sentUserEventList);
111117
}
112118

113119
private static void processBuffer(List<UserEvent> buffer, List<UserEvent> receivedUserEventList) {

apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/MultiplePartitionIntegrationTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.baeldung.kafka.message.ordering.payload.UserEvent;
44
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
55
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
6+
67
import org.apache.kafka.clients.admin.*;
78
import org.apache.kafka.clients.consumer.ConsumerConfig;
89
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -22,11 +23,14 @@
2223
import org.testcontainers.junit.jupiter.Container;
2324
import org.testcontainers.junit.jupiter.Testcontainers;
2425
import org.testcontainers.utility.DockerImageName;
26+
2527
import java.time.Duration;
2628
import java.util.*;
2729
import java.util.concurrent.ExecutionException;
2830
import java.util.concurrent.Future;
31+
2932
import com.google.common.collect.ImmutableList;
33+
3034
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
3135

3236
@Testcontainers
@@ -63,7 +67,9 @@ static void setup() throws ExecutionException, InterruptedException {
6367
admin = Admin.create(adminProperties);
6468
producer = new KafkaProducer<>(producerProperties);
6569
consumer = new KafkaConsumer<>(consumerProperties);
66-
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR))).all().get();
70+
admin.createTopics(ImmutableList.of(new NewTopic(Config.MULTI_PARTITION_TOPIC, Config.MULTIPLE_PARTITIONS, Config.REPLICATION_FACTOR)))
71+
.all()
72+
.get();
6773
}
6874

6975
@AfterAll
@@ -76,7 +82,8 @@ void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessage
7682
List<UserEvent> sentUserEventList = new ArrayList<>();
7783
List<UserEvent> receivedUserEventList = new ArrayList<>();
7884
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
79-
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
85+
UserEvent userEvent = new UserEvent(UUID.randomUUID()
86+
.toString());
8087
userEvent.setGlobalSequenceNumber(sequenceNumber);
8188
userEvent.setEventNanoTime(System.nanoTime());
8289
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(Config.MULTI_PARTITION_TOPIC, sequenceNumber, userEvent));
@@ -92,8 +99,7 @@ void givenMultiplePartitions_whenPublishedToKafkaAndConsumed_thenCheckForMessage
9299
receivedUserEventList.add(userEvent);
93100
logger.info("User Event ID: " + userEvent.getUserEventId());
94101
});
95-
assertThat(receivedUserEventList)
96-
.isNotEqualTo(sentUserEventList)
102+
assertThat(receivedUserEventList).isNotEqualTo(sentUserEventList)
97103
.containsExactlyInAnyOrderElementsOf(sentUserEventList);
98104
}
99105
}

apache-kafka-2/src/test/java/com/baeldung/kafka/message/ordering/SinglePartitionIntegrationTest.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.baeldung.kafka.message.ordering.payload.UserEvent;
44
import com.baeldung.kafka.message.ordering.serialization.JacksonDeserializer;
55
import com.baeldung.kafka.message.ordering.serialization.JacksonSerializer;
6+
67
import org.apache.kafka.clients.admin.Admin;
78
import org.apache.kafka.clients.admin.AdminClientConfig;
89
import org.apache.kafka.clients.admin.NewTopic;
@@ -29,7 +30,9 @@
2930
import java.util.*;
3031
import java.util.concurrent.ExecutionException;
3132
import java.util.concurrent.Future;
33+
3234
import com.google.common.collect.ImmutableList;
35+
3336
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
3437

3538
@Testcontainers
@@ -56,7 +59,6 @@ static void setup() throws ExecutionException, InterruptedException {
5659
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
5760
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
5861
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName());
59-
producer = new KafkaProducer<>(producerProperties);
6062

6163
Properties consumerProperties = new Properties();
6264
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
@@ -65,11 +67,12 @@ static void setup() throws ExecutionException, InterruptedException {
6567
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
6668
consumerProperties.put(Config.CONSUMER_VALUE_DESERIALIZER_SERIALIZED_CLASS, UserEvent.class);
6769
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
68-
consumer = new KafkaConsumer<>(consumerProperties);
6970
admin = Admin.create(adminProperties);
70-
71-
72-
admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR))).all().get();
71+
producer = new KafkaProducer<>(producerProperties);
72+
consumer = new KafkaConsumer<>(consumerProperties);
73+
admin.createTopics(ImmutableList.of(new NewTopic(Config.SINGLE_PARTITION_TOPIC, Config.SINGLE_PARTITION, Config.REPLICATION_FACTOR)))
74+
.all()
75+
.get();
7376
}
7477

7578
@AfterAll
@@ -82,7 +85,8 @@ void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOr
8285
List<UserEvent> sentUserEventList = new ArrayList<>();
8386
List<UserEvent> receivedUserEventList = new ArrayList<>();
8487
for (long sequenceNumber = 1; sequenceNumber <= 10; sequenceNumber++) {
85-
UserEvent userEvent = new UserEvent(UUID.randomUUID().toString());
88+
UserEvent userEvent = new UserEvent(UUID.randomUUID()
89+
.toString());
8690
userEvent.setGlobalSequenceNumber(sequenceNumber);
8791
userEvent.setEventNanoTime(System.nanoTime());
8892
ProducerRecord<Long, UserEvent> producerRecord = new ProducerRecord<>(Config.SINGLE_PARTITION_TOPIC, userEvent);
@@ -99,8 +103,7 @@ void givenASinglePartition_whenPublishedToKafkaAndConsumed_thenCheckForMessageOr
99103
receivedUserEventList.add(userEvent);
100104
logger.info("User Event ID: " + userEvent.getUserEventId());
101105
});
102-
assertThat(receivedUserEventList)
103-
.isEqualTo(sentUserEventList)
106+
assertThat(receivedUserEventList).isEqualTo(sentUserEventList)
104107
.containsExactlyElementsOf(sentUserEventList);
105108
}
106109
}

0 commit comments

Comments
 (0)