前言
上一篇我们学习了 RabbitMQ 的轮询调度机制,把消息平均分配给消费者去消费,而不管消费者消费消息的能力,因为对于消息队列 RabbitMQ 来说,它是不知道消费者消费消息的能力的。
现在我们想根据消费者消费消息的能力来分配消息,即”能者多劳“。比如 Recv1 消费者它收到消息后处理业务的时间是1秒,Recv2 消费者它收到消息后处理业务的时间是2秒,很明显 Recv1 消费消息的能力是高于 Recv2的,既然 Recv1能力高,公平起见,应该让 Recv1 干的活多一些,这就是公平分发(fair-dispatch)机制。
原理
RabbitMQ 提供了一个API:basicQos( perfetchCount = 1),即:RabbitMQ 的队列给某个消费者发送消息时,每次一条消息,等消费者处理完后手动告诉队列,队列才会把下一条消息发送给这个消费者。RabbitMQ 默认是消费者自动返回给队列 ACK ,为了实现公平分发,需要关闭自动 ACK。
自动 ACK简介
当消费者收到消息后,不管是否处理完 成,RabbitMQ Server会立即把这个message标记为完成,然后从queue中删除。
关闭自动 ACK 后,RabbitMQ 的 Queue 将消息发给消费者,消费者处理完这条消息后才会返回 ACK,RabbitMQ Queue收到ACK 确认后才会删除此 message,在此期间,消息一直存在于队列中。
代码示例
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| package com.yakai.rabbitmq.work.fair;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send {
private static final String QUEUE_NAME = "test_fair_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1,false); String msg = ""; for (int i = 0; i < 50; i++) { msg = "work "+i; System.out.println("Send "+msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } channel.close(); connection.close(); } }
|
Recv1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.yakai.rabbitmq.work.fair;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 { private static final String QUEUE_NAME = "test_fair_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1,false);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg); try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); }finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.yakai.rabbitmq.work.fair;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 { private static final String QUEUE_NAME = "test_fair_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1,false);
Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[2] :"+msg); try { Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); }finally { channel.basicAck(envelope.getDeliveryTag(),false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
测试
先启动 Recv1、Recv2,再启动 Send,观察数据:
send
1 2 3 4 5 6 7 8 9
| Send work 0 Send work 1 Send work 2 Send work 3 Send work 4 Send work 5 Send work 6 ...... Send work 49
|
Recv1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| Recv[1] :work 0 Recv[1] :work 2 Recv[1] :work 4 Recv[1] :work 5 Recv[1] :work 7 Recv[1] :work 8 Recv[1] :work 10 Recv[1] :work 11 Recv[1] :work 13 Recv[1] :work 14 Recv[1] :work 16 Recv[1] :work 17 Recv[1] :work 19 Recv[1] :work 20 Recv[1] :work 22 Recv[1] :work 23 Recv[1] :work 25 Recv[1] :work 26 Recv[1] :work 28 Recv[1] :work 29 Recv[1] :work 31 Recv[1] :work 32 Recv[1] :work 34 Recv[1] :work 35 Recv[1] :work 37 Recv[1] :work 38 Recv[1] :work 40 Recv[1] :work 41 Recv[1] :work 43 Recv[1] :work 44 Recv[1] :work 46 Recv[1] :work 47 Recv[1] :work 49
|
Recv2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Recv[2] :work 1 Recv[2] :work 3 Recv[2] :work 6 Recv[2] :work 9 Recv[2] :work 12 Recv[2] :work 15 Recv[2] :work 18 Recv[2] :work 21 Recv[2] :work 24 Recv[2] :work 27 Recv[2] :work 30 Recv[2] :work 33 Recv[2] :work 36 Recv[2] :work 39 Recv[2] :work 42 Recv[2] :work 45 Recv[2] :work 48
|
总结
这次测试我们发现,消费者 Recv1 的消费消息的数量比 Recv2 要多了不少,这是因为 RabbitMQ 使用了公平分发模式后,按照“消费能力”公平分配给各个消费者。
使用fair-dispatch机制注意:
channel 使用 basicQos(perfetchCount = 1)
设置 autoAck = false
消息消费完后记得手动回执一个 Ack 给队列,否则可能会造成队列里消息堵塞。