上篇学习了 RabbitMQ 的简单队列,它是“一对一”的。但是如果想实现多个消费者监听同一个队列的话,简单队列就不满足要求了。
RabbitMQ 提供了另外一种队列来满足上述需求—-工作队列:Work Queues,它有两种模式:轮询和公平分发。今天学习总结 RabbitMQ 工作队列的轮询方式。
工作队列
模型:
简单队列的不足:
简单队列是一一 对应的,而且我们实际开发,生产者发送消息基本是毫不费力的,而消费者消费消息一般是要与实际业务相结合,消费者接收到消息后就需要处理,可能需要花费时间,那么可能会造成队列内的消息积压,而工作队列是一个生产者对应多个消费者,队列内的消息可以被多个消费者消费,增加了消息被消费的效率。
Java 代码示例:
定义一个消息生产者 Send:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.yakai.rabbitmq.work;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send {
private static final String QUEUE_NAME = "test_work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = ""; for (int i = 0; i < 50; i++) { msg = "work "+i; System.out.println("Send "+msg); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); } channel.close(); connection.close(); } }
|
定义消息消费者 Recv1 消费 Send 发送的消息
注:Recv1 消费 Send 发送的消息,每隔1秒接收一次消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.work;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv1 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[1] :"+msg); try { Thread.sleep(1000); }catch (Exception e){ e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
定义消息消费者 Recv2
注:Recv2也消费 Send 发送的消息,每隔2秒消费一次消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.yakai.rabbitmq.work;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv2 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv[2] :"+msg); try { Thread.sleep(2000); }catch (Exception e){ e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME,true,consumer); } }
|
测试
先启动 Recv1、Recv2,再启动 Send,观察数据:
Send:
1 2 3 4 5 6 7 8 9
| Send work 0 Send work 1 Send work 2 Send work 3 Send work 4 Send work 5 Send work 6 ...... Send work 49
|
Recv1:
1 2 3 4 5 6 7 8
| Recv[1] :work 0 Recv[1] :work 2 Recv[1] :work 4 Recv[1] :work 6 Recv[1] :work 8 Recv[1] :work 10 ...... Recv[1] :work 48
|
Recv2:
1 2 3 4 5 6 7 8
| Recv[2] :work 1 Recv[2] :work 3 Recv[2] :work 5 Recv[2] :work 7 Recv[2] :work 9 Recv[2] :work 11 ...... Recv[2] :work 49
|
虽然消费者1和消费者2消费消息的频率(能力)不同,但是他们消费消息的数目是均分的,而且是轮流消费的,这种方式就叫轮询(round-robin,nginx和dubbo 中也有这种负载均衡机制),不管谁忙谁闲,消费的消息都是你一个我一个轮流来消费。