肥仔教程网

SEO 优化与 Web 开发技术学习分享平台

大白话RocketMQ之RocketMQ快速入门-Hello World

一、添加RocketMQ客户端依赖包

  <dependency>
       <groupId>org.apache.rocketmq</groupId>
       <artifactId>rocketmq-client</artifactId>
       <version>5.3.3</version>
  </dependency>
  • 注:查询maven的类包,可以到https://mvnrepository.com中查询
  • 可根据自身实际业务,选用当前最新稳定版本(与RocketMQ的版本保持兼容)

二、消息生产者生产消息

消息生产者生产消息有以下几个步骤:

1.实例化消息生产类

DefaultMQProducer producer = new DefaultMQProducer("item_group");

注:生产者组,一般默认为项目的名称

2.设置消息系统中的nameserver的地址

producer.setNamesrvAddr("192.168.0.200:9876");

注:192.168.0.200:9876替换为具体的nameserver的地址即可,多个nameserver(集群)使用;隔开即可

3.启动生产者

producer.start();

4.发送消息

Map<String, Object> orderMap = new HashMap<String,Object>(3);
orderMap.put("orderNo", orderNo);
orderMap.put("userId", 300124578);
orderMap.put("ticketId", "TX8547985689");

Message msg = new Message(topicName, tagName, msgKey, JSON.toJSONString(orderMap).getBytes());
SendResult result = producer.send(msg);

注:

  • 通过send方法可发送消息
  • send方法接受Message对象
  • Message实例化参数说明
    • topicName : 主题名称(分类名称)
    • tagName : 标签名称(子分类名称,一般是具体的业务)
    • msgKey : 消息唯一主键
    • msgBody : 消息体内容(是一个byte数组)

5.关闭生产者

producer.shutdown();

完整演示代码:

      //1.实例化生产者(指定生产者组)
        DefaultMQProducer producer = new DefaultMQProducer("item_group");
        //2.指定要连接的nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //3.启动生产者
        producer.start();
        for (int i = 0; i < 50; i++) {
            SteamItemContext context = new SteamItemContext();
            context.setItemId((long) ((i + 1) * 100));
            context.setImageUrl("https://steamcdn-a.akamaihd.net/steam/apps/" + (i + 1) * 100 + "/header.jpg");
            context.setName("SteamItem测试饰品" + (i + 1));

            //4.发送消息(默认是不允许自动创建topic的)
            SendResult result = producer.send(new Message("item_info", "tag_item_info", JsonUtil.toJsonString(context).getBytes(StandardCharsets.UTF_8)));
            log.info("result={}, messageId={}", result, result.getMsgId());
        }
        //5.关闭生产者
        producer.shutdown();

三、消息消费者消费消息

消息消费者消费消息一般分为下面几个步骤:

1.实例化消费者对象

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("item_consume_group");
  • 一般对于同一个消息,一个消费组只会其中一个消费者消费 

2.设置消费nameserver的地址

 consumer.setNamesrvAddr("192.168.0.200:9876");

3.订阅消息并注册监听器

consumer.subscribe("topicName", "tagName");
consumer.registerMessageListener(
    new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            //逻辑业务处理,当接收消息时做的处理动作
            //To DO
            LOGGER.info("receive message:{}", list);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
);
  • subscribe表示订阅消息,两个参数分别是订阅的主题名称和标签名称
  • registerMessageListener为注册监听器,参数为MessageListenerConcurrently对象(该对象必须要实现consumeMessage方法,当收到消息后会触发这个函数)

4.启动消费者服务

consumer.start();

四、使用Spring boot+RocketMQ实现消息生产者和消费者

1.maven依赖项:

pom.xml部分内容:

<!-- 公共配置参数(如各依赖项的版本号)-->

 <properties>
        <java.version>17</java.version>
        <rocketmq.client.version>5.3.3</rocketmq.client.version>
        <log4j.version>2.0.17</log4j.version>
        <lombok.version>1.18.38</lombok.version>
        <jackson.version>2.19.2</jackson.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

     <!-- 日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${log4j.version}</version>
            <scope>test</scope>
        </dependency>

   
        <!-- rocketmq client -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.client.version}</version>
        </dependency>

  
        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>${jackson.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2.应用配置

rocketmq:
  name-server-addr: 127.0.0.1:9876

3.项目配置

  • RocketMQ服务器相关属性配置
**
 * RocketMQ的属性配置
 * @author shixinke
 */
@ConfigurationProperties(prefix = "rocketmq")
@Configuration
@Data
public class RocketMQPropertiesConfig {
    //rocketMQ注册中心地址
    private String nameServerAddr;
    //生产者的配置
    private Producer producer;



    /**
     * 生产者组的配置
     */
    @Data
    public static class Producer {

        @Value("${spring.application.name}")
        private String applicationName;

       //生产者组的名称
        private String group;

        public String getGroup() {
            if (group == null) {
                group = applicationName;
            }
            return group;
        }
    }

}
  • bean配置
/**
 * rocketMQ配置
 * @author shixinke
 */
@Configuration
@Slf4j
public class RocketMQConfig {

    @Resource
    private RocketMQPropertiesConfig rocketMQPropertiesConfig;

    @Resource
    private ItemInfoConsumer itemInfoConsumer;


    /**
     * 生产者Bean
     * @return DefaultMQProducer
     */
    @Bean(
        name = "defaultMQProducer",
        initMethod = "start",
        destroyMethod = "shutdown"
    )
    public DefaultMQProducer defaultMQProducer() {
        DefaultMQProducer producer = new DefaultMQProducer(rocketMQPropertiesConfig.getProducer().getGroup());
        producer.setNamesrvAddr(rocketMQPropertiesConfig.getNameServerAddr());
        return producer;
    }

    /**
     * 生产者Bean
     * @return DefaultMQProducer
     */
    @Bean(
            name = "defaultMQProducer",
            initMethod = "start",
            destroyMethod = "shutdown"
    )
    public DefaultMQPushConsumer itemInfoConsumer() throws MQClientException {
        //1.实例化消费者(指定消费组)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("item_group");
        //2.设置要连接的nameServer地址
        consumer.setNamesrvAddr(rocketMQPropertiesConfig.getNameServerAddr());
        //3.订阅主题
        consumer.subscribe("item_info", "tag_item_info");
        //4.注册消息监听器(具体的消息消费逻辑)
        consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
            //确认是消费结果:成功或重试(失败)
            try {
                boolean result = itemInfoConsumer.consume(list);
                if (!result) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (Throwable e) {
                log.error("消费失败:", e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        return consumer;
    }

}

4.使用

  • 生产者使用
@Service
@Slf4j
public class ItemServiceImpl implements ItemService{

    @Resource
    private DefaultMQProducer defaultMQProducer;

    @Override
    public void create() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
        //生产消息
        SendResult result = defaultMQProducer.send(new Message("item_info", "tag_item_info", "10001".getBytes(StandardCharsets.UTF_8)));
        log.info("result={}, messageId={}", result, result.getMsgId());
    }
}
  • 消费者使用
@Component
@Slf4j
public class ItemInfoConsumer {

    public boolean consume(List<MessageExt> messages) {
        log.info("收到消息, messages={}", messages);
        //其他消费逻辑
        return true;
    }
}

遗留的问题:新增consumer时,每次,都需要在配置中新增

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言