肥仔教程网

SEO 优化与 Web 开发技术学习分享平台

就要!就要!就要!实现rocketmq顺序发送



又是一篇rocketmq啊,最近是和rocketmq杠上了。本章带来的是,rocketmq实现顺序发送(原生版本和spring版本)。

原生版本

前几天跑rocketmq原生demo的顺序发送的时候,发现好像没有按照顺序消费啊,只是根据参数求余进入不同的queue中,在各字的queue中顺序消费。

先上代码

生产者

消费者

测试

我们如果是做订单业务根据订单的id进行顺序消费是完全没有问题的,但是我现在就要就要就要所有的消息都顺序消费呢?

那么如何保证在topic中也能顺序消费,而不止是queue呢?

我们把路由全部改成同一个看看

生产者


消费者不变

结果发现他们走了同一个队列,且是顺序消费的。

本来到这里是结束的。但是我还没有和springboot整合呢?

和springboot整合

和spring整合,我们调用的是syncSendOrderly这个api


/**
 * consumemodel:测试有序消息队列
 *
 * @return java.lang.Object
 * @throws
 * @Date 2021/4/25 5:07 下午
 */
@RequestMapping("/testOrderly")
public Object testOrderly() {
    // 循环构造对象
    for (int i = 1; i <= 30; i++) {
        // 将循环下标当成用户id
        Users users = Users.builder().id(i).name(RandomUtil.randomNumbers(4)).build();
        // 同步发送
        SendResult sendResult = rocketMQTemplate.syncSendOrderly(MqConfig.Topic.TOPICUSERSORDERLY, users, "hashkey");
        // 屏蔽日志输出,方便监控消费者日志输出
        // log.info("MQ发送结果:{}", sendResult);
    }
    return "有序批量发送完成";
}

在消费者这边

@Slf4j
@Service
@RocketMQMessageListener(
        topic = MqConfig.Topic.TOPICUSERSORDERLY, //topic
        consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERSORDERLY, //分组规则,Group-"topic命名"
//        consumeMode = ConsumeMode.ORDERLY, //并发单线程顺序模式
        //下边可以去掉,都使用的是默认值
        messageModel = MessageModel.CLUSTERING, //默认值,集群模式
        selectorType = SelectorType.TAG, // 默认值,标签
        selectorExpression = "*" // 默认值,匹配该topic下所有tag
)
public class UsersOrderlyConsumer implements RocketMQListener<Users> {

    /**
     * 直接接收 Users 对象
     *
     * @Author: sun
     * @Date: 2021/4/25 5:03 下午
     */
    @Override
    public void onMessage(Users body) {
        // 接收到的序列都是有序的
        log.info("MQ消费者接收(有序队列):Topic:{},用户ID:{},用户名:{}", MqConfig.Topic.TOPICUSERSORDERLY, body.getId(), body.getName());
    }

}

测试结果:


好像和我们想象的不一样啊。

原因是我们没有在消费者设置并发单线程顺序模式


@Slf4j
@Service
@RocketMQMessageListener(
        topic = MqConfig.Topic.TOPICUSERSORDERLY, //topic
        consumerGroup = MqConfig.GROUP_PREFFIX + MqConfig.Topic.TOPICUSERSORDERLY, //分组规则,Group-"topic命名"
        consumeMode = ConsumeMode.ORDERLY, //并发单线程顺序模式
        //下边可以去掉,都使用的是默认值
        messageModel = MessageModel.CLUSTERING, //默认值,集群模式
        selectorType = SelectorType.TAG, // 默认值,标签
        selectorExpression = "*" // 默认值,匹配该topic下所有tag
)
public class UsersOrderlyConsumer implements RocketMQListener<Users> {

    /**
     * 直接接收 Users 对象
     *
     * @Author: sun
     * @Date: 2021/4/25 5:03 下午
     */
    @Override
    public void onMessage(Users body) {
        // 接收到的序列都是有序的
        log.info("MQ消费者接收(有序队列):Topic:{},用户ID:{},用户名:{}", MqConfig.Topic.TOPICUSERSORDERLY, body.getId(), body.getName());
    }

}

测试

产生的疑问

发现的的确确是顺序消费,本来到此已经结束了,结果我的同事过来插了一缸。

以下是他的疑问

一:如果我们启动一个服务,发送消息会被收到,我们再启动另外一个服务呢,此时是两边各走一半还是只走一边呢?

经过测试:针对第一个问题,在本地起了两个服务,发现只有一个服务会消费,另外一个服务是不会消费的

二:我们再把那个消费信息的服务关闭,继续执行发送消息,此时的之前没有消费消息的服务还会消费吗?

经过测试:把那个消费信息的服务关闭,继续执行发送消息,此时的之前没有消费消息的服务还会消费,只是会比较慢而已。

代码地址

https://gitee.com/rdweiif/rocketmq-spring-boot-starter-demo.git

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言