Contents
  1. 1. 参考资料
  2. 2. 消息的确认机制
    1. 2.1. SESSION_TRANSACTED = 0 事务提交并确认
    2. 2.2. AUTO_ACKNOWLEDGE = 1 自动确认
    3. 2.3. CLIENT_ACKNOWLEDGE = 2 客户端手动确认
    4. 2.4. INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
    5. 2.5. ACK_MODE
    6. 2.6. ACK_TYPE
  3. 3. 消息重发策略
  4. 4. 简单的异步消息发送代码
    1. 4.1. producer
    2. 4.2. consumer

待解决的问题

  • 消息的重发 producer和broker的通讯
  • 消息的ack consumer和broker的通讯
  • 如何确保消息的一致性
  • 事务消息

参考资料

activemq 科普资料
https://www.jianshu.com/p/9ef0ed0dad6f

消息的确认机制

http://shift-alt-ctrl.iteye.com/blog/2020182
生产者发送消息,如果消费者未连接上,消息队列存储消息。
消费者连接上之后,一旦消费者接收到消息,消息就被ack了。

SESSION_TRANSACTED = 0 事务提交并确认

用于消息组的业务场景,发送多个消息给消费者,完成一组消息后,C提交事务,一组消息才会确认。

AUTO_ACKNOWLEDGE = 1 自动确认

只要消息发送到消费者,调用onMessage方法的时候,broker就直接删除这条消息,不管消息是否正确的被消费了。也是默认的机制。

CLIENT_ACKNOWLEDGE = 2 客户端手动确认

实践表明,当客户端手动确认,或者未抛出异常的时候,都会被消化掉。
如果消费者执行方法的时候抛出异常,则broker会继续重发6次,最终把消息存放在 ActiveMQ.DLQ 当中。
如果消费者执行方法时,调用了Message.acknowledge();之后抛出异常,则broker会以为消息被正确处理了。

INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

针对消费者:
消费者配置后,只有当监听器调用textMessage.acknowledge(); 后消息才会从broker中删除,每次只能确认单条消息。

  1. 如果未确认的消息在未确认之前抛出异常了,则消息一直pending,有新的消费者接入时会重新分配。
  2. 该属性可以实现消息的可靠性,只有被消费者正确消费后的消息才会从消息队列当中剔除,而且消费者确认的时机是可以主动控制的。
  3. 但是如果消费者正确消费完数据,准备执行确认操作的时候,服务器宕机,或者忘记调用确认方法,会导致消息被重复消费。设计和使用的时候要注意。
<bean id="jmsContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestination" />
<property name="messageListener" ref="consumerMessageListener" />
<property name="concurrentConsumers" value="2"></property>
<property name="concurrency" value="2-5"></property>
<property name="sessionAcknowledgeMode" value="4"></property>
</bean>
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收到的消息内容是:" + textMessage.getText());
textMessage.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}

ACK_MODE

  • SESSION_TRANSACTED = 0 事务提交并确认
  • AUTO_ACKNOWLEDGE = 1 自动确认
  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认
  • INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认

ACK_TYPE

  • DELIVERED_ACK_TYPE = 0 消息”已接收”,但尚未处理结束
  • STANDARD_ACK_TYPE = 2 “标准”类型,通常表示为消息”处理成功”,broker端可以删除消息了
  • POSION_ACK_TYPE = 1 消息”错误”,通常表示”抛弃”此消息,比如消息重发多次后,都无法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3 消息需”重发”,比如consumer处理消息时抛出了异常,broker稍后会重新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4 表示只确认”单条消息”,无论在任何ACK_MODE下
  • UNMATCHED_ACK_TYPE = 5 BROKER间转发消息时,接收端”拒绝”消息

消息重发策略

https://blog.csdn.net/dly1580854879/article/details/68489798
比如在ACK_MODE 为手动确认的时候,如果消息在未acknowledge()之前抛出异常,broker则会尝试重发,这个时候通过对factory的配置就可以实现定制化的重发次数和重发间隔时间等参数。

  • 消费者的MQ配置
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    <property name="maximumRedeliveries" value="2"></property>
    <property name="initialRedeliveryDelay" value="1000"></property>
    </bean>
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:tcp://127.0.0.1:61616" />
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
    <property name="useAsyncSend" value="true"/>
    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>
    </bean>

简单的异步消息发送代码

producer

通过在spring中配置jmsTemplate,然后调用send(Queue,Message)来实现发送消息

  • 配置

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:tcp://127.0.0.1:61616" />
    </bean>
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
  • 调用

    public void sendMessage(final String dataMap) {
    jmsTemplate.send(dataDistination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(dataMap);
    }
    });
    System.out.println("发送了一条消息。"+dataMap);
    }

consumer

实现异步的可重复接收的监听

  • 监听类

    @Component
    public class TestMessageListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage) message;
    try {
    System.out.println("接收到的消息内容是:" + textMessage.getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
  • 配置

    <!-- 消息监听容器 -->
    <bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="concurrentConsumers" value="2"></property>
    <property name="concurrency" value="2-5"></property>
    </bean>
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
    <value>magic_queue</value>
    </constructor-arg>
    </bean>
    bean id="queueDestination2" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
    <value>map_queue</value>
    </constructor-arg>
    </bean>
    <!-- 消息监听器 -->
    <bean id="consumerMessageListener" class="com.test.message.TestMessageListener" />
Contents
  1. 1. 参考资料
  2. 2. 消息的确认机制
    1. 2.1. SESSION_TRANSACTED = 0 事务提交并确认
    2. 2.2. AUTO_ACKNOWLEDGE = 1 自动确认
    3. 2.3. CLIENT_ACKNOWLEDGE = 2 客户端手动确认
    4. 2.4. INDIVIDUAL_ACKNOWLEDGE = 4 单条消息确认
    5. 2.5. ACK_MODE
    6. 2.6. ACK_TYPE
  3. 3. 消息重发策略
  4. 4. 简单的异步消息发送代码
    1. 4.1. producer
    2. 4.2. consumer