1. 消息代理 (Broker)
ActiveMQ 核心组件,负责处理所有消息流。它提供一个平台,应用程序可以连接、发送和接收消息。
BrokerService broker = new BrokerService();
broker.addConnector("tcp://0.0.0.0:61616");
broker.start();
2. 消息队列
在 ActiveMQ 中存储消息的逻辑概念容器。消息队列可以从多个生产者接收消息,并将其传递给多个消费者。
Queue queue = session.createQueue("my-queue");
3. 主题
发布/订阅模型,消息生产者将消息发布到主题,感兴趣的消费者可以订阅该主题并接收所有发布的消息。
Topic topic = session.createTopic("my-topic");
4. 会话
应用程序与 ActiveMQ 代理之间通信的会话。会话允许应用程序发送和接收消息,以及管理事务。
Session session = broker.createSession();
5. 生产者
将消息发送到消息队列或主题的应用程序。
MessageProducer producer = session.createProducer(queue);
6. 消费者
从消息队列或主题接收消息的应用程序。
MessageConsumer consumer = session.createConsumer(queue);
7. 事务
一组操作,要么全部执行,要么全部回滚。ActiveMQ 支持事务以确保消息传递的可靠性和一致性。
session.begin();
producer.send(message);
session.commit();
8. 持久性
消息的持久性决定了当代理器重新启动或发生故障时消息是否会被保留。持久性消息将在磁盘上持久化,而非持久性消息将丢失。
Message message = session.createTextMessage("Hello world");
message.setPersistent(true);
producer.send(message);
9. 桥接
允许将来自一个 ActiveMQ 代理器的消息转发到另一个代理器。桥接可用于连接多个 ActiveMQ 实例。
<bridge destination="forward.my-topic"
source="activemq:topic:my-topic"
brokerName="broker-b" />
10. 虚拟机传送
允许在同一个 JVM 内连接 ActiveMQ 的两个实例。这对于测试或在单机环境中创建分布式系统非常有用。
BrokerService brokerA = new BrokerService();
BrokerService brokerB = new BrokerService();
brokerA.setVmConnectorURI(brokerB.getVmConnectorURI());
brokerA.setBrokerName("broker-a");
brokerB.setBrokerName("broker-b");
brokerA.start();
brokerB.start();
11. 插件
ActiveMQ 提供了一种机制来扩展其功能。插件可以添加新功能,例如消息存储、安全性或监控。
<plugins>
<journalPlugin>
<journalDirectory>/tmp/journal</journalDirectory>
</journalPlugin>
</plugins>
12. 消息转换
ActiveMQ 允许在不同消息格式之间转换消息。转换器可以用于将 XML 转换为 JSON,或将文本消息转换为二进制消息。
MessageConverter converter = session.getMessageConverter();
Message message = converter.toMessage("Hello world", session);
producer.send(message);
13. 故障转移
通过使用故障转移代理或集群,确保消息代理故障时的可用性。当主代理失败时,备用代理将接管。
<broker cluster="my-cluster">
<networkConnectors>
<networkConnector name="tcp" uri="tcp://0.0.0.0:61616" />
</networkConnectors>
</broker>
14. 负载均衡
通过将消息负载分布到多个代理器,提高可扩展性和性能。ActiveMQ 支持轮询或基于消息大小的负载均衡策略。
<broker loadBalancingPolicy="round-robin" />
15. 监控
监控 ActiveMQ 代理器至关重要,以确保其正常运行和性能。ActiveMQ 提供了一个 JMX 仪表板和 REST API,用于监控代理器状态和消息流。
import org.apache.activemq.broker.jmx.BrokerViewMBean;
BrokerViewMBean brokerView = (BrokerViewMBean) MBeanServerFactory.createMBeanServer().getObjectInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"));
System.out.println("Message count: " + brokerView.getQueueSize());
16. 安全性
ActiveMQ 提供各种安全机制,包括 SSL、SASL 和访问控制列表。
<securitySettings>
<sslProtocols>TLSv1,TLSv1.1,TLSv1.2</sslProtocols>
<requireCredentialsForAllConnections>true</requireCredentialsForAllConnections>
<audit>true</audit>
</securitySettings>
17. 协议
ActiveMQ 支持多种消息传递协议,包括 AMQP、JMS 和 STOMP。
BrokerService broker = new BrokerService();
broker.setBrokerName("my-broker");
broker.addConnector("stomp://0.0.0.0:61613");
broker.start();
18. MQTT
MQTT(消息队列遥测传输)是一个轻量级协议,专为物联网 (IoT) 设备开发。ActiveMQ 支持 MQTT,使其成为连接物联网设备和企业系统的理想选择。
<mqttConnectors>
<mqttConnector name="mqtt" persist="true"
uri="mqtt://0.0.0.0:1883" />
</mqttConnectors>
19. Web 控制台
ActiveMQ 提供了一个 Web 控制台,允许管理员监控代理器状态、管理队列和主题,以及管理插件。
20. 故障排除
ActiveMQ 故障排除涉及检查日志文件、状态 GUI 和 JMX 仪表板。通过仔细分析错误消息和日志,可以快速诊断和解决问题。