RabbitMQ 事务和发送方确认机制

RabbitMQ 事务和发送方确认机制

薛定谔的汪

前言

在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器宕机而导致消息丢失,保证了消息的可靠性。但还有个问题,当消息的生产者将消息发送出去后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下消息的发送情况是不会返回给生产者的,也就是生产者默认情况下是不知道消息到底有没有发送到 RabbitMQ Server,如果在消息到达前就丢失了(比如因为网络原因),持久化操作也解决不了这个问题,因为持久化是在 RabbitMQ Server 上进行的。

RabbitMQ 针对这个问题,提供了两种解决方式:

通过事务机制(AMQP协议提供)
发送方 Confirm 机制(RabbitMQ 提供)

事务机制

RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelect()、channel.txCommit() 和 channel.txRollback(),channel.txSelect() 用于将当前的信道设置成事务模式,channel.txCommit()用于提交事务,channel.txRollback 用于事务回滚,这个类似于数据库的事务机制。

代码示例

Send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.yakai.rabbitmq.tx;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_tx_queue";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息
String msg = "hello tx!";
System.out.println("Send :"+ msg);
try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//人造异常
int a = 1/0;
channel.txCommit();
}catch (Exception e){
System.out.println("RabbitMQ 发送消息异常!执行回滚");
channel.txRollback();
}finally {
//关闭通道、连接
channel.close();
connection.close();
}
//发消息

}

}

Recv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yakai.rabbitmq.tx;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv {
private static final String QUEUE_NAME = "test_tx_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv :"+msg);
}
};
channel.basicConsume(QUEUE_NAME,consumer);
}
}

观察控制台打印

Send:

1
2
Send :hello tx!
RabbitMQ 发送消息异常!执行回滚

Recv:

无打印内容

这是因为代码中开启了事务,消息在发送后,发生异常,事务进行回滚,所以 Recv 没收到消息。

使用事务的局限性

我的理解是:因为使用事务要调用channel.txSelect()、channel.txCommit() 和 channel.txRollback()三个方法,在消息量大的时候,产生的协议开销会很多,这样势必会降低 RabbitMQ 的吞吐量。

RabbitMQ 提供了一个改进方案就是消息 Confirm 机制。

发送方 Confirm 机制

生产者将消息设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列后,RabbitMQ 就会发送一个确认给生产者(包括消息的唯一 ID),这样生产者就知道消息到达目的地了。

如果消息是可持久化的,那么确认会在消息持久化后发出,这样保证了消息的可靠性。

开启 confirm 机制:channel.confirmSelect();,它有三种模式:

普通模式:生产者发一条消息,调用 channel.waitForConfirms() 方法等待此消息的服务器确认返回。

批量模式:生产者发送多条消息,调用 channel.waitForConfirms() 方法等待这些消息的服务器确认返回。

异步模式:提供一个回调方法,当服务端确认了一条或者多条消息后,发送方会调这个方法进行处理。

注:普通模式和批量模式都上同步串行的,编程代码类似。

优点

相比于事务机制,发送方确认机制好处在于它的性能比事务机制好,并且还是异步的。一旦发布一条消息,生产者可以在等信道返回确认的同事继续发送下一条消息,当消息最终得到确认后,生产者便可以通过回调方法来处理该确认消息,如果 RabbitMQ 丢失了这条消息,那么会发送一条 nack 命令,生产者同样可以在回调方法中处理 nack。

注:事务机制和 confirm 机制是互斥的。

批量模式代码

Send:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.yakai.rabbitmq.confirm.nomal;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_confirm_1";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//使用 Confirm 机制
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
//消息
String msg = "hello confirm!";
//发消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}

if(channel.waitForConfirms()){
System.out.println("Send success");
}else{
System.out.println("Send fail");
}
//关闭通道、连接
channel.close();
connection.close();
}

}

普通模式无非就是不循环,只发一条。

消费者代码略,逻辑类似。

异步模式

Channel 对象提供的 confirmListener() 回调方法只包含 deliveryTag(当前消息发出的消息序号),我们需要自己为每一个 channel 维护一个 unconfirm 的消息序号集合,每 publish 一条消息,集合中元素+1,每回调一次 handleAck 方法,unconfirm 集合中删掉相应的一条(mutiple=false)或者多条(mutiple=true)记录,从程序运行效率来看,这个 unconfirm 集合最好采用 SortedSet 存储结构(类似于 Redis 的 zset )。

Send

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.yakai.rabbitmq.confirm.async;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

public class Send {

private static final String QUEUE_NAME = "test_confirm_2";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

//使用 Confirm 机制
channel.confirmSelect();
//存放未确认消息的集合
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if(multiple){
//可能返回重复的,所以要对重复的处理
System.out.println("handleAck deliveryTag:"+deliveryTag+" multiple:"+multiple);
confirmSet.headSet(deliveryTag-1).clear();
System.out.println(confirmSet.size());
}else{
System.out.println("handleAck deliveryTag:"+deliveryTag+" multiple:"+multiple);
confirmSet.headSet(deliveryTag).clear();
System.out.println(confirmSet.size());
}
}

@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {

//根据业务场景处理

}
});

//消息
String msg = "hello confirm async!";
//发10条消息
for (int i = 0; i < 10; i++) {
Long nextSeq = channel.getNextPublishSeqNo();
//发消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(nextSeq);
}
}

}

总结

使用 Confirm 机制的异步模式性能会提高很多, 的当开发中有生产者确认的需求时,强烈建议使用 Confirm 机制的异步模式!!

  • Title: RabbitMQ 事务和发送方确认机制
  • Author: 薛定谔的汪
  • Created at : 2018-03-29 19:44:48
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/29/mq/rabbitmq/tx-confirm/
  • License: This work is licensed under CC BY-NC-SA 4.0.
On this page
RabbitMQ 事务和发送方确认机制