Contents
  1. 1. 参考资料
  2. 2. 消息的发送
    1. 2.1. 同步发送(可靠)
    2. 2.2. 异步发送(可靠)
    3. 2.3. Oneway(不可靠,类似于UPD)
    4. 2.4. 延迟发送
    5. 2.5. 顺序发送
    6. 2.6. 顺序读取
    7. 2.7. 批量发送
    8. 2.8. 广播监听接收(只需要对接收端做控制即可)
    9. 2.9. 事务消息(实现的Producer和Broker之间,确保这两个系统之间,确实投递成功,类似于2PC,但是和消费者无关)
      1. 2.9.1. Producer代码
      2. 2.9.2. 事务监听器代码

参考资料

阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/

阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html

阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo

消息的发送

  1. 支持延迟发送、严格的顺序发送、批量发送等发送模式
  2. 支持同步发送、异步发送、oneway发送等模式

同步发送(可靠)

  1. 同步发送,线程阻塞,
  2. 如果发送失败,会在默认的超时时间3秒内进行重试,嵌套for循环尝试
  3. 发送的结果是有返回值的,通过读取SendResult对象的sendStatus来判断发送是否成功
    //Send message in synchronous mode. This method returns only when the sending procedure totally completes.
    SendResult sendResult = producer.send(msg);
    //会在默认的3秒是超时时间内,如果发送失败会retry
    send(msg, this.defaultMQProducer.getSendMsgTimeout());

异步发送(可靠)

  1. 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
  2. 默认异步调用是不重试的,通过回调onException来获取失败信息
    for (int i = 0; i < 50; i++) {
    try {
    //构建消息
    Message msg = new Message("TopicTest2" /* Topic */,"TagA" /* Tag */,
    ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    final int index = i;
    producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
    System.out.printf("%-10d OK %s %n", index,
    sendResult.getMsgId());
    }
    @Override
    public void onException(Throwable e) {
    System.out.printf("%-10d Exception %s %n", index, e);
    e.printStackTrace();
    }
    });
    } catch (Exception e) {
    e.printStackTrace();
    Thread.sleep(3000);
    }
    }
    Thread.sleep(3000);
    producer.shutdown();

Oneway(不可靠,类似于UPD)

  1. 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功
    for (int i = 0; i < 50; i++) {
    try {
    Message msg = new Message("TopicTest2" /* Topic */,"TagA" /* Tag */,
    ("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
    producer.sendOneway(msg);
    } catch (Exception e) {
    e.printStackTrace();
    Thread.sleep(1000);
    }
    }
    producer.shutdown();

延迟发送

延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s

msg.setDelayTimeLevel(2);
SendResult sendResult = producer.send(msg);

原理:
1.发送消息的时候,如果是延迟消息,就会放到SCHEDULE_TOPIC_XXXX对应level的queueID里面,而真真正的topic和queueId作为msg暂时存起来。

if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}

2.在ScheduleMessageService.DeliverDelayedMessageTimerTask 内,判断是否可消息,可以就取出消息,将topic和quueeId还原,放到commitLog中

顺序发送

业务场景:银行账号转账操作, 先往账号A(当前余额为0) 里面 加钱1000,再加钱2000,再扣减3000,都是余额变动的Topic,所以如果消息无序的话,先消费扣钱3000,那么会报错余额不足,从而影响业务的正常执行,所以需要顺序发送和顺序消费消息。

实现原理: 默认情况下,一个Topic下面有4个Queue,每个Queue是FIFO队列,但是为了提高消费端的并发消费,put一个消息会按照轮询的顺序发送到不同的Queue里面,然后,多个Queue可以并发的消费,这里就可能导致乱序。解决方案就是把一些列有序的消息通过OrderID来存放在一个Queue里面,因为Queue是严格的FIFO,消费端也进行控制,一次只能读取一个Queue里面的消息,从而实现消息有序。

  • 当前消费者会定时的去锁定当前消费端所持有的消费的队列,保证broker中的每个消息队列只对应一个消费端
  • consumer消费的过程中也加锁,只有一个线程可以消费消息,从而保证了消息的严格有序
  1. 普通的发送,一个Topic下默认是4个Queue,而发送的时候,是默认轮询的方式,每次queueID自增1,每次往不同的Queue里面投放消息。(如果是同步消息,且第一次发送失败,retry的时候,会check是不是上次失败的Queue,是的话会跳过)

    public MessageQueue selectOneMessageQueue() {
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
    pos = 0;
    return this.messageQueueList.get(pos);
    }
  2. 顺序消息的发送,必须往一个Queue里面发送消息

  • 消息要发送到同一个messagequeue中
  • 一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的
  • 一个消费者内部对一个mq的消费要保证是有序的。

发送消息的时候,选择MessageQueue的时候,会回调传入的MessageQueueSelector的select方法,这里面的逻辑是可以根据业务来实现的。比如A业务,那么OrderId设置为1,所有的A业务都会返回相同的mq

private SendResult sendSelectImpl(
...
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
...
}

使用相同的orderId实现该业务的所有消息发送到一个Queue里面

for (int i = 1; i <= 5; i++) {
Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());
int orderID=0;
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderID);
System.out.println(sendResult);
}

顺序读取

registerMessageListener的时候绑定的是MessageListenerOrderly接口,可以确保每次读取消息只读取一个messageQueue的消息,从而实现1对1的读取

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo");
//指定NameServer地址,多个地址以 ; 隔开
consumer.setNamesrvAddr("192.168.29.129:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
System.out.println(msg + ",内容:" + new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();

批量发送

  • 需要rocketmq的版本要高,4.2以上是支持的,4.1不知道,4.0不支持
  • 提升性能,建议一次消息的大小不超过1M,超大的话,官网有方法Split
    String topic = "TopicTest";
    List<Message> messages = new ArrayList<>();
    messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));
    messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));
    messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));
    try {
    producer.send(messages);
    } catch (Exception e) {
    e.printStackTrace();
    }

广播监听接收(只需要对接收端做控制即可)

默认是集群模式(CLUSTERING(“CLUSTERING”);),如果不设置的话。只需要设置MessageModel的Type即可。

consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TopicTest", "*");

事务消息(实现的Producer和Broker之间,确保这两个系统之间,确实投递成功,类似于2PC,但是和消费者无关)

2PC+重传+幂等性:Producer发送消息,Broker预提交消息,Producer提交事务并记录并返回结果,Broker最终确认消息(如果未接收到确认,回调check方法来确认最终结果)

  1. 发送消息:相当于本地进入prepare状态,可以不做任何业务操作,仅仅是发起调用,需要传入一个全局唯一的key,作为唯一性标志
  2. MQ预提交:MQ服务器Broker接收到消息之后,进行事务形式的发送,然后check SendState,如果预发送成功的话,状态是OK的话,也就是Broker已经prepare成功了
  3. 本地带日志的事务提交:此时调用TransactionListenerImpl.executeLocalTransaction方法里面提交本地事务,并且记录到message表中,记录key,这两个步骤在一个事务里面完成,并返回执行的结果:LocalTransactionState.COMMIT_MESSAGE或者ROLL_BACK
  4. Broker如果再次收到了确认消息,Commit/Rollback消息
  5. Broker如果未收到确认消息,会默认1min回调TransactionListenerImpl.checkLocalTransaction,这个方法会去查询本地消息表,来查询该本地事务是否成功执行,然后做出最终的裁决

    Producer代码

    TransactionListener transactionListener = new TransactionListenerImpl();
    TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("192.168.29.129:9876");
    ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setName("client-transaction-msg-check-thread");
    return thread;
    }
    });
    producer.setExecutorService(executorService);
    producer.setTransactionListener(transactionListener);
    producer.start();
    for (int i = 0; i < 10; i++) {
    try {
    Message msg =new Message("TopicTest", "TagA", "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);
    Thread.sleep(10);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }

事务监听器代码

public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String msgBody;
//执行本地业务的时候,再插入一条数据到事务表中,供checkLocalTransaction进行check使用,避免doBusinessCommit业务成功,但是未返回Commit
try {
msgBody = new String(msg.getBody(), "utf-8");
doBusinessCommit(msg.getKeys(),msgBody);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Boolean result=checkBusinessStatus(msg.getKeys());
if(result){
return LocalTransactionState.COMMIT_MESSAGE;
}else{
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
public static void doBusinessCommit(String messageKey,String msgbody){
System.out.println("do something in DataBase");
System.out.println("insert 事务消息到本地消息表中,消息执行成功,messageKey为:"+messageKey);
}
public static Boolean checkBusinessStatus(String messageKey){
if(true){
System.out.println("查询数据库 messageKey为"+messageKey+"的消息已经消费成功了,可以提交消息");
return true;
}else{
System.out.println("查询数据库 messageKey为"+messageKey+"的消息不存在或者未消费成功了,可以回滚消息");
return false;
}
}
Contents
  1. 1. 参考资料
  2. 2. 消息的发送
    1. 2.1. 同步发送(可靠)
    2. 2.2. 异步发送(可靠)
    3. 2.3. Oneway(不可靠,类似于UPD)
    4. 2.4. 延迟发送
    5. 2.5. 顺序发送
    6. 2.6. 顺序读取
    7. 2.7. 批量发送
    8. 2.8. 广播监听接收(只需要对接收端做控制即可)
    9. 2.9. 事务消息(实现的Producer和Broker之间,确保这两个系统之间,确实投递成功,类似于2PC,但是和消费者无关)
      1. 2.9.1. Producer代码
      2. 2.9.2. 事务监听器代码