环境:
otp_win64_18.exe
rabbitmq-server-3.6.0.exe
RabbitMQ是建立在强大的Erlang OTP平台上,因此安装Rabbit MQ的前提是安装Erlang
http://www.erlang.org/downloads
下载并安装 Eralng OTP For Windows 比如otp_win64_xxx.exe
安装完成之后创建一个名为ERLANG_HOME的环境变量,其值指向erlang的安装目录,
同时将%ERLANG_HOME%\bin加入到Path中,
最后打开命令行,输入erl,如果出现erlang的版本信息就表示erlang语言环境安装成功
下载并安装 Rabbit MQ Server Windows Installer 比如rabbitmq-server-xxx.exe(安装目录不要有空格)
进入RabbitMQ的sbin目录,执行rabbitmqctl status 查看是否安装成功
接着输入 rabbitmq-plugins enable rabbitmq_management 来安装RabbitMQ-Plugins,
这个相当于是一个管理界面,
方便我们在浏览器界面查看RabbitMQ各个消息队列以及exchange的工作情况
插件安装完之后,在浏览器输入http://localhost:15672进行验证,默认的账户和密码均是 guest
新建一个Maven工程
com.rabbitmq amqp-client 3.6.0
消息生产者
package com.zns.rabbitmq;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer { private final static String QUEUE_NAME = "MyQueue"; public static void main(String[] argv) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost("localhost"); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 声明一个队列 -- // 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; // 发送消息到队列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("P Sent '" + message + "'"); // 关闭频道和连接 channel.close(); connection.close(); }}
消息消费者
package com.zns.rabbitmq;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class Consumer { private final static String QUEUE_NAME = "MyQueue"; public static void main(String[] argv) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ地址 factory.setHost("localhost"); // 创建一个新的连接 Connection connection = factory.newConnection(); // 创建一个频道 Channel channel = connection.createChannel(); // 声明要关注的队列 -- // 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("C [*] Waiting for messages..."); // DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("C Received '" + message + "'"); } }; // 自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); }}
确保Rabbitmq服务已开启,先运行生产者向队列发送消息,然后再运行消费者可以看到接收了消息。