前言
前面已经将RocketMQ的相关知识与应用进行了记录,这一章就来整合SpringBoot,这也是一个重头戏,因为在我们的实际运用中不可能还去向前面那样写代码,我们会用到SpringBoot框架。这一章就详细记录如何整合的吧。
步骤
0、前提
因是整合进SpringBoot,所以,RocketMQ的安装以及Dashboard的安装使用就不再本文赘述,感兴趣的可以看看本人前面写的文章哈。
还有,本文是将生产和消费放在两个module项目中,也是可以在一个项目里测的哈。
1、创建父工程项目
我们使用IDEA创建一个Maven项目,项目名为 rocketmq-test,POM类型。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhangdl</groupId>
<artifactId>rocketmqtest</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>rocketmq-producer</module>
</modules>
<packaging>pom</packaging>
<!-- 版本管理 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<springboot.version>2.3.2.RELEASE</springboot.version>
<rocketmq.version>2.2.0</rocketmq.version>
</properties>
<!-- 依赖管理 -->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${springboot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 插件 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2、创建Producer项目
2.1 修改POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmqtest</artifactId>
<groupId>com.zhangdl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-producer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>
2.2 建application.yml文件
rocketmq必须要有name-server和group两个属性。
server:
port: 8881
rocketmq:
name-server: 192.168.61.101:9876
producer:
group: test-boot-producer
2.3 写主启动类
public class RocketMQProducerApplication {
public static void main(String[] args) {
ApplicationContext run = SpringApplication.run(RocketMQProducerApplication.class);
}
}
2.4 业务类
需要:网页访问localhost:8881/send 进行发送十条消息到RocketMQ。
实际应用中不写原生代码,我们需要借助一个类RocketMQTemplate,通过它能够很方便的进行消息发送。
@RestController
public class ProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@GetMapping("/send")
public String send(){
for (int i = 0; i < 10; i++) {
rocketMQTemplate.convertAndSend("Boottopic1", "hello~"+i);
}
return "SEND_OK";
}
}
3、创建Consumer项目
3.1 修改POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rocketmqtest</artifactId>
<groupId>com.zhangdl</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rocketmq-consumer</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
</dependencies>
</project>
3.2 application.yml文件
server:
port: 8886
rocketmq:
name-server: rocketmq:9876
consumer:
group: test-boot-consumer
3.3 主启动类
@SpringBootApplication
public class RocketMQConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQConsumerApplication.class, args);
}
}
3.4 业务类
@Component
@RocketMQMessageListener(topic = "Boottopic1",consumerGroup = "${rocketmq.consumer.group}")
public class ConsumerService implements RocketMQListener<String> {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
4、测试
浏览器测试地址:http://localhost:8881/send
页面显示:
消费者端:打印出接收到的消息,可以看出不是顺序的,这很正常,因为这本就是普通消息不是顺序消息。
5、扩展
普通消息的三种发送方式:同步发送、异步发送、单向发送。
5.1 同步消息发送
/**
* 同步消息
*/
@GetMapping("/syncSend")
public SendResult syncSend() {
String str = "同步消息";
SendResult sendMessage = rocketMQTemplate.syncSend("Boottopic1", str);
System.out.println("send over");
return sendMessage;
}
测试地址:http://localhost:8881/syncSend
可以看出,有很多返回结果信息。
5.2 异步消息发送
/**
* 异步消息
*/
@GetMapping("/asyncSend")
public void asyncSend() {
String str = "异步消息";
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("success~");
}
@Override
public void onException(Throwable throwable) {
System.out.println("throw exception~");
}
};
测试地址:http://localhost:8881/asyncSend
生产端成功执行回调方法。
消费端接收到消息
5.3 单向消息发送
/**
* 单向消息
*/
@GetMapping("/onewaySend")
public void onewaySend() {
String str = "单向消息";
rocketMQTemplate.sendOneWay("Boottopic1", str);
System.out.println("send over");
}
消费端接收到消息
结束语
SpringBoot 整合 RocketMQ 的基础知识就是上述内容,当然,在实际开发时用到的东西是不止这些的,后续,我也会持续更新。