RabbitMQ topic类型交换器

RabbitMQ topic类型交换器

薛定谔的汪

前言

前面学到 direct 类型的交换器路由规则是完全匹配 BindingKey 和 RoutingKey,但是这种严格的匹配方式在很多情况下不能满足实际业务需求,RabbitMQ 提供了一种 topic 模型可以灵活组配路由和绑定。

模型

topic 类型的交换器在匹配规则上进行了扩展,它与 direct 类型相似,也是将消息路又道 BindingKey 和 RountingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:

RoutingKey 为一个点号“.”分隔的字符串(被“.”分隔的每一段独立的字符串为一个单词),如“com.rabbitmq.client”、“java.util.concurrent”;

BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;

BindingKey 中可以存在两种特殊字符串“*”和“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)。

如上图所示:

RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;

RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;

RoutingKey 为“lazy.apple.rabbit”的消息会同时路由到队列Q2;

RoutingKey 为“yakai.orange.mq”的消息会同时路由到队列 Q1;

代码示例

Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yakai.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String EXCHANGE_NAME = "test_ex_topic";
//交换器类型是topic
private static final String EXCHANGE_TYPE = "topic";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();

//声明交换器
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
String msg = "hello topic";
//发送消息
String routingKey = "yakai.orange.rabbit";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("Send: "+msg);

//关闭通道、连接
channel.close();
connection.close();
}
}

Recv1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yakai.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv1 {
private static final String QUEUE_NAME = "test_topic_queue_1";
private static final String EXCHANGE_NAME = "test_ex_topic";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定交换器
String routingKey = "*.orange.*";
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[1] :"+msg);

}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

Recv2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yakai.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv2 {
private static final String QUEUE_NAME = "test_topic_queue_2";
private static final String EXCHANGE_NAME = "test_ex_topic";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//绑定交换器
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[2] :"+msg);

}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

测试

启动 Recv1、Recv2,再启动 Send,观察控制台打印,根据上图路由规则,RoutingKey 为“yakai.orange.rabbit”的消息会同时路由到队列 Q1 和 Q2;

Send:

1
Send: hello topic

Recv1:

1
Recv[1] :hello topic

Recv2:

1
Recv[2] :hello topic

测试结果与猜想一致。

修改 Send 路由键,RoutingKey 为“yakai.apple.rabbit”的消息会路由到队列 Q2;

1
String routingKey = "yakai.apple.rabbit";

观察控制台打印

Send:

1
Send: hello topic

Recv1:

无变化

Recv2:

1
Recv[2] :hello topic

测试结果与猜想一致。

总结

topic 类型的交换器匹配规则按照 “*” 或 “#”进行模糊匹配,可以根据不同业务需求灵活地进行配置。

  • Title: RabbitMQ topic类型交换器
  • Author: 薛定谔的汪
  • Created at : 2018-03-28 19:44:48
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/28/mq/rabbitmq/topic/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
RabbitMQ topic类型交换器