又是一篇rocketmq啊,最近是和rocketmq杠上了。本章带来的是,rocketmq实现顺序发送(原生版本和spring版本)。
原生版本
前几天跑rocketmq原生demo的顺序发送的时候,发现好像没有按照顺序消费啊,只是根据参数求余进入不同的queue中,在各字的queue中顺序消费。
先上代码
生产者
消费者
测试
我们如果是做订单业务根据订单的id进行顺序消费是完全没有问题的,但是我现在就要就要就要所有的消息都顺序消费呢?
那么如何保证在topic中也能顺序消费,而不止是queue呢?
我们把路由全部改成同一个看看
生产者
消费者不变
结果发现他们走了同一个队列,且是顺序消费的。
本来到这里是结束的。但是我还没有和springboot整合呢?
和springboot整合
和spring整合,我们调用的是syncSendOrderly这个api
/**
* consumemodel:测试有序消息队列
*
* @return java.lang.Object
* @throws
* @Date 2021/4/25 5:07 下午
*/
@RequestMapping("/testOrderly")
public Object testOrderly() {
// 循环构造对象
for (int i = 1; i <= 30; i++) {
// 将循环下标当成用户id
Users users = Users.builder().id(i).name(RandomUtil.randomNumbers(4)).build();
// 同步发送
SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConfig.Topic.TOPICUSERSORDERLY, users, "hashkey");
// 屏蔽日志输出,方便监控消费者日志输出
// log.info("MQ发送结果:{}", sendResult);
}
return "有序批量发送完成";
}
在消费者这边
@Slf4j
@Service
@RocketMQMessageListener(
topic = MqConfig.Topic.TOPICUSERSORDERLY, //topic
consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERSORDERLY, //分组规则,Group-"topic命名"
// consumeMode = ConsumeMode.ORDERLY, //并发单线程顺序模式
//下边可以去掉,都使用的是默认值
messageModel = MessageModel.CLUSTERING, //默认值,集群模式
selectorType = SelectorType.TAG, // 默认值,标签
selectorExpression = "*" // 默认值,匹配该topic下所有tag
)
public class UsersOrderlyConsumer implements RocketMQListener<Users> {
/**
* 直接接收 Users 对象
*
* @Author: sun
* @Date: 2021/4/25 5:03 下午
*/
@Override
public void onMessage(Users body) {
// 接收到的序列都是有序的
log.info("MQ消费者接收(有序队列):Topic:{},用户ID:{},用户名:{}", MqConfig.Topic.TOPICUSERSORDERLY, body.getId(), body.getName());
}
}
测试结果:
好像和我们想象的不一样啊。
原因是我们没有在消费者设置并发单线程顺序模式
@Slf4j
@Service
@RocketMQMessageListener(
topic = MqConfig.Topic.TOPICUSERSORDERLY, //topic
consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERSORDERLY, //分组规则,Group-"topic命名"
consumeMode = ConsumeMode.ORDERLY, //并发单线程顺序模式
//下边可以去掉,都使用的是默认值
messageModel = MessageModel.CLUSTERING, //默认值,集群模式
selectorType = SelectorType.TAG, // 默认值,标签
selectorExpression = "*" // 默认值,匹配该topic下所有tag
)
public class UsersOrderlyConsumer implements RocketMQListener<Users> {
/**
* 直接接收 Users 对象
*
* @Author: sun
* @Date: 2021/4/25 5:03 下午
*/
@Override
public void onMessage(Users body) {
// 接收到的序列都是有序的
log.info("MQ消费者接收(有序队列):Topic:{},用户ID:{},用户名:{}", MqConfig.Topic.TOPICUSERSORDERLY, body.getId(), body.getName());
}
}
测试
产生的疑问
发现的的确确是顺序消费,本来到此已经结束了,结果我的同事过来插了一缸。
以下是他的疑问
一:如果我们启动一个服务,发送消息会被收到,我们再启动另外一个服务呢,此时是两边各走一半还是只走一边呢?
经过测试:针对第一个问题,在本地起了两个服务,发现只有一个服务会消费,另外一个服务是不会消费的
二:我们再把那个消费信息的服务关闭,继续执行发送消息,此时的之前没有消费消息的服务还会消费吗?
经过测试:把那个消费信息的服务关闭,继续执行发送消息,此时的之前没有消费消息的服务还会消费,只是会比较慢而已。
代码地址
https://gitee.com/rdweiif/rocketmq-spring-boot-starter-demo.git