RabbitMQ 工作队列之公平分发机制

RabbitMQ 工作队列之公平分发机制

薛定谔的汪

前言

上一篇我们学习了 RabbitMQ 的轮询调度机制,把消息平均分配给消费者去消费,而不管消费者消费消息的能力,因为对于消息队列 RabbitMQ 来说,它是不知道消费者消费消息的能力的。

现在我们想根据消费者消费消息的能力来分配消息,即”能者多劳“。比如 Recv1 消费者它收到消息后处理业务的时间是1秒,Recv2 消费者它收到消息后处理业务的时间是2秒,很明显 Recv1 消费消息的能力是高于 Recv2的,既然 Recv1能力高,公平起见,应该让 Recv1 干的活多一些,这就是公平分发(fair-dispatch)机制。

原理

RabbitMQ 提供了一个API:basicQos( perfetchCount = 1),即:RabbitMQ 的队列给某个消费者发送消息时,每次一条消息,等消费者处理完后手动告诉队列,队列才会把下一条消息发送给这个消费者。RabbitMQ 默认是消费者自动返回给队列 ACK ,为了实现公平分发,需要关闭自动 ACK

自动 ACK简介

当消费者收到消息后,不管是否处理完 成,RabbitMQ Server会立即把这个message标记为完成,然后从queue中删除。

关闭自动 ACK 后,RabbitMQ 的 Queue 将消息发给消费者,消费者处理完这条消息后才会返回 ACK,RabbitMQ Queue收到ACK 确认后才会删除此 message,在此期间,消息一直存在于队列中。

代码示例

Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.yakai.rabbitmq.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_fair_queue";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//每次消费者返回 ACK 确认消息前,队列不发送下一个消息到此消费者。
channel.basicQos(1,false);
String msg = "";
for (int i = 0; i < 50; i++) {
msg = "work "+i;
System.out.println("Send "+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//关闭通道、连接
channel.close();
connection.close();
}
}

Recv1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.yakai.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv1 {
private static final String QUEUE_NAME = "test_fair_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//每次消费者返回 ACK 确认消息前,队列不发送下一个消息到此消费者。
channel.basicQos(1,false);

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[1] :"+msg);
try {
//Recv1接收消息间隔1秒
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}finally {
//处理完后,手动回执一个 ack 确认给队列
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
//设置自动 ack 为 fasle
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

Recv2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.yakai.rabbitmq.work.fair;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv2 {
private static final String QUEUE_NAME = "test_fair_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//每次消费者返回 ACK 确认消息前,队列不发送下一个消息到此消费者。
channel.basicQos(1,false);

Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv[2] :"+msg);
try {
//Recv2接收消息间隔2秒
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}finally {
//处理完后,手动回执一个 ack 确认给队列
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

测试

先启动 Recv1、Recv2,再启动 Send,观察数据:

send

1
2
3
4
5
6
7
8
9
Send work 0
Send work 1
Send work 2
Send work 3
Send work 4
Send work 5
Send work 6
......
Send work 49

Recv1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Recv[1] :work 0
Recv[1] :work 2
Recv[1] :work 4
Recv[1] :work 5
Recv[1] :work 7
Recv[1] :work 8
Recv[1] :work 10
Recv[1] :work 11
Recv[1] :work 13
Recv[1] :work 14
Recv[1] :work 16
Recv[1] :work 17
Recv[1] :work 19
Recv[1] :work 20
Recv[1] :work 22
Recv[1] :work 23
Recv[1] :work 25
Recv[1] :work 26
Recv[1] :work 28
Recv[1] :work 29
Recv[1] :work 31
Recv[1] :work 32
Recv[1] :work 34
Recv[1] :work 35
Recv[1] :work 37
Recv[1] :work 38
Recv[1] :work 40
Recv[1] :work 41
Recv[1] :work 43
Recv[1] :work 44
Recv[1] :work 46
Recv[1] :work 47
Recv[1] :work 49

Recv2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Recv[2] :work 1
Recv[2] :work 3
Recv[2] :work 6
Recv[2] :work 9
Recv[2] :work 12
Recv[2] :work 15
Recv[2] :work 18
Recv[2] :work 21
Recv[2] :work 24
Recv[2] :work 27
Recv[2] :work 30
Recv[2] :work 33
Recv[2] :work 36
Recv[2] :work 39
Recv[2] :work 42
Recv[2] :work 45
Recv[2] :work 48

总结

这次测试我们发现,消费者 Recv1 的消费消息的数量比 Recv2 要多了不少,这是因为 RabbitMQ 使用了公平分发模式后,按照“消费能力”公平分配给各个消费者。

使用fair-dispatch机制注意:

channel 使用 basicQos(perfetchCount = 1)
设置 autoAck = false
消息消费完后记得手动回执一个 Ack 给队列,否则可能会造成队列里消息堵塞。

  • Title: RabbitMQ 工作队列之公平分发机制
  • Author: 薛定谔的汪
  • Created at : 2018-03-24 22:46:55
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/24/mq/rabbitmq/fair-dipatch/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
RabbitMQ 工作队列之公平分发机制