Skip to content

消息队列

概述

消息队列用于解耦系统组件,缓冲并异步传递数据,提升系统可扩展性、可靠性与性能,MMS-MQ 是一个基于 RabbitMQ 的消息队列系统,支持多种消息处理器的动态切换。本教程将详细介绍如何使用该系统进行消息的注册、发送和消费。

消息队列配置

  1. 引入依赖
xml
<dependency>
    <groupId>com.sxpcwlkj</groupId>
    <artifactId>mms-mq</artifactId>
</dependency>
  1. 配置消息队列连接信息
yaml
--- # rabbitmq
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    # 生产者配置
    publisher-confirm-type: correlated  # 发布确认模式
    publisher-returns: true  # 开启返回模式
    template:
      mandatory: true  # 开启强制消息投递
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual                       # 手动确认模式
        prefetch: 10                                   # 每次处理10条消息
        concurrency: 5                                 # 最小并发数
        max-concurrency: 10                            # 最大并发数
        retry:
          enabled: true                                # 开启重试
          max-attempts: 3                              # 最大重试次数
          initial-interval: 1000                       # 重试间隔

    # 自定义 RabbitMQ 配置
    config:
      # 主交换机配置
      exchange: distribution.exchange          # 主交换机名称 - 负责接收和路由消息
      queue: distribution.queue               # 主队列名称 - 存储待处理的分销消息
      routing-key: distribution.key           # 路由键 - 用于将消息从交换机路由到队列的匹配规则

      # 死信队列配置(用于处理失败消息)
      dead-letter-exchange: distribution.dlx.exchange    # 死信交换机 - 接收无法正常处理的消息
      dead-letter-queue: distribution.dlx.queue          # 死信队列 - 存储处理失败的消息,用于人工干预
      dead-letter-routing-key: distribution.dlx.key      # 死信路由键 - 死信消息的路由规则

      # 延时队列配置(用于延迟处理消息)
      delay-queue: distribution.delay.queue              # 延时队列名称
      delay-exchange: distribution.delay.exchange        # 延时交换机名称
      delay-routing-key: distribution.delay.key          # 延时路由键
      delay-time: 60000                                  # 默认延时时间(毫秒) - 1分钟

      # 重试配置
      max-retry-count: 3                                 # 最大重试次数
      retry-interval: 5000                               # 重试间隔(毫秒)

详细Demo:MqDemo.java

消息处理器注册

  1. 创建自定义消息处理器

首先,你需要创建一个实现了 MqHandler 接口的类:

java
@Service
public class CustomMqHandler implements MqHandler {
    
    @Override
    public boolean handleMessage(DistributionMessage message) {
        // 实现你的业务逻辑
        System.out.println("处理订单消息: " + message.getOrderId());
        return true; // 处理成功返回true,失败返回false
    }
    
    @Override
    public boolean isMessageProcessed(String orderId) {
        // 检查消息是否已经被处理过,用于幂等性控制
        return false; // 如果已处理返回true,未处理返回false
    }
    
    @Override
    public String getHandlerType() {
        // 返回处理器的唯一标识
        return "custom_handler";
    }
}
  1. 注册处理器

在系统启动时,你需要将你的处理器注册到系统中。这通常是通过配置类完成的:

java
@Configuration
@RequiredArgsConstructor
@Slf4j
public class MQRegister {
    
    private final MqService mqService;
    private final CustomMqHandler customMqHandler;
    
    @PostConstruct
    public void registerHandlers() {
        // 注册你的自定义处理器
        mqService.registerHandler(customMqHandler);
        
        // 设置当前使用的处理器
        if (mqService.setCurrentHandler("custom_handler")) {
            log.info("✅ 当前处理器设置为: custom_handler");
        }
    }
}

一、发送消息

1 注入 MqService

在你的业务类中注入 MqService:

java
@Service
public class OrderService {
    
    @Autowired
    private MqService mqService;
    
    // 你的业务方法
}

2 构造并发送消息

java
public void createOrder(Order order) {
    // 构造消息对象
    DistributionMessage message = new DistributionMessage();
    message.setOrderId(order.getId());
    message.setBuyerId(order.getBuyerId());
    message.setBusinessType("ORDER");
    
    // 发送消息到主队列
    boolean success = mqService.sendToMainQueue(message);
    
    if (success) {
        log.info("订单消息发送成功: {}", order.getId());
    } else {
        log.error("订单消息发送失败: {}", order.getId());
    }
}

二、消费消息与处理器匹配

1 消费流程

当消息被发送到队列后,系统会按照以下流程进行消费:

  1. @RabbitListener 监听器接收到消息
  2. 调用 onMessage 方法处理消息
  3. 根据当前设置的处理器类型获取对应的处理器
  4. 使用该处理器处理消息

2 处理器匹配机制

处理器的匹配不是基于消息内容,而是基于系统当前的配置:

java
// 在 onMessage 方法中
MqHandler handler = getCurrentMessageHandler(); // 根据 currentHandlerType 获取处理器
boolean success = handler.handleMessage(distributionMessage); // 处理消息

currentHandlerType 的值决定了使用哪个处理器:

  • 如果设置为 "custom_handler",则使用你自定义的处理器
  • 如果设置为 "distribution",则使用分销处理器
  • 如果设置为 "default",则使用默认处理器

3 动态切换处理器

你可以在运行时动态切换处理器:

java
// 切换到自定义处理器
mqService.setCurrentHandler("custom_handler");

// 切换到分销处理器
mqService.setCurrentHandler("distribution");

// 切换到默认处理器
mqService.setCurrentHandler("default");

三、完整示例

1 自定义处理器

java
@Service
public class OrderMqHandler implements MqHandler {
    
    @Override
    public boolean handleMessage(DistributionMessage message) {
        System.out.println("处理订单: " + message.getOrderId());
        // 实现订单处理逻辑
        return true;
    }
    
    @Override
    public boolean isMessageProcessed(String orderId) {
        // 检查订单是否已处理
        return false;
    }
    
    @Override
    public String getHandlerType() {
        return "order_handler";
    }
}

2 注册配置

java
@Configuration
@RequiredArgsConstructor
public class OrderMQRegister {
    
    private final MqService mqService;
    private final OrderMqHandler orderMqHandler;
    
    @PostConstruct
    public void registerHandlers() {
        mqService.registerHandler(orderMqHandler);
        mqService.setCurrentHandler("order_handler");
    }
}

3 消息发送

java
@Service
public class OrderService {
    
    @Autowired
    private MqService mqService;
    
    public void createOrder(Order order) {
        DistributionMessage message = new DistributionMessage();
        message.setOrderId(order.getId());
        message.setBuyerId(order.getBuyerId());
        
        mqService.sendToMainQueue(message);
    }
}

这样,当订单创建时,消息会被发送到队列,然后由你注册的 OrderMqHandler 处理器进行处理。

Released under the MIT License.