行百里er 行百里er
首页
  • 分类
  • 标签
  • 归档
设计模式
  • JVM
  • Java基础
MySQL
Elastic Stack
Redis
  • Kafka
  • RocketMQ
分布式
Spring Cloud Alibaba
云原生
数据结构与算法
关于
GitHub (opens new window)

行百里er

Java程序员一枚
首页
  • 分类
  • 标签
  • 归档
设计模式
  • JVM
  • Java基础
MySQL
Elastic Stack
Redis
  • Kafka
  • RocketMQ
分布式
Spring Cloud Alibaba
云原生
数据结构与算法
关于
GitHub (opens new window)
  • Kafka

  • RocketMQ

    • 【RocketMQ】RocketMQ入门之闪电三连鞭:消息队列、RocketMQ介绍及安装使用
    • 【RocketMQ】近距离感受RocketMQ如何收发消息,有备而来!
      • 2.1 引入jar包
      • 2.2 同步消息API
      • 2.3 批量消息发送
      • 2.4 异步消息API
      • 2.5 单向消息API
      • 3.1 消息消费模式
        • 3.1.1 集群消费模式
        • 3.1.2 广播消费模式
    • 【RocketMQ】基于RocketMQ的分布式事务
    • 【RocketMQ】RocketMQ集群,RocketMQ-on-DLedger可容灾集群
  • 消息中间件
  • RocketMQ
行百里er
2020-11-11
目录

【RocketMQ】近距离感受RocketMQ如何收发消息,有备而来!

作者:行百里er

博客:https://chendapeng.cn (opens new window)

提示

这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!

RocketMQ系列第二篇。熬夜不易,且行且珍惜。

通过RocketMQ的API来直观的感受一下,RocketMQ是怎样的进行消息生产和消费的。首先安装一个RocketMQ的扩展rocketmq-console控制台,然后通过API演示RocketMQ的消息发送模式和消费消息模式,最后介绍一下消费者如何通过TAG、SQL表达式来过滤消息。

  • rocketmq-console
  • 发送消息的方式
  • 消费消息
  • TAG过滤
  • SQL表达式过滤

# 0x01 安装RocketMQ扩展-rocketmq-console

RocketMQ官方GitHub上有一个项目rocketmq-externals,提供了很多扩展:

RocketMQ扩展

其中,rocketmq-console能够为我们直观的展示RocketMQ集群分部情况、Producer、Consumer、Topic等等,下面我们来装一个看看长什么样。

为了方便,这次我用Docker进行安装,到Docker Hub上找到rocketmq-console的Docker官方镜像:

使用Docker安装RocketMQ控制台

# 拉取镜像
docker pull apacherocketmq/rocketmq-console:2.0.0

# 启动
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.2.110:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t apacherocketmq/rocketmq-console:2.0.0
1
2
3
4
5

启动成功,出现熟悉的打印信息:

通过http://192.168.2.110:8080访问控制台:

这个控制台做的还是挺炫酷的!

Docker系列大纲已就绪,后面会水一篇关于Docker的文章,欢迎拍砖。

# 0x02 Producer发送消息

# 2.1 引入jar包

首先需要引入RocketMQ Client的jar包,这个注意一下版本就行了,最好和安装的RocketMQ版本一致,所以这里选择4.7.1版本:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.1</version>
</dependency>
1
2
3
4
5

# 2.2 同步消息API

RocketMQ是处理各种消息的,消息来自于Producer,那么要发送消息,自然就能想到要有发送消息的生产者实例,API提供了DefaultMQProducer这个类,其构造方法如下:

我们先new一个Producer实例出来,先能发送消息再说。

public class SyncMsgProducer {
    public static void main(String[] args) throws Exception {
        //实例化消息生产者,参数是producerGroup
        DefaultMQProducer producer = new DefaultMQProducer("laogong");
        //设置nameserver的地址
        producer.setNamesrvAddr("192.168.2.110:9876");
        //启动producer
        producer.start();

        //发送消息
//        Message msg = new Message("xiaoxianrou", "这是我的第一次".getBytes());
//        SendResult sendResult = producer.send(msg);
//        System.out.printf("%s%n", sendResult);
        //批量发送
        List<Message> msgs = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("xiaoxianrou", ("这是我的第" + i + "次").getBytes());
            msgs.add(msg);
        }
        SendResult sendResult = producer.send(msgs);
        System.out.printf("%s%n", sendResult);

        //关闭producer
        producer.shutdown();
        System.out.println("已关闭producer实例");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

这段代码提供了一次发送一条消息和批量发送消息的示例,运行它:

可以看到,send(msg)方法同步发送消息,有一个返回值,也就是说消息发送中一定会给客户端一个状态,等broker说我收到了之后,返回一个SendResult,在此后这条消息就和Producer没关系了。

同步发送过程中Producer进入同步等待状态,可以保证消息投递一定到达。

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

可以在控制台看一下发送的message:

进一步查看消息的详细信息:

最下面的TraceList展示了消息的消费情况,由于我们还没有消费它,所以这里没有记录。

PS:出现

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
1

的解决方案

  1. 修改/usr/local/rocketmq/conf/broker.conf,添加brokerIP1=192.168.2.110,IP地址是自己的虚拟机IP地址
  2. 重启nameserver
  3. 重启broker:./mqbroker -n 192.168.2.110:9876 -c /usr/local/rocketmq/conf/broker.conf

# 2.3 批量消息发送

上面的例子中提到了send方法可以批量发送消息,当一次性发送很多条消息时,可以多条消息打包一起发送,减少网络传输次数提高效率。

producer.send(Collection c) 方法可以接受一个集合,实现批量发送:

public SendResult send(Collection<Message> msgs) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(this.batch(msgs));
}
1
2
3

批量发送需注意:

  • 批量消息要求必要具有同一topic、相同消息配置
  • 不支持延时消息
  • 这一批消息的总大小不应超过4MB
  • 如果不确定是否超过限制,可以手动计算大小分批发送

# 2.4 异步消息API

Producer的API中send方法也提供了异步的发送方式:

show you the code:

int messageCount = 100;
// 根据消息数量实例化倒计时计算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
    final int index = i;
    // 创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("xiaoxianrou",
            "TagA",
            "OrderID188",
            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    // SendCallback接收异步返回结果的回调
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
        }
        @Override
        public void onException(Throwable e) {
            System.out.printf("%-10d Exception %s %n", index, e);
            e.printStackTrace();
        }
    });
}
// 等待5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

运行结果:

控制台查看消息详情:

以上消息是通过异步的方式生成的,异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

想要快速发送消息,又不想丢失消息的时候可以使用异步消息。

# 2.5 单向消息API

只发送消息,不等待服务器响应,只发送请求不等待应答。

此方式发送消息的过程耗时非常短,一般在微秒级别。

其API就是调用sendOneway方法:

for (int i = 0; i < 100; i++) {
    // 创建消息,并指定Topic,Tag和消息体
    Message msg = new Message("xiaoxianrou" ,
            "TagA",
            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
    );
    // 发送单向消息,没有任何返回结果
    producer.sendOneway(msg);
}
1
2
3
4
5
6
7
8
9

# 0x03 Consumer消费消息

# 3.1 消息消费模式

消息消费模式由消费者Consumer来决定,可以由消费者设置MessageModel来决定消息模式。

消息模式默认为集群消费模式,此外还有广播消费模式。

// 广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
1
2
3
4

# 3.1.1 集群消费模式

集群消费消息是指集群化部署消费者。

当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可。

集群消费模式

集群消费模式的特点:

  • 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时,不能保证路由到同一台机器上
  • 消费状态由broker维护

消费者消费消息代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laogong-consumer");
        consumer.setNamesrvAddr("192.168.2.110:9876");
        //订阅topic,根据tag过滤消息
        consumer.subscribe("xiaoxianrou", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //设置消费模式,默认就是CLUSTERING
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 启动Consumer实例
        consumer.start();
        System.out.println("consumer started.");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

默认就是集群消费模式,运行结果:

看一下控制台:

可以看出,之前Producer产生的消息状态已变成consumed了。

# 3.1.2 广播消费模式

当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。

广播消费模式

API设置广播消费模式很简单:

consumer.setMessageModel(MessageModel.BROADCASTING);
1

广播消费模式的特点:

  • 消费进度由consumer维护
  • 保证每个消费者消费一次消息
  • 消费失败的消息不会重投

# 0x04 关于TAG

前面的案例提到了tag,Consumer在订阅的时候,除了订阅topic外,还可以指定tag,对消息进行过滤。

比如,Producer发送topic为xiaoxianrou,tag为TagA和TagB的消息,Consumer只订阅TagA,那么这个Consumer则只处理TagA的消息。

我们还是通过API和控制台来看一下消息状态。

生产者产生的消息:

// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("xiaoxianrou",
        "TagA",
        "OrderID188",
        ("laogong" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置属性
msg.putUserProperty("money", String.valueOf(i));

// TagB
Message msg = new Message("xiaoxianrou",
        "TagB",
        "OrderID288",
        ("laogong" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("money", String.valueOf(i));
1
2
3
4
5
6
7
8
9
10
11
12
13
14

消费者消费,指定TagA:

//订阅topic,根据tag过滤消息
consumer.subscribe("xiaoxianrou", "TagA");
1
2

来看一下控制台:

由于Consumer订阅topic的时候,指定了TagA,所以猜测TagB应该会被过滤掉,我们来验证一下,先看一条TagA的消息消费情况:

TagA的消息均是CONSUMED,已消费状态,再来看一条TagB的消息:

被过滤了。

# 0x05 SQL表达式过滤消息

消费者收到包含TagA或TagB的消息,但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。

在这种情况下,可以使用SQL表达式筛选出消息。

首先需要配置一下/usr/local/rocketmq/conf/broker.conf,添加:

enablePropertyFilter=true
1

然后指定broker.conf,重启broker:

./mqbroker -n 192.168.2.110:9876 -c /usr/local/rocketmq/conf/broker.conf
1

重启之后,控制台集群会显示该属性:

前文所述案例中我设置了:

msg.putUserProperty("money", String.valueOf(i));
1

其中TagA的money是0~49,已经被consumer消费了,现在我再开一个通过sql表达式过滤出money大于49的消息,API如下:

//订阅topic,根据sql表达式过滤消息
MessageSelector selector = MessageSelector.bySql("money > 49");
consumer.subscribe("xiaoxianrou", selector);
1
2
3

消费完了再来看,TagB的状态:

变成已消费了。


首发公众号 行百里er ,欢迎老铁们关注阅读指正。

#RocketMQ#消息中间件
上次更新: 2022/10/04, 18:14:30
【RocketMQ】RocketMQ入门之闪电三连鞭:消息队列、RocketMQ介绍及安装使用
【RocketMQ】基于RocketMQ的分布式事务

← 【RocketMQ】RocketMQ入门之闪电三连鞭:消息队列、RocketMQ介绍及安装使用 【RocketMQ】基于RocketMQ的分布式事务→

最近更新
01
重要数据不能丢!MySQL数据库定期备份保驾护航!
05-22
02
分布式事务解决方案之 Seata(二):Seata AT 模式
09-09
03
Seata 番外篇:使用 docker-compose 部署 Seata Server(TC)及 K8S 部署 Seata 高可用
09-05
更多文章>
Theme by Vdoing | Copyright © 2020-2023 行百里er | MIT License | 豫ICP备2022020385号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式