上篇学习总结了 RabbitMQ 的 sub/pub 模式,今天接着学习它的 Routing 模式。此模式使用了“direct”类型的 Exchange。
模型
解读
队列一的 BindingKey 是“error”,队列二的BindingKey 是“info”、“error”和“warning”,Exchange 的类型是 direct,当 RoutingKey 是 error 时,Exchange 将消息发送到 队列一,当 RoutingKey 是 info 或 error 或 warning 时,Exchange 将消息发送到队列二。
代码示例
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.direct;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send {
private static final String EXCHANGE_NAME = "test_ex_direct"; private static final String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE); String msg = "error logs"; String routingKey = "error"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("Send: "+msg);
channel.close(); connection.close(); } }
|
注:Send 写完后先运行一下,让 RabbitMQ 中先生成这个交换器,防止Recv 运行的时候报错
Recv1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yakai.rabbitmq.direct;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 { private static final String QUEUE_NAME = "test_routing_queue_1"; private static final String EXCHANGE_NAME = "test_ex_direct"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String routingKey = "error"; channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.yakai.rabbitmq.direct;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 { private static final String QUEUE_NAME = "test_routing_queue_2"; private static final String EXCHANGE_NAME = "test_ex_direct"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
注:channel.queueBind(String queue,String exchange,String routingKey);
方法中的参数 routingKey 其实准确地来说应该是 bindingKey,其实在某些情形下,RoutingKey 和 BindingKey可以认为是一个东西,因为在 direct 交换器类型下,RoutingKey 和 BindingKey 需要完全匹配才可能使用,所以代码中的 API 使用了此种写法会显得很方便
测试
先启动 Recv1、Recv2,再启动 Send,观察数据:
Send:
Recv1:
Recv2:
发送“error”信息,Recv1和 Recv2都可以接收到。
修改Send代码:
1 2 3
| String msg = "info logs";
String routingKey = "info";
|
其他不变,继续测试,观察控制台打印:
Send:
Recv1:
控制台无变化
Recv2:
总结
“direct”类型的 Exchange 会把消息路由到 BindingKey 和 RoutingKey 完全匹配的队列中