RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。这就是简单队列。
我是利用 SpringBoot 来进行测试,因为 SpringBoot 已经集成 RabbitMQ 所需 jar 包,直接新建一个 SpringBoot 项目然后引入依赖即可:
1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.2.0</version> </dependency>
|
ConnectionUtil工具类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.yakai.rabbitmq.util;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil { public static Connection getConnection() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); return factory.newConnection(); }
}
|
消息提供者Send:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.yakai.rabbitmq.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.yakai.rabbitmq.util.ConnectionUtil;
public class Send { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "hello simple!"; System.out.println("Send :"+ msg); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); channel.close(); connection.close(); }
}
|
消息消费者 Recv:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| package com.yakai.rabbitmq.simple;
import com.rabbitmq.client.*; import com.yakai.rabbitmq.util.ConnectionUtil;
import java.io.IOException;
public class Recv { private static final String QUEUE_NAME = "test_simple_queue"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("Recv :"+msg); } }; channel.basicConsume(QUEUE_NAME,consumer); } }
|
测试:
先启动 Send 类:
在管理业上查看得知“test_simple_queue”已经创建,如图:
之后启动 Recv 类,观察可以正常收到消息:
疑问?
到此,一个 RabbitMQ 的简单队列小 Demo 已经完成,通过以上代码发现,声明队列、消息发送、消息接收都是由channel 完成的,而 channel 是由connection 创建的,这里有个疑问:为什么不直接使用 connection 来完成队列的声明、消息发送、消息接收的操作呢?
接下来带着这个疑问继续学习 RabbitMQ。