RocketMQ的发送和读取
参考资料
阿里云官方英文、最新的Demo和Guidence
http://rocketmq.apache.org/docs/transaction-example/
阿里云的帮助文档啊,超级详细而且有Demo
https://help.aliyun.com/document_detail/29551.html
阿里云在github上的Demos(包括整合Spring 和简单TCP的形式)
https://github.com/AliwareMQ/mq-demo
消息的发送
- 支持延迟发送、严格的顺序发送、批量发送等发送模式
- 支持同步发送、异步发送、oneway发送等模式
同步发送(可靠)
- 同步发送,线程阻塞,
- 如果发送失败,会在默认的超时时间3秒内进行重试,嵌套for循环尝试
- 发送的结果是有返回值的,通过读取SendResult对象的sendStatus来判断发送是否成功//Send message in synchronous mode. This method returns only when the sending procedure totally completes.SendResult sendResult = producer.send(msg);//会在默认的3秒是超时时间内,如果发送失败会retrysend(msg, this.defaultMQProducer.getSendMsgTimeout());
异步发送(可靠)
- 异步调用的话,当前线程一定要等待异步线程回调结束再关闭producer啊,因为是异步的,不会阻塞,提前关闭producer会导致未回调链接就断开了
- 默认异步调用是不重试的,通过回调onException来获取失败信息for (int i = 0; i < 50; i++) {try {//构建消息Message msg = new Message("TopicTest2" /* Topic */,"TagA" /* Tag */,("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));final int index = i;producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();Thread.sleep(3000);}}Thread.sleep(3000);producer.shutdown();
Oneway(不可靠,类似于UPD)
- 消息不可靠,性能高,只负责往服务器发送一条消息,不会重试也不关心是否发送成功for (int i = 0; i < 50; i++) {try {Message msg = new Message("TopicTest2" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);producer.sendOneway(msg);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();
延迟发送
延迟的机制是在 服务端实现的,也就是Broker收到了消息,但是经过一段时间以后才发送
服务器按照1-N定义了如下级别: “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;若要发送定时消息,在应用层初始化Message消息对象之后,调用Message.setDelayTimeLevel(int level)方法来设置延迟级别,按照序列取相应的延迟级别,例如level=2,则延迟为5s
原理:
1.发送消息的时候,如果是延迟消息,就会放到SCHEDULE_TOPIC_XXXX对应level的queueID里面,而真真正的topic和queueId作为msg暂时存起来。
2.在ScheduleMessageService.DeliverDelayedMessageTimerTask 内,判断是否可消息,可以就取出消息,将topic和quueeId还原,放到commitLog中
顺序发送
业务场景:银行账号转账操作, 先往账号A(当前余额为0) 里面 加钱1000,再加钱2000,再扣减3000,都是余额变动的Topic,所以如果消息无序的话,先消费扣钱3000,那么会报错余额不足,从而影响业务的正常执行,所以需要顺序发送和顺序消费消息。
实现原理: 默认情况下,一个Topic下面有4个Queue,每个Queue是FIFO队列,但是为了提高消费端的并发消费,put一个消息会按照轮询的顺序发送到不同的Queue里面,然后,多个Queue可以并发的消费,这里就可能导致乱序。解决方案就是把一些列有序的消息通过OrderID来存放在一个Queue里面,因为Queue是严格的FIFO,消费端也进行控制,一次只能读取一个Queue里面的消息,从而实现消息有序。
- 当前消费者会定时的去锁定当前消费端所持有的消费的队列,保证broker中的每个消息队列只对应一个消费端
- consumer消费的过程中也加锁,只有一个线程可以消费消息,从而保证了消息的严格有序
普通的发送,一个Topic下默认是4个Queue,而发送的时候,是默认轮询的方式,每次queueID自增1,每次往不同的Queue里面投放消息。(如果是同步消息,且第一次发送失败,retry的时候,会check是不是上次失败的Queue,是的话会跳过)
public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement();int pos = Math.abs(index) % this.messageQueueList.size();if (pos < 0)pos = 0;return this.messageQueueList.get(pos);}顺序消息的发送,必须往一个Queue里面发送消息
- 消息要发送到同一个messagequeue中
- 一个messagequeue只能被一个消费者消费,这点是由消息队列的分配机制来保证的
- 一个消费者内部对一个mq的消费要保证是有序的。
发送消息的时候,选择MessageQueue的时候,会回调传入的MessageQueueSelector的select方法,这里面的逻辑是可以根据业务来实现的。比如A业务,那么OrderId设置为1,所有的A业务都会返回相同的mq
使用相同的orderId实现该业务的所有消息发送到一个Queue里面
顺序读取
registerMessageListener的时候绑定的是MessageListenerOrderly接口,可以确保每次读取消息只读取一个messageQueue的消息,从而实现1对1的读取
批量发送
- 需要rocketmq的版本要高,4.2以上是支持的,4.1不知道,4.0不支持
- 提升性能,建议一次消息的大小不超过1M,超大的话,官网有方法SplitString topic = "TopicTest";List<Message> messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "Order1", "Hello world 0".getBytes()));messages.add(new Message(topic, "TagA", "Order2", "Hello world 1".getBytes()));messages.add(new Message(topic, "TagA", "Order3", "Hello world 2".getBytes()));try {producer.send(messages);} catch (Exception e) {e.printStackTrace();}
广播监听接收(只需要对接收端做控制即可)
默认是集群模式(CLUSTERING(“CLUSTERING”);),如果不设置的话。只需要设置MessageModel的Type即可。
事务消息(实现的Producer和Broker之间,确保这两个系统之间,确实投递成功,类似于2PC,但是和消费者无关)
2PC+重传+幂等性:Producer发送消息,Broker预提交消息,Producer提交事务并记录并返回结果,Broker最终确认消息(如果未接收到确认,回调check方法来确认最终结果)
- 发送消息:相当于本地进入prepare状态,可以不做任何业务操作,仅仅是发起调用,需要传入一个全局唯一的key,作为唯一性标志
- MQ预提交:MQ服务器Broker接收到消息之后,进行事务形式的发送,然后check SendState,如果预发送成功的话,状态是OK的话,也就是Broker已经prepare成功了
- 本地带日志的事务提交:此时调用TransactionListenerImpl.executeLocalTransaction方法里面提交本地事务,并且记录到message表中,记录key,这两个步骤在一个事务里面完成,并返回执行的结果:LocalTransactionState.COMMIT_MESSAGE或者ROLL_BACK
- Broker如果再次收到了确认消息,Commit/Rollback消息
- Broker如果未收到确认消息,会默认1min回调TransactionListenerImpl.checkLocalTransaction,这个方法会去查询本地消息表,来查询该本地事务是否成功执行,然后做出最终的裁决
Producer代码
TransactionListener transactionListener = new TransactionListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("192.168.29.129:9876");ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;}});producer.setExecutorService(executorService);producer.setTransactionListener(transactionListener);producer.start();for (int i = 0; i < 10; i++) {try {Message msg =new Message("TopicTest", "TagA", "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);Thread.sleep(10);} catch (Exception e) {e.printStackTrace();}}
事务监听器代码
|