本文共 15930 字,大约阅读时间需要 53 分钟。
Apache Rocketmq是一个分布式消息和流平台,具有低延迟、高性能和可靠性、万亿级容量和灵活的可扩展性,本文将从架构设计、源码解读、使用经验、调优调优,解剖MQ的特征。
支持多Master,可实现同步或者异步双写
;Broker Master和Broker Slave是通过jdk原生的nio(SocketChannel)通信实现数据同步,Producer与Broker,Consumer与Broker通过Netty进行通信,NameServer与其他通过Netty实现通信; NameServer服务提供了轻量级的服务发现和路由
,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。每个NameServer服务记录完整的路由信息,提供一致的读写服务,支持快速存储扩展;
提供消息存储,消息接收,消息查询,消息发送
,Broker通过提供轻量级主题和队列机制来处理消息存储,Broker分为Master与Slave,一个Master可以对应多个Slave(当前RocketMQ版本在部署架构上支持一Master多Slave)
,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息(心跳)到所有NameServer。它们支持Push和Pull模型,包含容错机制(2个副本或3个副本),提供了极强的峰值处理里能力和按照时间顺序存储数以百万记的消息存储能力,此外,代理提供了灾难恢复、丰富的度量统计和警报机制,这些都是在传统的消息传递系统中缺乏的;
produce支持分布式部署,分布式的produce通过broker集群提供的各种负载均衡策略将消息发送到broker集群中。发送过程支持快速失败是低延迟的。
消费者也支持在推送和拉取模式下分布式部署,它还支持集群消费和消息广播。提供实时的消息订阅机制,能够满足大多数消费者的需求。RocketMQ的网站为感兴趣的用户提供了一个简单的快速入门指南。
MQ速度快的原因
:顺序写:消息存储,文件存储结构;
零拷贝:消息发送,减少内存复制(避免理论上2次fd交互,4次内存复制);消息存储(文件存储机制)CommitLog(存储消息元数据)
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的,消息包括(起始物理偏移量offset,消息大小size和消息Tag的HashCode值),默认commitlog文件1GB。不同的topic,messageQueue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写
。
ConsumerQueue(通过偏移量查询消息)
消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的;MessageQueue只是存储CommitLog中对应的位置偏移信息
,方便通过MessageQueue找到对应存储在CommitLog的消息。一个Topic里面的ConsumerQueue对应队列数量的文件个数,默认加载到内存
;IndexFile(通过key或时间区间查消息)
IndexFile(IndexHeader, 槽位Slot和消息索引组成)提供了一种可以通过key或时间区间来查询消息的方法,默认存在500W个槽位,类似HasMap结构(消息的(MessageKey)取hashCode,通过hashCode获取slot位), 消息索引键与消息偏移量映射关系(CommitLog与IndexFile建立关系)写入到IndexFile。页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写
。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取;
同步刷盘
:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。 异步刷盘
:能够充分利用OS的PageCache的优势
,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。 Linux操作系统分为【用户态】和【内核态】,文件操作,网络操作需要涉及上下文切换,免不了进行数据复制(通过fd描述符),一台服务器把本机磁盘文件内容发送到客户端,一般分为两个步骤:
3. read:读取本地文件内容; 4. write:读取的内容通过网络发送出去; 这两个看似简单操作,实际进行了4次数据复制(两次fd交换
【filefd,socketfd】),fd交换次数决定内存拷贝次数
,分别是: 5. 从磁盘复制数据到内核态内存; 6. 从内核态内存复制到用户态内存; 7. 然后从用户态内存复制到网络驱动的内核态内存; 8. 最后是从网络驱动内核态内存复制到网卡中进行传输; 通过使用mmap的方式,可以省去想用户态内存复制,提高速度,这种机制在Java中通过MappedByteBuffer实现
的,RocketMq利用上述特性,也就是所谓的“零拷贝
”技术,提高消费存盘和网络发送的速度。 注意
:采用MapperByteBuffer这种内存映射的几个限制,其中之一是单次只能映射1.5G-2G的文件至虚拟内存
,这就是为何Rocketmq默认设置单个CommitLog日志数据为1GB的原因
所在。 发送优化(请求堆积): 9. 通过异步发送回调方式; 10. 使用线程池进行消息发送优化; 消息模式:集群消息(支持重试
),广播消息(不支持重试
); 消息类型:普通消息(默认使用,需消费端实现幂等),顺序消息(消息需要在发送到一个Broker才能保证,严重降低效率),事务消息(2段提交); rocketMq默认是push模式
,消费者订阅主题,然后自动进行集群内消息队列的动态负载,自动拉取消息,准实时,项目中一般采用PUSH模式。Master支读和写,Slave仅支持读
;在Broker配置文件中,参数brokerId的值为0表明这个是Broker的Master,大于0表明这个Broker是Salve,同时brokerRole参数也会说明这个Broker是Master还是Slave; 在Consumer的配置文件中,并不需要设置是从Master读还是从Slave读,当Master不可用或者繁忙的时候,Consumer会被自动切换到从Slave读。有了自动切换Consumer的这种机制
,当Master角色的机器出现故障后,Consumer仍然可以从Slave读取数据,不影响Consumer程序,这就达到了读消息高可用性;
在创建Topic的时候,把Topic的多个MessageQueue创建在多个Broker组上(相同的Broker名称,不同的BrokerId的机器组成一个Broker组,单个Broker组包括Master和对应的Slave
),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,生产者仍然可以发送消息。
目前不支持把Slave自动切换到Master
,如果机器资源不足,需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文件启动Broker。 如果一个Broker组有Master和Slave,消息需要从Master复制到Slave上,有同步和异步复制方式;
数据容易恢复
,但是同步复制会增大数据写入延迟,降低系统吞吐量
。较低的延迟和较高的吞吐量
,但如果Master出现故障,有些数据因为没有被写入Slave,有可能存在数据丢失
。通常情况下,应该把Master和Slave配置成ASYNC_FLUSH的刷盘方式,主从之间配置成SYNC_MASTER的复制方式
,这样即使有一台机器出现故障,仍然能保证数据不丢失,是个不错选择。Producer端,每个实例在发消息的时候,默认会轮询
所有的MessageQueue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图。
(1) 集群模式
在集群消费模式下,每条消息只需要投递到订阅这个topic的ConsumerGroup下的一个实例即可
。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候明确指定拉取那一条messageQueue。 当每个实例的数量有变更,都会触发一次所有实例的复制均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。 默认分配算法
是AllocateMessageQueueAveragely,如下图所示: 还有一种平均算法是AllocateMessageAveragelyByCycle,也是平均分摊每一条queue,只是以环状轮流
分queue的形式,如下图: 一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue
。某个topic消息量很大,应该给它多配置几个队列,并且尽量多分布在不同broker上,减轻某个broker的压力。 (1)Rebalance(队列分配)
:topic下的所有MessageQueue按照一定规则分配给consumerGroup下的consumer,有如下策略: 消费多个topic时,存在排序靠前的consumer可能消费队列较多(机器负载较大)
; 想到解决方案:不使用Collections.sort()而使用Collections.shulffe();源码分析: rebalanceByTopic(final String topic){} •
先获取topic下的MessageQueue,一个MessageQueue实际上就是一个partition • 然后获取当前topic和group的client id,即当前group中消费此topic的客户端 • 随后对partition和client id做排序 • 然后调用strategy获取当前客户端需要消费的partition • 最后更新订阅
(2) 广播模式
在实现上,其中一个不同就是在consumer分配queue的时候,会所有consumer都分到所有的queue
。 对于顺序消息,当消费者消费消息失败后,消息队列RocketMQ会自动不断进行消息重试(每次间隔时间1秒),这时应用会出现消息被阻塞
的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消息失败的情况,避免阻塞发生。
对于无序消息(普通,定时,延时,事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对集群消息方式生效
,广播方式不提供失败重试特性
,即消息不再重试。
默认每条消息最多重试16次
,每次重试的间隔时间如下: 第几次重试 与上次重试间隔 第几次重试 与上次重试间隔 如果消息重试16次后仍然失败,消息将不再进行投递。如果严格按照上述时间计算,某一条消息在一直消费失败前提下,将会在接下来的4小时46分钟内进行16次重试,超过此时间范围内将不再重试投递,会放入“死信队列”。 (2) 配置方式 消息失败后,重试配置方式。在集群消费方式下,消息消费失败后期望消息重试,需要在消息监听接口的实现中明确进行配置(三种方式任选一种): 推荐
)Properties properties = new Properties();properties.put(PropertyKeyConst.MaxReConsumTimes, "20");Consumer consumer = ONSFactory.createConsumer(properties);
注意
:
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumerContext context){ System.out.println(message.getReConsumeTimes()); }}
当消息重试次数达到最大重试次数后,会被放入死信队列(Dead-Letter Message);
均为3天
,3天后自动删除。因此,请在死信消息产生后的3天内可人工及时处理。一个死信队列对应一个Group ID
,而不是对应单个的消费者实例;一个死信队列可能包含多个Topic消息
;消费者在接收到消息后,有必要根据业务上的唯一key对消息做幂等处理必要性。
在互联网应用中,尤其在网络不稳定情况下,消息队列RocketMQ的消息有可能会出现重复,这个重复简单可以概括为一下情况:
Producer->ConsumeQueue
) 当一条消息已被成功发送到服务端并完成持久化,此时出现“网络闪断或客户端宕机”导致服务端对客户端应答失败。如果此时生产者意识到消息发送失败尝试再次发送消息,消息后续会收到两条内容相同并且Message ID也相同的消息。ConsumeQueue-Consumer
) 消息消费的场景下,消息已投递到消费者并完成业务处理。当客户端给服务端反恐应答的时候网络闪断,为了保证消息至少被消费一次,消息队列RocketMQ的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。客户端重启、扩容或者缩容时,会触发Rebalance
,此时消费者可能会收到重复消息。因为Message ID有可能出现重复(重复)的情况,所以真正安全的幂等处理,不建议以Message ID作为处理依据。最好的方式是以业务唯一标识为幂等性关键依据,而业务的唯一标识可以通过key进行设置:
Message message = new Message();message.setKeys("ORDERID_100");SendResult result = producer.send(message);
订阅方收到消息时可以根据消息的key进行幂等性处理:
consumer.subscribe("one_test", "", new MessageListener(){ public Action consume(Message message, ConsumeContext context){ String key = message.getKeys(); }}) //此处可以增加key写redis缓存标识是否已经处理过redis-key+数据库唯一索引
使用Netty的Reactor主从多线程模型
,模块使用RemotingCommand(消息头+消息体)进行通信,消息可靠性
。
RocketMQ提供同步和异步两种复制方式来增强消息的可靠性与高可用性
,同步异步区别:同步复制会等待slave响应提交日志已经被复制,相应的异步复制会在master节点处理成功后快速返回,消息最大允许大小4MB; 多Master多Slave模式-异步复制
每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式(Master复制完成即返回),主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
速度快
,即使磁盘损坏,消息丢失的非常少
,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;消息存在丢失
,Master宕机,磁盘损坏情况下会丢失少量消息。多Master多Slave模式-同步双写【生产】
每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
数据安全性高
,数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;性能较低
,性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。消息传递保证三个级别
MQ通过两段事务提交,由客户端发送应答,提交完成最近一次offset
,从而避免客户端未消费完成发生宕机重启时候不能继续消费。
临时存储进行暂存
;消息投递到目标Topic
中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同;按照消息offest检索
);按照key和时间区间检索
);ConsumeQueue和IndexFile文件恢复
,保持与CommitLog一致);异步刷盘丢失恢复
)。单个文件都被设计为固定长度
,如果一个文件写满以后再创建一个新文件,文件名称就为该文件第一条消息对应全局物理偏移量
。 (1) MappedFileQueue
对应就是物理的CommitLog所有文件; 按照时间查找偏移量; 根据消息偏移量查询MappedFile; (2) MappedFile(1G-内存映射技术)
其实对应的就是commitlog里面的每一个文件;线程ReputMessageService来准实时转发CommitLog文件更新时间
,相应的任务处理器根据转发消息及时更新CosumeQueue、IndexFile文件。 首先将消息全量存储在CommitLog文件中,然后异步(单独线程处理)生成转发任务更新ConsumeQueue和Index文件
。如果消息成功存储到CommitLog文件中,转发任务未成功执行,此时消息服务器Broker由于某个原因宕机,导致CommitLog、ConsumeQueue、IndexFile文件数据不一致。如果不加以人工修复的话,会有一部分消息即使在CommitLog文件中存在,单由于没有转发到ConsumeQueue,这部分消息将永远无法被消费者消费,通过abort文件获取异常退出点
。 (1) 存储文件加载
代码DefaultMessageStore#load
(2) 正常恢复
从倒数第3个文件开始进行恢复,如果不足3个文件,则从第一个文件开始恢复 代码CommitLog#recoverNormally
(3) 异常恢复
从最后一个文件往前走,找到第一个消息存储正常文件。 代码CommitLog#recoveryAbnormally
NIO的内存映射机制
(MappedByteBuffer)的,消息存储首先将消息追加到内存,在根据配置的刷盘策略在不同时间进行刷写磁盘。 (1) 同步刷盘 消息追加到内存后,立即将数据刷写到磁盘文件。 代码CommitLog#handleDiskFlush; (2) 异步刷盘 开启transientStorePoolEnable后异步刷盘步骤:堆外ByteBuffer->内存映射文件MappedByteBuffer
));线程默认每500ms
将MappedByteBuffer追加的内存刷写到磁盘(提交内存映射文件MappedByteBuffer->磁盘
);代码CommitLog#CommitRealTimeService#run;代码CommitLog#FlushRealTimeService#run;
默认每个文件过期时间为72h(3天)
,通过Broker配置文件中设置fileReservedTime来改变过期时间,单位为小时。 代码:DefaultMessageStore#addScheduleTask 代码:DefaultMessageStore#cleanFilesPeriodically; 代码:DefaultMessageStore#deleteExpiredFiles; (1) 删除文件操作的条件9. 小结
刷盘机制
:同步刷盘和异步刷盘(堆外数据->内存映射文件->刷盘),异步刷盘由Commit线程(每200ms)和Flush线程(每500ms)完成;;消息存储
:为了保证消息存储完全顺序写,单一文件存储所有主题消息;但存在读取不方便,因此引入队列文件和索引文件(可设置消息索引键)检索;线程转发消息
:消息到达CommitLog后,通过ReputMessageService线程近实时将消息转发给消息队列文件(ConsumeQueue)和索引文件(IndexFile);同步复制,异步刷盘
;默认采用(Async_Flush,Async_Master)
,Async复制(Master复制)并不能保证消息不丢失,所以SEND_OK并不意味它是可靠的
,要确保消息不丢失,需要开启(Sync_Master同步主或Sync_Flush同步写,一般适用于金融领域
),在不影响吞吐量情况下,推荐使用: messageProducer.send(new Message(), new SendCallback(){ }));
1.12.1、消费者消费慢问题
提高消费并行度(增加机器+增加消费线程)
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法: 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数(topic)的 Consumer 实例无效
)。可以通过加机器,或者在已有机器启动多个进程的方式。 提高单个Consumer 的消费并行线程
,通过修改参数consumeThreadMin、consumeThreadMax
实现。批量方式消费
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1
,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:public ConsumeConcurrentlyStatus consumeMessage( Listmsgs, ConsumeConcurrentlyContext context) { long offset = msgs.get(0).getQueueOffset(); String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET); long diff = Long.parseLong(maxOffset) - offset; if (diff > 100000) { // TODO 消息堆积情况的特殊处理 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // TODO 正常消费过程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
优化每条消息消费过程(优化业务代码)
举例如下,某条消息的消费过程如下: 根据消息从 DB 查询【数据 1】 根据消息从 DB 查询【数据 2】 复杂的业务计算 向 DB 插入【数据 3】 向 DB 插入【数据 4】 这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。刷盘部署方式
异步刷盘建议使用自旋锁,同步刷盘建议使用重入锁,调整Broker配置项useReentrantLockWhenPutMessage
,默认为false;异步刷盘建议开启TransientStorePoolEnable;建议关闭transferMsgByHeap,提高拉消息效率;同步刷盘建议适当增大sendMessageThreadPoolNums,具体配置需要经过压测。消息复制方式
: 如果业务系统可以容忍消息丢失,MQ部署可以采用异步复制方式;生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投(消息量大/网络抖动)会导致消息重复
;
默认重投2次,不会选择上次失败的broker
,尝试向其他broker发送,最大程度保证消息不丢; 异步重试:异步重试不会选择其他broker
,仅在同一个broker上做重试,不保证消息不丢。 1.11.12.3. 死信队列(消息消费失败) 当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 原因分析
:消费者挂掉或生产者大量消息或延时订单消息,因消息消费队列数固定导致MQ消息积压
;
提高消费者速度
:如果比较严重,新建一个临时topic,partition增大倍,queue增大倍数,写中间程序快速转发,并增加Consumer数量进行消费(线程数+多条消费等); 转载地址:http://wjcpi.baihongyu.com/