Skip to content

Commit 83738e5

Browse files
committed
Update Java Notes
1 parent 9cb437d commit 83738e5

1 file changed

Lines changed: 119 additions & 18 deletions

File tree

Frame.md

Lines changed: 119 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3594,7 +3594,7 @@ RocketMQ 主要由 Producer、Broker、Consumer 三部分组成,其中 Produce
35943594

35953595

35963596

3597-
官方文档:https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md
3597+
官方文档:https://github.com/apache/rocketmq/blob/master/docs/cn/concept.md(基础知识部分的笔记参考官方文档编写)
35983598

35993599

36003600

@@ -3934,7 +3934,7 @@ public class Consumer {
39343934

39353935

39363936

3937-
#### 代码实例
3937+
#### 代码实现
39383938

39393939
一个订单的顺序流程是:创建、付款、推送、完成,订单号相同的消息会被先后发送到同一个队列中,消费时同一个 OrderId 获取到的肯定是同一个队列
39403940

@@ -4282,7 +4282,7 @@ RocketMQ 定义了一些基本语法来支持过滤特性,可以很容易地
42824282
- NULL,特殊的常量
42834283
- 布尔值,TRUEFALSE
42844284

4285-
只有使用 push 模式的消费者才能用使用SQL92标准的sql语句,接口如下:
4285+
只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:
42864286

42874287
```java
42884288
public void subscribe(final String topic, final MessageSelector messageSelector)
@@ -4297,13 +4297,33 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
42974297

42984298

42994299

4300+
***
4301+
4302+
4303+
4304+
#### 原理解析
4305+
4306+
RocketMQ 分布式消息队列的消息过滤方式是在 Consumer 端订阅消息时再做消息过滤的。因为 RocketMQProducer 端写入消息和在 Consumer 端订阅消息采用**分离存储**的机制实现,Consumer 端订阅消息是需要通过 ConsumeQueue 这个消息消费的逻辑队列拿到一个索引,然后再从 CommitLog 里面读取真正的消息实体内容,所以绕不开其存储结构。
4307+
4308+
ConsumeQueue 的存储结构如下,有 8 个字节存储的 Message Tag 的哈希值,基于 Tag 的消息过滤就是基于这个字段
4309+
4310+
![](https://gitee.com/seazean/images/raw/master/Frame/RocketMQ-消费队列结构.png)
4311+
4312+
* Tag 过滤:Consumer 端订阅消息时指定 TopicTAG,然后将订阅请求构建成一个 SubscriptionData,发送一个 Pull 消息的请求给 Broker 端。Broker 端用这些数据先构建一个 MessageFilter,然后传给文件存储层 StoreStoreConsumeQueue 读取到一条记录后,会用它记录的消息 tag hash 值去做过滤。因为在服务端只是根据 hashcode 进行判断,无法精确对 tag 原始字符串进行过滤,所以消费端拉取到消息后,还需要对消息的原始 tag 字符串进行比对,如果不同,则丢弃该消息,不进行消息消费
4313+
4314+
* SQL92 过滤:工作流程和 Tag 过滤大致一样,只是在 Store 层的具体过滤方式不一样。真正的 SQL expression 的构建和执行由 rocketmq-filter 模块负责,每次过滤都去执行 SQL 表达式会影响效率,所以 RocketMQ 使用了 BloomFilter 来避免了每次都去执行
4315+
4316+
4317+
4318+
4319+
43004320
****
43014321

43024322

43034323

4304-
#### 代码实例
4324+
#### 代码实现
43054325

4306-
发送消息时,通过 putUserProperty 来设置消息的属性
4326+
发送消息时,通过 putUserProperty 来设置消息的属性SQL92 的表达式上下文为消息的属性
43074327

43084328
```java
43094329
public class Producer {
@@ -4360,7 +4380,97 @@ public class Consumer {
43604380

43614381
### 事务消息
43624382

4363-
#### 事务机制
4383+
#### 工作流程
4384+
4385+
RocketMQ 支持分布式事务消息,采用了 2PC 的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示:
4386+
4387+
![](https://gitee.com/seazean/images/raw/master/Frame/RocketMQ-事务消息.png)
4388+
4389+
事务消息的大致方案分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程
4390+
4391+
1. 事务消息发送及提交:
4392+
4393+
* 发送消息(Half 消息)
4394+
4395+
* 服务端响应消息写入结果
4396+
4397+
* 根据发送结果执行本地事务(如果写入失败,此时 Half 消息对业务不可见,本地逻辑不执行)
4398+
* 根据本地事务状态执行 Commit 或者 RollbackCommit 操作生成消息索引,消息对消费者可见)
4399+
4400+
2. 补偿流程:
4401+
4402+
* 对没有 Commit/Rollback 的事务消息(pending 状态的消息),从服务端发起一次回查
4403+
* Producer 收到回查消息,检查回查消息对应的本地事务的状态
4404+
4405+
* 根据本地事务状态,重新 Commit 或者 Rollback
4406+
4407+
补偿阶段用于解决消息 Commit 或者 Rollback 发生超时或者失败的情况
4408+
4409+
4410+
4411+
****
4412+
4413+
4414+
4415+
#### 原理解析
4416+
4417+
##### 不可见性
4418+
4419+
事务消息相对普通消息最大的特点就是**一阶段发送的消息对用户是不可见的**,因为对于 Half 消息,会备份原消息的主题与消息消费队列,然后改变主题为 RMQ_SYS_TRANS_HALF_TOPIC,由于消费组未订阅该主题,故消费端无法消费 Half 类型的消息
4420+
4421+
RocketMQ 会开启一个定时任务,从 TopicRMQ_SYS_TRANS_HALF_TOPIC 中拉取消息进行消费,根据生产者组获取一个服务提供者发送回查事务状态请求,根据事务状态来决定是提交或回滚消息
4422+
4423+
RocketMQ 中,每条消息都会有对应的索引信息,Consumer 通过 ConsumeQueue 这个二级索引来读取消息实体内容,如图:
4424+
4425+
![](https://gitee.com/seazean/images/raw/master/Frame/RocketMQ-事务工作流程.png)
4426+
4427+
RocketMQ 的具体实现策略:如果写入的是事务消息,对消息的 TopicQueue 等属性进行替换,同时将原来的 TopicQueue 信息存储到消息的属性中,因为消息主题被替换,消息并不会转发到该原主题的消息消费队列,消费者无法感知消息的存在,不会消费
4428+
4429+
4430+
4431+
****
4432+
4433+
4434+
4435+
##### OP消息
4436+
4437+
一阶段写入不可见的消息后,二阶段操作:
4438+
4439+
* 如果执行 Commit 操作,则需要让消息对用户可见,构建出 Half 消息的索引。一阶段的 Half 消息写到一个特殊的 Topic,所以构建索引时需要读取出 Half 消息,并将 TopicQueue 替换成真正的目标的 TopicQueue,然后通过一次普通消息的写入操作来生成一条对用户可见的消息
4440+
4441+
* 如果是 Rollback 则需要撤销一阶段的消息,因为消息本就不可见,所以并不需要真正撤销消息(实际上 RocketMQ 也无法去删除一条消息,因为是顺序写文件的),为了区分这条消息没有确定的状态(Pending 状态),RocketMQOp 消息标识事务消息已经确定的状态(Commit 或者 Rollback
4442+
4443+
事务消息无论是 Commit 或者 Rollback 都会记录一个 Op 操作,两者的区别是 Commit 相对于 Rollback 在写入 Op 消息前创建 Half 消息的索引。如果一条事务消息没有对应的 Op 消息,说明这个事务的状态还无法确定(可能是二阶段失败了)
4444+
4445+
RocketMQOp 消息写入到全局一个特定的 Topic 中,通过源码中的方法 `TransactionalMessageUtil.buildOpTopic()`,这个主题是一个内部的 Topic(像 Half 消息的 Topic 一样),不会被用户消费。Op 消息的内容为对应的 Half 消息的存储的 Offset,这样通过 Op 消息能索引到 Half 消息进行后续的回查操作
4446+
4447+
![](https://gitee.com/seazean/images/raw/master/Frame/RocketMQ-OP消息.png)
4448+
4449+
4450+
4451+
****
4452+
4453+
4454+
4455+
##### 补偿机制
4456+
4457+
如果在 RocketMQ 事务消息的二阶段过程中失败了,例如在做 Commit 操作时,出现网络问题导致 Commit 失败,那么需要通过一定的策略使这条消息最终被 CommitRocketMQ采用了一种补偿机制,称为回查
4458+
4459+
Broker 端通过对比 Half 消息和 Op 消息,对未确定状态的消息发起回查并且推进CheckPoint(记录那些事务消息的状态是确定的),将消息发送到对应的 Producer 端(同一个 GroupProducer),由 Producer 根据消息来检查本地事务的状态,然后执行提交或回滚
4460+
4461+
注意:RocketMQ 并不会无休止的进行事务状态回查,默认回查 15 次,如果 15 次回查还是无法得知事务状态,则默认回滚该消息
4462+
4463+
4464+
4465+
4466+
4467+
****
4468+
4469+
4470+
4471+
#### 基本使用
4472+
4473+
##### 使用方式
43644474

43654475
事务消息共有三种状态,提交状态、回滚状态、中间状态:
43664476

@@ -4379,11 +4489,11 @@ public class Consumer {
43794489

43804490

43814491

4382-
****
4492+
***
43834493

43844494

43854495

4386-
#### 代码实例
4496+
##### 代码实现
43874497

43884498
使用 **TransactionMQProducer** 类创建事务性生产者,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求,执行本地事务后、需要根据执行结果对消息队列进行回复
43894499

@@ -4419,7 +4529,7 @@ public class Producer {
44194529

44204530
消费者代码和前面的实例相同的
44214531

4422-
实现事务的监听接口,当发送半消息成功时
4532+
实现事务的监听接口,当发送半消息成功时
44234533

44244534
* `executeLocalTransaction` 方法来执行本地事务,返回三个事务状态之一
44254535
* `checkLocalTransaction` 方法检查本地事务状态,响应消息队列的检查请求,返回三个事务状态之一
@@ -4461,12 +4571,6 @@ public class TransactionListenerImpl implements TransactionListener {
44614571

44624572

44634573

4464-
4465-
4466-
4467-
4468-
4469-
44704574
****
44714575

44724576

@@ -4475,10 +4579,7 @@ public class TransactionListenerImpl implements TransactionListener {
44754579

44764580
## 高级特性
44774581

4478-
待补充笔记:
44794582

4480-
* https://github.com/apache/rocketmq/blob/master/docs/cn/design.md#3-%E6%B6%88%E6%81%AF%E8%BF%87%E6%BB%A4
4481-
* https://github.com/apache/rocketmq/blob/master/docs/cn/design.md#5-%E4%BA%8B%E5%8A%A1%E6%B6%88%E6%81%AF
44824583

44834584

44844585

0 commit comments

Comments
 (0)