前言
在使用 RabbitMQ 的时候,可以通过消息持久化操作来解决因为服务器宕机而导致消息丢失,保证了消息的可靠性。但还有个问题,当消息的生产者将消息发送出去后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下消息的发送情况是不会返回给生产者的,也就是生产者默认情况下是不知道消息到底有没有发送到 RabbitMQ Server,如果在消息到达前就丢失了(比如因为网络原因),持久化操作也解决不了这个问题,因为持久化是在 RabbitMQ Server 上进行的。
RabbitMQ 针对这个问题,提供了两种解决方式:
通过事务机制(AMQP协议提供)
发送方 Confirm 机制(RabbitMQ 提供)
事务机制
RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelect()、channel.txCommit() 和 channel.txRollback(),channel.txSelect() 用于将当前的信道设置成事务模式,channel.txCommit()用于提交事务,channel.txRollback 用于事务回滚,这个类似于数据库的事务机制。
代码示例
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.yakai.rabbitmq.tx;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send { private static final String QUEUE_NAME = "test_tx_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello tx!"; System.out.println("Send :"+ msg); try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); int a = 1/0; channel.txCommit(); }catch (Exception e){ System.out.println("RabbitMQ 发送消息异常!执行回滚"); channel.txRollback(); }finally { channel.close(); connection.close(); }
}
}
|
Recv
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.yakai.rabbitmq.tx;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv { private static final String QUEUE_NAME = "test_tx_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv :"+msg); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
|
观察控制台打印
Send:
1 2
| Send :hello tx! RabbitMQ 发送消息异常!执行回滚
|
Recv:
无打印内容
这是因为代码中开启了事务,消息在发送后,发生异常,事务进行回滚,所以 Recv 没收到消息。
使用事务的局限性
我的理解是:因为使用事务要调用channel.txSelect()、channel.txCommit() 和 channel.txRollback()三个方法,在消息量大的时候,产生的协议开销会很多,这样势必会降低 RabbitMQ 的吞吐量。
RabbitMQ 提供了一个改进方案就是消息 Confirm 机制。
发送方 Confirm 机制
生产者将消息设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上发布的消息都会被指派一个唯一的 ID,一旦消息被投递到所有匹配的队列后,RabbitMQ 就会发送一个确认给生产者(包括消息的唯一 ID),这样生产者就知道消息到达目的地了。
如果消息是可持久化的,那么确认会在消息持久化后发出,这样保证了消息的可靠性。
开启 confirm 机制:channel.confirmSelect();
,它有三种模式:
普通模式:生产者发一条消息,调用 channel.waitForConfirms() 方法等待此消息的服务器确认返回。
批量模式:生产者发送多条消息,调用 channel.waitForConfirms() 方法等待这些消息的服务器确认返回。
异步模式:提供一个回调方法,当服务端确认了一条或者多条消息后,发送方会调这个方法进行处理。
注:普通模式和批量模式都上同步串行的,编程代码类似。
优点
相比于事务机制,发送方确认机制好处在于它的性能比事务机制好,并且还是异步的。一旦发布一条消息,生产者可以在等信道返回确认的同事继续发送下一条消息,当消息最终得到确认后,生产者便可以通过回调方法来处理该确认消息,如果 RabbitMQ 丢失了这条消息,那么会发送一条 nack 命令,生产者同样可以在回调方法中处理 nack。
注:事务机制和 confirm 机制是互斥的。
批量模式代码
Send:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.yakai.rabbitmq.confirm.nomal;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send { private static final String QUEUE_NAME = "test_confirm_1"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect(); for (int i = 0; i < 10; i++) { String msg = "hello confirm!"; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); }
if(channel.waitForConfirms()){ System.out.println("Send success"); }else{ System.out.println("Send fail"); } channel.close(); connection.close(); }
}
|
普通模式无非就是不循环,只发一条。
消费者代码略,逻辑类似。
异步模式
Channel 对象提供的 confirmListener() 回调方法只包含 deliveryTag(当前消息发出的消息序号),我们需要自己为每一个 channel 维护一个 unconfirm 的消息序号集合,每 publish 一条消息,集合中元素+1,每回调一次 handleAck 方法,unconfirm 集合中删掉相应的一条(mutiple=false)或者多条(mutiple=true)记录,从程序运行效率来看,这个 unconfirm 集合最好采用 SortedSet 存储结构(类似于 Redis 的 zset )。
Send
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package com.yakai.rabbitmq.confirm.async;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet;
public class Send {
private static final String QUEUE_NAME = "test_confirm_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.confirmSelect(); final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>()); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { if(multiple){ System.out.println("handleAck deliveryTag:"+deliveryTag+" multiple:"+multiple); confirmSet.headSet(deliveryTag-1).clear(); System.out.println(confirmSet.size()); }else{ System.out.println("handleAck deliveryTag:"+deliveryTag+" multiple:"+multiple); confirmSet.headSet(deliveryTag).clear(); System.out.println(confirmSet.size()); } }
@Override public void handleNack(long deliveryTag, boolean multiple) throws IOException {
} });
String msg = "hello confirm async!"; for (int i = 0; i < 10; i++) { Long nextSeq = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); confirmSet.add(nextSeq); } }
}
|
总结
使用 Confirm 机制的异步模式性能会提高很多, 的当开发中有生产者确认的需求时,强烈建议使用 Confirm 机制的异步模式!!