RabbitMQ ack应答和durabel持久化

RabbitMQ ack应答和durabel持久化

薛定谔的汪

前言

昨天在学习 RabbitMQ 的公平分发机制时提到了ack 应答机制,代码如下:

1
2
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);

应答 ack

RabbitMQ 有两种应答模式,自动和手动。 分别对应 autoAck 为 true 何 false。

自动应答

autoAck = true:一旦RabbitMQ 的 Queue 将message发给consumer,不管是否处理完 成,RabbitMQ Queue会立即把这个message标记为完成,然后从Queue中删除。这种情况下,如果consumer未处理完message就宕机,那么此message也就丢失了。

手动应答

autoAck = false:RabbitMQ 的 Queue 将 message 发给消费者后,consumer 处理完这条 message 后才会返回 ACK,RabbitMQ Queue 收到ACK 确认后才会删除此 message,在此期间,message 一直存在于队列中。这样消息就不容易丢失了。

持久化 durable

message 正常情况下存在于内存中,当 RabbitMQ 服务宕机后,那么此服务上的 message 也就没了,为了保证消息的可靠性,RabbitMQ 提供了持久化机制。

代码如下:

1
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

方法:

1
2
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)

当设置boolean durable = true时表示打开 RabbitMQ 的持久化机制。那么当RabbitMQ 服务重启时会把持久化的 message 加载到 Queue 中,保证了消息的可靠性。

问题:

我把先前测试的代码中的boolean durable = true,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.yakai.rabbitmq.work.fair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_fair_queue";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//持久化设置为 true
boolean durable = true;
//声明一个队列
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
//每次消费者返回 ACK 确认消息前,队列不发送下一个消息到此消费者。
channel.basicQos(1,false);
String msg = "";
for (int i = 0; i < 50; i++) {
msg = "work "+i;
System.out.println("Send "+msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
}
//关闭通道、连接
channel.close();
connection.close();
}
}

重新启动 Send,发现报错:

1
2
3
4
5
6
7
8
9
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_fair_queue' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:510)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:111)
at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:670)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
at java.lang.Thread.run(Thread.java:748)

重点异常提示:

inequivalent arg ‘durable’ for queue ‘test_fair_queue’ in vhost ‘/‘: received ‘true’ but current is ‘false’, class-id=50, method-id=10)

这表示”test_fair_queue“队列先前已经声明过了,RabbitMQ 服务中已经存在这个队列,且 RabbitMQ 不允许对已经存在的队列重新定义

解决办法:

  1. 进入控制台将当前队列删除。
  2. 或重新声明一个新的队。
  3. 或使用新的 vhost,前面已经了解到每个 vhost 之间是绝对隔离的,不同的 vhost 下可以存在相同名称的队列。

总结

  1. RabbitMQ在服务端没有声明队列和消息持久化时,队列和消息是存在内存中的,服务端宕机了,队列和消息也不会保留。
  2. 服务端声明持久化,需要确认 RabbitMQ Server 中不存在队列与将要声明队列命名冲突,若已存在,则启动报错。
  3. 服务端声明持久化,客户端想接受消息的话,必须也要声明queue时,也要声明持久化,不然的话,客户端执行会报错。
  • Title: RabbitMQ ack应答和durabel持久化
  • Author: 薛定谔的汪
  • Created at : 2018-03-25 13:20:16
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/25/mq/rabbitmq/ack-durable/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
RabbitMQ ack应答和durabel持久化