|
| 1 | +# RocketMQ ConsumeQueue 详解 |
| 2 | + |
| 3 | +RocketMQ 基于主题订阅模式实现消息消费,消费者关注每一个主题下的所有消息,但是同一主题下的消息是不连续地存储在 CommitLog 文件中的,如果消费者直接从消息存储文件中遍历查找主题下的消息,效率会特别低。所以为了在查找消息的时候效率更高一些,设计了 ConsumeQueue 文件,可以看作 CommitLog 消费的目录文件. |
| 4 | + |
| 5 | +ConsumeQueue 的第一级目录为消息主题名称,第二级目录为主题的队列 id |
| 6 | + |
| 7 | +为了加速 ConsumeQueue 消息的查询速度并节省磁盘空间,不会存储消息的全量信息,只会 存储一些 关键信息,如 8 字节的 CommmitLog 偏移量、4 字节的文件大小、8 字节的 tag 哈希码 |
| 8 | + |
| 9 | +1、根据消息存储时间查找物理偏移量: |
| 10 | + |
| 11 | +org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime |
| 12 | + |
| 13 | +第一步:根据时间戳定位物理文件 |
| 14 | + |
| 15 | +```java |
| 16 | +public MappedFile getMappedFileByTime(final long timestamp) { |
| 17 | + Object[] mfs = this.copyMappedFiles(0); |
| 18 | + |
| 19 | + if (null == mfs) |
| 20 | + return null; |
| 21 | + |
| 22 | + for (int i = 0; i < mfs.length; i++) { |
| 23 | + MappedFile mappedFile = (MappedFile) mfs[i]; |
| 24 | + if (mappedFile.getLastModifiedTimestamp() >= timestamp) { |
| 25 | + return mappedFile; |
| 26 | + } |
| 27 | + } |
| 28 | + |
| 29 | + return (MappedFile) mfs[mfs.length - 1]; |
| 30 | +} |
| 31 | +``` |
| 32 | + |
| 33 | +从第一个文件 开始,找到第一个更新时间大于该时间戳的文件 |
| 34 | + |
| 35 | +第二步:利用二分查找法来加速检索 |
| 36 | + |
| 37 | +计算最低查找偏移量,如果消息队列偏移量大于文件的偏移量,则最低偏移量等于消息队列偏移量减去文件的偏移量,反之为 0 |
| 38 | + |
| 39 | +`int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;` |
| 40 | + |
| 41 | +计算中间偏移量,其中*`CQ_STORE_UNIT_SIZE` =* 8 字节的 CommmitLog 偏移量 + 4 字节的文件大小+8 字节的 tag 哈希码 |
| 42 | + |
| 43 | +`midOffset = (low + high) / (2 * *CQ_STORE_UNIT_SIZE*) * *CQ_STORE_UNIT_SIZE*;` |
| 44 | + |
| 45 | +如果得到的物理偏移量小于当前最小物理偏移量,则待查找消息的物理偏移量大于 midOffset,将 low 设置为 midOffset,继续查询 |
| 46 | + |
| 47 | +```java |
| 48 | +byteBuffer.position(midOffset); |
| 49 | +long phyOffset = byteBuffer.getLong(); |
| 50 | +int size = byteBuffer.getInt(); |
| 51 | +if (phyOffset < minPhysicOffset) { |
| 52 | + low = midOffset +CQ_STORE_UNIT_SIZE; |
| 53 | + leftOffset = midOffset; |
| 54 | + continue; |
| 55 | +} |
| 56 | +``` |
| 57 | + |
| 58 | +如果得到的物理偏移量大于最小物理偏移量,说明该消息为有效信息,则根据消息物理偏移量和消息长度获取消息存储的时间戳 |
| 59 | + |
| 60 | +```java |
| 61 | +long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); |
| 62 | +``` |
| 63 | + |
| 64 | +如果存储时间小于 0,则为无效消息,返回 0; |
| 65 | + |
| 66 | +如果存储时间戳等于待查找时间戳,说明查找到了目标消息,设置 targetOffset,跳出循环; |
| 67 | + |
| 68 | +如果存储时间戳大于待查找时间戳,说明待查找消息的物理偏移量小于 midOffset,设置 high 为 midOffset,设置 rightIndexValue 等于 storeTime,设置 rightOffset 为 midOffset; |
| 69 | + |
| 70 | +如果存储时间戳小于待查找时间戳,说明待查找消息的物理偏移量大于 midOffset,设置 low 为 midOffset,设置 leftIndexValue 等于 storeTime,设置 leftOffset 为 midOffset |
| 71 | + |
| 72 | +```java |
| 73 | +if (storeTime < 0) { |
| 74 | + return 0; |
| 75 | +} else if (storeTime == timestamp) { |
| 76 | + targetOffset = midOffset; |
| 77 | + break; |
| 78 | +} else if (storeTime > timestamp) { |
| 79 | + high = midOffset -CQ_STORE_UNIT_SIZE; |
| 80 | + rightOffset = midOffset; |
| 81 | + rightIndexValue = storeTime; |
| 82 | +} else { |
| 83 | + low = midOffset +CQ_STORE_UNIT_SIZE; |
| 84 | + leftOffset = midOffset; |
| 85 | + leftIndexValue = storeTime; |
| 86 | +} |
| 87 | +``` |
| 88 | + |
| 89 | +如果 targetOffset 不等于-1,表示找到了存储时间戳等于待查找时间戳的消息; |
| 90 | + |
| 91 | +如果 leftIndexValue 等于-1,返回大于并且最接近待查找消息的时间戳的偏移量 |
| 92 | + |
| 93 | +如果 rightIndexValue 等于-1,返回小于并且最接近待查找消息的时间戳的偏移量 |
| 94 | + |
| 95 | +```java |
| 96 | +if (targetOffset != -1) { |
| 97 | + |
| 98 | + offset = targetOffset; |
| 99 | +} else { |
| 100 | + if (leftIndexValue == -1) { |
| 101 | + offset = rightOffset; |
| 102 | + } else if (rightIndexValue == -1) { |
| 103 | + offset = leftOffset; |
| 104 | + } else { |
| 105 | + offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset; |
| 106 | + } |
| 107 | +} |
| 108 | +``` |
| 109 | + |
| 110 | +2、根据当前偏移量获取下一个文件的偏移量 |
| 111 | + |
| 112 | +org.apache.rocketmq.store.ConsumeQueue#rollNextFile |
| 113 | + |
| 114 | +```java |
| 115 | +public long rollNextFile(final long index) { |
| 116 | + int mappedFileSize = this.mappedFileSize; |
| 117 | + int totalUnitsInFile = mappedFileSize /CQ_STORE_UNIT_SIZE; |
| 118 | + return index + totalUnitsInFile - index % totalUnitsInFile; |
| 119 | +} |
| 120 | +``` |
| 121 | + |
| 122 | +3、ConsumeQueue 添加消息 |
| 123 | + |
| 124 | +org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo |
| 125 | + |
| 126 | +将消息偏移量、消息长度、tag 哈希码写入 ByteBuffer,将内容追加到 ConsumeQueue 的内存映射文件中。 |
| 127 | + |
| 128 | +```java |
| 129 | +private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, |
| 130 | + final long cqOffset) { |
| 131 | + |
| 132 | + if (offset + size <= this.maxPhysicOffset) { |
| 133 | + log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); |
| 134 | + return true; |
| 135 | + } |
| 136 | + |
| 137 | + this.byteBufferIndex.flip(); |
| 138 | + this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); |
| 139 | + this.byteBufferIndex.putLong(offset); |
| 140 | + this.byteBufferIndex.putInt(size); |
| 141 | + this.byteBufferIndex.putLong(tagsCode); |
| 142 | + |
| 143 | + final long expectLogicOffset = cqOffset *CQ_STORE_UNIT_SIZE; |
| 144 | + |
| 145 | + MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); |
| 146 | + if (mappedFile != null) { |
| 147 | + |
| 148 | + if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { |
| 149 | + this.minLogicOffset = expectLogicOffset; |
| 150 | + this.mappedFileQueue.setFlushedWhere(expectLogicOffset); |
| 151 | + this.mappedFileQueue.setCommittedWhere(expectLogicOffset); |
| 152 | + this.fillPreBlank(mappedFile, expectLogicOffset); |
| 153 | + log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " |
| 154 | + + mappedFile.getWrotePosition()); |
| 155 | + } |
| 156 | + |
| 157 | + if (cqOffset != 0) { |
| 158 | + long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); |
| 159 | + |
| 160 | + if (expectLogicOffset < currentLogicOffset) { |
| 161 | + log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); |
| 162 | + return true; |
| 163 | + } |
| 164 | + |
| 165 | + if (expectLogicOffset != currentLogicOffset) { |
| 166 | + LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", |
| 167 | + expectLogicOffset, |
| 168 | + currentLogicOffset, |
| 169 | + this.topic, |
| 170 | + this.queueId, |
| 171 | + expectLogicOffset - currentLogicOffset |
| 172 | + ); |
| 173 | + } |
| 174 | + } |
| 175 | + this.maxPhysicOffset = offset + size; |
| 176 | + return mappedFile.appendMessage(this.byteBufferIndex.array()); |
| 177 | + } |
| 178 | + return false; |
| 179 | +} |
| 180 | +``` |
| 181 | + |
| 182 | +4、ConsumeQueue 文件删除 |
| 183 | + |
| 184 | +org.apache.rocketmq.store.ConsumeQueue#destroy |
| 185 | + |
| 186 | +重置 ConsumeQueue 的 maxPhysicOffset 与 minLogicOffset,调用 MappedFileQueue 的 destroy()方法将 ConsumeQueue 目录下的文件全部删除 |
| 187 | + |
| 188 | +```java |
| 189 | +public void destroy() { |
| 190 | + this.maxPhysicOffset = -1; |
| 191 | + this.minLogicOffset = 0; |
| 192 | + this.mappedFileQueue.destroy(); |
| 193 | + if (isExtReadEnable()) { |
| 194 | + this.consumeQueueExt.destroy(); |
| 195 | + } |
| 196 | +} |
| 197 | +``` |
0 commit comments