一、MQ简介
MQ:Message Queue,即消息队列,是一种应用程序之间的消息通信,简单理解就是A服务不断的往队列里发布信息,另一服务B从队列中读取消息并执行处理,消息发布者不需要关心是谁消费了消息,消息消费者不需要关心发布消息的是谁,互不干扰。
消息队列主要作用和优势:
- 异步和解耦
- 以电商订单处理为例,用户提交一个订单,如果订单系统以同步方式调用 支付、库存等服务。服务之间就会有很高耦合性,容错性会降低。一旦支付或库存服务宕机,那么就会导致订单提交失败,整个交易的异常。从而影响用户的体验。
如果中间加入了消息中间件,不管是支付还是库存等服务,通过异步的方式进行调用的,如果其中一个服务宕机了,提交订单仍收到正常响应,不会影响用户下单的使用,同时不用等待其它服务的结果,提高了处理效率。然后等服务恢复后可以从消息队列获取消息,继续进行订单的后续处理。
- 流量削峰
流量削峰也叫削峰填谷,例如一些电商秒杀或购物节活动,都会使用到消息中间件。
如果在不使用消息中间件,活动期间每秒是很高的并发,如果A服务需要要将数据写入到MYSQL中,由于MYSQL本身服务的上限,会有大量的请求堆积在A服务去处理,结果会导致A服务的的崩溃。
这时将用户的请求写入存储到MQ中,因为消息中间件本身是对数据量处理比较高的一个系统,所以对于高并发的请求,消息中间件仍可以处理,然后A服务作为消息中间件的一个消费者,以固定的速度从MQ中拉取消息,处理并完成相关业务操作,进而确保我们A服务的稳定性。
- 数据分发
- 可扩展性等
常见的MQ主要有:RocketMQ、ActiveMQ、RabbitMQ、Kafka,再次不做比较,各有优势。其中RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件。历经过很多高并发数据处理的磨练,无数业务场景的应用,足以证明它的优秀。
二、RocketMQ结构
RocketMQ结构主要为上方的四部分, Producer、NameServer、Broker、Consumer 组成
- Producer 消息生产者
由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。
- Consumer 消息消费者
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序,执行完成后会发送一个消息给Broker进行确认,这个就是ACK确认。提供了两种消费形式:PullConsumer(拉取式消费)、PushConsumer(推动式消费)。
- NameServer 名称服务
名称服务是一个Topic路由注册中心,充当路由消息的提供者。生产者或消费者能够通过名字服务查找各Topic相应的Broker IP列表。
- Broker 代理服务器
主要负责接收来自 Producer 的消息并存储,Consumer 从这里取得消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,Message Queue 用于存储消息的物理地址。消息相关的元数据,包括用户组、消费进度偏移量、队列信息等。
三、环境搭建
- rocketmq安装
wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip
unzip rocketmq-all-4.9.4-bin-release.zip
- rocketmq启动
前提条件:要进行如下两个文件的内存使用进行修改,不然无法启动
cd bin
vim runserver.sh
vim runbroker.sh
启动NameSrv:
rocketmq-all-4.9.4-bin-release下的bin目录
本地部署(linux)
nohup ./mqnamesrv & 启动NameServer
tail -f ~/logs/rocketmqlogs/namesrv.log 查看日志
或者tail -f nohup.out 查看报错信息
启动Broker:
rocketmq-all-4.9.4-bin-release下的bin目录
本地部署(linux)
nohup ./borker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
或者 tail -f nohup.out
确认启动:
- 消息发送确认
cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Producer
此时若安装了rocketmq控制台,也可以看到发送到broker的Topic=TopicTest下的消息,如下图:
- 消息接收确认
cd bin
export NAMESRV_ADDR=localhost:9876
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
- 关闭RocketMQ
cd bin
./mqshutdown broker
./mqshutdown namesrv
- RocketMQ控制台安装
git clone https://github.com/apache/rocketmq-dashboard.git
cd rocketmq-dashboard
vim ./src/main/resources/application.yml
# server.port可以修改为自己想要的端口
# namesAddrs 配置上本机IP和端口
mvn clean package -Dmaven.test.skip=true
cd target
java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar
至此RocketMQ控制台部署完成,访问IP:9877即可访问。
四、python调用rocketmq-client
- 安装相关rocketmq的包
pip install rocketmq-client-python
pip install rocketmq
前提是在linux下安装部署,因为rocketmq-client不支持windows平台
- Producer消息发送
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import json
import time
from faker import Faker
from rocketmq.client import Producer, Message
def send_msg():
faker = Faker(locale="zh_CN")
for i in range(200):
msg = Message("topic_202332")
msg.set_keys("key"+str(time.time()))
msg.set_tags("tag_202332")
msg.set_body(faker.name()+","+faker.address()+","+faker.phone_number())
res = producer.send_sync(msg)
print("res statu:{},res msg_id:{}, res offset:{}".format(res.status, res.msg_id, res.offset))
producer.shutdown()
if __name__ == '__main__':
producer = Producer("pro_test")
producer.set_namesrv_addr("47.95.133.253:9876")
producer.start()
send_msg()
执行结果:
同时在RocketMQ控制台也可以查看到消息如下:
- Consumer消费消息
#!/usr/bin/env python
# -*-coding:utf-8-*-
import time
from rocketmq.client import PushConsumer, PullConsumer
def callback(msg):
print("msg_id:{}, msg_body:{}".format(msg.id, str(msg.body,'UTF-8')))
#return ConsumeStatus.CONSUME_SUCCESS
return PullConsumer
if __name__ == '__main__':
consumer = PushConsumer('consumer_test')
consumer.set_namesrv_addr("47.95.133.253:9876")
consumer.subscribe("topic_202332", callback)
consumer.start()
while True:
time.sleep(30)
consumer.shutdown()
执行结果:
以上为普通消息发送,其它发送类型还包括顺序消息,延迟消息,事务消息。本例是最基本的消息发送和接收,实际测试中需要更多用例设计和代码实现。
- 整理测试点
1.消息正向验证
消息发送和消费都正常情况下,消息生成和消费及流程处理无误。
2. 异常用例
异常消息在Producer和Consumer两端的处理,如消息某个参数为空,或确实等异常的情况,在Producer发送错误信息后,消费端是否能够正确接收并有效处理错误问题。需要进行日志和数据库查看。
3.消息丢失
如因为网络原因导致的消息丢失,消息是否有补发,以及Producer补发后,消息消费是否正常。
4. 消息重复消费
RocketMQ有消息重复发送的机制,所以当产生消息重新发送后,要对消费端的服务做幂等处理,保证消息不被重复消费。
5. 处理性能,消息堆积
主要是性能测试,可以通过RocketMQ的控制台,查看对应消息消费的TPS,来保证消费的及时性,看是否有消息产生阻塞消费,导致消费TPS波动或骤降等问题。
感谢阅读,欢迎关注微信公众号(ATester),所有文章会在公众号首发。