前言
前面学到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务需求,RabbitMQ 提供了一种 topic 模型可以灵活组配路由和绑定。
模型


topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型相似,也是将消息路又道 BindingKey 和 RountingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
RoutingKey 为一个点号“.”分隔的字符串(被“.”分隔的每一段独立的字符串为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”;
BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)。
如上图所示:
RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;
RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;
RoutingKey 为“lazy.apple.rabbit”的消息会同时路由到队列Q2;
RoutingKey 为“yakai.orange.mq”的消息会同时路由到队列 Q1;
…
代码示例
Send
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 
 | package com.yakai.rabbitmq.topic;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.yakai.rabbitmq.util.ConnectionUtil;
 
 public class Send {
 
 private static final String EXCHANGE_NAME = "test_ex_topic";
 
 private static final String EXCHANGE_TYPE = "topic";
 
 public static void main(String[] args) throws Exception {
 
 
 Connection connection = ConnectionUtil.getConnection();
 
 Channel channel = connection.createChannel();
 
 
 channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
 String msg = "hello topic";
 
 String routingKey = "yakai.orange.rabbit";
 channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
 System.out.println("Send: "+msg);
 
 
 channel.close();
 connection.close();
 }
 }
 
 
 | 
Recv1
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | package com.yakai.rabbitmq.topic;
 import com.rabbitmq.client.*;
 import com.yakai.rabbitmq.util.ConnectionUtil;
 
 import java.io.IOException;
 
 public class Recv1 {
 private static final String QUEUE_NAME = "test_topic_queue_1";
 private static final String EXCHANGE_NAME = "test_ex_topic";
 public static void main(String[] args) throws Exception{
 
 Connection connection = ConnectionUtil.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
 
 String routingKey = "*.orange.*";
 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
 
 Consumer consumer = new DefaultConsumer(channel){
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body,"utf-8");
 System.out.println("Recv[1] :"+msg);
 
 }
 };
 channel.basicConsume(QUEUE_NAME,true,consumer);
 }
 }
 
 
 | 
Recv2
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 
 | package com.yakai.rabbitmq.topic;
 import com.rabbitmq.client.*;
 import com.yakai.rabbitmq.util.ConnectionUtil;
 
 import java.io.IOException;
 
 public class Recv2 {
 private static final String QUEUE_NAME = "test_topic_queue_2";
 private static final String EXCHANGE_NAME = "test_ex_topic";
 public static void main(String[] args) throws Exception{
 
 Connection connection = ConnectionUtil.getConnection();
 
 Channel channel = connection.createChannel();
 
 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
 
 
 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
 
 Consumer consumer = new DefaultConsumer(channel){
 @Override
 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 String msg = new String(body,"utf-8");
 System.out.println("Recv[2] :"+msg);
 
 }
 };
 channel.basicConsume(QUEUE_NAME,true,consumer);
 }
 }
 
 
 | 
测试
启动 Recv1、Recv2,再启动 Send,观察控制台打印,根据上图路由规则,RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;
Send:
Recv1:
Recv2:
测试结果与猜想一致。
修改 Send 路由键,RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;
| 1
 | String routingKey = "yakai.apple.rabbit";
 | 
观察控制台打印
Send:
Recv1:
无变化
Recv2:
测试结果与猜想一致。
总结
topic 类型的交换器匹配规则按照 “*” 或 “#”进行模糊匹配,可以根据不同业务需求灵活地进行配置。