Skip to content

Commit 0ce4b5a

Browse files
author
boyunkai
committed
[feature]添加重试以及多线程并发策略
1 parent 1da6342 commit 0ce4b5a

3 files changed

Lines changed: 66 additions & 28 deletions

File tree

09mq/activemq-demo/src/main/java/io/order/QueueApplication.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package io.order;
22

3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
5+
import java.util.concurrent.locks.ReentrantLock;
6+
37
import javax.annotation.Resource;
48

59
import org.springframework.boot.ApplicationArguments;
@@ -32,19 +36,13 @@ public static void main(String[] args) {
3236

3337
@Override
3438
public void run(ApplicationArguments args) throws Exception {
35-
for (int i = 1; i <= 10; i++) {
36-
log.info(queueProducer.sendMessage() + "<=======入队");
39+
for (int i = 0; i < 100; i++) {
40+
log.info(queueProducer.sendMessage(String.valueOf(i)) + "<=======入队");
3741
}
38-
while (true) {
39-
try {
40-
queueConsumer.receiveMessage();
41-
Thread.sleep(100);
42-
} catch (IllegalAccessException exception) {
43-
log.info(exception.getMessage());
44-
} catch (IllegalStateException exception){
45-
log.info(exception.getMessage());
46-
break;
47-
}
42+
try {
43+
queueConsumer.receiveMessage();
44+
} catch (IllegalStateException exception) {
45+
log.info(exception.getMessage());
4846
}
4947
}
5048
}

09mq/activemq-demo/src/main/java/io/order/service/QueueConsumer.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import java.util.List;
44
import java.util.Objects;
5+
import java.util.concurrent.ExecutorService;
6+
import java.util.concurrent.Executors;
7+
import java.util.concurrent.locks.ReentrantLock;
58

69
import javax.annotation.Resource;
710

@@ -22,29 +25,67 @@
2225
public class QueueConsumer {
2326
@Resource
2427
OrderMapper orderMapper;
28+
private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
29+
//最大重试次数
30+
private static final Integer TRY_TIMES = 6;
31+
//重试间隔时间单位秒
32+
private static final Long INTERVAL_TIME = 100L;
2533

26-
public void receiveMessage() throws IllegalAccessException {
34+
public void receiveMessage() {
2735
// STEP 1: 获取为处理订单列表
2836
List<Order> orderList = orderMapper.selectByStatusId(0);
2937
if (Objects.isNull(orderList) || orderList.isEmpty()) {
3038
throw new IllegalStateException("所有订单已处理");
3139
}
3240
for (Order order : orderList) {
33-
// STEP 2: 校验订单
34-
if (Objects.isNull(order)) {
35-
throw new IllegalAccessException("订单不存在");
41+
for (int i = 0; i < 10; i++) {
42+
executorService.submit(() -> {
43+
try {
44+
// 重试策略
45+
retryBuss(order);
46+
} catch (InterruptedException exception) {
47+
log.info("进程被打断");
48+
Thread.currentThread().interrupt();
49+
}
50+
});
3651
}
37-
boolean isOrderComplete = order.getStatusId() == 1;
38-
if (isOrderComplete) {
39-
throw new IllegalAccessException("订单已完成");
40-
}
41-
// STEP 3: 插入订单
42-
order.setStatusId(1);
43-
int success = orderMapper.updateByPrimaryKeySelective(order);
44-
if (success <= 0L) {
45-
throw new IllegalAccessException("更新订单失败");
52+
}
53+
}
54+
55+
private void retryBuss(Order order) throws InterruptedException {
56+
int retryNum = 1;
57+
while (retryNum <= TRY_TIMES) {
58+
try {
59+
// 加锁解决并发,效率降低
60+
int success = updateOrder(order);
61+
if (success > 0L) {
62+
break;
63+
}
64+
retryNum++;
65+
} catch (Exception e) {
66+
retryNum++;
67+
Thread.sleep(INTERVAL_TIME);
68+
continue;
4669
}
47-
log.info("订单" + order.getOrderId() + "=======>出队");
4870
}
4971
}
72+
73+
private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException {
74+
// STEP 1: 校验订单
75+
if (Objects.isNull(order)) {
76+
throw new IllegalAccessException("订单不存在");
77+
}
78+
if (order.getStatusId() == 1) {
79+
return 1;
80+
}
81+
// STEP 2: 插入订单
82+
order.setStatusId(1);
83+
int success = orderMapper.updateByPrimaryKeySelective(order);
84+
if (success <= 0L) {
85+
throw new IllegalAccessException("更新订单失败");
86+
}
87+
Thread.sleep(100);
88+
log.info("订单" + order.getOrderId() + "=======>出队");
89+
return success;
90+
}
5091
}

09mq/activemq-demo/src/main/java/io/order/service/QueueProducer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ public class QueueProducer {
2222
@Resource
2323
OrderMapper orderMapper;
2424

25-
public String sendMessage() throws IllegalAccessException {
25+
public String sendMessage(String orderId) throws IllegalAccessException {
2626
// STEP 1: 生成订单
27-
String orderId = UUID.randomUUID().toString().replace("-", "").substring(0, 10);
2827
Order order = new Order(0, orderId, 0);
2928
// STEP 2: 发送订单消息
3029
int success = orderMapper.insertSelective(order);

0 commit comments

Comments
 (0)