【RocketMQ】RocketMQ入门之闪电三连鞭:消息队列、RocketMQ介绍及安装使用
作者:行百里er
博客:https://chendapeng.cn (opens new window)
提示
这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!
谁要懂得多,就要睡得少。
先从以下几个方面对RocketMQ入个门:
- 消息队列介绍
- Rocket MQ介绍
- Rocket MQ安装使用
# 消息队列
什么是消息队列呢?
队列,大家肯定都不陌生了,在数据结构中,它是一种先进先出的结构。
消息队列可以看成是一个容器,里面存放各种消息,这些消息代表业务上需要处理的动作。
消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。
那么为什么要是用消息队列呢?
我在项目中遇到一种需求:后端抓取很多数据,抓取完成后需要进行解析处理,数据处理完成后要根据相应的规则发送给客户,最后持久化入库。
后端抓取完数据后需要提醒另外一个程序:数据我弄完了,你解析处理吧。在没有使用消息中间件的时候,需要保存一个抓取完数据的消息到数据库中,解析程序需要不断的扫描数据库来确认消息是否抓取完,然后再做后续操作。
这种就是串行化处理,数据处理上效率非常的低。
使用消息队列,通过异步的方式提高了系统的性能,还降低了系统的耦合性。
消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。
现在的微服务架构中,每个服务几乎不存在耦合性,这两个应用之间可以使用消息中间件作为桥梁,进行消息沟通处理,也就是应用解耦合。
# Rocket MQ
# 基本概念
# 消息模型 Message Model
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。
Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。
# 消息生产者 Producer
负责生产消息,一般由业务系统负责生产消息。
一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。
RocketMQ提供多种发送方式:
- 同步发送
- 异步发送
- 顺序发送
- 单向发送。
同步和异步方式均需要Broker返回确认信息,单向发送不需要。
# 消息消费者 Consumer
负责消费消息,一般是后台系统负责异步消费。
一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。
从用户应用的角度而言提供了两种消费形式:拉取式(pull)消费、推动式(push)消费。
拉取式消费:主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。
推动式消费:Broker收到数据后会主动推送给消费端,实时性较高。
# 主题 Topic
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
# 代理服务器 Broker Server
消息中转角色,负责存储消息、转发消息。
代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。
代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
# 名字服务 Name Server
充当路由消息的提供者。
生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。
多个Namesrv实例组成集群,但相互独立,没有信息交换。
# RocketMQ角色详解
# broker
- Broker面向producer和consumer接受和发送消息。
- 向nameserver提交自己的信息。
- 是消息中间件的消息存储、转发服务器。
- 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报。
# broker集群
- Broker高可用,可以配成Master/Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave。
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义BrokerId为0表示Master,非0表示Slave
- Master多机负载,可以部署多个broker
- 每个Broker与nameserver集群中的所有节点建立长连接,定时注册Topic信息到所有nameserver。
# producer
- 消息的生产者
- 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获得Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
# consumer
消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。
注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接。
# nameserver
底层由netty实现,提供了路由管理、服务注册、服务发现的功能,是一个无状态节点
nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时想nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把它从列表中剔除
nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用,
nameServer集群间互不通信,没有主备的概念
nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
为什么不用zookeeper?:rocketmq希望为了提高性能,CAP定理,客户端负载均衡。
# 使用RocketMQ
# 安装RocketMQ
理论知识BB完了,下面开始实操,从安装开始。
名称 | 版本号 |
---|---|
JDK | 11 |
Maven | 3.6.3 |
RocketMQ | 4.7.1 |
JDK和Maven的安装及环境变量配置就不赘述了,这里提一下maven的配置,打开 maven 的配置文件( MVN_HOME/conf/settings.xml ),在<mirrors></mirrors>
标签中添加 mirror 子节点:
<mirror>
<id>aliyunmaven</id>
<mirrorOf>*</mirrorOf>
<name>阿里云公共仓库</name>
<url>https://maven.aliyun.com/repository/public</url>
</mirror>
2
3
4
5
6
这样下载jar包的时候速度快一点。
TIP:最好用JDK 1.8版本,因为我用JDK 11遇到一些坑,虽然文中给出了解决方案,但还是不如没问题的JDK1.8好。
下载完RocketMQ源文件后,安装很简单:
unzip rocketmq-all-4.7.1-source-release.zip
cd rocketmq-all-4.7.1-source-release
mvn -Prelease-all -DskipTests clean install -U
2
3
4
5
上面命令执行完之后,在rocketmq-all-4.7.1-source-release
目录有一个distribution
目录:
安装好的RocketMQ就在distribution/target/rocketmq-4.7.1/rocketmq-4.7.1
目录下:
cd distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 && ll
把它挪到习惯的位置:
cd distribution/target/rocketmq-4.7.1
mv rocketmq-4.7.1/ /usr/local/rocketmq
2
3
OK,安装完成了。
# 启动服务
进入/usr/local/rocketmq/bin目录下,可以看到有很多可执行命令:
# 启动nameserver
踩坑一:
[root@localhost bin]# ./mqnamesrv
Java HotSpot(TM) 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Unrecognized VM option 'UseCMSCompactAtFullCollection'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
2
3
4
5
报错了,这是由于RocketMQ的启动文件都是按照JDK8配置的,而前面我特意配置的JDK版本是11,有很多命令参数不支持导致的,使用JDK8,正常启动没有问题的。
解决
先看一下mqnamesrv.sh
脚本的内容:
最后一行说明它需要执行runserver.sh
修改runserver.sh
环境变量部分:
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
# 加入RocketMQ的lib目录
# export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
2
3
4
5
6
Java选项部分:
# 文件末尾
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
#JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC"
JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_srv_gc_%p_%t.log -XX:+PrintGCDetails"
#JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
2
3
4
5
6
7
8
9
10
11
再次启动:
nameserver成功启动,接下来启动broker。
# 启动broker
踩坑二:
[root@localhost bin]# ./mqbroker
[0.001s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/dev/shm/rmq_broker_gc_%p_%t.log instead.
Unrecognized VM option 'PrintGCDateStamps'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
2
3
4
5
看着和启动nameserver时报错一样,同样先来看一下mqbroker.sh
:
说明需要执行runbroker.sh
,修改runbroker.sh
:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
#JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0"JAVA_OPT="${JAVA_OPT} -verbose:gc -Xloggc:${GC_LOG_DIR}/rmq_broker_gc_%p_%t.log -XX:+PrintGCDetails"
#JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
#JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
#JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=15g"
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${BASE_DIR}/lib:${JAVA_HOME}/lib/ext"
#JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
2
3
4
5
6
7
8
9
10
11
这里有一点需要说明的是需要修改一下jvm最大堆、最小堆以及新生代空间大小(虚拟机空间足够的可忽略)xms
、xmx
以及xmn
。
修改完后再次启动:
# 测试发送消息
使用tools.sh
脚本测试消息的发送和接收。
踩坑三:
[root@localhost bin]# ./tools.sh org.apache.rocketmq.example.quickstart.Producer
-Djava.ext.dirs=./../lib:/usr/local/java/jre/lib/ext:/usr/local/java/lib/ext is not supported. Use -classpath instead.
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.
2
3
4
这个就比较有经验了,和前面一样修改相关脚本。
修改tools.sh
:
export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=$(dirname $0)/..
#export CLASSPATH=.:${BASE_DIR}/conf:${CLASSPATH}
export CLASSPATH=.:${BASE_DIR}/lib/*:${BASE_DIR}/conf:${CLASSPATH}
#===========================================================================================
# JVM Configuration
#===========================================================================================
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
#JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
2
3
4
5
6
7
8
9
10
11
12
踩坑四:
再次执行tools.sh
进行测试,报错:
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:679)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:509)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:693)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:557)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:394)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1363)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1353)
at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:622)
... 7 more
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这个报错原因是,找不到nameserver,在tools.sh
脚本中添加如下内容:
export NAMESRV_ADDR=localhost:9876
踩坑五:
再次执行./tools.sh org.apache.rocketmq.example.quickstart.Producer
,报错:
org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: TopicTest
See http://rocketmq.apache.org/docs/faq/ for further details.
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:685)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1343)
at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1289)
at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:325)
at org.apache.rocketmq.example.quickstart.Producer.main(Producer.java:67)
2
3
4
5
6
7
这个报错就和之前启动的broker有关系了,之前启动broker的时候没有连接到注册中心nameserver上,我们关掉broker,重新按照如下命令启动一下:
./mqbroker -n localhost:9876
broker启动起来之后,再次执行tools脚本进行测试,打印如下消息即表示服务测试成功:
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301103E1, offsetMsgId=C0A8026E00002A9F00000000000312FD, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301303E2, offsetMsgId=C0A8026E00002A9F00000000000313C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301603E3, offsetMsgId=C0A8026E00002A9F0000000000031493, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=248]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301A03E4, offsetMsgId=C0A8026E00002A9F000000000003155E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=1], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9301C03E5, offsetMsgId=C0A8026E00002A9F0000000000031629, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=2], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302103E6, offsetMsgId=C0A8026E00002A9F00000000000316F4, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=3], queueOffset=249]
SendResult [sendStatus=SEND_OK, msgId=C0A8026E222F4B85612C2CA9302503E7, offsetMsgId=C0A8026E00002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=249]
16:08:05.445 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
16:08:05.458 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.2.110:10911] result: true
2
3
4
5
6
7
8
9
# 测试消费消息
前面的实验已经成功发送了消息,现在来测试一下消费消息,同样借助于tools.sh
:
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
测试结果:
ConsumeMessageThread_16 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=200, sysFlag=0, bornTimestamp=1604909284622, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284623, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000279F2, commitLogOffset=162290, bodyCRC=265672385, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92D0E0320, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 56, 48, 48], transactionId='null'}]]
ConsumeMessageThread_14 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=199, sysFlag=0, bornTimestamp=1604909284603, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284604, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F00000000000276C6, commitLogOffset=161478, bodyCRC=1009291136, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614601, UNIQ_KEY=C0A8026E222F4B85612C2CA92CFB031C, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 54], transactionId='null'}]]
ConsumeMessageThread_9 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=197, sysFlag=0, bornTimestamp=1604909284567, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284569, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F000000000002706E, commitLogOffset=159854, bodyCRC=1116443590, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CD70314, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 56], transactionId='null'}]]
ConsumeMessageThread_19 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=1, storeSize=203, queueOffset=196, sysFlag=0, bornTimestamp=1604909284550, bornHost=/192.168.2.110:45654, storeTimestamp=1604909284552, storeHost=/192.168.2.110:10911, msgId=C0A8026E00002A9F0000000000026D42, commitLogOffset=159042, bodyCRC=1262346221, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1604909614600, UNIQ_KEY=C0A8026E222F4B85612C2CA92CC60310, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 56, 52], transactionId='null'}]]
2
3
4
验证成功。
首发公众号 行百里er ,欢迎老铁们关注阅读指正。