消息队列
概述
消息队列用于解耦系统组件,缓冲并异步传递数据,提升系统可扩展性、可靠性与性能,MMS-MQ 是一个基于 RabbitMQ 的消息队列系统,支持多种消息处理器的动态切换。本教程将详细介绍如何使用该系统进行消息的注册、发送和消费。
消息队列配置
- 引入依赖
xml
<dependency>
<groupId>com.sxpcwlkj</groupId>
<artifactId>mms-mq</artifactId>
</dependency>- 配置消息队列连接信息
yaml
--- # rabbitmq
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 生产者配置
publisher-confirm-type: correlated # 发布确认模式
publisher-returns: true # 开启返回模式
template:
mandatory: true # 开启强制消息投递
# 消费者配置
listener:
simple:
acknowledge-mode: manual # 手动确认模式
prefetch: 10 # 每次处理10条消息
concurrency: 5 # 最小并发数
max-concurrency: 10 # 最大并发数
retry:
enabled: true # 开启重试
max-attempts: 3 # 最大重试次数
initial-interval: 1000 # 重试间隔
# 自定义 RabbitMQ 配置
config:
# 主交换机配置
exchange: distribution.exchange # 主交换机名称 - 负责接收和路由消息
queue: distribution.queue # 主队列名称 - 存储待处理的分销消息
routing-key: distribution.key # 路由键 - 用于将消息从交换机路由到队列的匹配规则
# 死信队列配置(用于处理失败消息)
dead-letter-exchange: distribution.dlx.exchange # 死信交换机 - 接收无法正常处理的消息
dead-letter-queue: distribution.dlx.queue # 死信队列 - 存储处理失败的消息,用于人工干预
dead-letter-routing-key: distribution.dlx.key # 死信路由键 - 死信消息的路由规则
# 延时队列配置(用于延迟处理消息)
delay-queue: distribution.delay.queue # 延时队列名称
delay-exchange: distribution.delay.exchange # 延时交换机名称
delay-routing-key: distribution.delay.key # 延时路由键
delay-time: 60000 # 默认延时时间(毫秒) - 1分钟
# 重试配置
max-retry-count: 3 # 最大重试次数
retry-interval: 5000 # 重试间隔(毫秒)详细Demo:MqDemo.java
消息处理器注册
- 创建自定义消息处理器
首先,你需要创建一个实现了 MqHandler 接口的类:
java
@Service
public class CustomMqHandler implements MqHandler {
@Override
public boolean handleMessage(DistributionMessage message) {
// 实现你的业务逻辑
System.out.println("处理订单消息: " + message.getOrderId());
return true; // 处理成功返回true,失败返回false
}
@Override
public boolean isMessageProcessed(String orderId) {
// 检查消息是否已经被处理过,用于幂等性控制
return false; // 如果已处理返回true,未处理返回false
}
@Override
public String getHandlerType() {
// 返回处理器的唯一标识
return "custom_handler";
}
}- 注册处理器
在系统启动时,你需要将你的处理器注册到系统中。这通常是通过配置类完成的:
java
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MQRegister {
private final MqService mqService;
private final CustomMqHandler customMqHandler;
@PostConstruct
public void registerHandlers() {
// 注册你的自定义处理器
mqService.registerHandler(customMqHandler);
// 设置当前使用的处理器
if (mqService.setCurrentHandler("custom_handler")) {
log.info("✅ 当前处理器设置为: custom_handler");
}
}
}一、发送消息
1 注入 MqService
在你的业务类中注入 MqService:
java
@Service
public class OrderService {
@Autowired
private MqService mqService;
// 你的业务方法
}2 构造并发送消息
java
public void createOrder(Order order) {
// 构造消息对象
DistributionMessage message = new DistributionMessage();
message.setOrderId(order.getId());
message.setBuyerId(order.getBuyerId());
message.setBusinessType("ORDER");
// 发送消息到主队列
boolean success = mqService.sendToMainQueue(message);
if (success) {
log.info("订单消息发送成功: {}", order.getId());
} else {
log.error("订单消息发送失败: {}", order.getId());
}
}二、消费消息与处理器匹配
1 消费流程
当消息被发送到队列后,系统会按照以下流程进行消费:
- @RabbitListener 监听器接收到消息
- 调用 onMessage 方法处理消息
- 根据当前设置的处理器类型获取对应的处理器
- 使用该处理器处理消息
2 处理器匹配机制
处理器的匹配不是基于消息内容,而是基于系统当前的配置:
java
// 在 onMessage 方法中
MqHandler handler = getCurrentMessageHandler(); // 根据 currentHandlerType 获取处理器
boolean success = handler.handleMessage(distributionMessage); // 处理消息currentHandlerType 的值决定了使用哪个处理器:
- 如果设置为 "custom_handler",则使用你自定义的处理器
- 如果设置为 "distribution",则使用分销处理器
- 如果设置为 "default",则使用默认处理器
3 动态切换处理器
你可以在运行时动态切换处理器:
java
// 切换到自定义处理器
mqService.setCurrentHandler("custom_handler");
// 切换到分销处理器
mqService.setCurrentHandler("distribution");
// 切换到默认处理器
mqService.setCurrentHandler("default");三、完整示例
1 自定义处理器
java
@Service
public class OrderMqHandler implements MqHandler {
@Override
public boolean handleMessage(DistributionMessage message) {
System.out.println("处理订单: " + message.getOrderId());
// 实现订单处理逻辑
return true;
}
@Override
public boolean isMessageProcessed(String orderId) {
// 检查订单是否已处理
return false;
}
@Override
public String getHandlerType() {
return "order_handler";
}
}2 注册配置
java
@Configuration
@RequiredArgsConstructor
public class OrderMQRegister {
private final MqService mqService;
private final OrderMqHandler orderMqHandler;
@PostConstruct
public void registerHandlers() {
mqService.registerHandler(orderMqHandler);
mqService.setCurrentHandler("order_handler");
}
}3 消息发送
java
@Service
public class OrderService {
@Autowired
private MqService mqService;
public void createOrder(Order order) {
DistributionMessage message = new DistributionMessage();
message.setOrderId(order.getId());
message.setBuyerId(order.getBuyerId());
mqService.sendToMainQueue(message);
}
}这样,当订单创建时,消息会被发送到队列,然后由你注册的 OrderMqHandler 处理器进行处理。
