SpringBoot集成RocketMQ

前言

在开始之前,大家可以先了解RocketMQ的一些特性。

RocketMQ简介

今天我们接上之前说的,对RocketMQ进行简单使用。主要的也是讲如何在SpringBoot项目中使用RocketMQ。

环境

RocketMQ安装

我们先在RocketMQ官网上下载最新的MQ版本并进行安装。

可以通过镜像进行下载。

将压缩包解压并放在一个指定文件夹下。(这里要注意的是文件夹路径中尽量不要有空格,像Program Files这种,有可能导致mq无法正常启动)

RocketMQ启动

通过命令行进入到bin目录下,使用 mqnamesrv -n localhost:9876 (windows)可以启动mq的namesrv。如下图:

upload successful

使用 mqbroker -n localhost:9876 (windows)可以启动mqbroker。如下图:

upload successful

注意:上图表示RocketMQ的namesrv和broker启动成功,RocketMQ若正常使用应保证namesrv和broker均启动成功。

与Java集成使用

主要依赖于rocketmq-client的jar包,在与SpringBoot进行集成时,应当引入该jar包。

1
2
3
4
5
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${version.rocketmq}</version>
</dependency>

例子

我们简单使用下该jar包创建消费者和生产者进行消费,来了解下它们的一些参数。

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ProducerTest {
public static void main(String[] args) throws Exception {
//producerGroup 生产者组名称
DefaultMQProducer producer = new DefaultMQProducer("producer1");
//设置NamesrvAddr
producer.setNamesrvAddr("127.0.0.1:9876");
//设置自动创建Topic
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
//调用start()方法启动一个producer实例
producer.start();
System.out.println("Producer started");
Message message=new Message();
message.setTopic("Test");
message.setTags("123");
message.setBody(new String("Hello").getBytes());
SendResult result=producer.send(message);
System.out.println(result);
}
}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class ConsumerTest {
public static void main(String[] args) throws Exception{
//需要consumerGroup
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//设置NameServer
consumer.setNamesrvAddr("127.0.0.1:9876");
//消费策略
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

//Topic和Tag,*代表全部的Tag
consumer.subscribe("Test", "*");
//设置一个Listener,主要进行消息的逻辑处理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach((a)->{
System.out.println(new String(a.getBody()));
});
//返回消费状态
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动
consumer.start();
System.out.println("Consumer started");
}
}

运行这两个类进行测试。

生产者结果:

upload successful

消费者结果:

upload successful

备注:如果启动中出现异常
com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, TestTopic
可能是没有开启AUTO_CREATE_TOPIC,我们可以在启动broker的时候加上该参数,mqbroker -n localhost:9876&autoCreateTopicEnable=true 可以保证使用时自动创建Topic。

生产等环境一般需要什么Topic就配置什么,不会开启这个参数让程序自动创建。

简要说明

我们对上面例子里的一些参数等做些说明,以便于我们可以更好的封装功能。

可与之前RocketMQ的简介结合理解。

  1. DefaultMQProducer部分参数

     producerGroup:生产者组名称。
     namesrvAddr:生产者NameSrvAddr的服务地址。
     createTopicKey:可以创建指定的Topic。AUTO_CREATE_TOPIC_KEY为自动创建Topic。
     其它参数略。
    

    发送Message时,需要设置Message的Topic和Tag,并能收到发送状态结果。

  2. DefaultMQPushConsumer部分参数

     consumerGroup:消费者组名称。
     namesrvAddr:消费者NameSrvAddr的服务地址。
     ConsumeFromWhere:消费策略。
     ---> CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
     ---> CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
     ---> CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
     
     subscribe方法:使消费者订阅指定的Topic和Tag。
     registerMessageListener方法:注册消费者监听,用于消费消息,这里面也是我们业务逻辑的主要内容。有顺序消费和并行消费两种模式。
     分别需要实现MessageListenerOrderly接口和MessageListenerConcurrently接口。
     我们的例子是并行消费处理的。
    

小试牛刀

每次处理时都要这样写会有很多的与业务无关的代码,也不美观。

我们对其进行必要封装,之前集成了SpringBoot的一个自己封装的starter插件,今天我们把RocketMQ也与SpringBoot集成下。

A 首先新建SpringBoot项目,引入RocketMQ 的jar依赖,不在详述。

然后在项目下创建必要的package。

upload successful

producer:用来存放我们构建producer的类的包。
consumer:用来存放我们构建consumer的类的包。
listener:用来存放我们构建listener的类的包
factory:用来构建生产者和消费者群组的包。
config:存放SpringBoot配置类的包。
autoware:存放启动配置生效的类的包。
annotation:用来存放注解的包。

先说下简单思路吧。

首先这个生产者和消费者是可以有多个的,然后我们怎么管理它们?
生产者可以发送顺序消息和并发消息,消费者可以处理顺序消息和并发消息,同时我们可能有两种业务要使用同一个Listener,如何解耦呢?

关于管理:我们可以管理生产者和消费者的一个集合来解决。

关于解耦:可以提供一个接口,业务类实现这个接口拿到Message,进行处理。那如何知道这个业务类需要哪个listener呢?自然需要customerId或者listenerId。

好了开始工作。

B 先从配置入手。

下面是生产者和消费者的配置Bean。

由于可以配置多个生产者或者消费者,故使用List处理它们。部分代码如下:

upload successful

upload successful

upload successful

C 然后从消费者和生产者的提供入手。也是比较简单的,主要是根据参数生成一个生产者或者消费者,然后暴露一些方法,如start,stop等方法。

提供生产者的类,部分代码如下。

upload successful

备注:生产环境一般不设置AUTO_CREATE_TOPIC_KEY,需要什么Topic要手动创建加入管理。

upload successful

创建消费者的类,部分代码如下。

upload successful

我们认为两个consumerId相等则获取的是一个Consumer,因此需要重写equals方法。

upload successful

D 创建消费者监听,为处理消息提供一个接口。

upload successful

使用抽象类部分实现这个接口。

upload successful

这一步的目的是由于不同的业务逻辑可能用到一个监听,这样可以两个业务逻辑写到两个不同的类中,只需实现IProcessor。

写两个并行处理监听和顺序处理监听,对其进行实现。

并行监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ConcurrentlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerConcurrently {
private static final transient Logger logger = LoggerFactory.getLogger(ConcurrentlyRocketMQMessageListener.class);

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
logger.debug("ConcurrentlyRocketMQMessageListener receive message begin,length:{}" , msgs.size());
for(MessageExt msg:msgs ) {
for (IProcessor processor : processorList) {
try {
// 处理消息
process(processor, msg);
} catch (Exception ex) {
logger.error("ConcurrentlyRocketMQMessageListener error",ex);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

顺序监听:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class OrderlyRocketMQMessageListener extends AbstractRocketMQMessageListener implements MessageListenerOrderly {
private static final transient Logger logger = LoggerFactory.getLogger(OrderlyRocketMQMessageListener.class);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
logger.debug("OrderlyRocketMQMessageListener receive message begin,length:{}" , msgs.size());
for(MessageExt msg:msgs ) {
for (IProcessor processor : processorList) {
try {
process(processor, msg);
} catch (Exception ex) {
logger.error("OrderlyRocketMQMessageListener error",ex);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
}

然后需要对实现IProcesser接口的类添加如下注解,用来查看调用的哪个Listener。

1
2
3
4
5
6
7
8
9
10
11
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RocketMQProcessor {
/**
* 该程序归哪个消费者监听处理
* @return
*/
String consumerId() default "";
}

E 好,我们开始构建factory。

使用ConcurrentHashMap存储生产者和消费者集合。

1
2
3
4
//用于存放生产者的map组
private static Map<String, RocketMQMessageProducer> producers=new ConcurrentHashMap<>();
//用于存放消费者的map组
private static Map<String, RocketMQMessageConsumer> consumers=new ConcurrentHashMap<>();

创建生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 创建一个生产者
* @param configuration
* @return
*/
public RocketMQMessageProducer createProducer(MqProducerConfiguration configuration){
//如果map里存在这个实例,直接返回
if(producers.get(configuration.getProducerId())!=null){
return producers.get(configuration.getProducerId());
}
//创建一个生产者
RocketMQMessageProducer producer=new RocketMQMessageProducer(configuration.getGroupName(), configuration.getNamesrvAddr());
if(configuration.getSendMsgTimeout()!=null){
producer.setSendMsgTimeout(configuration.getSendMsgTimeout());
}
if(configuration.getMaxMessageSize()!=null){
producer.setMaxMessageSize(configuration.getMaxMessageSize());
}
try {
//启动生产者并放入map进行管理
producer.start();
producers.put(configuration.getProducerId(), producer);
logger.info("MqProducer start success "+configuration.toString());
} catch (MQClientException e) {
logger.error("MqProducer start error "+configuration.toString(),e);
throw new RuntimeException(e);
}
return producer;
}

创建消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/**
* 创建一个消费者
* @param mqConsumerConfiguration
* @return
*/
public RocketMQMessageConsumer createConsumer(MqConsumerConfiguration mqConsumerConfiguration, List<IProcessor> list) {
//如果map里存在,直接返回
if (consumers.get(mqConsumerConfiguration.getConsumerId()) != null) {
return consumers.get(mqConsumerConfiguration.getConsumerId());
}
try {
RocketMQMessageConsumer consumer = new RocketMQMessageConsumer(mqConsumerConfiguration.getConsumerId(), mqConsumerConfiguration.getGroupName(), mqConsumerConfiguration.getNamesrvAddr());
consumer.subscribe(mqConsumerConfiguration.getTopicAndTagMap());

//设置消费者其它参数
if(!CollectionUtils.isEmpty(mqConsumerConfiguration.getOptions())){
String consumeFromWhere = mqConsumerConfiguration.getOptions().get("consumeFromWhere");
String consumeThreadMin = mqConsumerConfiguration.getOptions().get("consumeThreadMin");
String consumeThreadMax = mqConsumerConfiguration.getOptions().get("consumeThreadMax");
String pullThresholdForQueue = mqConsumerConfiguration.getOptions().get("pullThresholdForQueue");
String consumeMessageBatchMaxSize = mqConsumerConfiguration.getOptions().get("consumeMessageBatchMaxSize");
String pullBatchSize = mqConsumerConfiguration.getOptions().get("pullBatchSize");
String pullInterval = mqConsumerConfiguration.getOptions().get("pullInterval");
if (StringUtils.isNotBlank(consumeFromWhere)) {
if (StringUtils.equals(consumeFromWhere, "CONSUME_FROM_LAST_OFFSET")) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
} else if (StringUtils.equals(consumeFromWhere, "CONSUME_FROM_FIRST_OFFSET")) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
}
}
if (StringUtils.isNotBlank(consumeThreadMin)) {
consumer.setConsumeThreadMin(Integer.parseInt(consumeThreadMin));
}
if (StringUtils.isNotBlank(consumeThreadMax)) {
consumer.setConsumeThreadMax(Integer.parseInt(consumeThreadMax));
}
if (StringUtils.isNotBlank(pullThresholdForQueue)) {
consumer.setPullThresholdForQueue(Integer.parseInt(pullThresholdForQueue));
}
if (StringUtils.isNotBlank(consumeMessageBatchMaxSize)) {
consumer.setConsumeMessageBatchMaxSize(Integer.parseInt(consumeMessageBatchMaxSize));
}
if (StringUtils.isNotBlank(pullBatchSize)) {
consumer.setPullBatchSize(Integer.parseInt(pullBatchSize));
}
if (StringUtils.isNotBlank(pullInterval)) {
consumer.setPullInterval(Integer.parseInt(pullInterval));
}
}

//设置消费者监听
if(mqConsumerConfiguration.isOrderly()){
OrderlyRocketMQMessageListener orderlyRocketMQMessageListener=new OrderlyRocketMQMessageListener();
orderlyRocketMQMessageListener.setProcessorList(list);
consumer.registerMessageListener(orderlyRocketMQMessageListener);
}else{
ConcurrentlyRocketMQMessageListener concurrentlyRocketMQMessageListener=new ConcurrentlyRocketMQMessageListener();
concurrentlyRocketMQMessageListener.setProcessorList(list);
consumer.registerMessageListener(concurrentlyRocketMQMessageListener);
}

consumer.start();
consumers.put(mqConsumerConfiguration.getConsumerId(), consumer);
logger.info("MqConsumer start success "+mqConsumerConfiguration.toString());
logger.info("MqConsumer processors size "+list.size());
return consumer;
} catch (Exception e) {
logger.error("MqConsumer start error", e);
throw new RuntimeException(e);
}
}

它们的其它方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 /**
* 获取一个生产者
* @param producerId
* @return
*/
public RocketMQMessageProducer getProducer(String producerId){
if(producers.get(producerId)!=null){
return producers.get(producerId);
}
return null;
}
/**
* 停止某个生产者
* @param producerId
*/
public void stopProducer(String producerId){
if(producers.get(producerId)!=null){
producers.get(producerId).shutdown();
producers.remove(producerId);
logger.info("MqProducer "+producerId+" is shutdown!");
}
}
/**
* 获取一个消费者
* @param customerId
* @return
*/
public RocketMQMessageConsumer getConsumer(String customerId){
if(consumers.get(customerId)!=null){
return consumers.get(customerId);
}
return null;
}
/**
* 停止某个消费者
* @param customerId
*/
public void stopConsumer(String customerId){
if(consumers.get(customerId)!=null){
consumers.get(customerId).shutdown();
consumers.remove(customerId);
logger.info("MqConsumer "+customerId+" is shutdown!");
}
}

F 然后到了我们关键的自动配置部分了。

让这个类实现ApplicationContextAware可以拿到applicationContext。

upload successful

同时生成一个FactoryBean。

主要方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.rocketmq.config", name = "enabled", havingValue = "true")
RocketMQFactory rocketMQFactory(){
RocketMQFactory rocketMQFactory=new RocketMQFactory();
//处理生产者
if(mqConfigurations.getMqProducerConfigurations()!=null&&mqConfigurations.getMqProducerConfigurations().size()>0){
mqConfigurations.getMqProducerConfigurations().forEach(producerConfiguration ->{
rocketMQFactory.createProducer(producerConfiguration);
});
}
if(mqConfigurations.getMqConsumerConfigurations()!=null&&mqConfigurations.getMqConsumerConfigurations().size()>0){
//处理消费者
mqConfigurations.getMqConsumerConfigurations().forEach(consumerConfiguration->{
//处理Processor
final Map<String, Object> annotationMap = applicationContext.getBeansWithAnnotation(RocketMQProcessor.class);
List<IProcessor> list = new ArrayList<>();
if(annotationMap!=null){
annotationMap.forEach((key,value)->{
RocketMQProcessor annotation = value.getClass().getAnnotation(RocketMQProcessor.class);
if(consumerConfiguration.getConsumerId().equals(annotation.consumerId())){
try{
list.add((IProcessor) value);
}catch (Exception e){
throw new RuntimeException(e);
}
}
});
}
rocketMQFactory.createConsumer(consumerConfiguration,list);
});



}
return rocketMQFactory;
}

总的来说就是拿到配置生产生产者组,生成消费者组。

在生成消费者的时候需要注册监听,一个监听可以有很多业务类,通过注解拿到业务类,放到处理器列表里,再把该监听注册到指定的customerId上。

G 配置spring.factories。

1
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.zwt.rocketmqspringboot.autoware.RocketMqAutoConfig

H 然后我们添加application.properties进行测试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
## 是否启用RocketMQ
spring.rocketmq.config.enabled=true

## Producer1配置
spring.rocketmq.config.mq-producer-configurations[0].group-name=rocketmq-producer
spring.rocketmq.config.mq-producer-configurations[0].namesrv-addr=localhost:9876
spring.rocketmq.config.mq-producer-configurations[0].producer-id=001

## Consumer1配置
spring.rocketmq.config.mq-consumer-configurations[0].namesrv-addr=localhost:9876
spring.rocketmq.config.mq-consumer-configurations[0].group-name=rocketmq-consumer
spring.rocketmq.config.mq-consumer-configurations[0].consumer-id=001
spring.rocketmq.config.mq-consumer-configurations[0].topic-and-tag-map.123=123
spring.rocketmq.config.mq-consumer-configurations[0].orderly=false

server.port=8001

## Producer2配置
spring.rocketmq.config.mq-producer-configurations[1].group-name=rocketmq-producer1
spring.rocketmq.config.mq-producer-configurations[1].namesrv-addr=localhost:9876
spring.rocketmq.config.mq-producer-configurations[1].producer-id=002

## Consumer2配置
spring.rocketmq.config.mq-consumer-configurations[1].namesrv-addr=localhost:9876
spring.rocketmq.config.mq-consumer-configurations[1].group-name=rocketmq-consumer1
spring.rocketmq.config.mq-consumer-configurations[1].consumer-id=002
spring.rocketmq.config.mq-consumer-configurations[1].topic-and-tag-map.1234=1234
spring.rocketmq.config.mq-consumer-configurations[1].orderly=false

编写一些测试类进行测试。

1
2
3
4
5
6
7
8
9
@Service
@RocketMQProcessor(consumerId = "001")
public class TestConsumer implements IProcessor {
private static final Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@Override
public void handleMessage(MessageExt msg) throws Exception {
System.out.println(new String(msg.getBody())+"TestConsumer");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Controller
public class TestProducer {
@Autowired
RocketMQFactory rocketMQFactory;

@RequestMapping("/test")
public String doSomething() throws Exception{
Message message=new Message();
message.setTopic("123");
message.setTags("123");
message.setBody(new String("Hello World").getBytes());
SendResult result=rocketMQFactory.getProducer("001").sendMessage(message);
System.out.println(result);

return result.toString();

}
}

可以看到正常运行。

upload successful

实战

我们按照上篇文章那样把它封装成jar包。

完成后进行测试。

新建一个test的SpringBoot项目。作为消费者。

引入我们的包。

upload successful

PS:这儿没有命名为xxxx-spring-boot-starter的形式。

创建一个消费者。

upload successful

upload successful

再新建一个test1的SpringBoot项目。作为生产者。

引入我们的包并进行配置。

upload successful

upload successful

启动test消费者,同时使用test1生产者发送一条消息。

可以看到。

生产者发送成功:

upload successful

消费者处理成功:

upload successful

总结

通过对RocketMQ的集成封装使用,更好地学会了如何使用RocketMQ,及对其的更多理解。

消息中间件在我们软件开发中具有重要作用,应当好好理解。

如果觉得properties配置太繁琐可以改用yml配置,会更简介好看些。

如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
spring:
rocketmq:
config:
enabled: true # 是否启用RocketMQ
mq-producer-configurations: # 生产者配置
- producerId: TestProducer1 # 生产者1的id
groupName: producer1 # 生产者1的组名称
namesrvAddr: 127.0.0.1:9876 # 生产者1的namesrvAddr
- producerId: TestProducer2 # 生产者2的id
groupName: producer2 # 生产者2的组名称
namesrvAddr: 127.0.0.1:9876 # 生产者2的namesrvAddr

mq-consumer-configurations: # 消费者配置
- consumerId: TestConsumer1 # 消费者1的id
groupName: consumer1 # 消费者1的组名称
namesrvAddr: 127.0.0.1:9876 # 消费者1的namesrvAddr
topicAndTagMap: { TestTopic1:TestTag1 } # 消费者1监听的topic和tag
orderly: false # 消费者1是否顺序消费消息
- consumerId: TestConsumer2 # 消费者2的id
groupName: consumer2 # 消费者2的组名称
namesrvAddr: 127.0.0.1:9876 # 消费者2的namesrvAddr
topicAndTagMap: { TestTopic2:TestTag2 } # 消费者2监听的topic和tag
orderly: true # 消费者2是否顺序消费消息

代码地址:rocketmq-spring-boot




-------------文章结束啦 ~\(≧▽≦)/~ 感谢您的阅读-------------

您的支持就是我创作的动力!

欢迎关注我的其它发布渠道