Contents
  1. 1. 参考资料
  2. 2. 消息发送到存储的过程
  3. 3. 消费过程和负载均衡
  4. 4. 消费端流量控制
  5. 5. NameServer的作用
  6. 6. 高可用通讯
  7. 7. 消费者订阅
  8. 8. 数据结构
    1. 8.1. CommitLog
    2. 8.2. consume queue(面向消费者直接消费)
    3. 8.3. IndexService(索引,方便查询,关键字快速定位消息)
  9. 9. 集群管理

参考资料

阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/

阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html

阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo

很好的一个总结的博客
http://aokunsang.iteye.com/blog/2358692

消息发送到存储的过程

https://my.oschina.net/bieber/blog/725646

  • 消息提供端发起一个send操作,SendMessageProcessor处理
  • CommitLog进行具体的持久化,持久化好了之后会(所有的topic和queue共享的)
  • 更新ConsumeQueue(queue独占的)(消息的物理物质、大小、tages信息)、更新indexService消息的索引持久化。

消费过程和负载均衡

https://www.cnblogs.com/wxd0108/p/6054817.html

  1. 有别于其他消息中间件由broker做负载均衡并主动向consumer投递消息,RocketMq是基于拉模式拉取消息,consumer做负载均衡并通过长轮询向broker拉消息。
  2. consumer通过长轮询拉取消息后回调MessageListener接口实现完成消费,应用系统只要MessageListener完成业务逻辑即可

    consumer.registerMessageListener(new MessageListenerConcurrently() {}
  3. 启动的时候会注册自己,启动定时任务定期的和nameServer、broker交互,和按照算法分摊消息,每次也是批量拉取消息

  4. 集群模式下,queue都是只允许分配只一个实例,Consumer会定期10s一次根据负载算法,获取对应的queue,一个consumer实例可以允许同时分到不同的queue。

消费端流量控制

  1. 消费端在启动的时候,会new一个线程池来消费消息
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
private int consumeThreadMin = 20;
private int consumeThreadMax = 64;
  1. 消费端在拉取消息的时候,也会做流量控制,任何一个阈值超过限制之后,会间隔50毫秒以后再去拉取消息
    1000个,100M,2000跨度
    private int pullThresholdForQueue = 1000;
    private int pullThresholdSizeForQueue = 100;
    private int consumeConcurrentlyMaxSpan = 2000;
    long cachedMessageCount = processQueue.getMsgCount().get();
    long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
    if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
    }
    if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
    log.warn(
    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
    }

NameServer的作用

  1. 注册中心:
  • Broker的不同Master和Slave注册在NameServer上,从而实现Broker于Broker之间、Master和Slave之间解耦,提高可扩展性

    brokerClusterName = rocketmq-cluster
    brokerName = broker-a
    brokerId = 0
    namesrvAddr = rocketmq-server1:9876;rocketmq-server2:9876
  • Client通过NameServer来获取提供服务的Broker,实现解耦

    producer.setNamesrvAddr("192.168.29.129:9876");
  1. 负载均衡:Client发送读写请求的时候,会通过NameServer来做负载均衡,让特定的Broker去完成任务
  2. 高可用:NameServer会剔除掉不可用的Broker,同时让Slave顶上

高可用通讯

Clinet会缓存Broker信息,并且定时和NameServer进出Check,且定时会和所有缓存着的Broker进行心跳

消费者订阅

  1. 可以订阅某Topic下的所有消息,也可以订阅某Topic下的某几个Tag的消息

    consumer.subscribe("TopicTest", "TagA || TagC || TagD");
    consumer.subscribe("TopicTest", *);
  2. 注册监听的话,可以注册普通的监听,也可以做注册顺序的监听,顺序监听确保只监听某一个或者某两个Queue,而且是独占

    consumer.registerMessageListener(new MessageListenerOrderly() { })
    consumer.registerMessageListener(new MessageListenerConcurrently() {})

数据结构

ConsumerQueue(检索队列)、CommitLog(存储队列)

CommitLog

CommitLog消息存放物理文件,每台broker上的commitLog被本机器所有queue共享不做区分

consume queue(面向消费者直接消费)

  1. consume queue中存储单元是一个20字节定长的数据,是顺序写顺序读,每个Queue对应一个ConsumeQueue
  • offset: CommitLog中的物理位移
  • size: CommitLog中的日志大小
  • tagsCode:和storeTimestamp相关
  1. 存储消息第一步会触发CommitLog的物理IO写消息,然后,再写CQueue,
  2. 直接面向消费者,因为每个Queue有一个单独的Consume Queue,而一个Topic下有多个Queue,从而支持并发的消费

IndexService(索引,方便查询,关键字快速定位消息)

public Message(String topic, String tags, String keys, byte[] body) {
this(topic, tags, keys, 0, body, true);
}

集群管理

https://www.cnblogs.com/xuwc/p/9043764.html

  1. Master和Slave之间刷盘方式(同步复制、异步复制):SYNC_MASTER、ASYNC_MASTE,异步复制的话,如果Master挂了,但是新消息未复制到Slave会导致消息丢失
  2. Broker的数据持久化方式:ASYNC_FLUSH、SYNC_FLUSH,一种是CommitLog持久化成功就返回消息存储成功,一种是到了内存就返回成功,异步的方式进行持久化
  3. Master之间分摊消息,一个Master挂掉,不会影响其他Master,但是该Master存储的消息,无法消费和读取
  4. Master对所有的Producer可见,Salve对所有的Consumer可见
Contents
  1. 1. 参考资料
  2. 2. 消息发送到存储的过程
  3. 3. 消费过程和负载均衡
  4. 4. 消费端流量控制
  5. 5. NameServer的作用
  6. 6. 高可用通讯
  7. 7. 消费者订阅
  8. 8. 数据结构
    1. 8.1. CommitLog
    2. 8.2. consume queue(面向消费者直接消费)
    3. 8.3. IndexService(索引,方便查询,关键字快速定位消息)
  9. 9. 集群管理