rabbitMQ安装教程网上特别多就不多赘述,这里主要说一下怎么去连接
第一步,创建工程添加依赖
创建一个Maven项目,打开pom.xml,添加两个依赖,并更新Maven。
4.0.0 org.example rabbitMQdemo 1.0-SNAPSHOT ------------添加下面两个依赖------------ com.rabbitmq amqp-client 5.14.0 org.slf4j slf4j-nop 1.7.25
第二步,配置连接
在src->main->java中新建一个文件夹utils,在此文件夹中添加class:rabbitMQUtils
package utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMQUtils { private static final ConnectionFactory factory; static { factory = new ConnectionFactory(); factory.setHost("126.239.25.24"); // 换成自己的ip factory.setPort(5672); // 一般默认端口为5672 factory.setUsername("root"); factory.setPassword("123456"); factory.setAutomaticRecoveryEnabled(true); // 开启Connection自动恢复功能 factory.setNetworkRecoveryInterval(5000); factory.setVirtualHost("/"); factory.setConnectionTimeout(30 * 1000); factory.setHandshakeTimeout(30 * 1000); factory.setShutdownTimeout(0); } // 定义提供连接对象的方法 public static Connection getConnection() { try { return factory.newConnection(); } catch (Exception e) { e.printStackTrace(); } return null; } // 定义关闭通道和连接的方法 public static void closeAll(Channel chan, Connection conn) { try{ if(chan != null) chan.close(); if(conn != null) conn.close(); } catch (Exception e) { e.printStackTrace(); } }}
需要注意的是要将ip、用户名、密码更换成自己的,如果是在服务器上装的MQ记得在安全组中把端口放行,另外记得设置虚拟用户"/",或改成自己的虚拟用户名。
第三步,测试连接
使用直连测试连接效果,在src->main->java创建文件夹direct,添加两个class:consumer和producer。
package direct;import com.rabbitmq.client.*;import utils.RabbitMQUtils;import java.io.IOException;public class producer { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); assert connection != null; final Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare("direct_router", "direct", true, false, false, null); // 声明消息队列 // 参数1: 队列名称 // 参数2: 定义是否需要持久化队列,true为持久化 // 参数3: 定义是否让当前连接独占队列,true为独占 // 参数4: 是否在消费完成后自动删除队列,true为删除 // 参数5: 额外附加参数,传入HashMap channel.queueDeclare("direct_queue", false, false, false, null); // 交换机与消息队列绑定 channel.queueBind("direct_queue", "direct_router", "Dir-RQ"); // 发送消息 // 参数1: 交换机名称 // 参数2: routingKey // 参数3: 若为true,则当消息无法送达指定队列时会触发channel.BasicReturn事件否则broker会将消息直接丢弃 // 参数4: 传递消息的额外设置 // 参数5: 发送消息内容 String msg="direct producer message"; channel.basicPublish("direct_router", "Dir-RQ", true, null, msg.getBytes()); RabbitMQUtils.closeAll(channel, connection); }}
package direct;import com.rabbitmq.client.*;import utils.RabbitMQUtils;import java.io.IOException;public class consumer { public static void main(String[] args) throws IOException, InterruptedException { Connection connection = RabbitMQUtils.getConnection(); assert connection != null; final Channel channel = connection.createChannel(); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery( String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body ) throws IOException { // 打印消息 String msg= new String(body, "utf-8"); System.out.println(msg); // 应答机制 将队列中的消息删除掉,第二个参数为是否需要确认多个ACK channel.basicAck(envelope.getDeliveryTag(), false); } }; // 接收消息 // 参数1: 队列名称 // 参数2: 是否自动签收消息,对应上面的应答机制,最好手动删,否则消费者多了可能会出问题 channel.basicConsume("direct_queue", false, "ConsumerTag", consumer); Thread.sleep(20000); RabbitMQUtils.closeAll(channel, connection); }}
右键运行consumer,20s内运行producer就能看见控制台中接收到了producer发出的消息。
来源地址:https://blog.csdn.net/cris_tian/article/details/131531080