Skip to content

Commit d9d6bd3

Browse files
authored
SpringBoot - Kafka Sample (#493)
* SpringBoot - Kafka Sample Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * add comment on tests Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> * small fix for test Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io> --------- Signed-off-by: Tihomir Surdilovic <tihomir@temporal.io>
1 parent b7c642a commit d9d6bd3

20 files changed

Lines changed: 814 additions & 3 deletions

build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ plugins {
22
id 'org.cadixdev.licenser' version '0.6.1'
33
id "net.ltgt.errorprone" version "3.1.0"
44
id 'com.diffplug.spotless' version '6.17.0' apply false
5-
id 'org.springframework.boot' version '2.7.12'
5+
id 'org.springframework.boot' version '2.7.13'
66
}
77

88
subprojects {
@@ -39,6 +39,7 @@ subprojects {
3939
exclude '**/*.yaml'
4040
exclude '**/*.yml'
4141
exclude '**/*.html'
42+
exclude '**/*.js'
4243
}
4344

4445
if (JavaVersion.current().isJava11Compatible()) {

springboot/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ dependencies {
55
implementation "org.springframework.boot:spring-boot-starter-thymeleaf"
66
implementation "org.springframework.boot:spring-boot-starter-actuator"
77
implementation "org.springframework.boot:spring-boot-starter-data-jpa"
8+
implementation "org.springframework.kafka:spring-kafka"
9+
// we set this as impl depends to use embedded kafka in samples not just tests
10+
implementation "org.springframework.kafka:spring-kafka-test"
811
implementation "io.temporal:temporal-spring-boot-starter-alpha:$javaSDKVersion"
912
runtimeOnly "io.micrometer:micrometer-registry-prometheus"
1013
runtimeOnly "com.h2database:h2"

springboot/src/main/java/io/temporal/samples/springboot/SamplesController.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.temporal.client.WorkflowUpdateException;
2727
import io.temporal.samples.springboot.hello.HelloWorkflow;
2828
import io.temporal.samples.springboot.hello.model.Person;
29+
import io.temporal.samples.springboot.kafka.MessageWorkflow;
2930
import io.temporal.samples.springboot.update.PurchaseWorkflow;
3031
import io.temporal.samples.springboot.update.model.ProductRepository;
3132
import io.temporal.samples.springboot.update.model.Purchase;
@@ -123,4 +124,32 @@ ResponseEntity purchase(@RequestBody Purchase purchase) {
123124
return new ResponseEntity("\"" + message + "\"", HttpStatus.NOT_FOUND);
124125
}
125126
}
127+
128+
@GetMapping("/kafka")
129+
public String afka(Model model) {
130+
model.addAttribute("sample", "Kafka Request / Reply");
131+
return "kafka";
132+
}
133+
134+
@PostMapping(
135+
value = "/kafka",
136+
consumes = {MediaType.TEXT_PLAIN_VALUE},
137+
produces = {MediaType.TEXT_HTML_VALUE})
138+
ResponseEntity sendToKafka(@RequestBody String message) {
139+
MessageWorkflow workflow =
140+
client.newWorkflowStub(
141+
MessageWorkflow.class,
142+
WorkflowOptions.newBuilder()
143+
.setTaskQueue("KafkaSampleTaskQueue")
144+
.setWorkflowId("MessageSample")
145+
.build());
146+
147+
WorkflowClient.start(workflow::start);
148+
workflow.update(message);
149+
150+
// wait till exec completes
151+
WorkflowStub.fromTyped(workflow).getResult(Void.class);
152+
// bypass thymeleaf, don't return template name just result
153+
return new ResponseEntity("\" Message workflow completed\"", HttpStatus.OK);
154+
}
126155
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import io.temporal.activity.ActivityInterface;
23+
24+
@ActivityInterface
25+
public interface KafkaActivity {
26+
void sendMessage(String message);
27+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import io.temporal.spring.boot.ActivityImpl;
23+
import org.springframework.beans.factory.annotation.Autowired;
24+
import org.springframework.beans.factory.annotation.Value;
25+
import org.springframework.kafka.core.KafkaTemplate;
26+
import org.springframework.stereotype.Component;
27+
28+
@Component
29+
@ActivityImpl(taskQueues = "KafkaSampleTaskQueue")
30+
public class KafkaActivityImpl implements KafkaActivity {
31+
32+
// Setting required to false means we won't fail
33+
// if a test does not have kafka enabled
34+
@Autowired(required = false)
35+
private KafkaTemplate<String, String> kafkaTemplate;
36+
37+
@Value(value = "${samples.message.topic.name}")
38+
private String topicName;
39+
40+
@Override
41+
public void sendMessage(String message) {
42+
kafkaTemplate.send(topicName, message);
43+
}
44+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import java.io.IOException;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import org.apache.kafka.clients.admin.AdminClientConfig;
26+
import org.apache.kafka.clients.admin.NewTopic;
27+
import org.springframework.beans.factory.annotation.Autowired;
28+
import org.springframework.beans.factory.annotation.Value;
29+
import org.springframework.context.annotation.Bean;
30+
import org.springframework.context.annotation.Configuration;
31+
import org.springframework.context.annotation.Profile;
32+
import org.springframework.kafka.annotation.KafkaListener;
33+
import org.springframework.kafka.core.KafkaAdmin;
34+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
35+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
36+
37+
@Configuration()
38+
@Profile("!test")
39+
public class KafkaConfig {
40+
@Value(value = "${spring.kafka.bootstrap-servers}")
41+
private String bootstrapAddress;
42+
43+
@Value(value = "${samples.message.topic.name}")
44+
private String topicName;
45+
46+
@Autowired private MessageController messageController;
47+
48+
@Bean
49+
EmbeddedKafkaBroker broker() {
50+
return new EmbeddedKafkaBroker(1)
51+
.kafkaPorts(9092)
52+
.brokerListProperty("spring.kafka.bootstrap-servers");
53+
}
54+
55+
@Bean
56+
public KafkaAdmin kafkaAdmin() {
57+
Map<String, Object> configs = new HashMap<>();
58+
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
59+
return new KafkaAdmin(configs);
60+
}
61+
62+
@Bean
63+
public NewTopic samplesTopic() {
64+
return new NewTopic(topicName, 1, (short) 1);
65+
}
66+
67+
@KafkaListener(id = "samples-topic", topics = "samples-topic")
68+
public void kafkaListener(String message) {
69+
SseEmitter latestEm = messageController.getLatestEmitter();
70+
71+
try {
72+
latestEm.send(message);
73+
} catch (IOException e) {
74+
latestEm.completeWithError(e);
75+
}
76+
}
77+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import org.springframework.web.bind.annotation.GetMapping;
25+
import org.springframework.web.bind.annotation.RestController;
26+
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
27+
28+
@RestController
29+
public class MessageController {
30+
private final List<SseEmitter> emitters = new ArrayList<>();
31+
32+
@GetMapping("/kafka-messages")
33+
public SseEmitter getKafkaMessages() {
34+
35+
SseEmitter emitter = new SseEmitter(60 * 1000L);
36+
emitters.add(emitter);
37+
38+
emitter.onCompletion(() -> emitters.remove(emitter));
39+
40+
emitter.onTimeout(() -> emitters.remove(emitter));
41+
42+
return emitter;
43+
}
44+
45+
public List<SseEmitter> getEmitters() {
46+
return emitters;
47+
}
48+
49+
public SseEmitter getLatestEmitter() {
50+
return (emitters.isEmpty()) ? null : emitters.get(emitters.size() - 1);
51+
}
52+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import io.temporal.workflow.SignalMethod;
23+
import io.temporal.workflow.WorkflowInterface;
24+
import io.temporal.workflow.WorkflowMethod;
25+
26+
@WorkflowInterface
27+
public interface MessageWorkflow {
28+
29+
@WorkflowMethod
30+
void start();
31+
32+
@SignalMethod
33+
void update(String message);
34+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright (c) 2020 Temporal Technologies, Inc. All Rights Reserved
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.samples.springboot.kafka;
21+
22+
import io.temporal.activity.ActivityOptions;
23+
import io.temporal.spring.boot.WorkflowImpl;
24+
import io.temporal.workflow.Workflow;
25+
import java.time.Duration;
26+
27+
@WorkflowImpl(taskQueues = "KafkaSampleTaskQueue")
28+
public class MessageWorkflowImpl implements MessageWorkflow {
29+
30+
private KafkaActivity activity =
31+
Workflow.newActivityStub(
32+
KafkaActivity.class,
33+
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(2)).build());
34+
private String message = null;
35+
36+
@Override
37+
public void start() {
38+
Workflow.await(() -> message != null);
39+
// simulate some steps / milestones
40+
activity.sendMessage(
41+
"Starting execution: " + Workflow.getInfo().getWorkflowId() + " with message: " + message);
42+
43+
activity.sendMessage("Step 1 done...");
44+
activity.sendMessage("Step 2 done...");
45+
activity.sendMessage("Step 3 done...");
46+
47+
activity.sendMessage("Completing execution: " + Workflow.getInfo().getWorkflowId());
48+
}
49+
50+
@Override
51+
public void update(String message) {
52+
this.message = message;
53+
}
54+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# SpringBoot Kafka Request / Reply Sample
2+
3+
1. Start SpringBoot from main samples repo directory:
4+
5+
./gradlew bootRun
6+
7+
2. In your browser navigate to:
8+
9+
http://localhost:3030/kafka
10+
11+
## Use Case
12+
When you start adopting Temporal in your existing applications it can
13+
very often become a much better replacement to any queueing techs like Kafka.
14+
Even tho we can replace big parts of our unreliable architecture with Temporal
15+
often it's not a complete replacement and we still have services that produce
16+
messages/events to Kafka topics and workflow results need to be pushed to Kafka in order
17+
to be consumed by these existing services.
18+
In this sample we show how messages sent to Kafka topics can trigger workflow execution
19+
as well as how via activities we can produce messages to Kafka topics that can be consumed
20+
by other existing services you might have.
21+
22+
## How to use
23+
Enter a message you want to set to Kafka topic. Message consumer when it receives it
24+
will start a workflow execution and deliver message to it as signal.
25+
Workflow execution performs some sample steps. For each step it executes an activity which uses
26+
Kafka producer to send message to Kafka topic.
27+
In the UI we listen on this kafka topic and you will see all messages produced by activities
28+
show up dynamically as they are happening.
29+
30+
## Note
31+
We use embedded (in-memory) Kafka to make it easier to run this sample.
32+
You should not use this in your applications outside of tests.

0 commit comments

Comments
 (0)