@@ -3594,7 +3594,7 @@ RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Produce
35943594
35953595
35963596
3597- 官方文档:https: // github.com/apache/rocketmq/blob/master/docs/cn/concept.md
3597+ 官方文档:https: // github.com/apache/rocketmq/blob/master/docs/cn/concept.md(基础知识部分的笔记参考官方文档编写)
35983598
35993599
36003600
@@ -3934,7 +3934,7 @@ public class Consumer {
39343934
39353935
39363936
3937- #### 代码实例
3937+ #### 代码实现
39383938
39393939一个订单的顺序流程是:创建、付款、推送、完成,订单号相同的消息会被先后发送到同一个队列中,消费时同一个 OrderId 获取到的肯定是同一个队列
39403940
@@ -4282,7 +4282,7 @@ RocketMQ 定义了一些基本语法来支持过滤特性,可以很容易地
42824282- NULL ,特殊的常量
42834283- 布尔值,TRUE 或 FALSE
42844284
4285- 只有使用 push 模式的消费者才能用使用 SQL92 标准的sql语句 ,接口如下:
4285+ 只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句 ,接口如下:
42864286
42874287```java
42884288public void subscribe(final String topic, final MessageSelector messageSelector)
@@ -4297,13 +4297,33 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
42974297
42984298
42994299
4300+ ***
4301+
4302+
4303+
4304+ #### 原理解析
4305+
4306+ RocketMQ 分布式消息队列的消息过滤方式是在 Consumer 端订阅消息时再做消息过滤的。因为 RocketMQ 在 Producer 端写入消息和在 Consumer 端订阅消息采用** 分离存储** 的机制实现,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以绕不开其存储结构。
4307+
4308+ ConsumeQueue 的存储结构如下,有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤就是基于这个字段
4309+
4310+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-消费队列结构.png)
4311+
4312+ * Tag 过滤:Consumer 端订阅消息时指定 Topic 和 TAG ,然后将订阅请求构建成一个 SubscriptionData ,发送一个 Pull 消息的请求给 Broker 端。Broker 端用这些数据先构建一个 MessageFilter ,然后传给文件存储层 Store 。Store 从 ConsumeQueue 读取到一条记录后,会用它记录的消息 tag hash 值去做过滤。因为在服务端只是根据 hashcode 进行判断,无法精确对 tag 原始字符串进行过滤,所以消费端拉取到消息后,还需要对消息的原始 tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费
4313+
4314+ * SQL92 过滤:工作流程和 Tag 过滤大致一样,只是在 Store 层的具体过滤方式不一样。真正的 SQL expression 的构建和执行由 rocketmq- filter 模块负责,每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 使用了 BloomFilter 来避免了每次都去执行
4315+
4316+
4317+
4318+
4319+
43004320****
43014321
43024322
43034323
4304- #### 代码实例
4324+ #### 代码实现
43054325
4306- 发送消息时,通过 putUserProperty 来设置消息的属性
4326+ 发送消息时,通过 putUserProperty 来设置消息的属性, SQL92 的表达式上下文为消息的属性
43074327
43084328```java
43094329public class Producer {
@@ -4360,7 +4380,97 @@ public class Consumer {
43604380
43614381### 事务消息
43624382
4363- #### 事务机制
4383+ #### 工作流程
4384+
4385+ RocketMQ 支持分布式事务消息,采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示:
4386+
4387+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-事务消息.png)
4388+
4389+ 事务消息的大致方案分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程
4390+
4391+ 1. 事务消息发送及提交:
4392+
4393+ * 发送消息(Half 消息)
4394+
4395+ * 服务端响应消息写入结果
4396+
4397+ * 根据发送结果执行本地事务(如果写入失败,此时 Half 消息对业务不可见,本地逻辑不执行)
4398+ * 根据本地事务状态执行 Commit 或者 Rollback (Commit 操作生成消息索引,消息对消费者可见)
4399+
4400+ 2. 补偿流程:
4401+
4402+ * 对没有 Commit / Rollback 的事务消息(pending 状态的消息),从服务端发起一次回查
4403+ * Producer 收到回查消息,检查回查消息对应的本地事务的状态
4404+
4405+ * 根据本地事务状态,重新 Commit 或者 Rollback
4406+
4407+ 补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况
4408+
4409+
4410+
4411+ ****
4412+
4413+
4414+
4415+ #### 原理解析
4416+
4417+ ##### 不可见性
4418+
4419+ 事务消息相对普通消息最大的特点就是** 一阶段发送的消息对用户是不可见的** ,因为对于 Half 消息,会备份原消息的主题与消息消费队列,然后改变主题为 RMQ_SYS_TRANS_HALF_TOPIC ,由于消费组未订阅该主题,故消费端无法消费 Half 类型的消息
4420+
4421+ RocketMQ 会开启一个定时任务,从 Topic 为 RMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息
4422+
4423+ 在 RocketMQ 中,每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue 这个二级索引来读取消息实体内容,如图:
4424+
4425+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-事务工作流程.png)
4426+
4427+ RocketMQ 的具体实现策略:如果写入的是事务消息,对消息的 Topic 和 Queue 等属性进行替换,同时将原来的 Topic 和 Queue 信息存储到消息的属性中,因为消息主题被替换,消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费
4428+
4429+
4430+
4431+ ****
4432+
4433+
4434+
4435+ ##### OP 消息
4436+
4437+ 一阶段写入不可见的消息后,二阶段操作:
4438+
4439+ * 如果执行 Commit 操作,则需要让消息对用户可见,构建出 Half 消息的索引。一阶段的 Half 消息写到一个特殊的 Topic ,所以构建索引时需要读取出 Half 消息,并将 Topic 和 Queue 替换成真正的目标的 Topic 和 Queue ,然后通过一次普通消息的写入操作来生成一条对用户可见的消息
4440+
4441+ * 如果是 Rollback 则需要撤销一阶段的消息,因为消息本就不可见,所以并不需要真正撤销消息(实际上 RocketMQ 也无法去删除一条消息,因为是顺序写文件的),为了区分这条消息没有确定的状态(Pending 状态),RocketMQ 用 Op 消息标识事务消息已经确定的状态(Commit 或者 Rollback )
4442+
4443+ 事务消息无论是 Commit 或者 Rollback 都会记录一个 Op 操作,两者的区别是 Commit 相对于 Rollback 在写入 Op 消息前创建 Half 消息的索引。如果一条事务消息没有对应的 Op 消息,说明这个事务的状态还无法确定(可能是二阶段失败了)
4444+
4445+ RocketMQ 将 Op 消息写入到全局一个特定的 Topic 中,通过源码中的方法 `TransactionalMessageUtil . buildOpTopic()`,这个主题是一个内部的 Topic (像 Half 消息的 Topic 一样),不会被用户消费。Op 消息的内容为对应的 Half 消息的存储的 Offset ,这样通过 Op 消息能索引到 Half 消息进行后续的回查操作
4446+
4447+ ! [](https: // gitee.com/seazean/images/raw/master/Frame/RocketMQ-OP消息.png)
4448+
4449+
4450+
4451+ ****
4452+
4453+
4454+
4455+ ##### 补偿机制
4456+
4457+ 如果在 RocketMQ 事务消息的二阶段过程中失败了,例如在做 Commit 操作时,出现网络问题导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 Commit ,RocketMQ 采用了一种补偿机制,称为回查
4458+
4459+ Broker 端通过对比 Half 消息和 Op 消息,对未确定状态的消息发起回查并且推进CheckPoint (记录那些事务消息的状态是确定的),将消息发送到对应的 Producer 端(同一个 Group 的 Producer ),由 Producer 根据消息来检查本地事务的状态,然后执行提交或回滚
4460+
4461+ 注意:RocketMQ 并不会无休止的进行事务状态回查,默认回查 15 次,如果 15 次回查还是无法得知事务状态,则默认回滚该消息
4462+
4463+
4464+
4465+
4466+
4467+ ****
4468+
4469+
4470+
4471+ #### 基本使用
4472+
4473+ ##### 使用方式
43644474
43654475事务消息共有三种状态,提交状态、回滚状态、中间状态:
43664476
@@ -4379,11 +4489,11 @@ public class Consumer {
43794489
43804490
43814491
4382- ****
4492+ ***
43834493
43844494
43854495
4386- #### 代码实例
4496+ ##### 代码实现
43874497
43884498使用 ** TransactionMQProducer ** 类创建事务性生产者,并指定唯一的 `ProducerGroup `,就可以设置自定义线程池来处理这些检查请求,执行本地事务后、需要根据执行结果对消息队列进行回复
43894499
@@ -4419,7 +4529,7 @@ public class Producer {
44194529
44204530消费者代码和前面的实例相同的
44214531
4422- 实现事务的监听接口,当发送半消息成功时
4532+ 实现事务的监听接口,当发送半消息成功时:
44234533
44244534* `executeLocalTransaction` 方法来执行本地事务,返回三个事务状态之一
44254535* `checkLocalTransaction` 方法检查本地事务状态,响应消息队列的检查请求,返回三个事务状态之一
@@ -4461,12 +4571,6 @@ public class TransactionListenerImpl implements TransactionListener {
44614571
44624572
44634573
4464-
4465-
4466-
4467-
4468-
4469-
44704574****
44714575
44724576
@@ -4475,10 +4579,7 @@ public class TransactionListenerImpl implements TransactionListener {
44754579
44764580## 高级特性
44774581
4478- 待补充笔记:
44794582
4480- * https: // github.com/apache/rocketmq/blob/master/docs/cn/design.md#3-%E6%B6%88%E6%81%AF%E8%BF%87%E6%BB%A4
4481- * https: // github.com/apache/rocketmq/blob/master/docs/cn/design.md#5-%E4%BA%8B%E5%8A%A1%E6%B6%88%E6%81%AF
44824583
44834584
44844585
0 commit comments