导入业务下RocketMQ在SpringBoot中的使用
@[toc]
RocketMQ在SpringBoot中的使用
业务痛点
本次在项目中使用RocketMQ主要是因为:
当用户在页面上准备导入数据的时候,由于用户的Excel的条数很多,>10000条,每一条数据都需要校验数完整性,比如导入sku,需要判断每一条sku对应填写的单位是否是系统中存在该单位等,也就是说每一条数据都需要频繁的查询数据库,判断是否填写正确,那么很多的查询以及批量插入会将该导入接口的响应时间拉长
假设导入请求响应时间为35s,但是前端的http统一配置最大响应时间为30s,那么超过30s后响应的response则为响应超时,给用户的反馈是上传失败或超时,但是实际上等后台默默的执行了35s后,导入的数据其实都成功导入了,对用户体验不好,甚至用户可能重新上传
解决方案
使用异步方式导入,前端轮训间隔5s请求导入进度,或websocket实时推送进度(综合考虑5s只在总导入时长中占比很小,所以使用轮训请求接口)
效果
由于此处上传的文件比较小,后台在5s就处理了文件中的所有数据,所以进度条从0直接到100
集成RocketMQ
此处不介绍RocketMQ的搭建,网上教程很多
寻找合适的版本
锁定版本后再到Maven仓库找到对应版本的SpringBoot starter的版本
我使用的是RocketMQ4.4.0,对应的starter版本为2.0.2
添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.2</version>
</dependency>
添加配置
生产者
rocketmq:
name-server: 127.0.0.1:9876
producer:
# 一般情况一个项目使用同一个生产者组
group: my_producer_group
消费者
rocketmq:
name-server: 127.0.0.1:9876
RocketMQ使用
生产消息
@Service
public class BaseImportServiceImpl<T> implements BaseImportService<T> {
@Autowired
private RocketMQTemplate rocketMqTemplate;
@Autowired
private RedisService redisService;
@Override
public String baseImport(BaseImportDto importDto) {
String uuid = UUID.randomUUID().toString();
// 固定使用import_topic作为导入业务的统一主题,使用tag来区分导入类型,类型指的是a业务导入还是b业务导入等...
SendResult sendResult = rocketMqTemplate.syncSend(
"import_topic" + (importDto.getType() != null ? ":" + importDto.getType() : ""),
new BaseImportMessage()
.setUuid(uuid)
.setCreateUid(SecurityUtils.getUserId())
.setCreateUname(SecurityUtils.getName())
.setTenantId(SecurityUtils.getTenantId())
.setFile(importDto.getFile()));
if (!Objects.equals(SendStatus.SEND_OK, sendResult.getSendStatus())) {
throw new CustomException("文件导入失败,请稍后再试");
}
return uuid;
}
@Override
public BaseImportResult getImportProgress(String requestId) {
// 在redis中查询uuid对应的导入进度,requestId指的是uuid
String key = RedisConstants.buildConstKey(RedisConstants.IMPORT_RESULT_KEY + requestId);
return (BaseImportResult) redisService.get(key);
}
}
消费消息
@Slf4j
@Component
@RocketMQMessageListener(
// 通常一个应用使用一个消费者组
consumerGroup = "my-consumer-group",
topic = "import_topic",
selectorType = SelectorType.TAG,
// 使用tag过滤出对应的导入业务
selectorExpression = "sale_order",
// 一定使用集群消费,避免广播消费在服务器集群下重复导入
messageModel = MessageModel.CLUSTERING)
public class PasSaleOrderRocketMqListener implements RocketMQListener<BaseImportMessage> {
@Autowired
private PasSaleOrderExcelService pasSaleOrderExcelService;
@Override
public void onMessage(BaseImportMessage msg) {
try {
// 具体的导入业务
pasSaleOrderExcelService.importExcel(msg);
} catch (Throwable e) {
// 这里如果消息抛出了异常,RocketMQ会将该消息放入到重试队列中,重试机制会让消息重新消费,我们此处的业务如果发生异常或者失败,直接将异常写入redis中,前端从redis中查询异常的结果即可,不需要重复消费,所以捕捉该异常,记录日志即可
log.error("导入进度更新失败", e);
}
}
}