今天来学习下 RabbitMQ 的消息订阅发布模式:pub/sub。
概念
发布订阅模式:一个生产者发布消息,多个消费者订阅该生产者,这样同一条消息可以被多个消费者消费。
模型
解读
- 一个生产者,多个消费者。
- 每个消费者都有自己的队列。
- 每个队列都绑定到交换器上。
- 生产者不是直接把消息发送到队列,而是发到了图中的“X”,也就是交换器(Exchange),交换器根据相应的路由绑定到对应的队列。
常见应用场景:用户注册后,短信、邮箱都通知用户注册成功。
代码示例
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yakai.rabbitmq.ps;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send {
private static final String EXCHANGE_NAME = "test_ex_fanout"; private static final String EXCHANGE_TYPE = "fanout";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE); String msg = "hello pub/sub"; channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("Send: "+msg);
channel.close(); connection.close(); } }
|
启动 Send,登录localhost:15672控制台查看Exchange 一栏,发现交换器已经声明:
疑问
那么此时的消息在哪里呢?答案是丢失,这是因为消息只发送到 Exchange,但是 Exchange 没有存储消息的功能,队列才是内存中消息存放的地方,而此时Exchange 没有与任何队列绑定,刚才发的那条消息自然就丢失了。
Recv1
让消费者声明队列,然后 Send 根据路由绑定规则,通过 Exchange,绑定到对应的队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.ps;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 { private static final String QUEUE_NAME = "test_ps_queue_sms"; private static final String EXCHANGE_NAME = "test_ex_fanout"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.ps;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 { private static final String QUEUE_NAME = "test_ps_queue_email"; private static final String EXCHANGE_NAME = "test_ex_fanout"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[2] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
测试
先启动 Recv1、Recv2,再启动 Send,观察数据:
Send:
Recv1:
Recv2:
现象
Send 发送了一条消息,Recv1和 Recv2 都接收到了这条消息。实现了 RabbitMQ 的发布订阅机制。
总结
在[RabbitMQ 基础概念详解](link: http://zhengyk.cn/2018/03/20/rabbitmq/basic-concept/)中了解了 Exchange、 RoutingKey、BindingKey等相关概念。 Exchange 类型主要有 fanout、direct、topic、headers四种,我们正是使用了 fanout 这一类型的交换器实现了订阅发布。
fanout 特点
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。当使用此类型时,RoutingKey 和 BindingKey 默认使用空字符串。