activemq实战
Contents
待解决的问题
- 消息的重发 producer和broker的通讯
- 消息的ack consumer和broker的通讯
- 如何确保消息的一致性
- 事务消息
参考资料
activemq 科普资料
https://www.jianshu.com/p/9ef0ed0dad6f
消息的确认机制
http://shift-alt-ctrl.iteye.com/blog/2020182
生产者发送消息,如果消费者未连接上,消息队列存储消息。
消费者连接上之后,一旦消费者接收到消息,消息就被ack了。
SESSION_TRANSACTED = 0 事务提交并确认
用于消息组的业务场景,发送多个消息给消费者,完成一组消息后,C提交事务,一组消息才会确认。
AUTO_ACKNOWLEDGE = 1 自动确认
只要消息发送到消费者,调用onMessage方法的时候,broker就直接删除这条消息,不管消息是否正确的被消费了。也是默认的机制。
CLIENT_ACKNOWLEDGE = 2 客户端手动确认
实践表明,当客户端手动确认,或者未抛出异常的时候,都会被消化掉。
如果消费者执行方法的时候抛出异常,则broker会继续重发6次,最终把消息存放在 ActiveMQ.DLQ 当中。
如果消费者执行方法时,调用了Message.acknowledge();之后抛出异常,则broker会以为消息被正确处理了。
INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
针对消费者:
消费者配置后,只有当监听器调用textMessage.acknowledge(); 后消息才会从broker中删除,每次只能确认单条消息。
- 如果未确认的消息在未确认之前抛出异常了,则消息一直pending,有新的消费者接入时会重新分配。
- 该属性可以实现消息的可靠性,只有被消费者正确消费后的消息才会从消息队列当中剔除,而且消费者确认的时机是可以主动控制的。
- 但是如果消费者正确消费完数据,准备执行确认操作的时候,服务器宕机,或者忘记调用确认方法,会导致消息被重复消费。设计和使用的时候要注意。
|
ACK_MODE
- SESSION_TRANSACTED = 0 事务提交并确认
- AUTO_ACKNOWLEDGE = 1 自动确认
- CLIENT_ACKNOWLEDGE = 2 客户端手动确认
- DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
- INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
ACK_TYPE
- DELIVERED_ACK_TYPE = 0 消息”已接收”,但尚未处理结束
- STANDARD_ACK_TYPE = 2 “标准”类型,通常表示为消息”处理成功”,broker端可以删除消息了
- POSION_ACK_TYPE = 1 消息”错误”,通常表示”抛弃”此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
- REDELIVERED_ACK_TYPE = 3 消息需”重发”,比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
- INDIVIDUAL_ACK_TYPE = 4 表示只确认”单条消息”,无论在任何ACK_MODE下
- UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端”拒绝”消息
消息重发策略
https://blog.csdn.net/dly1580854879/article/details/68489798
比如在ACK_MODE 为手动确认的时候,如果消息在未acknowledge()之前抛出异常,broker则会尝试重发,这个时候通过对factory的配置就可以实现定制化的重发次数和重发间隔时间等参数。
- 消费者的MQ配置<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="2"></property><property name="initialRedeliveryDelay" value="1000"></property></bean><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="failover:tcp://127.0.0.1:61616" /><property name="userName" value="admin"/><property name="password" value="admin"/><property name="useAsyncSend" value="true"/><property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/></bean>
简单的异步消息发送代码
producer
通过在spring中配置jmsTemplate,然后调用send(Queue,Message)来实现发送消息
配置
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /></bean><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="failover:tcp://127.0.0.1:61616" /></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean>调用
public void sendMessage(final String dataMap) {jmsTemplate.send(dataDistination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(dataMap);}});System.out.println("发送了一条消息。"+dataMap);}
consumer
实现异步的可重复接收的监听
监听类
@Componentpublic class TestMessageListener implements MessageListener {@Overridepublic void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("接收到的消息内容是:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}配置
<!-- 消息监听容器 --><bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueDestination" /><property name="messageListener" ref="consumerMessageListener" /><property name="concurrentConsumers" value="2"></property><property name="concurrency" value="2-5"></property></bean><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean><!--这个是队列目的地 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>magic_queue</value></constructor-arg></bean>bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>map_queue</value></constructor-arg></bean><!-- 消息监听器 --><bean id="consumerMessageListener" class="com.test.message.TestMessageListener" />