Skip to content

Commit fe4d7ba

Browse files
committed
update note
1 parent 2f5700e commit fe4d7ba

5 files changed

Lines changed: 318 additions & 51 deletions

File tree

img/rabbitmq-return-listener.png

47.2 KB
Loading

中间件/RabbitMQ.md

Lines changed: 269 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
1+
[RabbitMQ基础](https://www.jianshu.com/p/79ca08116d57)
2+
3+
[Springboot整合RabbitMQ](https://blog.csdn.net/qq_35387940/article/details/100514134)
4+
5+
[RabbitMQ之消息持久化](https://blog.csdn.net/u013256816/article/details/60875666)
6+
17
## 简介
28

39
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )。消息队列是一种应用间的异步协作机制。
410

511
![image-20200718104019614](../img/rabbitmq.png)
612

7-
Message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key、priority、delivery-mode(指出该消息可能需要持久性存储)等。
13+
### 基本概念
14+
15+
Message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key、priority、delivery-mode(是否持久性存储)等。
816

917
Publisher:消息的生产者。
1018

11-
Exchange:接收消息并将消息分发到一个或多个Queue。
19+
Exchange:接收消息并将消息分发到一个或多个Queue。default exchange 是默认的直连交换机,名字为空字符串,每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。
1220

13-
Binding:通过Binding将Exchange和Queue关联,这样Exchange就知道如何将消息准确的推送到Queue中去
21+
Binding:通过Binding将Exchange和Queue关联,这样Exchange就知道将消息分发到哪个Queue中
1422

15-
Queue:存储消息,队列的特性是先进先出。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者消费
23+
Queue:存储消息,队列的特性是先进先出。一个消息可分发到一个或多个队列
1624

1725
Virtual host:每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange和queue。
1826

@@ -22,7 +30,7 @@ Broker:消息队列服务器实体。
2230

2331
对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现。
2432

25-
以常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑
33+
以常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发短信通知。这种场景下就可以用 MQ 。将短信通知放到 MQ 异步执行,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的线程消费MQ的消息
2634

2735
## Exchange 类型
2836

@@ -35,11 +43,11 @@ Exchange规则。
3543
| fanout | 把所有发送到该Exchange的消息路由到所有与它绑定的Queue中 |
3644
| direct | Routing Key==Binding Key |
3745
| topic | 模糊匹配 |
38-
| headers | Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配|
46+
| headers | Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配|
3947

4048
### direct
4149

42-
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。它是完全匹配、单播的模式。
50+
消息中的路由键如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。它是完全匹配、单播的模式。
4351

4452
![](../img/rabbitmq-direct.png)
4553

@@ -59,38 +67,278 @@ Exchange规则。
5967

6068
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
6169

62-
[参考:RabbitMQ](https://www.jianshu.com/p/79ca08116d57)
63-
6470

6571

6672
## 消息丢失
6773

68-
[RabbitMQ系列之怎么确保消息不丢失](https://www.debug8.com/java/t_50678.html)
74+
消息丢失场景:生产者生产消息到RabbitMQ Server消息丢失、RabbitMQ Server存储的消息丢失和RabbitMQ到消费者消息丢失。
75+
76+
消息丢失从三个方面来解决:生产者确认机制、消费者开启消息确认和持久化。
77+
78+
### 生产者确认机制
79+
80+
生产者发送消息到队列,无法确保发送的消息成功的到达server。
81+
82+
解决方法:开启生产者确认机制,只要消息成功到达server并找到交换机之后(不用到达相应的Queue),RabbitMQ就会发送一个ack给生产者(即使消息没有Queue接收,也会发送ack),生产者就知道消息已经到达server了。如果没有找到对应的交换机,就会发送一条nack消息,提示找不到交换机。
83+
84+
生产者可以通过调用channel.confirmSelect()方法将信道设置为confirm模式。在 Springboot 是通过 publisher-confirms 参数来设置 confirm 模式:
85+
86+
```yaml
87+
spring:
88+
rabbitmq:
89+
#开启 confirm 确认机制
90+
publisher-confirms: true
91+
```
92+
93+
异步comfirm模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后,生产者会回调这个方法,根据具体的结果对消息进行重新发送、或记录日志等后续处理。
94+
95+
```java
96+
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
97+
log.info("correlationData: " + correlationData);
98+
log.info("ack: " + ack);
99+
if(!ack) {
100+
log.info("异常处理....");
101+
}
102+
};
103+
104+
rabbitTemplate.setConfirmCallback(confirmCallback);
105+
```
106+
107+
### 路由不可达消息
108+
109+
生产者确认机制只确保消息正确到达交换机,对于指定的 routing key 路由不到的消息(消息没有匹配到Queue),还是会存在消息丢失的问题。
69110

70-
消息丢失从三个方面来解决:生产者开启发送确认、消费者开启消息确认和持久化
111+
对于不可路由的消息,有两种处理方式:Return消息机制和备份交换机
71112

72-
### 生产者开启发送确认
113+
#### Return消息机制
73114

74-
生产者发送消息到队列,无法确保发送的消息成功的分配到队列中
115+
ReturnCallback 用于监听路由不可达的消息。需要将`mandatory` 设置为 `true` ,这样路由不可达的消息会被监听到,不会被 broker 自动删除
75116

76-
解决方法:开启消息发送确认,通过 ConfirmCallback 接口 和 ReturnCallback 接口 来保障。
117+
```yaml
118+
spring:
119+
rabbitmq:
120+
#设置为true后消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
121+
template.mandatory: true
122+
```
77123
78-
ConfirmCallback 确认消息是否正确到达 Exchange 中
124+
通过 ReturnCallback 监听路由不可达消息
79125
80-
ReturnCallback 消息没有正确到达队列时触发回调,如果正确到达队列不执行。
126+
```java
127+
final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
128+
log.info("return exchange: " + exchange + ", routingKey: "
129+
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
130+
rabbitTemplate.setReturnCallback(returnCallback);
131+
```
81132

82-
### 消费者开启消息确认
133+
当消息没有匹配到Queue时,会返回 `return exchange: , routingKey: MAIL, replyCode: 312, replyText: NO_ROUTE`
83134

84-
可能消费者收到消息还没来得及处理服务就宕机了,导致消息丢失。
135+
#### 备份交换机
85136

86-
解决方法:消费端开启消息确认(ACK),将消息设置为手动确认。默认情况下消息消费者是自动 ack消息的,自动确认会在消息发送给消费者后立即确认,这样存在丢失消息的可能。
137+
alternate-exchange 是一个普通的exchange,当你发送消息到对应的exchange时,没有匹配到queue,就会自动转移到alternate-exchange对应的queue,这样消息就不会丢失。
138+
139+
### 消费者手动消息确认
140+
141+
可能消费者收到消息还没来得及处理服务就宕机了,导致消息丢失。因为消息者默认采用自动ack,当消费者收到消息后会通知MQ Server这条消息已经处理好了,MQ 就会移除这条消息。
142+
143+
解决方法:消费者设置为手动确认消息。消费者处理完逻辑之后再通知MQ Server,这样消费者没处理完消息就不会发送ack。当消息者消费失败的时候,MQ 会重发消息。
144+
145+
消费者设置手动ack:
146+
147+
```java
148+
#设置消费端手动 ack
149+
spring.rabbitmq.listener.simple.acknowledge-mode=manual
150+
```
151+
152+
消息处理完,手动确认:
153+
154+
```java
155+
@RabbitListener(queues = RabbitMqConfig.MAIL_QUEUE)
156+
public void onMessage(Message message, Channel channel) throws IOException {
157+
158+
try {
159+
Thread.sleep(5000);
160+
} catch (InterruptedException e) {
161+
e.printStackTrace();
162+
}
163+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
164+
//手工ack;第二个参数是multiple,设置为true,表示deliveryTag序列号之前(包括自身)的消息都已经收到,设为false则表示收到一条消息
165+
channel.basicAck(deliveryTag, true);
166+
System.out.println("mail listener receive: " + new String(message.getBody()));
167+
}
168+
```
87169

88170
### 持久化
89171

90172
如果RabbitMQ服务异常导致重启,将会导致消息丢失。RabbitMQ提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启RabbitMQ,消息也不会丢失。
91173

92174
消息持久化需要满足以下条件:
93175

94-
1. Exchange设置持久化。将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或执行代码去创建交换机了,交换机会自动被创建。
95176
2. Queue设置持久化。将队列的元数据保存在磁盘上,防止数据丢失。
96-
3. 发送消息设置持久化发送。
177+
3. 发送消息设置持久化。如果只有队列设置了持久化,消息没有设置持久化,当RabbitMQ服务挂掉重启之后,只会恢复队列的数据,消息依然会丢失。队列里的消息会不会持久化,取决于发送消息时对消息的持久化设置。
178+
179+
建议 Exchange 也设置持久化。将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭之后,在重启 RabbitMQ 时不需要重新手动或重启应用去创建交换机了,交换机会自动被创建。
180+
181+
其实消息在正确存入RabbitMQ之后,还需要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并不是为每条消息都做fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache但是还没来得及落盘,那么这些消息将会丢失。那么这个怎么解决呢?首先可以引入RabbitMQ的mirrored-queue即镜像队列,这个相当于配置了副本,当master在此特殊时间内crash掉,可以自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能完全的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好很多。
182+
183+
184+
## 消费端限流
185+
186+
当 RabbitMQ 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器奔溃。这种情况下需要对消费端限流。
187+
188+
Spring RabbitMQ 提供参数 prefetch 可以设置单个请求中处理的最大消息个数。如果有 N 个消息还没有 ack,则该 consumer 将会阻塞,不会消费新的消息,直到有消息 ack。
189+
190+
开启消费端限流:
191+
192+
```properties
193+
#在单个请求中处理的消息个数,unack的最大数量
194+
spring.rabbitmq.listener.simple.prefetch=2
195+
```
196+
197+
除此之外,原生 RabbitMQ 还提供 prefetchSize 和 global 两个参数。Spring RabbitMQ没有这两个参数。
198+
199+
```java
200+
//单条消息大小限制,0代表不限制
201+
//global:限制限流功能是channel级别的还是consumer级别。当设置为false,consumer级别,限流功能生效,设置为true没有了限流功能,因为channel级别尚未实现。
202+
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
203+
```
204+
205+
206+
207+
### 消息过期时间
208+
209+
RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。
210+
211+
在生产端发送消息的时候可以在 properties 中指定 `expiration`属性来对消息过期时间进行设置,单位为毫秒(ms)。
212+
213+
```java
214+
Message msg = new Message("tyson".getBytes(), mp);
215+
msg.getMessageProperties().setExpiration("3000");
216+
```
217+
218+
也可以在创建队列的时候指定队列的ttl,对于队列中超过该时间的消息将会被移除。
219+
220+
221+
222+
## 死信队列
223+
224+
没有被消费成功的消息存放的队列。
225+
226+
消息没有被消费成功的原因:
227+
228+
- 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
229+
- 消息超时未消费
230+
- 达到最大队列长度
231+
232+
实现死信队列:
233+
234+
设置死信队列的 exchange 和 queue,然后进行绑定:
235+
236+
```java
237+
@Bean
238+
public DirectExchange dlxExchange() {
239+
return new DirectExchange(RabbitMqConfig.DLX_EXCHANGE);
240+
}
241+
242+
@Bean
243+
public Queue dlxQueue() {
244+
return new Queue(RabbitMqConfig.DLX_QUEUE, true);
245+
}
246+
247+
@Bean
248+
public Binding bindingDeadExchange(Queue dlxQueue, DirectExchange deadExchange) {
249+
return BindingBuilder.bind(dlxQueue).to(deadExchange).with(RabbitMqConfig.DLX_QUEUE);
250+
}
251+
```
252+
253+
在普通队列加上两个参数,绑定普通队列到死信队列。当消息在过期、requeue失败、 队列在达到最大长度时,消息会被路由到死信队列。
254+
255+
```java
256+
@Bean
257+
public Queue sendSmsQueue() {
258+
Map<String,Object> arguments = new HashMap<>(2);
259+
// 绑定该队列到私信交换机
260+
arguments.put("x-dead-letter-exchange", RabbitMqConfig.DLX_EXCHANGE);
261+
arguments.put("x-dead-letter-routing-key", RabbitMqConfig.DLX_QUEUE);
262+
return new Queue(RabbitMqConfig.MAIL_QUEUE, true, false, false, arguments);
263+
}
264+
```
265+
266+
生产者完整代码:
267+
268+
```java
269+
@Component
270+
@Slf4j
271+
public class MQProducer {
272+
273+
@Autowired
274+
RabbitTemplate rabbitTemplate;
275+
276+
@Autowired
277+
RandomUtil randomUtil;
278+
279+
@Autowired
280+
UserService userService;
281+
282+
final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
283+
log.info("correlationData: " + correlationData);
284+
log.info("ack: " + ack);
285+
if(!ack) {
286+
log.info("异常处理....");
287+
}
288+
};
289+
290+
291+
final RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) ->
292+
log.info("return exchange: " + exchange + ", routingKey: "
293+
+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
294+
295+
public void sendMail(String mail) {
296+
//貌似线程不安全 范围100000 - 999999
297+
Integer random = randomUtil.nextInt(100000, 999999);
298+
Map<String, String> map = new HashMap<>(2);
299+
String code = random.toString();
300+
map.put("mail", mail);
301+
map.put("code", code);
302+
303+
MessageProperties mp = new MessageProperties();
304+
//在生产环境中这里不用Message,而是使用 fastJson 等工具将对象转换为 json 格式发送
305+
Message msg = new Message("tyson".getBytes(), mp);
306+
msg.getMessageProperties().setExpiration("3000");
307+
//如果消费端要设置为手工 ACK ,那么生产端发送消息的时候一定发送 correlationData ,并且全局唯一,用以唯一标识消息。
308+
CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
309+
310+
rabbitTemplate.setMandatory(true);
311+
rabbitTemplate.setConfirmCallback(confirmCallback);
312+
rabbitTemplate.setReturnCallback(returnCallback);
313+
rabbitTemplate.convertAndSend(RabbitMqConfig.MAIL_QUEUE, msg, correlationData);
314+
315+
//存入redis
316+
userService.updateMailSendState(mail, code, MailConfig.MAIL_STATE_WAIT);
317+
}
318+
}
319+
```
320+
321+
消费者完整代码:
322+
323+
```java
324+
@Slf4j
325+
@Component
326+
public class DeadListener {
327+
328+
@RabbitListener(queues = RabbitMqConfig.DLX_QUEUE)
329+
public void onMessage(Message message, Channel channel) throws IOException {
330+
331+
try {
332+
Thread.sleep(5000);
333+
} catch (InterruptedException e) {
334+
e.printStackTrace();
335+
}
336+
long deliveryTag = message.getMessageProperties().getDeliveryTag();
337+
//手工ack
338+
channel.basicAck(deliveryTag,false);
339+
System.out.println("receive--1: " + new String(message.getBody()));
340+
}
341+
}
342+
```
343+
344+
当普通队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。

工具/nginx部署项目.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,4 +236,4 @@ service mysqld start
236236
/sbin/service rabbitmq-server start
237237
```
238238

239-
启动 nginx:`安装目录/sbin/nginx`
239+
启动 nginx:`安装目录/sbin/nginx`

0 commit comments

Comments
 (0)