RocketMQ相关概念
参考资料
阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/
阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html
阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo
很好的一个总结的博客
http://aokunsang.iteye.com/blog/2358692
消息发送到存储的过程
https://my.oschina.net/bieber/blog/725646
- 消息提供端发起一个send操作,SendMessageProcessor处理
- CommitLog进行具体的持久化,持久化好了之后会(所有的topic和queue共享的)
- 更新ConsumeQueue(queue独占的)(消息的物理物质、大小、tages信息)、更新indexService消息的索引持久化。
消费过程和负载均衡
https://www.cnblogs.com/wxd0108/p/6054817.html
- 有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。
consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可
consumer.registerMessageListener(new MessageListenerConcurrently() {}启动的时候会注册自己,启动定时任务定期的和nameServer、broker交互,和按照算法分摊消息,每次也是批量拉取消息
- 集群模式下,queue都是只允许分配只一个实例,Consumer会定期10s一次根据负载算法,获取对应的queue,一个consumer实例可以允许同时分到不同的queue。
消费端流量控制
- 消费端在启动的时候,会new一个线程池来消费消息
|
- 消费端在拉取消息的时候,也会做流量控制,任何一个阈值超过限制之后,会间隔50毫秒以后再去拉取消息1000个,100M,2000跨度private int pullThresholdForQueue = 1000;private int pullThresholdSizeForQueue = 100;private int consumeConcurrentlyMaxSpan = 2000;long cachedMessageCount = processQueue.getMsgCount().get();long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);if ((queueFlowControlTimes++ % 1000) == 0) {log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);}return;}
NameServer的作用
- 注册中心:
Broker的不同Master和Slave注册在NameServer上,从而实现Broker于Broker之间、Master和Slave之间解耦,提高可扩展性
brokerClusterName = rocketmq-clusterbrokerName = broker-abrokerId = 0namesrvAddr = rocketmq-server1:9876;rocketmq-server2:9876Client通过NameServer来获取提供服务的Broker,实现解耦
producer.setNamesrvAddr("192.168.29.129:9876");
- 负载均衡:Client发送读写请求的时候,会通过NameServer来做负载均衡,让特定的Broker去完成任务
- 高可用:NameServer会剔除掉不可用的Broker,同时让Slave顶上
高可用通讯
Clinet会缓存Broker信息,并且定时和NameServer进出Check,且定时会和所有缓存着的Broker进行心跳
消费者订阅
可以订阅某Topic下的所有消息,也可以订阅某Topic下的某几个Tag的消息
consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.subscribe("TopicTest", *);注册监听的话,可以注册普通的监听,也可以做注册顺序的监听,顺序监听确保只监听某一个或者某两个Queue,而且是独占
consumer.registerMessageListener(new MessageListenerOrderly() { })consumer.registerMessageListener(new MessageListenerConcurrently() {})
数据结构
ConsumerQueue(检索队列)、CommitLog(存储队列)
CommitLog
CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分
consume queue(面向消费者直接消费)
- consume queue中存储单元是一个20字节定长的数据,是顺序写顺序读,每个Queue对应一个ConsumeQueue
- offset: CommitLog中的物理位移
- size: CommitLog中的日志大小
- tagsCode:和storeTimestamp相关
- 存储消息第一步会触发CommitLog的物理IO写消息,然后,再写CQueue,
- 直接面向消费者,因为每个Queue有一个单独的Consume Queue,而一个Topic下有多个Queue,从而支持并发的消费
IndexService(索引,方便查询,关键字快速定位消息)
|
集群管理
https://www.cnblogs.com/xuwc/p/9043764.html
- Master和Slave之间刷盘方式(同步复制、异步复制):SYNC_MASTER、ASYNC_MASTE,异步复制的话,如果Master挂了,但是新消息未复制到Slave会导致消息丢失
- Broker的数据持久化方式:ASYNC_FLUSH、SYNC_FLUSH,一种是CommitLog持久化成功就返回消息存储成功,一种是到了内存就返回成功,异步的方式进行持久化
- Master之间分摊消息,一个Master挂掉,不会影响其他Master,但是该Master存储的消息,无法消费和读取
- Master对所有的Producer可见,Salve对所有的Consumer可见