导入业务下RocketMQ在SpringBoot中的使用

100

@[toc]

RocketMQ在SpringBoot中的使用

业务痛点

本次在项目中使用RocketMQ主要是因为:

当用户在页面上准备导入数据的时候,由于用户的Excel的条数很多,>10000条,每一条数据都需要校验数完整性,比如导入sku,需要判断每一条sku对应填写的单位是否是系统中存在该单位等,也就是说每一条数据都需要频繁的查询数据库,判断是否填写正确,那么很多的查询以及批量插入会将该导入接口的响应时间拉长

假设导入请求响应时间为35s,但是前端的http统一配置最大响应时间为30s,那么超过30s后响应的response则为响应超时,给用户的反馈是上传失败或超时,但是实际上等后台默默的执行了35s后,导入的数据其实都成功导入了,对用户体验不好,甚至用户可能重新上传

解决方案

使用异步方式导入,前端轮训间隔5s请求导入进度,或websocket实时推送进度(综合考虑5s只在总导入时长中占比很小,所以使用轮训请求接口)

效果

在这里插入图片描述
由于此处上传的文件比较小,后台在5s就处理了文件中的所有数据,所以进度条从0直接到100

集成RocketMQ

此处不介绍RocketMQ的搭建,网上教程很多

寻找合适的版本

SpringCloud Alibaba版本说明

锁定版本后再到Maven仓库找到对应版本的SpringBoot starter的版本

我使用的是RocketMQ4.4.0,对应的starter版本为2.0.2

添加依赖

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

添加配置

生产者

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    # 一般情况一个项目使用同一个生产者组
    group: my_producer_group

消费者

rocketmq:
  name-server: 127.0.0.1:9876

RocketMQ使用

生产消息

@Service
public class BaseImportServiceImpl<T> implements BaseImportService<T> {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;
    @Autowired
    private RedisService redisService;

    @Override
    public String baseImport(BaseImportDto importDto) {
        String uuid = UUID.randomUUID().toString();

        // 固定使用import_topic作为导入业务的统一主题,使用tag来区分导入类型,类型指的是a业务导入还是b业务导入等...
        SendResult sendResult = rocketMqTemplate.syncSend(
                "import_topic" + (importDto.getType() != null ? ":" + importDto.getType() : ""),
                new BaseImportMessage()
                        .setUuid(uuid)
                        .setCreateUid(SecurityUtils.getUserId())
                        .setCreateUname(SecurityUtils.getName())
                        .setTenantId(SecurityUtils.getTenantId())
                        .setFile(importDto.getFile()));
        if (!Objects.equals(SendStatus.SEND_OK, sendResult.getSendStatus())) {
            throw new CustomException("文件导入失败,请稍后再试");
        }

        return uuid;
    }

    @Override
    public BaseImportResult getImportProgress(String requestId) {
        // 在redis中查询uuid对应的导入进度,requestId指的是uuid
        String key = RedisConstants.buildConstKey(RedisConstants.IMPORT_RESULT_KEY + requestId);
        return (BaseImportResult) redisService.get(key);
    }
}

消费消息

@Slf4j
@Component
@RocketMQMessageListener(
        // 通常一个应用使用一个消费者组
        consumerGroup = "my-consumer-group",
        topic = "import_topic",
        selectorType = SelectorType.TAG,
        // 使用tag过滤出对应的导入业务
        selectorExpression = "sale_order",
        // 一定使用集群消费,避免广播消费在服务器集群下重复导入
        messageModel = MessageModel.CLUSTERING)
public class PasSaleOrderRocketMqListener implements RocketMQListener<BaseImportMessage> {

    @Autowired
    private PasSaleOrderExcelService pasSaleOrderExcelService;

    @Override
    public void onMessage(BaseImportMessage msg) {
        try {
            // 具体的导入业务
            pasSaleOrderExcelService.importExcel(msg);
        } catch (Throwable e) {
            // 这里如果消息抛出了异常,RocketMQ会将该消息放入到重试队列中,重试机制会让消息重新消费,我们此处的业务如果发生异常或者失败,直接将异常写入redis中,前端从redis中查询异常的结果即可,不需要重复消费,所以捕捉该异常,记录日志即可
            log.error("导入进度更新失败", e);
        }
    }
}