|
2 | 2 |
|
3 | 3 | import java.util.List; |
4 | 4 | import java.util.Objects; |
| 5 | +import java.util.concurrent.ExecutorService; |
| 6 | +import java.util.concurrent.Executors; |
| 7 | +import java.util.concurrent.locks.ReentrantLock; |
5 | 8 |
|
6 | 9 | import javax.annotation.Resource; |
7 | 10 |
|
|
22 | 25 | public class QueueConsumer { |
23 | 26 | @Resource |
24 | 27 | OrderMapper orderMapper; |
| 28 | + private static final ExecutorService executorService = Executors.newFixedThreadPool(3); |
| 29 | + //最大重试次数 |
| 30 | + private static final Integer TRY_TIMES = 6; |
| 31 | + //重试间隔时间单位秒 |
| 32 | + private static final Long INTERVAL_TIME = 100L; |
25 | 33 |
|
26 | | - public void receiveMessage() throws IllegalAccessException { |
| 34 | + public void receiveMessage() { |
27 | 35 | // STEP 1: 获取为处理订单列表 |
28 | 36 | List<Order> orderList = orderMapper.selectByStatusId(0); |
29 | 37 | if (Objects.isNull(orderList) || orderList.isEmpty()) { |
30 | 38 | throw new IllegalStateException("所有订单已处理"); |
31 | 39 | } |
32 | 40 | for (Order order : orderList) { |
33 | | - // STEP 2: 校验订单 |
34 | | - if (Objects.isNull(order)) { |
35 | | - throw new IllegalAccessException("订单不存在"); |
| 41 | + for (int i = 0; i < 10; i++) { |
| 42 | + executorService.submit(() -> { |
| 43 | + try { |
| 44 | + // 重试策略 |
| 45 | + retryBuss(order); |
| 46 | + } catch (InterruptedException exception) { |
| 47 | + log.info("进程被打断"); |
| 48 | + Thread.currentThread().interrupt(); |
| 49 | + } |
| 50 | + }); |
36 | 51 | } |
37 | | - boolean isOrderComplete = order.getStatusId() == 1; |
38 | | - if (isOrderComplete) { |
39 | | - throw new IllegalAccessException("订单已完成"); |
40 | | - } |
41 | | - // STEP 3: 插入订单 |
42 | | - order.setStatusId(1); |
43 | | - int success = orderMapper.updateByPrimaryKeySelective(order); |
44 | | - if (success <= 0L) { |
45 | | - throw new IllegalAccessException("更新订单失败"); |
| 52 | + } |
| 53 | + } |
| 54 | + |
| 55 | + private void retryBuss(Order order) throws InterruptedException { |
| 56 | + int retryNum = 1; |
| 57 | + while (retryNum <= TRY_TIMES) { |
| 58 | + try { |
| 59 | + // 加锁解决并发,效率降低 |
| 60 | + int success = updateOrder(order); |
| 61 | + if (success > 0L) { |
| 62 | + break; |
| 63 | + } |
| 64 | + retryNum++; |
| 65 | + } catch (Exception e) { |
| 66 | + retryNum++; |
| 67 | + Thread.sleep(INTERVAL_TIME); |
| 68 | + continue; |
46 | 69 | } |
47 | | - log.info("订单" + order.getOrderId() + "=======>出队"); |
48 | 70 | } |
49 | 71 | } |
| 72 | + |
| 73 | + private synchronized int updateOrder(Order order) throws IllegalAccessException, InterruptedException { |
| 74 | + // STEP 1: 校验订单 |
| 75 | + if (Objects.isNull(order)) { |
| 76 | + throw new IllegalAccessException("订单不存在"); |
| 77 | + } |
| 78 | + if (order.getStatusId() == 1) { |
| 79 | + return 1; |
| 80 | + } |
| 81 | + // STEP 2: 插入订单 |
| 82 | + order.setStatusId(1); |
| 83 | + int success = orderMapper.updateByPrimaryKeySelective(order); |
| 84 | + if (success <= 0L) { |
| 85 | + throw new IllegalAccessException("更新订单失败"); |
| 86 | + } |
| 87 | + Thread.sleep(100); |
| 88 | + log.info("订单" + order.getOrderId() + "=======>出队"); |
| 89 | + return success; |
| 90 | + } |
50 | 91 | } |
0 commit comments