Contents
  1. 1. github代码
  2. 2. ActiveMq 安装、操作、注意事项
  3. 3. ActiveMQ实现步骤
  4. 4. ActiveMQ消息队列的功能
  5. 5. ActiveMQ监控后台的使用
  6. 6. 同步、异步、无返回值、有返回值的发送
  7. 7. ActiveMQ的持久化机制
    1. 7.1. JDBC式持久化
    2. 7.2. KahaDB持久化(默认)
    3. 7.3. LevelDB持久化
  8. 8. ActiveMQ集群和高可用
    1. 8.1. MASTER/SLAVE模式
    2. 8.2. Cluster模式(数据库共享模式 或者文件共享模式)
  9. 9. 安装和配置用户
  10. 10. 消息队列整合Spring
  11. 11. 我们项目的使用
  12. 12. 实战

github代码

整合了一个spring 和activemq的简易demo,可以发gson转化后的字符串,也可以穿HashMap。
https://github.com/huangzhenshi/ssm_activeMQ_Demo

ActiveMq 安装、操作、注意事项

http://blog.csdn.net/clj198606061111/article/details/38145597
注意事项:

  • rabbitMQ 依赖的erl.exe 默认端口 5672和 activeMQ使用的一样,所以要先杀掉 erl.exe这个进程
  • 64位操作系统要进到 /bin/64 下启动 activemq.bat
  • http://localhost:8161/admin/index.jsp 账号密码 admin/admin (默认的)
  • 整合spring的监听器 依赖geronimo-jms_1.1_spec ,因为MessageListener

ActiveMQ实现步骤

  • 引入依赖jar包、配置消息服务器的URL和账号密码、配置QUEUE
  • 代码里写发送的内容再发送
  • 接收者还要配置消息监听器

ActiveMQ消息队列的功能

  • 分布式项目中解耦,消息发送者甚至不需要知道谁去消费了这个消息
  • 确保消息可靠(不会因为下游宕机而丢失),发布者不需要知道消费者是否正常运行
  • 提速,消息的发布者不需要耗费线程等待消费者处理消息
  • 削峰、控流
  • 广播功能
  • 负载均衡,可以灵活易扩展无入侵式的负载均衡,多个consumer 订阅相同的queue然后每次取1个

ActiveMQ监控后台的使用

http://localhost:8161/admin/queues.jsp
程序启动后查看对应的queue里面的生产者数量,消费者数量,进队消息数,出队消息数

同步、异步、无返回值、有返回值的发送

默认配置是异步发送

  1. 使用同步方式:消费者会一直等待生产者发送消息或者超市。因其是阻塞式接收消息,故当第一次接收生产者发送过来的消息并消费后,第二次生产者提供的消息不再消费。

  2. 使用异步监听方式:消费者通过注册监听器,每当生产者有新的消息提供过来是会触发MessageListener的回调方法onMessage()方法。便于后续消息处理。

ActiveMQ的持久化机制

https://yq.aliyun.com/articles/38433

JDBC式持久化

数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock

KahaDB持久化(默认)

  1. 日志形式存储消息;
  2. 消息索引以B-Tree结构存储,可以快速更新;
  3. 完全支持JMS事务;
  4. 支持多种恢复机制;

消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

LevelDB持久化

基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

ActiveMQ集群和高可用

MASTER/SLAVE模式

特点:

  • 一个Master只能有一个Slave
  • Master会复制消息给Slave保持同步
<value>failover:(tcp://localhost:61616,tcp://localhost:61618)

Cluster模式(数据库共享模式 或者文件共享模式)

允许多个slave,持有锁的就是master,挂掉的话,不需要人工干预,其它slave成为新的master

安装和配置用户

安装的话参考(windows下的安装和配置环境变量和启动)
http://blog.csdn.net/chwshuang/article/details/50543878

windows下配置超级用户
http://blog.csdn.net/xiaojieblog/article/details/70332469

默认的超级用户是 guest/guest

  • 查看用户
    rabbitmqctl list_users
  • 添加用户 设置密码
    rabbitmqctl add_user root 123456
  • 设置用户为admin
    rabbitmqctl set_user_tags root administrator
  • 分配资源
    rabbitmqctl set_permissions -p / root “.“ “.“ “.*”

  • 图形化界面
    http://localhost:15672/

消息队列整合Spring

参考:
http://blog.csdn.net/chwshuang/article/details/50580718

生产者 依赖 和spring配置

<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" publisher-confirms="true" virtual-host="test" username="test" password="test" />
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

调用

public void setMessage(String msg, String routingKey) {
rabbitTemplate.convertAndSend(routingKey, msg);
log.info("rabbitmq--发送消息完成: routingKey[{}]-msg[{}]", routingKey, msg);
}

消费者:主要通知配置监听以及处理监听的方法的形式实现监听消息。

<!-- 连接工厂 -->
<rabbit:connection-factory id="connectionFactory" host="192.168.5.198" publisher-confirms="true" virtual-host="test" username="test" password="1234" />
<!-- 监听器 -->
<rabbit:listener-container connection-factory="connectionFactory">
<!-- queues是队列名称,可填多个,用逗号隔开, method是ref指定的Bean调用Invoke方法执行的方法名称 -->
<rabbit:listener queues="red" method="onMessage" ref="redQueueListener" />
<rabbit:listener queues="blue" method="onMessage" ref="blueQueueListener" />
</rabbit:listener-container>
<!-- 队列声明 -->
<rabbit:queue name="red" durable="true" />
<!-- 队列声明 -->
<rabbit:queue name="blue" durable="true" />
<!-- 红色监听处理器 -->
<bean id="redQueueListener" class="com.aitongyi.customer.RedQueueListener" />
<!-- 蓝色监听处理器 -->
<bean id="blueQueueListener" class="com.aitongyi.customer.BlueQueueListener" />

我们项目的使用

后台DML课件后,后端程序发消息到 ActiveMQ通知前端程序,
前端程序监听到消息后,删除原来的索引,添加进最新的索引。
因为负责搜索的solr是在前端程序里面,所以要监听后端程序控制的课件变化。

实战

利用ActiveMq 实现一套,互动的消息。服务器A发消息 到B 然后,B接到消息后反馈给A
很全面的一个系列。
参考:http://blog.mayongfa.cn/94.html

依赖

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1.1</version>
</dependency>

  • 发送文本消息和hashMap消息

    <!-- 发送者依赖的最少配置 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:tcp://127.0.0.1:61616" />
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
    <property name="useAsyncSend" value="true"/>
    </bean>
    <bean id="connectionFactory"
    class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    @Component
    public class MessageSender {
    @Autowired
    private JmsTemplate jmsTemplate;
    String dataDistination="magic_queue";
    private Gson gson = new Gson();
    public void sendMessage(User user){
    sendMessage(gson.toJson(user));
    }
    // 发送文本消息
    public void sendMessage(final String dataMap) {
    jmsTemplate.send(dataDistination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(dataMap);
    }
    });
    System.out.println("发送了一条消息。"+dataMap);
    }
    //发送Map消息
    private void sendMessage(final HashMap<String, Object> dataMap) {
    jmsTemplate.send(dataDistination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createObjectMessage(dataMap);
    }
    });
    }
    }
  • 接收文本、Map消息

    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="failover:tcp://127.0.0.1:61616" />
    <property name="userName" value="admin"/>
    <property name="password" value="admin"/>
    <property name="useAsyncSend" value="true"/>
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
    class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg>
    <value>magic_queue</value>
    </constructor-arg>
    </bean>
    <!-- 消息监听器 -->
    <bean id="consumerMessageListener" class="com.test.message.TestMessageListener" />
    <!-- 消息监听容器 -->
    <bean id="jmsContainer"
    class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="destination" ref="queueDestination" />
    <property name="messageListener" ref="consumerMessageListener" />
    <property name="concurrentConsumers" value="2"></property>
    <property name="concurrency" value="2-5"></property>
    </bean>
    @Component
    public class TestMessageListener implements MessageListener {
    //接收String消息
    @Override
    public void onMessage(Message message) {
    // 监听发送到消息队列的文本消息,作强制转换。
    TextMessage textMessage = (TextMessage) message;
    try {
    System.out.println("接收到的消息内容是:" + textMessage.getText());
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    //接收Map
    @Override
    public void onMessage(Message message) {
    if (message instanceof ObjectMessage) {
    ObjectMessage objMsg = (ObjectMessage) message;
    try {
    Map<String, Object> dataMap = (Map<String, Object>) objMsg.getObject();
    String name=(String) dataMap.get("name");
    String password=(String) dataMap.get("password");
    System.out.println(name+password);
    } catch (JMSException e) {
    }
    }
    }
    }
Contents
  1. 1. github代码
  2. 2. ActiveMq 安装、操作、注意事项
  3. 3. ActiveMQ实现步骤
  4. 4. ActiveMQ消息队列的功能
  5. 5. ActiveMQ监控后台的使用
  6. 6. 同步、异步、无返回值、有返回值的发送
  7. 7. ActiveMQ的持久化机制
    1. 7.1. JDBC式持久化
    2. 7.2. KahaDB持久化(默认)
    3. 7.3. LevelDB持久化
  8. 8. ActiveMQ集群和高可用
    1. 8.1. MASTER/SLAVE模式
    2. 8.2. Cluster模式(数据库共享模式 或者文件共享模式)
  9. 9. 安装和配置用户
  10. 10. 消息队列整合Spring
  11. 11. 我们项目的使用
  12. 12. 实战