一、添加RocketMQ客户端依赖包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.3.3</version>
</dependency>- 注:查询maven的类包,可以到https://mvnrepository.com中查询
- 可根据自身实际业务,选用当前最新稳定版本(与RocketMQ的版本保持兼容)
二、消息生产者生产消息
消息生产者生产消息有以下几个步骤:
1.实例化消息生产类
DefaultMQProducer producer = new DefaultMQProducer("item_group");注:生产者组,一般默认为项目的名称
2.设置消息系统中的nameserver的地址
producer.setNamesrvAddr("192.168.0.200:9876");注:192.168.0.200:9876替换为具体的nameserver的地址即可,多个nameserver(集群)使用;隔开即可
3.启动生产者
producer.start();4.发送消息
Map<String, Object> orderMap = new HashMap<String,Object>(3);
orderMap.put("orderNo", orderNo);
orderMap.put("userId", 300124578);
orderMap.put("ticketId", "TX8547985689");
Message msg = new Message(topicName, tagName, msgKey, JSON.toJSONString(orderMap).getBytes());
SendResult result = producer.send(msg);注:
- 通过send方法可发送消息
- send方法接受Message对象
- Message实例化参数说明
- topicName : 主题名称(分类名称)
- tagName : 标签名称(子分类名称,一般是具体的业务)
- msgKey : 消息唯一主键
- msgBody : 消息体内容(是一个byte数组)
5.关闭生产者
producer.shutdown();完整演示代码:
//1.实例化生产者(指定生产者组)
DefaultMQProducer producer = new DefaultMQProducer("item_group");
//2.指定要连接的nameServer地址
producer.setNamesrvAddr("localhost:9876");
//3.启动生产者
producer.start();
for (int i = 0; i < 50; i++) {
SteamItemContext context = new SteamItemContext();
context.setItemId((long) ((i + 1) * 100));
context.setImageUrl("https://steamcdn-a.akamaihd.net/steam/apps/" + (i + 1) * 100 + "/header.jpg");
context.setName("SteamItem测试饰品" + (i + 1));
//4.发送消息(默认是不允许自动创建topic的)
SendResult result = producer.send(new Message("item_info", "tag_item_info", JsonUtil.toJsonString(context).getBytes(StandardCharsets.UTF_8)));
log.info("result={}, messageId={}", result, result.getMsgId());
}
//5.关闭生产者
producer.shutdown();三、消息消费者消费消息
消息消费者消费消息一般分为下面几个步骤:
1.实例化消费者对象
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("item_consume_group");- 一般对于同一个消息,一个消费组只会其中一个消费者消费
2.设置消费nameserver的地址
consumer.setNamesrvAddr("192.168.0.200:9876");3.订阅消息并注册监听器
consumer.subscribe("topicName", "tagName");
consumer.registerMessageListener(
new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
//逻辑业务处理,当接收消息时做的处理动作
//To DO
LOGGER.info("receive message:{}", list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
);- subscribe表示订阅消息,两个参数分别是订阅的主题名称和标签名称
- registerMessageListener为注册监听器,参数为MessageListenerConcurrently对象(该对象必须要实现consumeMessage方法,当收到消息后会触发这个函数)
4.启动消费者服务
consumer.start();四、使用Spring boot+RocketMQ实现消息生产者和消费者
1.maven依赖项:
pom.xml部分内容:
<!-- 公共配置参数(如各依赖项的版本号)-->
<properties>
<java.version>17</java.version>
<rocketmq.client.version>5.3.3</rocketmq.client.version>
<log4j.version>2.0.17</log4j.version>
<lombok.version>1.18.38</lombok.version>
<jackson.version>2.19.2</jackson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${log4j.version}</version>
<scope>test</scope>
</dependency>
<!-- rocketmq client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.client.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.应用配置
rocketmq:
name-server-addr: 127.0.0.1:98763.项目配置
- RocketMQ服务器相关属性配置
**
* RocketMQ的属性配置
* @author shixinke
*/
@ConfigurationProperties(prefix = "rocketmq")
@Configuration
@Data
public class RocketMQPropertiesConfig {
//rocketMQ注册中心地址
private String nameServerAddr;
//生产者的配置
private Producer producer;
/**
* 生产者组的配置
*/
@Data
public static class Producer {
@Value("${spring.application.name}")
private String applicationName;
//生产者组的名称
private String group;
public String getGroup() {
if (group == null) {
group = applicationName;
}
return group;
}
}
}- bean配置
/**
* rocketMQ配置
* @author shixinke
*/
@Configuration
@Slf4j
public class RocketMQConfig {
@Resource
private RocketMQPropertiesConfig rocketMQPropertiesConfig;
@Resource
private ItemInfoConsumer itemInfoConsumer;
/**
* 生产者Bean
* @return DefaultMQProducer
*/
@Bean(
name = "defaultMQProducer",
initMethod = "start",
destroyMethod = "shutdown"
)
public DefaultMQProducer defaultMQProducer() {
DefaultMQProducer producer = new DefaultMQProducer(rocketMQPropertiesConfig.getProducer().getGroup());
producer.setNamesrvAddr(rocketMQPropertiesConfig.getNameServerAddr());
return producer;
}
/**
* 生产者Bean
* @return DefaultMQProducer
*/
@Bean(
name = "defaultMQProducer",
initMethod = "start",
destroyMethod = "shutdown"
)
public DefaultMQPushConsumer itemInfoConsumer() throws MQClientException {
//1.实例化消费者(指定消费组)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("item_group");
//2.设置要连接的nameServer地址
consumer.setNamesrvAddr(rocketMQPropertiesConfig.getNameServerAddr());
//3.订阅主题
consumer.subscribe("item_info", "tag_item_info");
//4.注册消息监听器(具体的消息消费逻辑)
consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
//确认是消费结果:成功或重试(失败)
try {
boolean result = itemInfoConsumer.consume(list);
if (!result) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Throwable e) {
log.error("消费失败:", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
return consumer;
}
}
4.使用
- 生产者使用
@Service
@Slf4j
public class ItemServiceImpl implements ItemService{
@Resource
private DefaultMQProducer defaultMQProducer;
@Override
public void create() throws MQBrokerException, RemotingException, InterruptedException, MQClientException {
//生产消息
SendResult result = defaultMQProducer.send(new Message("item_info", "tag_item_info", "10001".getBytes(StandardCharsets.UTF_8)));
log.info("result={}, messageId={}", result, result.getMsgId());
}
}- 消费者使用
@Component
@Slf4j
public class ItemInfoConsumer {
public boolean consume(List<MessageExt> messages) {
log.info("收到消息, messages={}", messages);
//其他消费逻辑
return true;
}
}
遗留的问题:新增consumer时,每次,都需要在配置中新增