消息队列ActiveMQ
github代码
整合了一个spring 和activemq的简易demo,可以发gson转化后的字符串,也可以穿HashMap。
https://github.com/huangzhenshi/ssm_activeMQ_Demo
ActiveMq 安装、操作、注意事项
http://blog.csdn.net/clj198606061111/article/details/38145597
注意事项:
- rabbitMQ 依赖的erl.exe 默认端口 5672和 activeMQ使用的一样,所以要先杀掉 erl.exe这个进程
- 64位操作系统要进到 /bin/64 下启动 activemq.bat
- http://localhost:8161/admin/index.jsp 账号密码 admin/admin (默认的)
- 整合spring的监听器 依赖geronimo-jms_1.1_spec ,因为MessageListener
ActiveMQ实现步骤
- 引入依赖jar包、配置消息服务器的URL和账号密码、配置QUEUE
- 代码里写发送的内容再发送
- 接收者还要配置消息监听器
ActiveMQ消息队列的功能
- 分布式项目中解耦,消息发送者甚至不需要知道谁去消费了这个消息
- 确保消息可靠(不会因为下游宕机而丢失),发布者不需要知道消费者是否正常运行
- 提速,消息的发布者不需要耗费线程等待消费者处理消息
- 削峰、控流
- 广播功能
- 负载均衡,可以灵活易扩展无入侵式的负载均衡,多个consumer 订阅相同的queue然后每次取1个
ActiveMQ监控后台的使用
http://localhost:8161/admin/queues.jsp
程序启动后查看对应的queue里面的生产者数量,消费者数量,进队消息数,出队消息数
同步、异步、无返回值、有返回值的发送
默认配置是异步发送
使用同步方式:消费者会一直等待生产者发送消息或者超市。因其是阻塞式接收消息,故当第一次接收生产者发送过来的消息并消费后,第二次生产者提供的消息不再消费。
使用异步监听方式:消费者通过注册监听器,每当生产者有新的消息提供过来是会触发MessageListener的回调方法onMessage()方法。便于后续消息处理。
ActiveMQ的持久化机制
https://yq.aliyun.com/articles/38433
JDBC式持久化
数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock
KahaDB持久化(默认)
- 日志形式存储消息;
- 消息索引以B-Tree结构存储,可以快速更新;
- 完全支持JMS事务;
- 支持多种恢复机制;
消息存储在基于文件的数据日志中。如果消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。定期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。
LevelDB持久化
基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。
ActiveMQ集群和高可用
MASTER/SLAVE模式
特点:
- 一个Master只能有一个Slave
- Master会复制消息给Slave保持同步
|
Cluster模式(数据库共享模式 或者文件共享模式)
允许多个slave,持有锁的就是master,挂掉的话,不需要人工干预,其它slave成为新的master
安装和配置用户
安装的话参考(windows下的安装和配置环境变量和启动)
http://blog.csdn.net/chwshuang/article/details/50543878
windows下配置超级用户
http://blog.csdn.net/xiaojieblog/article/details/70332469
默认的超级用户是 guest/guest
- 查看用户
rabbitmqctl list_users - 添加用户 设置密码
rabbitmqctl add_user root 123456 - 设置用户为admin
rabbitmqctl set_user_tags root administrator 分配资源
rabbitmqctl set_permissions -p / root “.“ “.“ “.*”
消息队列整合Spring
参考:
http://blog.csdn.net/chwshuang/article/details/50580718
生产者 依赖 和spring配置
调用
消费者:主要通知配置监听以及处理监听的方法的形式实现监听消息。
我们项目的使用
后台DML课件后,后端程序发消息到 ActiveMQ通知前端程序,
前端程序监听到消息后,删除原来的索引,添加进最新的索引。
因为负责搜索的solr是在前端程序里面,所以要监听后端程序控制的课件变化。
实战
利用ActiveMq 实现一套,互动的消息。服务器A发消息 到B 然后,B接到消息后反馈给A
很全面的一个系列。
参考:http://blog.mayongfa.cn/94.html
依赖
发送文本消息和hashMap消息
<!-- 发送者依赖的最少配置 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /></bean><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="failover:tcp://127.0.0.1:61616" /><property name="userName" value="admin"/><property name="password" value="admin"/><property name="useAsyncSend" value="true"/></bean><bean id="connectionFactory"class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean>@Componentpublic class MessageSender {@Autowiredprivate JmsTemplate jmsTemplate;String dataDistination="magic_queue";private Gson gson = new Gson();public void sendMessage(User user){sendMessage(gson.toJson(user));}// 发送文本消息public void sendMessage(final String dataMap) {jmsTemplate.send(dataDistination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(dataMap);}});System.out.println("发送了一条消息。"+dataMap);}//发送Map消息private void sendMessage(final HashMap<String, Object> dataMap) {jmsTemplate.send(dataDistination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createObjectMessage(dataMap);}});}}接收文本、Map消息
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="failover:tcp://127.0.0.1:61616" /><property name="userName" value="admin"/><property name="password" value="admin"/><property name="useAsyncSend" value="true"/></bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --><bean id="connectionFactory"class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactory" /></bean><!--这个是队列目的地 --><bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>magic_queue</value></constructor-arg></bean><!-- 消息监听器 --><bean id="consumerMessageListener" class="com.test.message.TestMessageListener" /><!-- 消息监听容器 --><bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueDestination" /><property name="messageListener" ref="consumerMessageListener" /><property name="concurrentConsumers" value="2"></property><property name="concurrency" value="2-5"></property></bean>@Componentpublic class TestMessageListener implements MessageListener {//接收String消息@Overridepublic void onMessage(Message message) {// 监听发送到消息队列的文本消息,作强制转换。TextMessage textMessage = (TextMessage) message;try {System.out.println("接收到的消息内容是:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}//接收Map@Overridepublic void onMessage(Message message) {if (message instanceof ObjectMessage) {ObjectMessage objMsg = (ObjectMessage) message;try {Map<String, Object> dataMap = (Map<String, Object>) objMsg.getObject();String name=(String) dataMap.get("name");String password=(String) dataMap.get("password");System.out.println(name+password);} catch (JMSException e) {}}}}