消息队列RabbitMQ
学习资源
RabbitMQ大佬系列博客,超级多的干货,还出书了
https://blog.csdn.net/u013256816/article/details/79838428
https://blog.csdn.net/u013256816/article/list/3?t=1
RabbitMq
知乎的结果是学习 RabbitMQ,还有工具Erlang
- 刷完这个rabbitMQq专栏,连windows下的安装和配置都有,太详细了
http://blog.csdn.net/chwshuang/article/details/50543878 - 理解我们项目中配置的消息队列结构
- windows rabbitMq 安装和配置
http://blog.csdn.net/chwshuang/article/details/50543878
重要的概念
channel
生产者和消费者和队列服务器交互就必须建立一个物理连接。
特别是消费者,如果想持续收到消息的话,channel就不能关闭。
channel创建和关闭
先创建好channel,所有的exchange,或者queue 都是绑定在channel上的ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();...channel.close();connection.close();queue绑定、消息的publish、交换器的绑定都是建立在channel上的
channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
exchange 交换器
- 生产者直接把消息发给exchange,然后exchange根据type不同,会把消息发送到不同的quequ中
- 所有的queue都要绑定在exchange,如果不声明的话,就绑定在默认的fanout的exchange当中
- exchange有2个属性: type 和routingKeys,type为fanout时就没有必要设置routingKeys了
- 一个queue只能绑定在一个exchange当中,但是可以绑定routingKeys,如果是top的话,可以正则匹配任意个routingKeys
exchange的类型
fanout 类型,所有绑定这个exchange的queue都会收到消息,这时候设置routingKeys无效
direct 可以不指定queueName,指定routingKey 和 exchangeName,和type来发消息
topic direct的加强版,消费者建立订阅满足正在表达式式的routingKey的消息
queue
消息传送的通道,先进先出原则。queue是通过name来确定的,可以多个生产者往同一个queue中生产数据,也可以多个消费者同时订阅相同的queue。
下面获取的是独立 随机 且自动删除的queue,每次创建都不一样,和空的exchange完全不同,那个 是默认公用的交互器,这个是独立随机且自动删除的queue
String queueName = channel.queueDeclare().getQueue();一个queue只能绑定在一个exchange上面,但是一个queue可以绑定多个routingKeys
|
注意事项
要先启动Consumer,建立对应的queue,发送消息的时候如果发现该exchange没有等待的queue,消息自动丢弃
一个queue可以连接着多个consumer,然后默认负载均衡从队列中消费消息;一个exchange可以连接多个queue
生产者可以不指定queue发送消息,只指定消息的路由器和routingKeys即可。例如direct类型的路由器发送特定routingKeys的消息
Producer 不指定queue发送消息channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 发送消息for(String severity :routingKeys){String message = "Send the message level:" + severity;channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}Consumer 取随机queueName来 对应设置的exchange 和routingKey来接受消息String queueName = channel.queueDeclare().getQueue();// 根据路由关键字进行多重绑定for (String severity : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, severity);System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);}生产者可以指定queueName来发送消息,
channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));Consumer:channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("C [*] Waiting for messages. To exit press CTRL+C");channel.basicConsume(QUEUE_NAME, true, consumer);