From f5efa43583febd23f33499081eda26787aa5b625 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 14:52:11 +0800 Subject: [PATCH 01/17] update --- 02nio/nio01/pom.xml | 6 ++ 02nio/nio01/src/main/java/Main.java | 2 +- .../java/io/kimmking/java8/ForeachDemo.java | 22 +++---- .../java/io/kimmking/java8/LambdaDemo.java | 61 ++++++++++--------- .../java/io/kimmking/java8/StreamDemo.java | 57 +++++++++++------ .../mq/activemq/ActivemqApplication.java | 6 +- 6 files changed, 90 insertions(+), 64 deletions(-) diff --git a/02nio/nio01/pom.xml b/02nio/nio01/pom.xml index e42a62f2..3ae38c66 100644 --- a/02nio/nio01/pom.xml +++ b/02nio/nio01/pom.xml @@ -59,6 +59,12 @@ netty-all 4.1.51.Final + + io.netty + netty-all + 4.1.55.Final + compile + diff --git a/02nio/nio01/src/main/java/Main.java b/02nio/nio01/src/main/java/Main.java index b07dca92..35cc141f 100644 --- a/02nio/nio01/src/main/java/Main.java +++ b/02nio/nio01/src/main/java/Main.java @@ -10,7 +10,7 @@ public class Main { public static void main(String[] args) { - Map map = new HashMap<>(); + Map map = new HashMap(); map.put("1", HttpServer01.class); map.put("2", HttpServer02.class); map.put("3", HttpServer03.class); diff --git a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java index b4f8507d..b6d24957 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/ForeachDemo.java @@ -4,25 +4,25 @@ import java.util.List; public class ForeachDemo { - - private int x=1; - + + private int x = 1; + public static void main(String[] args) { - + ForeachDemo demo = new ForeachDemo(); - + demo.test(); - + System.out.println(demo.x); } - + private void test() { - List list = Arrays.asList(1,2); + List list = Arrays.asList(1, 2); int y = 1; list.forEach(e -> { - x=2; - //y=2; // can't be compiled + x = 2; +// y = 2; // can't be compiled }); } - + } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java index f1dffef0..545cba21 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/LambdaDemo.java @@ -3,76 +3,77 @@ import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Optional; -public class LambdaDemo { - - public static void main(String args[]){ +public class LambdaDemo { + + public static void main(String args[]) { LambdaDemo demo = new LambdaDemo(); - + MathOperation op = new MathOperation() { @Override public Integer operation(int a, int b) { return 1; } }; - + MathOperation op1 = (a, b) -> 1; - + // 类型声明 MathOperation addition = (int a, int b) -> a + b; - - // 不用类型声明 + + // 不用类型声明 MathOperation subtraction = (a, b) -> a - b + 1.0; - + // 大括号中的返回语句 - MathOperation multiplication = (int a, int b) -> { + MathOperation multiplication = (int a, int b) -> { int c = 1000; - return a * b + c; + return a * b + c; }; - + // 没有大括号及返回语句 MathOperation division = (int a, int b) -> a / b; - + System.out.println("10 + 5 = " + demo.operate(10, 5, addition)); System.out.println("10 - 5 = " + demo.operate(10, 5, subtraction)); System.out.println("10 x 5 = " + demo.operate(10, 5, multiplication)); System.out.println("10 / 5 = " + demo.operate(10, 5, division)); - + //System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> new Double(Math.pow(a,b)).intValue())); - - System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a,b))); - + + System.out.println("10 ^ 5 = " + demo.operate(10, 5, (a, b) -> Math.pow(a, b))); + // 不用括号 GreetingService greetService1 = message -> System.out.println("Hello " + message); - + // 用括号 GreetingService greetService2 = (message) -> System.out.println("Hello " + message); - + GreetingService greetService3 = System.out::println; - - Arrays.asList(1,2,3).forEach( x -> System.out.println(x+3)); - Arrays.asList(1,2,3).forEach( LambdaDemo::println ); - + + Arrays.asList(1, 2, 3).forEach(x -> System.out.println(x + 3)); + Arrays.asList(1, 2, 3).forEach(LambdaDemo::println); + greetService1.sayMessage("kimmking"); greetService2.sayMessage("Java"); } - + private static void println(int x) { - System.out.println(x+3); + System.out.println(x + 3); } - + interface MathOperation { T operation(int a, int b); // 返回类型+函数名+参数类型的列表 } - + interface GreetingService { void sayMessage(String message); } - - private T operate(int a, int b, MathOperation mathOperation){ + + private T operate(int a, int b, MathOperation mathOperation) { return mathOperation.operation(a, b); } - + } diff --git a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java index 4ec48773..937f110c 100644 --- a/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java +++ b/04fx/java8/src/main/java/io/kimmking/java8/StreamDemo.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSON; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; @@ -11,39 +12,57 @@ import java.util.stream.Collectors; public class StreamDemo { - + public static void main(String[] args) throws IOException { - - List list = Arrays.asList(4,2,3,5,1,2,2,7,6); + + List list = Arrays.asList(4, 2, 3, 5, 1, 2, 2, 7, 6); print(list); - + // Optional Optional first = list.stream().findFirst(); - + System.out.println(first.map(i -> i * 100).orElse(100)); - - int sum = list.stream().filter( i -> i<4).distinct().reduce(0,(a,b)->a+b); - System.out.println("sum="+sum); - + + int sum = list.stream().filter(i -> i < 4).distinct().reduce(0, (a, b) -> a + b); + System.out.println("sum=" + sum); + //Map map = list.stream().collect(Collectors.toMap(a->a,a->(a+1))); - Map map = list.parallelStream().collect(Collectors.toMap(a->a,a->(a+1),(a,b)->a, LinkedHashMap::new)); + Map map = + list.parallelStream().collect(Collectors.toMap(a -> a, a -> (a + 1), (a, b) -> a, LinkedHashMap::new)); print(map); - - + + map.forEach((k, v) -> System.out.println("key:value = " + k + ":" + v)); - List list1 = map.entrySet().parallelStream().map(e -> e.getKey()+e.getValue()).collect(Collectors.toList()); + List list1 = + map.entrySet().parallelStream().map(e -> e.getKey() + e.getValue()).collect(Collectors.toList()); print(list1); - + // parallelStream() - + // 总结: // 1. Fluent API:继续Stream // 2. 终止操作:得到结果 - - + + test(); + test2(); + } + + static int one; + + static void test() { + List b = new ArrayList<>(); + b.add(1); + b.forEach((x) -> one = 2); + System.out.println("test" + one); + } + + static void test2() { + List b = new ArrayList<>(); + b.add(1); + b.forEach(x -> x = 2); + System.out.println("test" + b.get(0)); } - - + private static void print(Object obj) { System.out.println(JSON.toJSONString(obj)); } diff --git a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java index 75e13283..5042b9be 100644 --- a/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java +++ b/09mq/activemq-demo/src/main/java/io/kimmking/javacourse/mq/activemq/ActivemqApplication.java @@ -62,9 +62,9 @@ public void onMessage(Message message) { producer.send(message); } - Thread.sleep(2000); - session.close(); - conn.close(); +// Thread.sleep(2000); +// session.close(); +// conn.close(); } catch (Exception e) { From 5508eca645a653f44432fa4af120ab81f498fc87 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 15:45:27 +0800 Subject: [PATCH 02/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8E=20JMS=20=E7=9A=84=20ActiveMQ=20=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/pom.xml | 14 +++---- .../byk/activemq/JmsActiveMqApplication.java | 40 +++++++++++++++++++ .../byk/activemq/config/ActiveMqConfig.java | 15 +++++++ .../io/byk/activemq/queue/QueueConsumer.java | 23 +++++++++++ .../io/byk/activemq/queue/QueueProducer.java | 32 +++++++++++++++ .../src/main/resources/application.properties | 1 - .../src/main/resources/application.yml | 7 ++++ 7 files changed, 123 insertions(+), 9 deletions(-) create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java delete mode 100644 09mq/activemq-demo/src/main/resources/application.properties create mode 100644 09mq/activemq-demo/src/main/resources/application.yml diff --git a/09mq/activemq-demo/pom.xml b/09mq/activemq-demo/pom.xml index c3352476..34ae8b81 100644 --- a/09mq/activemq-demo/pom.xml +++ b/09mq/activemq-demo/pom.xml @@ -5,7 +5,7 @@ org.springframework.boot spring-boot-starter-parent - 2.3.0.RELEASE + 2.2.6.RELEASE io.kimmking.javacourse.mq @@ -24,12 +24,6 @@ spring-boot-starter - - org.apache.activemq - activemq-all - 5.16.0 - - org.projectlombok lombok @@ -40,6 +34,11 @@ spring-boot-starter-test test + + + org.springframework.boot + spring-boot-starter-activemq + @@ -58,5 +57,4 @@ - diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java new file mode 100644 index 00000000..66ba1121 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java @@ -0,0 +1,40 @@ +package io.byk.activemq; + + +import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; + +import java.util.Queue; + +import javax.annotation.Resource; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import io.byk.activemq.queue.QueueProducer; +import lombok.extern.slf4j.Slf4j; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@SpringBootApplication +@Slf4j +public class JmsActiveMqApplication implements ApplicationRunner { + @Resource + private QueueProducer queueProducer; + + public static void main(String[] args) { + SpringApplication.run(JmsActiveMqApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + for (int i = 0; i < 10; i++) { + String message; + log.info(message = "生产消息" + i); + queueProducer.sendMessage(ACTIVE_MQ_QUEUE, message); + } + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java new file mode 100644 index 00000000..6e188824 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java @@ -0,0 +1,15 @@ +package io.byk.activemq.config; + +/** + * 常量类 + * + * @author boyunkai + * Created on 2021-02-05 + */ +public class ActiveMqConfig { + // 测试队列 + public static final String ACTIVE_MQ_QUEUE = "test.queue"; + // 测试主题 + public static final String ACTIVE_MQ_TOPIC = "test.topic"; + // 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java new file mode 100644 index 00000000..c71ff2e6 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java @@ -0,0 +1,23 @@ +package io.byk.activemq.queue; + +import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueConsumer { + @JmsListener(destination = ACTIVE_MQ_QUEUE) + public void receiveMessage(String message) { + log.info("<=========== 收到消息" + message); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java new file mode 100644 index 00000000..f202af98 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java @@ -0,0 +1,32 @@ +package io.byk.activemq.queue; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +@Getter +public class QueueProducer { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String message) { + log.info("============> 发送 Queue 消息" + message); + Destination destination = new ActiveMQQueue(destinationName); + jmsMessagingTemplate.convertAndSend(destination, message); + } + +} diff --git a/09mq/activemq-demo/src/main/resources/application.properties b/09mq/activemq-demo/src/main/resources/application.properties deleted file mode 100644 index 8b137891..00000000 --- a/09mq/activemq-demo/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ - diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml new file mode 100644 index 00000000..21bc4e7d --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -0,0 +1,7 @@ +spring: + activemq: + broker-url: tcp://127.0.0.1:61616 + user: admin + password: admin + pool: + enabled: false From ad46466504aa5a79ba212f6ed781744ba6dcf232 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 15:56:28 +0800 Subject: [PATCH 03/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8E=20JMS=20=E7=9A=84=20ActiveMQ=20=E5=AE=9E=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../byk/activemq/JmsActiveMqApplication.java | 12 +++++-- .../activemq/topic/JmsContainerConfig.java | 26 +++++++++++++++ .../io/byk/activemq/topic/TopicPublisher.java | 33 +++++++++++++++++++ .../byk/activemq/topic/TopicSubscriber.java | 24 ++++++++++++++ 4 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java index 66ba1121..5efd3196 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java @@ -2,6 +2,7 @@ import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; +import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; import java.util.Queue; @@ -13,6 +14,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import io.byk.activemq.queue.QueueProducer; +import io.byk.activemq.topic.TopicPublisher; import lombok.extern.slf4j.Slf4j; /** @@ -24,6 +26,8 @@ public class JmsActiveMqApplication implements ApplicationRunner { @Resource private QueueProducer queueProducer; + @Resource + private TopicPublisher topicPublisher; public static void main(String[] args) { SpringApplication.run(JmsActiveMqApplication.class, args); @@ -32,9 +36,13 @@ public static void main(String[] args) { @Override public void run(ApplicationArguments args) throws Exception { for (int i = 0; i < 10; i++) { - String message; - log.info(message = "生产消息" + i); + String message = "队列消息" + i; queueProducer.sendMessage(ACTIVE_MQ_QUEUE, message); } + + for (int i = 0; i < 10; i++) { + String message = "主题消息" + i; + topicPublisher.sendMessage(ACTIVE_MQ_TOPIC, message); + } } } diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java new file mode 100644 index 00000000..cd769620 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java @@ -0,0 +1,26 @@ +package io.byk.activemq.topic; + +import javax.jms.ConnectionFactory; + +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jms.config.JmsListenerContainerFactory; +import org.springframework.jms.config.SimpleJmsListenerContainerFactory; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@Configuration +public class JmsContainerConfig { + + @Bean + public JmsListenerContainerFactory myJmsContainerFactory(@Qualifier("jmsConnectionFactory") + ConnectionFactory connectionFactory) { + SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory(); + factory.setConnectionFactory(connectionFactory); + factory.setPubSubDomain(true); + return factory; + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java new file mode 100644 index 00000000..b16c35ac --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java @@ -0,0 +1,33 @@ +package io.byk.activemq.topic; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +@Getter +public class TopicPublisher { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String message) { + log.info("============> 发送 Topic 消息" + message); + Destination destination = new ActiveMQTopic(destinationName); + jmsMessagingTemplate.convertAndSend(destination, message); + } + +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java new file mode 100644 index 00000000..1b8f2172 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java @@ -0,0 +1,24 @@ +package io.byk.activemq.topic; + +import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; +import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class TopicSubscriber { + @JmsListener(destination = ACTIVE_MQ_TOPIC,containerFactory = "myJmsContainerFactory") + public void receiveMessage(String message) { + log.info("<=========== 收到消息" + message); + } +} From 75bca270875a652a09b4dd69250d83c4c176a017 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 16:22:27 +0800 Subject: [PATCH 04/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 6 ++++++ .../main/java/io/byk/activemq/JmsActiveMqApplication.java | 7 ++++--- .../src/main/java/io/byk/activemq/queue/QueueProducer.java | 1 + .../java/io/byk/activemq/topic/JmsContainerConfig.java | 2 ++ .../main/java/io/byk/activemq/topic/TopicPublisher.java | 6 ++---- .../main/java/io/byk/activemq/topic/TopicSubscriber.java | 5 ++--- 6 files changed, 17 insertions(+), 10 deletions(-) create mode 100644 09mq/activemq-demo/README.md diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md new file mode 100644 index 00000000..d32a8e06 --- /dev/null +++ b/09mq/activemq-demo/README.md @@ -0,0 +1,6 @@ + +## Week13 周四作业: +### 1. [(必做)搭建 ActiveMQ 服务,基于 JMS,写代码分别实现对于 queue 和 topic 的消息生产和消费,代码提交到 github。](src/main/java/io/byk/activemq) + +使用`JmsMessagingTemplate`模板类来实现 + diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java index 5efd3196..36af7cfb 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java @@ -4,8 +4,6 @@ import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; -import java.util.Queue; - import javax.annotation.Resource; import org.springframework.boot.ApplicationArguments; @@ -18,6 +16,8 @@ import lombok.extern.slf4j.Slf4j; /** + * 启动类 + * * @author boyunkai * Created on 2021-02-05 */ @@ -35,11 +35,12 @@ public static void main(String[] args) { @Override public void run(ApplicationArguments args) throws Exception { + // 测试队列 for (int i = 0; i < 10; i++) { String message = "队列消息" + i; queueProducer.sendMessage(ACTIVE_MQ_QUEUE, message); } - + // 测试主题 for (int i = 0; i < 10; i++) { String message = "主题消息" + i; topicPublisher.sendMessage(ACTIVE_MQ_TOPIC, message); diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java index f202af98..e4f1369e 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java @@ -12,6 +12,7 @@ /** * 队列生产者 + * * @author boyunkai * Created on 2021-02-05 */ diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java index cd769620..3de97a34 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java @@ -9,6 +9,8 @@ import org.springframework.jms.config.SimpleJmsListenerContainerFactory; /** + * ContainerFactor 配置 + * * @author boyunkai * Created on 2021-02-05 */ diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java index b16c35ac..7d088a44 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java @@ -3,22 +3,20 @@ import javax.annotation.Resource; import javax.jms.Destination; -import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** - * 队列生产者 + * 主题生产者 + * * @author boyunkai * Created on 2021-02-05 */ @Service @Slf4j -@Getter public class TopicPublisher { // JMS 模板 @Resource diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java index 1b8f2172..3955ce13 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java @@ -1,6 +1,5 @@ package io.byk.activemq.topic; -import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; import org.springframework.jms.annotation.JmsListener; @@ -9,7 +8,7 @@ import lombok.extern.slf4j.Slf4j; /** - * 队列消费者 + * 主题消费者 * * @author boyunkai * Created on 2021-02-05 @@ -17,7 +16,7 @@ @Service @Slf4j public class TopicSubscriber { - @JmsListener(destination = ACTIVE_MQ_TOPIC,containerFactory = "myJmsContainerFactory") + @JmsListener(destination = ACTIVE_MQ_TOPIC, containerFactory = "myJmsContainerFactory") public void receiveMessage(String message) { log.info("<=========== 收到消息" + message); } From 147b11830208d07f6daad70e74ea4cd445fc9e11 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 20:25:15 +0800 Subject: [PATCH 05/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E6=A8=A1?= =?UTF-8?q?=E6=8B=9F=E5=A4=84=E7=90=86=E8=AE=A2=E5=8D=95=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{ => jms}/JmsActiveMqApplication.java | 10 ++-- .../{ => jms}/queue/QueueConsumer.java | 4 +- .../{ => jms}/queue/QueueProducer.java | 2 +- .../{ => jms}/topic/JmsContainerConfig.java | 2 +- .../{ => jms}/topic/TopicPublisher.java | 2 +- .../{ => jms}/topic/TopicSubscriber.java | 4 +- .../{activemq => }/config/ActiveMqConfig.java | 5 +- .../io/byk/queue/order/QueueApplication.java | 55 +++++++++++++++++++ .../java/io/byk/queue/order/entity/Order.java | 17 ++++++ .../queue/order/service/QueueConsumer.java | 41 ++++++++++++++ .../queue/order/service/QueueProducer.java | 37 +++++++++++++ .../src/main/resources/application.yml | 2 +- 12 files changed, 166 insertions(+), 15 deletions(-) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/JmsActiveMqApplication.java (82%) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/queue/QueueConsumer.java (81%) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/queue/QueueProducer.java (95%) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/topic/JmsContainerConfig.java (96%) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/topic/TopicPublisher.java (95%) rename 09mq/activemq-demo/src/main/java/io/byk/activemq/{ => jms}/topic/TopicSubscriber.java (83%) rename 09mq/activemq-demo/src/main/java/io/byk/{activemq => }/config/ActiveMqConfig.java (67%) create mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java create mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java similarity index 82% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java index 36af7cfb..cd88d03a 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/JmsActiveMqApplication.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/JmsActiveMqApplication.java @@ -1,8 +1,8 @@ -package io.byk.activemq; +package io.byk.activemq.jms; -import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; -import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; import javax.annotation.Resource; @@ -11,8 +11,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import io.byk.activemq.queue.QueueProducer; -import io.byk.activemq.topic.TopicPublisher; +import io.byk.activemq.jms.queue.QueueProducer; +import io.byk.activemq.jms.topic.TopicPublisher; import lombok.extern.slf4j.Slf4j; /** diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java similarity index 81% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java index c71ff2e6..d1e657cc 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueConsumer.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueConsumer.java @@ -1,6 +1,6 @@ -package io.byk.activemq.queue; +package io.byk.activemq.jms.queue; -import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_QUEUE; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java similarity index 95% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java index e4f1369e..5e7b0f6f 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/queue/QueueProducer.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/queue/QueueProducer.java @@ -1,4 +1,4 @@ -package io.byk.activemq.queue; +package io.byk.activemq.jms.queue; import javax.annotation.Resource; import javax.jms.Destination; diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java similarity index 96% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java index 3de97a34..2b1a6621 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/JmsContainerConfig.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/JmsContainerConfig.java @@ -1,4 +1,4 @@ -package io.byk.activemq.topic; +package io.byk.activemq.jms.topic; import javax.jms.ConnectionFactory; diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java similarity index 95% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java index 7d088a44..a2cc5437 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicPublisher.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicPublisher.java @@ -1,4 +1,4 @@ -package io.byk.activemq.topic; +package io.byk.activemq.jms.topic; import javax.annotation.Resource; import javax.jms.Destination; diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java similarity index 83% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java rename to 09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java index 3955ce13..08ff083a 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/topic/TopicSubscriber.java +++ b/09mq/activemq-demo/src/main/java/io/byk/activemq/jms/topic/TopicSubscriber.java @@ -1,6 +1,6 @@ -package io.byk.activemq.topic; +package io.byk.activemq.jms.topic; -import static io.byk.activemq.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; +import static io.byk.config.ActiveMqConfig.ACTIVE_MQ_TOPIC; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Service; diff --git a/09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java similarity index 67% rename from 09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java rename to 09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java index 6e188824..89db1847 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/activemq/config/ActiveMqConfig.java +++ b/09mq/activemq-demo/src/main/java/io/byk/config/ActiveMqConfig.java @@ -1,4 +1,4 @@ -package io.byk.activemq.config; +package io.byk.config; /** * 常量类 @@ -11,5 +11,6 @@ public class ActiveMqConfig { public static final String ACTIVE_MQ_QUEUE = "test.queue"; // 测试主题 public static final String ACTIVE_MQ_TOPIC = "test.topic"; - // 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml + // 队列大小 + public static final Integer QUEUE_SIZE = 10; } diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java new file mode 100644 index 00000000..5c8aa8f0 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java @@ -0,0 +1,55 @@ +package io.byk.queue.order; + +import java.util.LinkedList; +import java.util.Queue; + +import javax.annotation.Resource; + +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import com.google.common.collect.Lists; + +import io.byk.activemq.jms.JmsActiveMqApplication; +import io.byk.queue.order.entity.Order; +import io.byk.queue.order.service.QueueConsumer; +import io.byk.queue.order.service.QueueProducer; +import lombok.extern.slf4j.Slf4j; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@SpringBootApplication +@Slf4j +public class QueueApplication implements ApplicationRunner { + private final static Queue orderQueue = Lists.newLinkedList(); + + @Resource + QueueProducer queueProducer; + + @Resource + QueueConsumer queueConsumer; + + public static void main(String[] args) { + SpringApplication.run(QueueApplication.class, args); + } + + @Override + public void run(ApplicationArguments args) throws Exception { + for (int i = 0; i < 10; i++) { + log.info(queueProducer.sendMessage(orderQueue) + "<=======入队"); + } + while (true) { + try { + log.info(queueConsumer.receiveMessage(orderQueue) + "=======>出队"); + Thread.sleep(100); + } catch (IllegalStateException exception) { + log.info(exception.getMessage()); + break; + } + } + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java new file mode 100644 index 00000000..28934d6e --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java @@ -0,0 +1,17 @@ +package io.byk.queue.order.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@Data +@AllArgsConstructor +public class Order { + // 订单编号 + private String orderId; + // 状态 0-未处理 1-已处理 + private Integer statusId; +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java new file mode 100644 index 00000000..910a3d24 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java @@ -0,0 +1,41 @@ +package io.byk.queue.order.service; + +import java.util.List; +import java.util.Objects; +import java.util.Queue; + +import org.springframework.stereotype.Service; + +import io.byk.queue.order.entity.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueConsumer { + public String receiveMessage(Queue orderQueue) throws IllegalAccessException { + // STEP 1: 校验队列 + if (Objects.isNull(orderQueue)) { + throw new IllegalAccessException("订单队列不存在"); + } + boolean isOrderQueueEmpty = orderQueue.size() == 0; + if (isOrderQueueEmpty) { + throw new IllegalStateException("订单队列为空"); + } + // STEP 2: 校验订单 + Order order = orderQueue.poll(); + if (Objects.isNull(order)) { + throw new IllegalAccessException("订单不存在"); + } + boolean isOrderComplete = order.getStatusId() == 1; + if (isOrderComplete) { + throw new IllegalAccessException("订单已完成"); + } + // STEP 3: 更改订单状态 + order.setStatusId(1); + return order.toString(); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java new file mode 100644 index 00000000..dd057213 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java @@ -0,0 +1,37 @@ +package io.byk.queue.order.service; + +import static io.byk.config.ActiveMqConfig.QUEUE_SIZE; + +import java.util.Objects; +import java.util.Queue; +import java.util.UUID; + +import org.springframework.stereotype.Service; + +import io.byk.queue.order.entity.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueProducer { + public String sendMessage(Queue orderQueue) throws IllegalAccessException { + // STEP 1: 校验队列 + if (Objects.isNull(orderQueue)) { + throw new IllegalAccessException("订单队列不存在"); + } + boolean isOrderQueueEmpty = QUEUE_SIZE.equals(orderQueue.size()); + if (isOrderQueueEmpty) { + return "订单队列已满"; + } + // STEP 2: 生成订单 + String orderId = UUID.randomUUID().toString().replace("-", "").substring(0, 10); + Order order = new Order(orderId, 0); + // STEP 3: 发送订单消息 + orderQueue.add(order); + return order.toString(); + } +} diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml index 21bc4e7d..c2bef78e 100644 --- a/09mq/activemq-demo/src/main/resources/application.yml +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -1,6 +1,6 @@ spring: activemq: - broker-url: tcp://127.0.0.1:61616 + broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml user: admin password: admin pool: From 31a9922653aa5004c61386b472269a75b6043796 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 20:28:38 +0800 Subject: [PATCH 06/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index d32a8e06..872ff8cd 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -1,6 +1,12 @@ ## Week13 周四作业: -### 1. [(必做)搭建 ActiveMQ 服务,基于 JMS,写代码分别实现对于 queue 和 topic 的消息生产和消费,代码提交到 github。](src/main/java/io/byk/activemq) +### 1. [(必做)搭建 ActiveMQ 服务,基于 JMS,写代码分别实现对于 queue 和 topic 的消息生产和消费,代码提交到 github。](src/main/java/io/byk/activemq/jms) 使用`JmsMessagingTemplate`模板类来实现 +## 2. [**(选做)**基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue.order) + +> - 一个程序往表里写新订单,标记状态为未处理 (status=0); +> - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; +> - **TODO (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。** + From 458fc7c1d239c6da910786b2d4b79d5ffef405a6 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 20:29:49 +0800 Subject: [PATCH 07/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index 872ff8cd..5eea942e 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -4,7 +4,7 @@ 使用`JmsMessagingTemplate`模板类来实现 -## 2. [**(选做)**基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue.order) +## 2. [(选做)基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue/order) > - 一个程序往表里写新订单,标记状态为未处理 (status=0); > - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; From 67764c263980a8584af28718875b257d50df3696 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Fri, 5 Feb 2021 20:30:17 +0800 Subject: [PATCH 08/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index 5eea942e..e9311f7e 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -4,7 +4,7 @@ 使用`JmsMessagingTemplate`模板类来实现 -## 2. [(选做)基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue/order) +### 2. [(选做)基于数据库的订单表,模拟消息队列处理订单](src/main/java/io/byk/queue/order) > - 一个程序往表里写新订单,标记状态为未处理 (status=0); > - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; From 48b6587c3cd34bc4c33ec504d1ff51b7cfd84e8d Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 10:33:25 +0800 Subject: [PATCH 09/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/io/byk/queue/order/QueueApplication.java | 6 +++--- .../src/main/java/io/byk/queue/order/entity/Order.java | 2 ++ .../main/java/io/byk/queue/order/service/QueueConsumer.java | 6 +++--- .../main/java/io/byk/queue/order/service/QueueProducer.java | 2 ++ 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java index 5c8aa8f0..28f25a9b 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java @@ -1,6 +1,5 @@ package io.byk.queue.order; -import java.util.LinkedList; import java.util.Queue; import javax.annotation.Resource; @@ -12,20 +11,21 @@ import com.google.common.collect.Lists; -import io.byk.activemq.jms.JmsActiveMqApplication; import io.byk.queue.order.entity.Order; import io.byk.queue.order.service.QueueConsumer; import io.byk.queue.order.service.QueueProducer; import lombok.extern.slf4j.Slf4j; /** + * 启动类 + * * @author boyunkai * Created on 2021-02-05 */ @SpringBootApplication @Slf4j public class QueueApplication implements ApplicationRunner { - private final static Queue orderQueue = Lists.newLinkedList(); + private static final Queue orderQueue = Lists.newLinkedList(); @Resource QueueProducer queueProducer; diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java index 28934d6e..f0d2341b 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java @@ -4,6 +4,8 @@ import lombok.Data; /** + * 订单类 + * * @author boyunkai * Created on 2021-02-05 */ diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java index 910a3d24..d584912c 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java @@ -1,6 +1,5 @@ package io.byk.queue.order.service; -import java.util.List; import java.util.Objects; import java.util.Queue; @@ -10,6 +9,8 @@ import lombok.extern.slf4j.Slf4j; /** + * 队列消费者 + * * @author boyunkai * Created on 2021-02-05 */ @@ -21,8 +22,7 @@ public String receiveMessage(Queue orderQueue) throws IllegalAccessExcept if (Objects.isNull(orderQueue)) { throw new IllegalAccessException("订单队列不存在"); } - boolean isOrderQueueEmpty = orderQueue.size() == 0; - if (isOrderQueueEmpty) { + if (orderQueue.isEmpty()) { throw new IllegalStateException("订单队列为空"); } // STEP 2: 校验订单 diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java index dd057213..f33971d7 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java +++ b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java @@ -12,6 +12,8 @@ import lombok.extern.slf4j.Slf4j; /** + * 队列生产者 + * * @author boyunkai * Created on 2021-02-05 */ From 444e44e37146b3fbdcdc835aacc59e356c648d0d Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 15:09:05 +0800 Subject: [PATCH 10/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/pom.xml | 26 ++++++ .../java/io/byk/queue/order/entity/Order.java | 19 ----- .../queue/order/service/QueueConsumer.java | 41 ---------- .../queue/order/service/QueueProducer.java | 39 --------- .../queue => }/order/QueueApplication.java | 23 +++--- .../java/io/order/mapper/OrderMapper.java | 27 +++++++ .../src/main/java/io/order/model/Order.java | 31 +++++++ .../java/io/order/service/QueueConsumer.java | 50 ++++++++++++ .../java/io/order/service/QueueProducer.java | 36 +++++++++ .../src/main/resources/application.yml | 8 ++ .../src/main/resources/mapper/OrderMapper.xml | 80 +++++++++++++++++++ 11 files changed, 267 insertions(+), 113 deletions(-) delete mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java delete mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java delete mode 100644 09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java rename 09mq/activemq-demo/src/main/java/io/{byk/queue => }/order/QueueApplication.java (62%) create mode 100644 09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java create mode 100644 09mq/activemq-demo/src/main/java/io/order/model/Order.java create mode 100644 09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java create mode 100644 09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java create mode 100644 09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml diff --git a/09mq/activemq-demo/pom.xml b/09mq/activemq-demo/pom.xml index 34ae8b81..329ad1f6 100644 --- a/09mq/activemq-demo/pom.xml +++ b/09mq/activemq-demo/pom.xml @@ -39,6 +39,32 @@ org.springframework.boot spring-boot-starter-activemq + + + org.mybatis + mybatis + 3.5.6 + + + org.mybatis.spring.boot + mybatis-spring-boot-starter + 2.1.4 + + + org.springframework.boot + spring-boot-starter-jdbc + + + + mysql + mysql-connector-java + runtime + + + org.mybatis + mybatis-spring + 2.0.6 + diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java deleted file mode 100644 index f0d2341b..00000000 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/entity/Order.java +++ /dev/null @@ -1,19 +0,0 @@ -package io.byk.queue.order.entity; - -import lombok.AllArgsConstructor; -import lombok.Data; - -/** - * 订单类 - * - * @author boyunkai - * Created on 2021-02-05 - */ -@Data -@AllArgsConstructor -public class Order { - // 订单编号 - private String orderId; - // 状态 0-未处理 1-已处理 - private Integer statusId; -} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java deleted file mode 100644 index d584912c..00000000 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueConsumer.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.byk.queue.order.service; - -import java.util.Objects; -import java.util.Queue; - -import org.springframework.stereotype.Service; - -import io.byk.queue.order.entity.Order; -import lombok.extern.slf4j.Slf4j; - -/** - * 队列消费者 - * - * @author boyunkai - * Created on 2021-02-05 - */ -@Service -@Slf4j -public class QueueConsumer { - public String receiveMessage(Queue orderQueue) throws IllegalAccessException { - // STEP 1: 校验队列 - if (Objects.isNull(orderQueue)) { - throw new IllegalAccessException("订单队列不存在"); - } - if (orderQueue.isEmpty()) { - throw new IllegalStateException("订单队列为空"); - } - // STEP 2: 校验订单 - Order order = orderQueue.poll(); - if (Objects.isNull(order)) { - throw new IllegalAccessException("订单不存在"); - } - boolean isOrderComplete = order.getStatusId() == 1; - if (isOrderComplete) { - throw new IllegalAccessException("订单已完成"); - } - // STEP 3: 更改订单状态 - order.setStatusId(1); - return order.toString(); - } -} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java deleted file mode 100644 index f33971d7..00000000 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/service/QueueProducer.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.byk.queue.order.service; - -import static io.byk.config.ActiveMqConfig.QUEUE_SIZE; - -import java.util.Objects; -import java.util.Queue; -import java.util.UUID; - -import org.springframework.stereotype.Service; - -import io.byk.queue.order.entity.Order; -import lombok.extern.slf4j.Slf4j; - -/** - * 队列生产者 - * - * @author boyunkai - * Created on 2021-02-05 - */ -@Service -@Slf4j -public class QueueProducer { - public String sendMessage(Queue orderQueue) throws IllegalAccessException { - // STEP 1: 校验队列 - if (Objects.isNull(orderQueue)) { - throw new IllegalAccessException("订单队列不存在"); - } - boolean isOrderQueueEmpty = QUEUE_SIZE.equals(orderQueue.size()); - if (isOrderQueueEmpty) { - return "订单队列已满"; - } - // STEP 2: 生成订单 - String orderId = UUID.randomUUID().toString().replace("-", "").substring(0, 10); - Order order = new Order(orderId, 0); - // STEP 3: 发送订单消息 - orderQueue.add(order); - return order.toString(); - } -} diff --git a/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java similarity index 62% rename from 09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java rename to 09mq/activemq-demo/src/main/java/io/order/QueueApplication.java index 28f25a9b..ceb6839f 100644 --- a/09mq/activemq-demo/src/main/java/io/byk/queue/order/QueueApplication.java +++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java @@ -1,6 +1,4 @@ -package io.byk.queue.order; - -import java.util.Queue; +package io.order; import javax.annotation.Resource; @@ -9,11 +7,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import com.google.common.collect.Lists; - -import io.byk.queue.order.entity.Order; -import io.byk.queue.order.service.QueueConsumer; -import io.byk.queue.order.service.QueueProducer; +import io.order.service.QueueConsumer; +import io.order.service.QueueProducer; import lombok.extern.slf4j.Slf4j; /** @@ -25,8 +20,6 @@ @SpringBootApplication @Slf4j public class QueueApplication implements ApplicationRunner { - private static final Queue orderQueue = Lists.newLinkedList(); - @Resource QueueProducer queueProducer; @@ -39,14 +32,16 @@ public static void main(String[] args) { @Override public void run(ApplicationArguments args) throws Exception { - for (int i = 0; i < 10; i++) { - log.info(queueProducer.sendMessage(orderQueue) + "<=======入队"); + for (int i = 1; i <= 10; i++) { + log.info(queueProducer.sendMessage() + "<=======入队"); } while (true) { try { - log.info(queueConsumer.receiveMessage(orderQueue) + "=======>出队"); + queueConsumer.receiveMessage(); Thread.sleep(100); - } catch (IllegalStateException exception) { + } catch (IllegalAccessException exception) { + log.info(exception.getMessage()); + } catch (IllegalStateException exception){ log.info(exception.getMessage()); break; } diff --git a/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java new file mode 100644 index 00000000..3dc42ad4 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/mapper/OrderMapper.java @@ -0,0 +1,27 @@ +package io.order.mapper; +import org.apache.ibatis.annotations.Param; +import java.util.List; + +import io.order.model.Order; +import org.apache.ibatis.annotations.Mapper; + +/** + * @author boyunkai + * Created on 2021-02-07 + */ +@Mapper +public interface OrderMapper { + int deleteByPrimaryKey(Integer id); + + int insert(Order record); + + int insertSelective(Order record); + + Order selectByPrimaryKey(Integer id); + + int updateByPrimaryKeySelective(Order record); + + int updateByPrimaryKey(Order record); + + List selectByStatusId(@Param("statusId")Integer statusId); +} \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/java/io/order/model/Order.java b/09mq/activemq-demo/src/main/java/io/order/model/Order.java new file mode 100644 index 00000000..89bd6bdd --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/model/Order.java @@ -0,0 +1,31 @@ +package io.order.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * @author boyunkai + * Created on 2021-02-07 + */ + +/** + * 订单表 + */ +@Data +@AllArgsConstructor +public class Order { + /** + * 主键ID + */ + private Integer id; + + /** + * 订单ID + */ + private String orderId; + + /** + * 状态 0-未处理 1-已处理 + */ + private Integer statusId; +} \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java new file mode 100644 index 00000000..13e20bf5 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java @@ -0,0 +1,50 @@ +package io.order.service; + +import java.util.List; +import java.util.Objects; + +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import io.order.mapper.OrderMapper; +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueConsumer { + @Resource + OrderMapper orderMapper; + + public void receiveMessage() throws IllegalAccessException { + // STEP 1: 获取为处理订单列表 + List orderList = orderMapper.selectByStatusId(0); + if (Objects.isNull(orderList) || orderList.isEmpty()) { + throw new IllegalStateException("所有订单已处理"); + } + for (Order order : orderList) { + // STEP 2: 校验订单 + if (Objects.isNull(order)) { + throw new IllegalAccessException("订单不存在"); + } + boolean isOrderComplete = order.getStatusId() == 1; + if (isOrderComplete) { + throw new IllegalAccessException("订单已完成"); + } + // STEP 3: 插入订单 + order.setStatusId(1); + int success = orderMapper.updateByPrimaryKeySelective(order); + if (success <= 0L) { + throw new IllegalAccessException("更新订单失败"); + } + log.info("订单" + order.getOrderId() + "=======>出队"); + } + } +} diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java new file mode 100644 index 00000000..55a822bb --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java @@ -0,0 +1,36 @@ +package io.order.service; + +import java.util.UUID; + +import javax.annotation.Resource; + +import org.springframework.stereotype.Service; + +import io.order.mapper.OrderMapper; +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class QueueProducer { + @Resource + OrderMapper orderMapper; + + public String sendMessage() throws IllegalAccessException { + // STEP 1: 生成订单 + String orderId = UUID.randomUUID().toString().replace("-", "").substring(0, 10); + Order order = new Order(0, orderId, 0); + // STEP 2: 发送订单消息 + int success = orderMapper.insertSelective(order); + if (success <= 0L) { + throw new IllegalStateException("生成订单失败"); + } + return "订单" + order.getOrderId(); + } +} diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml index c2bef78e..075fb77f 100644 --- a/09mq/activemq-demo/src/main/resources/application.yml +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -5,3 +5,11 @@ spring: password: admin pool: enabled: false + datasource: + url: jdbc:mysql://localhost:3306/java_study?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC + username: root + password: + driver-class-name: com.mysql.cj.jdbc.Driver +mybatis: + type-aliases-package: io.byk. + mapper-locations: classpath:mapper/*.xml diff --git a/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml new file mode 100644 index 00000000..8b6f267b --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/mapper/OrderMapper.xml @@ -0,0 +1,80 @@ + + + + + + + + + + + + + id, order_id, status_id + + + + + delete from order_list + where id = #{id,jdbcType=INTEGER} + + + + insert into order_list (order_id, status_id) + values (#{orderId,jdbcType=VARCHAR}, #{statusId,jdbcType=INTEGER}) + + + + insert into order_list + + + order_id, + + + status_id, + + + + + #{orderId,jdbcType=VARCHAR}, + + + #{statusId,jdbcType=INTEGER}, + + + + + + update order_list + + + order_id = #{orderId,jdbcType=VARCHAR}, + + + status_id = #{statusId,jdbcType=INTEGER}, + + + where id = #{id,jdbcType=INTEGER} + + + + update order_list + set order_id = #{orderId,jdbcType=VARCHAR}, + status_id = #{statusId,jdbcType=INTEGER} + where id = #{id,jdbcType=INTEGER} + + + + + \ No newline at end of file From 4ac81a1506dd02c7e6e4bd80a8007a241f18a9d4 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 15:09:39 +0800 Subject: [PATCH 11/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E7=B1=BB=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/src/main/resources/application.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml index 075fb77f..200aa4b8 100644 --- a/09mq/activemq-demo/src/main/resources/application.yml +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -11,5 +11,4 @@ spring: password: driver-class-name: com.mysql.cj.jdbc.Driver mybatis: - type-aliases-package: io.byk. mapper-locations: classpath:mapper/*.xml From 1da634210d783904173e38764a1f21cd611bc490 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 15:14:51 +0800 Subject: [PATCH 12/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index e9311f7e..0d20241d 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -10,3 +10,16 @@ > - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; > - **TODO (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。** +#### 解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题 + +在`application.yml`文件下添加配置信息: + +```sh +mybatis: + mapper-locations: classpath:mapper/*.xml +``` + +#### ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near + +建议不要在创建表的过程中使用mysql保留字,避免后期造成麻烦 + From 0ce4b5abae58c5f43e50466248ae619323b741dd Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 17:33:43 +0800 Subject: [PATCH 13/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E4=BB=A5=E5=8F=8A=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/io/order/QueueApplication.java | 22 +++--- .../java/io/order/service/QueueConsumer.java | 69 +++++++++++++++---- .../java/io/order/service/QueueProducer.java | 3 +- 3 files changed, 66 insertions(+), 28 deletions(-) diff --git a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java index ceb6839f..cd0a4600 100644 --- a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java +++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java @@ -1,5 +1,9 @@ package io.order; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; + import javax.annotation.Resource; import org.springframework.boot.ApplicationArguments; @@ -32,19 +36,13 @@ public static void main(String[] args) { @Override public void run(ApplicationArguments args) throws Exception { - for (int i = 1; i <= 10; i++) { - log.info(queueProducer.sendMessage() + "<=======入队"); + for (int i = 0; i < 100; i++) { + log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队"); } - while (true) { - try { - queueConsumer.receiveMessage(); - Thread.sleep(100); - } catch (IllegalAccessException exception) { - log.info(exception.getMessage()); - } catch (IllegalStateException exception){ - log.info(exception.getMessage()); - break; - } + try { + queueConsumer.receiveMessage(); + } catch (IllegalStateException exception) { + log.info(exception.getMessage()); } } } diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java index 13e20bf5..db3901ba 100644 --- a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java @@ -2,6 +2,9 @@ import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Resource; @@ -22,29 +25,67 @@ public class QueueConsumer { @Resource OrderMapper orderMapper; + private static final ExecutorService executorService = Executors.newFixedThreadPool(3); + //最大重试次数 + private static final Integer TRY_TIMES = 6; + //重试间隔时间单位秒 + private static final Long INTERVAL_TIME = 100L; - public void receiveMessage() throws IllegalAccessException { + public void receiveMessage() { // STEP 1: 获取为处理订单列表 List orderList = orderMapper.selectByStatusId(0); if (Objects.isNull(orderList) || orderList.isEmpty()) { throw new IllegalStateException("所有订单已处理"); } for (Order order : orderList) { - // STEP 2: 校验订单 - if (Objects.isNull(order)) { - throw new IllegalAccessException("订单不存在"); + for (int i = 0; i < 10; i++) { + executorService.submit(() -> { + try { + // 重试策略 + retryBuss(order); + } catch (InterruptedException exception) { + log.info("进程被打断"); + Thread.currentThread().interrupt(); + } + }); } - boolean isOrderComplete = order.getStatusId() == 1; - if (isOrderComplete) { - throw new IllegalAccessException("订单已完成"); - } - // STEP 3: 插入订单 - order.setStatusId(1); - int success = orderMapper.updateByPrimaryKeySelective(order); - if (success <= 0L) { - throw new IllegalAccessException("更新订单失败"); + } + } + + private void retryBuss(Order order) throws InterruptedException { + int retryNum = 1; + while (retryNum <= TRY_TIMES) { + try { + // 加锁解决并发,效率降低 + int success = updateOrder(order); + if (success > 0L) { + break; + } + retryNum++; + } catch (Exception e) { + retryNum++; + Thread.sleep(INTERVAL_TIME); + continue; } - log.info("订单" + order.getOrderId() + "=======>出队"); } } + + private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException { + // STEP 1: 校验订单 + if (Objects.isNull(order)) { + throw new IllegalAccessException("订单不存在"); + } + if (order.getStatusId() == 1) { + return 1; + } + // STEP 2: 插入订单 + order.setStatusId(1); + int success = orderMapper.updateByPrimaryKeySelective(order); + if (success <= 0L) { + throw new IllegalAccessException("更新订单失败"); + } + Thread.sleep(100); + log.info("订单" + order.getOrderId() + "=======>出队"); + return success; + } } diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java index 55a822bb..45a54536 100644 --- a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java @@ -22,9 +22,8 @@ public class QueueProducer { @Resource OrderMapper orderMapper; - public String sendMessage() throws IllegalAccessException { + public String sendMessage(String orderId) throws IllegalAccessException { // STEP 1: 生成订单 - String orderId = UUID.randomUUID().toString().replace("-", "").substring(0, 10); Order order = new Order(0, orderId, 0); // STEP 2: 发送订单消息 int success = orderMapper.insertSelective(order); From d44d674e92ffada64a37386e327eb9429b4ce927 Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 17:42:07 +0800 Subject: [PATCH 14/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E4=BB=A5=E5=8F=8A=E5=A4=9A=E7=BA=BF=E7=A8=8B=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/io/order/QueueApplication.java | 6 +----- .../src/main/java/io/order/model/Order.java | 6 ++---- .../src/main/java/io/order/service/QueueConsumer.java | 11 +++++++---- .../src/main/java/io/order/service/QueueProducer.java | 4 +--- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java index cd0a4600..bc0dbf0c 100644 --- a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java +++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java @@ -1,9 +1,5 @@ package io.order; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.locks.ReentrantLock; - import javax.annotation.Resource; import org.springframework.boot.ApplicationArguments; @@ -35,7 +31,7 @@ public static void main(String[] args) { } @Override - public void run(ApplicationArguments args) throws Exception { + public void run(ApplicationArguments args) { for (int i = 0; i < 100; i++) { log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队"); } diff --git a/09mq/activemq-demo/src/main/java/io/order/model/Order.java b/09mq/activemq-demo/src/main/java/io/order/model/Order.java index 89bd6bdd..62de147c 100644 --- a/09mq/activemq-demo/src/main/java/io/order/model/Order.java +++ b/09mq/activemq-demo/src/main/java/io/order/model/Order.java @@ -4,13 +4,11 @@ import lombok.Data; /** + * 订单表 + * * @author boyunkai * Created on 2021-02-07 */ - -/** - * 订单表 - */ @Data @AllArgsConstructor public class Order { diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java index db3901ba..05913e08 100644 --- a/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java @@ -4,7 +4,6 @@ import java.util.Objects; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Resource; @@ -42,7 +41,10 @@ public void receiveMessage() { executorService.submit(() -> { try { // 重试策略 - retryBuss(order); + int success = retryBuss(order); + if (success <= 0) { + log.info("订单" + order.getOrderId() + "=======>出队失败"); + } } catch (InterruptedException exception) { log.info("进程被打断"); Thread.currentThread().interrupt(); @@ -52,14 +54,14 @@ public void receiveMessage() { } } - private void retryBuss(Order order) throws InterruptedException { + private int retryBuss(Order order) throws InterruptedException { int retryNum = 1; while (retryNum <= TRY_TIMES) { try { // 加锁解决并发,效率降低 int success = updateOrder(order); if (success > 0L) { - break; + return 1; } retryNum++; } catch (Exception e) { @@ -68,6 +70,7 @@ private void retryBuss(Order order) throws InterruptedException { continue; } } + return 0; } private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException { diff --git a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java index 45a54536..66544647 100644 --- a/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java +++ b/09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java @@ -1,7 +1,5 @@ package io.order.service; -import java.util.UUID; - import javax.annotation.Resource; import org.springframework.stereotype.Service; @@ -22,7 +20,7 @@ public class QueueProducer { @Resource OrderMapper orderMapper; - public String sendMessage(String orderId) throws IllegalAccessException { + public String sendMessage(String orderId) { // STEP 1: 生成订单 Order order = new Order(0, orderId, 0); // STEP 2: 发送订单消息 From d24a243016f9245eac90f8e6b55c49c7ea3c7e4c Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 18:13:07 +0800 Subject: [PATCH 15/17] =?UTF-8?q?[feature]=E6=9B=B4=E6=96=B0=20README?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index 0d20241d..b7fdef5f 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -8,7 +8,7 @@ > - 一个程序往表里写新订单,标记状态为未处理 (status=0); > - 另一个程序每隔 100ms 定时从表里读取所有 status=0 的订单,打印一下订单数据,然后改成完成 status=1; -> - **TODO (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。** +> - (挑战☆)考虑失败重试策略,考虑多个消费程序如何协作。 #### 解决org.apache.ibatis.binding.BindingException: Invalid bound statement (not found)问题 From c6bdad2bda1c8e714d03a52fec38e525b4fe4ffe Mon Sep 17 00:00:00 2001 From: boyunkai Date: Sun, 7 Feb 2021 18:54:22 +0800 Subject: [PATCH 16/17] =?UTF-8?q?[feature]=E6=B7=BB=E5=8A=A0=20ActiveMQ=20?= =?UTF-8?q?=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 16 +++++++++ .../main/java/io/order/QueueApplication.java | 15 ++++++-- .../src/main/java/io/order/model/Order.java | 4 ++- .../order/service/ActiveMqQueueConsumer.java | 25 +++++++++++++ .../order/service/ActiveMqQueueProducer.java | 36 +++++++++++++++++++ .../src/main/resources/application.yml | 2 ++ 6 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java create mode 100644 09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index b7fdef5f..47bc886c 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -23,3 +23,19 @@ mybatis: 建议不要在创建表的过程中使用mysql保留字,避免后期造成麻烦 +#### ActiveMQ序列化异常Forbidden class ! This class is not trusted to be serialized as ObjectMessage payload + +在`application.yml`文件下添加配置信息: + +``` +spring: + activemq: + broker-url: tcp://127.0.0.1:61616 # 目标地址,61616 端口为 JMS 协议,具体查看在 apache-activemq-5.16.1/conf/activemq.xml + user: admin + password: admin + pool: + enabled: false + packages: + trust-all: true +``` + diff --git a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java index bc0dbf0c..078ead67 100644 --- a/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java +++ b/09mq/activemq-demo/src/main/java/io/order/QueueApplication.java @@ -7,6 +7,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import io.order.service.ActiveMqQueueConsumer; +import io.order.service.ActiveMqQueueProducer; import io.order.service.QueueConsumer; import io.order.service.QueueProducer; import lombok.extern.slf4j.Slf4j; @@ -26,14 +28,23 @@ public class QueueApplication implements ApplicationRunner { @Resource QueueConsumer queueConsumer; + @Resource + ActiveMqQueueProducer activeMqQueueProducer; + + @Resource + ActiveMqQueueConsumer activeMqQueueConsumer; + // 测试队列 + public static final String ACTIVE_MQ_ORDER = "order.activeMQ"; + public static void main(String[] args) { SpringApplication.run(QueueApplication.class, args); } @Override public void run(ApplicationArguments args) { - for (int i = 0; i < 100; i++) { - log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队"); + for (int i = 0; i < 10; i++) { + // log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队"); + activeMqQueueProducer.sendMessage(ACTIVE_MQ_ORDER, String.valueOf(i)); } try { queueConsumer.receiveMessage(); diff --git a/09mq/activemq-demo/src/main/java/io/order/model/Order.java b/09mq/activemq-demo/src/main/java/io/order/model/Order.java index 62de147c..151bf0e8 100644 --- a/09mq/activemq-demo/src/main/java/io/order/model/Order.java +++ b/09mq/activemq-demo/src/main/java/io/order/model/Order.java @@ -1,5 +1,7 @@ package io.order.model; +import java.io.Serializable; + import lombok.AllArgsConstructor; import lombok.Data; @@ -11,7 +13,7 @@ */ @Data @AllArgsConstructor -public class Order { +public class Order implements Serializable { /** * 主键ID */ diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java new file mode 100644 index 00000000..f9f182df --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueConsumer.java @@ -0,0 +1,25 @@ +package io.order.service; + +import org.springframework.jms.annotation.JmsListener; +import org.springframework.stereotype.Service; + +import io.order.model.Order; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列消费者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +public class ActiveMqQueueConsumer { + public static final String ACTIVE_MQ_ORDER = "order.activeMQ"; + + @JmsListener(destination = ACTIVE_MQ_ORDER) + public void receiveMessage(Order order) { + order.setStatusId(1); + log.info("<=========== 收到消息" + order.toString()); + } +} diff --git a/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java new file mode 100644 index 00000000..4d18ffd6 --- /dev/null +++ b/09mq/activemq-demo/src/main/java/io/order/service/ActiveMqQueueProducer.java @@ -0,0 +1,36 @@ +package io.order.service; + +import javax.annotation.Resource; +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; +import org.springframework.jms.core.JmsMessagingTemplate; +import org.springframework.stereotype.Service; + +import io.order.model.Order; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +/** + * 队列生产者 + * + * @author boyunkai + * Created on 2021-02-05 + */ +@Service +@Slf4j +@Getter +public class ActiveMqQueueProducer { + // JMS 模板 + @Resource + private JmsMessagingTemplate jmsMessagingTemplate; + + public void sendMessage(String destinationName, String orderId) { + Destination destination = new ActiveMQQueue(destinationName); + // STEP 1: 生成订单 + Order order = new Order(0, orderId, 0); + jmsMessagingTemplate.convertAndSend(destination, order); + log.info("============> 发送 Queue 消息" + order.toString()); + } + +} diff --git a/09mq/activemq-demo/src/main/resources/application.yml b/09mq/activemq-demo/src/main/resources/application.yml index 200aa4b8..5c84b56a 100644 --- a/09mq/activemq-demo/src/main/resources/application.yml +++ b/09mq/activemq-demo/src/main/resources/application.yml @@ -5,6 +5,8 @@ spring: password: admin pool: enabled: false + packages: + trust-all: true datasource: url: jdbc:mysql://localhost:3306/java_study?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC username: root From 60dab476ce48f4d5a9623edeab226bf3a02f595e Mon Sep 17 00:00:00 2001 From: boyunkai Date: Mon, 8 Feb 2021 09:46:50 +0800 Subject: [PATCH 17/17] =?UTF-8?q?[feature]=20=E6=B7=BB=E5=8A=A0=20ActiveMQ?= =?UTF-8?q?=20=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 09mq/activemq-demo/README.md | 5 +-- .../src/main/resources/order_list.sql | 31 +++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) create mode 100644 09mq/activemq-demo/src/main/resources/order_list.sql diff --git a/09mq/activemq-demo/README.md b/09mq/activemq-demo/README.md index 47bc886c..fa7c6aab 100644 --- a/09mq/activemq-demo/README.md +++ b/09mq/activemq-demo/README.md @@ -23,6 +23,8 @@ mybatis: 建议不要在创建表的过程中使用mysql保留字,避免后期造成麻烦 +### 3.[(选做)将上述订单处理场景,改成使用 ActiveMQ 发送消息处理模式。](src/main/java/io/byk/queue/order) + #### ActiveMQ序列化异常Forbidden class ! This class is not trusted to be serialized as ObjectMessage payload 在`application.yml`文件下添加配置信息: @@ -37,5 +39,4 @@ spring: enabled: false packages: trust-all: true -``` - +``` \ No newline at end of file diff --git a/09mq/activemq-demo/src/main/resources/order_list.sql b/09mq/activemq-demo/src/main/resources/order_list.sql new file mode 100644 index 00000000..685cf3b3 --- /dev/null +++ b/09mq/activemq-demo/src/main/resources/order_list.sql @@ -0,0 +1,31 @@ +/* + Navicat Premium Data Transfer + + Source Server : root + Source Server Type : MySQL + Source Server Version : 80022 + Source Host : localhost:3306 + Source Schema : java_study + + Target Server Type : MySQL + Target Server Version : 80022 + File Encoding : 65001 + + Date: 08/02/2021 09:45:24 +*/ + +SET NAMES utf8mb4; +SET FOREIGN_KEY_CHECKS = 0; + +-- ---------------------------- +-- Table structure for order_list +-- ---------------------------- +DROP TABLE IF EXISTS `order_list`; +CREATE TABLE `order_list` ( + `id` int unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID', + `order_id` varchar(100) NOT NULL COMMENT '订单ID', + `status_id` int NOT NULL COMMENT '状态 0-未处理 1-已处理', + PRIMARY KEY (`id`) +) ENGINE=InnoDB AUTO_INCREMENT=2951 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='订单表'; + +SET FOREIGN_KEY_CHECKS = 1;