前言
在开始之前,大家可以先了解RocketMQ的一些特性。
今天我们接上之前说的,对RocketMQ进行简单使用。主要的也是讲如何在SpringBoot项目中使用RocketMQ。
环境
RocketMQ安装
我们先在RocketMQ官网上下载最新的MQ版本并进行安装。
可以通过镜像进行下载。
将压缩包解压并放在一个指定文件夹下。(这里要注意的是文件夹路径中尽量不要有空格,像Program Files这种,有可能导致mq无法正常启动)
RocketMQ启动
通过命令行进入到bin目录下,使用 mqnamesrv -n localhost:9876 (windows)可以启动mq的namesrv。如下图:
使用 mqbroker -n localhost:9876 (windows)可以启动mqbroker。如下图:
注意:上图表示RocketMQ的namesrv和broker启动成功,RocketMQ若正常使用应保证namesrv和broker均启动成功。
与Java集成使用
主要依赖于rocketmq-client的jar包,在与SpringBoot进行集成时,应当引入该jar包。
1 | <dependency> |
例子
我们简单使用下该jar包创建消费者和生产者进行消费,来了解下它们的一些参数。
生产者:
1 | public class ProducerTest { |
消费者:
1 | public class ConsumerTest { |
运行这两个类进行测试。
生产者结果:
消费者结果:
备注:如果启动中出现异常
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TestTopic
可能是没有开启AUTO_CREATE_TOPIC,我们可以在启动broker的时候加上该参数,mqbroker -n localhost:9876&autoCreateTopicEnable=true 可以保证使用时自动创建Topic。
生产等环境一般需要什么Topic就配置什么,不会开启这个参数让程序自动创建。
简要说明
我们对上面例子里的一些参数等做些说明,以便于我们可以更好的封装功能。
可与之前RocketMQ的简介结合理解。
DefaultMQProducer部分参数
producerGroup:生产者组名称。 namesrvAddr:生产者NameSrvAddr的服务地址。 createTopicKey:可以创建指定的Topic。AUTO_CREATE_TOPIC_KEY为自动创建Topic。 其它参数略。
发送Message时,需要设置Message的Topic和Tag,并能收到发送状态结果。
DefaultMQPushConsumer部分参数
consumerGroup:消费者组名称。 namesrvAddr:消费者NameSrvAddr的服务地址。 ConsumeFromWhere:消费策略。 ---> CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息 ---> CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍 ---> CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前 subscribe方法:使消费者订阅指定的Topic和Tag。 registerMessageListener方法:注册消费者监听,用于消费消息,这里面也是我们业务逻辑的主要内容。有顺序消费和并行消费两种模式。 分别需要实现MessageListenerOrderly接口和MessageListenerConcurrently接口。 我们的例子是并行消费处理的。
小试牛刀
每次处理时都要这样写会有很多的与业务无关的代码,也不美观。
我们对其进行必要封装,之前集成了SpringBoot的一个自己封装的starter插件,今天我们把RocketMQ也与SpringBoot集成下。
A 首先新建SpringBoot项目,引入RocketMQ 的jar依赖,不在详述。
然后在项目下创建必要的package。
producer:用来存放我们构建producer的类的包。
consumer:用来存放我们构建consumer的类的包。
listener:用来存放我们构建listener的类的包
factory:用来构建生产者和消费者群组的包。
config:存放SpringBoot配置类的包。
autoware:存放启动配置生效的类的包。
annotation:用来存放注解的包。
先说下简单思路吧。
首先这个生产者和消费者是可以有多个的,然后我们怎么管理它们?
生产者可以发送顺序消息和并发消息,消费者可以处理顺序消息和并发消息,同时我们可能有两种业务要使用同一个Listener,如何解耦呢?
关于管理:我们可以管理生产者和消费者的一个集合来解决。
关于解耦:可以提供一个接口,业务类实现这个接口拿到Message,进行处理。那如何知道这个业务类需要哪个listener呢?自然需要customerId或者listenerId。
好了开始工作。
B 先从配置入手。
下面是生产者和消费者的配置Bean。
由于可以配置多个生产者或者消费者,故使用List处理它们。部分代码如下:
C 然后从消费者和生产者的提供入手。也是比较简单的,主要是根据参数生成一个生产者或者消费者,然后暴露一些方法,如start,stop等方法。
提供生产者的类,部分代码如下。
备注:生产环境一般不设置AUTO_CREATE_TOPIC_KEY,需要什么Topic要手动创建加入管理。
创建消费者的类,部分代码如下。
我们认为两个consumerId相等则获取的是一个Consumer,因此需要重写equals方法。
D 创建消费者监听,为处理消息提供一个接口。
使用抽象类部分实现这个接口。
这一步的目的是由于不同的业务逻辑可能用到一个监听,这样可以两个业务逻辑写到两个不同的类中,只需实现IProcessor。
写两个并行处理监听和顺序处理监听,对其进行实现。
并行监听:
1 | public class ConcurrentlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerConcurrently { |
顺序监听:
1 | public class OrderlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerOrderly { |
然后需要对实现IProcesser接口的类添加如下注解,用来查看调用的哪个Listener。
1 | (ElementType.TYPE) |
E 好,我们开始构建factory。
使用ConcurrentHashMap存储生产者和消费者集合。
1 | //用于存放生产者的map组 |
创建生产者:
1 | /** |
创建消费者:
1 | /** |
它们的其它方法:
1 | /** |
F 然后到了我们关键的自动配置部分了。
让这个类实现ApplicationContextAware可以拿到applicationContext。
同时生成一个FactoryBean。
主要方法:
1 |
|
总的来说就是拿到配置生产生产者组,生成消费者组。
在生成消费者的时候需要注册监听,一个监听可以有很多业务类,通过注解拿到业务类,放到处理器列表里,再把该监听注册到指定的customerId上。
G 配置spring.factories。
1 | org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.zwt.rocketmqspringboot.autoware.RocketMqAutoConfig |
H 然后我们添加application.properties进行测试。
1 | ## 是否启用RocketMQ |
编写一些测试类进行测试。
1 |
|
1 |
|
可以看到正常运行。
实战
我们按照上篇文章那样把它封装成jar包。
完成后进行测试。
新建一个test的SpringBoot项目。作为消费者。
引入我们的包。
PS:这儿没有命名为xxxx-spring-boot-starter的形式。
创建一个消费者。
再新建一个test1的SpringBoot项目。作为生产者。
引入我们的包并进行配置。
启动test消费者,同时使用test1生产者发送一条消息。
可以看到。
生产者发送成功:
消费者处理成功:
总结
通过对RocketMQ的集成封装使用,更好地学会了如何使用RocketMQ,及对其的更多理解。
消息中间件在我们软件开发中具有重要作用,应当好好理解。
如果觉得properties配置太繁琐可以改用yml配置,会更简介好看些。
如下:
1 | spring: |
代码地址:rocketmq-spring-boot