使用Java操作RabbitMQ简单队列

使用Java操作RabbitMQ简单队列

薛定谔的汪

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。这就是简单队列。

RabbitMQ基础概念详细介绍

我是利用 SpringBoot 来进行测试,因为 SpringBoot 已经集成 RabbitMQ 所需 jar 包,直接新建一个 SpringBoot 项目然后引入依赖即可:

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.2.0</version>
</dependency>

ConnectionUtil工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.yakai.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
public static Connection getConnection() throws Exception{
//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器地址
factory.setHost("localhost");
//AMQP 5672
factory.setPort(5672);
//vhost 默认 / 可以不设置
//factory.setVirtualHost("/");
//用户名
factory.setUsername("admin");
//密码
factory.setPassword("admin");
//创建连接
return factory.newConnection();

}

}

消息提供者Send:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.yakai.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.yakai.rabbitmq.util.ConnectionUtil;

public class Send {

private static final String QUEUE_NAME = "test_simple_queue";

public static void main(String[] args) throws Exception {

//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列 这些参数暂时忽略,先实现功能,之后会深入学习
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//消息
String msg = "hello simple!";
System.out.println("Send :"+ msg);
//发消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
//关闭通道、连接
channel.close();
connection.close();
}

}

消息消费者 Recv:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.yakai.rabbitmq.simple;

import com.rabbitmq.client.*;
import com.yakai.rabbitmq.util.ConnectionUtil;

import java.io.IOException;

public class Recv {
private static final String QUEUE_NAME = "test_simple_queue";
public static void main(String[] args) throws Exception{
//获取一个连接
Connection connection = ConnectionUtil.getConnection();
//从连接中获取一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//这里使用的是新的 API 来接受消息。 类似监听回调机制
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("Recv :"+msg);
}
};
channel.basicConsume(QUEUE_NAME,consumer);
}
}

测试:

先启动 Send 类:

1
Send :hello simple!

在管理业上查看得知“test_simple_queue”已经创建,如图:

之后启动 Recv 类,观察可以正常收到消息:

1
Recv :hello simple!

疑问?

到此,一个 RabbitMQ 的简单队列小 Demo 已经完成,通过以上代码发现,声明队列、消息发送、消息接收都是由channel 完成的,而 channel 是由connection 创建的,这里有个疑问:为什么不直接使用 connection 来完成队列的声明、消息发送、消息接收的操作呢?

接下来带着这个疑问继续学习 RabbitMQ。

  • Title: 使用Java操作RabbitMQ简单队列
  • Author: 薛定谔的汪
  • Created at : 2018-03-22 20:12:10
  • Updated at : 2023-11-17 19:37:37
  • Link: https://www.zhengyk.cn/2018/03/22/mq/rabbitmq/java-simple-queue/
  • License: This work is licensed under CC BY-NC-SA 4.0.