搜索rabbitmq镜像
docker search rabbitmq:management
下载镜像
docker pull rabbitmq:management
启动容器
docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management
打印容器
docker logs rabbitmq
访问RabbitMQ Management
http://localhost:15672
账户密码默认:guest
编写生产者类
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "Hello world!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println(" [x] Sent '"+message+"'");
channel.close();
connection.close();
}
}
运行该方法,可以看到控制台的打印
name=hello的队列收到Message
消费者
package com.xun.rabbitmqdemo.example;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setConnectionTimeout(600000);//milliseconds
factory.setRequestedHeartbeat(60);//seconds
factory.setHandshakeTimeout(6000);//milliseconds
factory.setRequestedChannelMax(5);
factory.setNetworkRecoveryInterval(500);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
System.out.println("Waiting for messages. ");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}
工作队列
RabbitMqUtils工具类
package com.xun.rabbitmqdemo.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
启动2个工作线程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
System.out.println("C1 消费者启动等待消费....");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
public class Work02 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String receivedMessage = new String(delivery.getBody());
System.out.println("接收消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
System.out.println("C2 消费者启动等待消费....");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
启动工作线程
启动发送线程
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
try(Channel channel= RabbitMqUtils.getChannel();){
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//从控制台接收消息
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
}
启动发送线程,此时发送线程等待键盘输入
发送4个消息
可以看到2个工作线程按照顺序分别接收message。
消息应答机制
rabbitmq将message发送给消费者后,就会将该消息标记为删除。
但消费者在处理message过程中宕机,会导致消息的丢失。
因此需要设置手动应答。
生产者
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;
public class Task02 {
private static final String TASK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
try(Channel channel = RabbitMqUtils.getChannel()){
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入信息");
while(scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
System.out.println("生产者task02发出消息"+ message);
}
}
}
}
消费者
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work03 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work03 等待接收消息处理时间较短");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(1);
System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;
public class Work04 {
private static final String ACK_QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("Work04 等待接收消息处理时间较长");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message = new String(delivery.getBody());
SleepUtils.sleep(30);
System.out.println("接收到消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
//采用手动应答
boolean autoAck = false;
channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
工具类SleepUtils
package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
public static void sleep(int second){
try{
Thread.sleep(1000*second);
}catch (InterruptedException _ignored){
Thread.currentThread().interrupt();
}
}
}
模拟
work04等待30s后发出ack
在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03
不公平分发
上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:
int prefetchCount = 1;
channel.basicQos(prefetchCount);
通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。
总结
到此这篇关于docker启动rabbitmq以及使用的文章就介绍到这了,更多相关docker启动rabbitmq及使用内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!