RabbitMQ 工作队列之轮询调度机制

RabbitMQ 工作队列之轮询调度机制

薛定谔的汪

上篇学习了 RabbitMQ 的简单队列,它是“一对一”的。但是如果想实现多个消费者监听同一个队列的话,简单队列就不满足要求了。

RabbitMQ 提供了另外一种队列来满足上述需求—-工作队列:Work Queues,它有两种模式:轮询和公平分发。今天学习总结 RabbitMQ 工作队列的轮询方式。

工作队列

模型:

简单队列的不足:

简单队列是一一 对应的,而且我们实际开发,生产者发送消息基本是毫不费力的,而消费者消费消息一般是要与实际业务相结合,消费者接收到消息后就需要处理,可能需要花费时间,那么可能会造成队列内的消息积压,而工作队列是一个生产者对应多个消费者,队列内的消息可以被多个消费者消费,增加了消息被消费的效率。

Java 代码示例:

定义一个消息生产者 Send:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.yakai.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_work_queue";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//发送50条消息到队列
String msg = "";
for (int i = 0; i < 50; i++) {
msg = "work "+i;
System.out.println("Send "+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//关闭通道、连接
channel.close();
connection.close();
}
}

定义消息消费者 Recv1 消费 Send 发送的消息

注:Recv1 消费 Send 发送的消息,每隔1秒接收一次消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yakai.rabbitmq.work;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv1 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[1] :"+msg);
try {
//Recv1接收消息间隔1秒
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

定义消息消费者 Recv2

注:Recv2也消费 Send 发送的消息,每隔2秒消费一次消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.yakai.rabbitmq.work;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv2 {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[2] :"+msg);
try {
//Recv2接收消息间隔2秒
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

测试

先启动 Recv1、Recv2,再启动 Send,观察数据:

Send:

1
2
3
4
5
6
7
8
9
Send work 0
Send work 1
Send work 2
Send work 3
Send work 4
Send work 5
Send work 6
......
Send work 49

Recv1:

1
2
3
4
5
6
7
8
Recv[1] :work 0
Recv[1] :work 2
Recv[1] :work 4
Recv[1] :work 6
Recv[1] :work 8
Recv[1] :work 10
......
Recv[1] :work 48

Recv2:

1
2
3
4
5
6
7
8
Recv[2] :work 1
Recv[2] :work 3
Recv[2] :work 5
Recv[2] :work 7
Recv[2] :work 9
Recv[2] :work 11
......
Recv[2] :work 49

虽然消费者1和消费者2消费消息的频率(能力)不同,但是他们消费消息的数目是均分的,而且是轮流消费的,这种方式就叫轮询(round-robin,nginx和dubbo 中也有这种负载均衡机制),不管谁忙谁闲,消费的消息都是你一个我一个轮流来消费。

  • Title: RabbitMQ 工作队列之轮询调度机制
  • Author: 薛定谔的汪
  • Created at : 2018-03-23 22:46:55
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/23/mq/rabbitmq/round-robin/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
RabbitMQ 工作队列之轮询调度机制