前言
前面学到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务需求,RabbitMQ 提供了一种 topic 模型可以灵活组配路由和绑定。
模型
topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型相似,也是将消息路又道 BindingKey 和 RountingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
RoutingKey 为一个点号“.”分隔的字符串(被“.”分隔的每一段独立的字符串为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”;
BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)。
如上图所示:
RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;
RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;
RoutingKey 为“lazy.apple.rabbit”的消息会同时路由到队列Q2;
RoutingKey 为“yakai.orange.mq”的消息会同时路由到队列 Q1;
…
代码示例
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.topic;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send {
private static final String EXCHANGE_NAME = "test_ex_topic"; private static final String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE); String msg = "hello topic"; String routingKey = "yakai.orange.rabbit"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes()); System.out.println("Send: "+msg);
channel.close(); connection.close(); } }
|
Recv1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yakai.rabbitmq.topic;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 { private static final String QUEUE_NAME = "test_topic_queue_1"; private static final String EXCHANGE_NAME = "test_ex_topic"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String routingKey = "*.orange.*"; channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package com.yakai.rabbitmq.topic;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 { private static final String QUEUE_NAME = "test_topic_queue_2"; private static final String EXCHANGE_NAME = "test_ex_topic"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[2] :"+msg);
} }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
测试
启动 Recv1、Recv2,再启动 Send,观察控制台打印,根据上图路由规则,RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;
Send:
Recv1:
Recv2:
测试结果与猜想一致。
修改 Send 路由键,RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;
1
| String routingKey = "yakai.apple.rabbit";
|
观察控制台打印
Send:
Recv1:
无变化
Recv2:
测试结果与猜想一致。
总结
topic 类型的交换器匹配规则按照 “*” 或 “#”进行模糊匹配,可以根据不同业务需求灵活地进行配置。