整合篇:零基础学习与使用WebFlux

配套资料,免费下载
链接:https://pan.baidu.com/s/1jA217UgqXpONi_fV-aOzqw
提取码:bm2g
复制这段内容后打开百度网盘手机App,操作更方便哦

1、WebFlux的概述

WebFlux

Spring框架中包含的原始Web框架Spring WebMVC是专门为Servlet API和Servlet容器而构建的。响应式Web框架Spring WebFlux是在Spring 5.0以后添加的新的模块。WebFlux是一种异步非阻塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关API实现的。WebFlux能够在有限资源下,提高系统吞吐量和伸缩性,并以 Reactor 为基础实现响应式编程,可在Netty,Undertow和Servlet 3.1+容器等服务器上运行。这两个Web框架都反映了其源模块的名称(spring-webmvc和 spring-webflux),并在Spring Framework中并存。每个模块都是可选的,应用程序可以使用一个模块,也可以使用两个模块。

异步非阻塞

要想解释清楚异步非阻塞,我们就得明白,异步和同步、阻塞和非阻塞之间的关系。

  • 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同步,如果发送请求之后不等着对方回应就去做其他事情就是异步
  • 阻塞和非阻塞针对被调用者,被调用者收到请求之后,做完请求任务之后才给出反馈就是阻塞,收到请求之后马上给出反馈然后再去做事情就是非阻塞

了解了异步非阻塞,我们来说一说Spring WebMVC 和 Spring WebFlux分别属于哪种?

【spring-webmvc + Servlet + Tomcat】命令式的、同步阻塞的

【spring-webflux + Reactor + Netty】响应式的、异步非阻塞的

响应式编程

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

Reactive 和 Reactor

Reactor 是基于Reactive Streams 规范的第四代响应库,用于在JVM上构建非阻塞的应用程序。Reactor是Spring WebFlux的首选响应库。它提供了 Mono和 Flux API类型,并通过丰富运算符集来处理0…1(Mono)和0…N(Flux)数据序列。因此,我们了解到 Reactive 是一种响应式编程的规范,而 Reactor 是此规范的一种具体实现,他是支撑 WebFlux 实现响应式编程的基础。

到底用Spring WebMVC还是Spring WebFlux?

一个自然的问题要问,但建立了不合理的二分法。实际上,两者共同努力扩大了可用选项的范围。两者的设计旨在实现彼此的连续性和一致性,它们可以并行使用,并且来自双方的反馈对双方都有利。下图显示了两者之间的关系,它们的共同点以及各自的独特支持:

Spring MVC和Webflux Venn

我们建议您考虑以下几点:

  • 如果您有运行正常的Spring MVC应用程序,则无需更改。命令式编程是编写,理解和调试代码的最简单方法。您有最大的库选择空间,因为从历史上看,大多数库都是阻塞的。

  • 如果您已经在购买无阻塞的Web堆栈,Spring WebFlux可以提供与该领域其他服务器相同的执行模型优势,还可以选择服务器(Netty,Tomcat,Jetty,Undertow和Servlet 3.1+容器),选择编程模型(带注释的控制器和功能性Web端点),以及选择反应式库(Reactor,RxJava或其他)。

  • 如果您对与Java 8 lambda或Kotlin一起使用的轻量级功能性Web框架感兴趣,则可以使用Spring WebFlux功能性Web端点。对于要求较低复杂性的较小应用程序或微服务(可以受益于更高的透明度和控制)而言,这也是一个不错的选择。

  • 在微服务架构中,您可以混合使用带有Spring MVC或Spring WebFlux控制器或带有Spring WebFlux功能端点的应用程序。在两个框架中都支持相同的基于注释的编程模型,这使得重用知识变得更加容易,同时还为正确的工作选择了正确的工具。

  • 评估应用程序的一种简单方法是检查其依赖关系。如果您要使用阻塞性持久性API(JPA,JDBC)或网络API,则Spring MVC至少是通用体系结构的最佳选择。使用Reactor和RxJava在单独的线程上执行阻塞调用在技术上是可行的,但您不会充分利用非阻塞Web堆栈。

  • 如果您的Spring MVC应用程序具有对远程服务的调用,请尝试使用active WebClient。您可以直接从Spring MVC控制器方法返回反应类型(Reactor,RxJava或其他)。每个呼叫的等待时间或呼叫之间的相互依赖性越大,好处就越明显。Spring MVC控制器也可以调用其他反应式组件。

  • 如果您有庞大的团队,请牢记向无阻塞,功能性和声明性编程的过渡过程中的学习曲线很陡。在没有完全切换的情况下启动的一种实用方法是使用WebClient。除此之外,从小处着手并衡量收益。我们希望,对于广泛的应用程序,这种转变是不必要的。如果不确定要寻找什么好处,请先了解无阻塞I / O的工作原理(例如,单线程Node.js上的并发性)及其影响。

2、WebFlux的基础

2.1、两个核心类

响应式编程操作中,Reactor 是满足 Reactive 规范的框架,Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素,Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号: 元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种信号特点:

  • 错误信号和完成信号都是终止信号,不能共存
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
  • 如果没有错误信号,没有完成信号,表示是无限数据流

一个入门案例:

项目名称: reactor-demo

引入依赖:

<!-- https://mvnrepository.com/artifact/io.projectreactor/reactor-core -->
<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.4.1</version>
</dependency>

声明数据流:

//第一种形式
Flux.just(1, 2, 3, 4);
Mono.just(1);
//第二种形式
Integer[] array = {1, 4};
Flux.fromArray(array);
//第三种形式
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
//第四种形式
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);

订阅数据流: 调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的

//第一种形式
Flux.just(1, 4).subscribe(System.out::println);
Mono.just(1).subscribe(System.out::println);

2.2、四种操作符

操作符就是对数据进行一道又一道的操作,就好比生产车间的流水线,常见的操作符有四种:map、flatMap、filter、zipWith

map:将元素映射为一个新元素

请添加图片描述

flatMap:把每个元素转换成流,把转换之后多个流合并成大的流

请添加图片描述

filter:可以对元素进行筛选

请添加图片描述

zip:可以对元素进行合并

无标题

3、WebFlux的注解式编程模型

项目名称: webflux-demo-01

image-20201228141205060

image-20201228141639409

image-20201228141730530

image-20201228141750063

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties

server.port=8080

com.caochenlei.webfluxdemo01.entity.User

public class User {
    private Integer id;
    private String name;
    private String gender;
    private Integer age;

    public User() {
    }

    public User(Integer id, String name, String gender, Integer age) {
        this.id = id;
        this.name = name;
        this.gender = gender;
        this.age = age;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

com.caochenlei.webfluxdemo01.dao.UserDao

public interface UserDao {
    //查找一个用户
    public User findOne(Integer id);

    //查找全部用户
    public Collection<User> findAll();

    //新增一个用户
    public void save(User user);

    //删除一个用户
    public void delete(User user);

    //更新一个用户
    public void update(User user);
}

com.caochenlei.webfluxdemo01.dao.impl.UserDaoImpl

@Repository
public class UserDaoImpl implements UserDao {
    //模拟数据库中的数据
    private final Map<Integer, User> users = new ConcurrentHashMap<>();

    //调用无参构造初始化
    public UserDaoImpl() {
        users.put(1, new User(1, "张三", "男", 18));
        users.put(2, new User(2, "李四", "女", 19));
        users.put(3, new User(3, "王五", 20));
    }

    @Override
    public User findOne(Integer id) {
        return users.get(id);
    }

    @Override
    public Collection<User> findAll() {
        return users.values();
    }

    @Override
    public void save(User user) {
        int id = users.size() + 1;
        user.setId(id);
        users.put(id, user);
    }

    @Override
    public void delete(User user) {
        users.remove(user.getId());
    }

    @Override
    public void update(User user) {
        int id = user.getId();
        users.remove(id);
        users.put(id, user);
    }
}

com.caochenlei.webfluxdemo01.service.UserService

public interface UserService {
    //查找一个用户
    public Mono<User> findOne(Integer id);

    //查找全部用户
    public Flux<User> findAll();

    //新增一个用户
    public Mono<Void> save(Mono<User> userMono);

    //删除一个用户
    public Mono<Void> delete(Mono<User> userMono);

    //更新一个用户
    public Mono<Void> update(Mono<User> userMono);
}

com.caochenlei.webfluxdemo01.service.UserServiceImpl

@Service
public class UserServiceImpl implements UserService {
    @Autowired
    private UserDao userDao;

    @Override
    public Mono<User> findOne(Integer id) {
        return Mono.justOrEmpty(userDao.findOne(id));
    }

    @Override
    public Flux<User> findAll() {
        return Flux.fromIterable(userDao.findAll());
    }

    @Override
    public Mono<Void> save(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //保存一个用户
            userDao.save(user);
        }).thenEmpty(Mono.empty());
    }

    @Override
    public Mono<Void> delete(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //删除一个用户
            userDao.delete(user);
        }).thenEmpty(Mono.empty());
    }

    @Override
    public Mono<Void> update(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //更新一个用户
            userDao.update(user);
        }).thenEmpty(Mono.empty());
    }
}

com.caochenlei.webfluxdemo01.controller.UserController

@RestController
@RequestMapping("/user")
public class UserController {
    @Autowired
    private UserService userService;

    //查找一个用户
    @GetMapping("/findOne/{id}")
    public Mono<User> findOne(@PathVariable Integer id) {
        return userService.findOne(id);
    }

    //查找全部用户
    @GetMapping("/findAll")
    public Flux<User> findAll() {
        return userService.findAll();
    }

    //新增一个用户
    @PostMapping("/save")
    public Mono<Void> save(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        return userService.save(userMono);
    }

    //删除一个用户
    @PostMapping("/delete")
    public Mono<Void> delete(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        return userService.delete(userMono);
    }

    //更新一个用户
    @PostMapping("/update")
    public Mono<Void> update(@RequestBody User user) {
        Mono<User> userMono = Mono.just(user);
        return userService.update(userMono);
    }
}

com.caochenlei.webfluxdemo01.WebfluxDemo01Application,启动主函数

@SpringBootApplication
public class WebfluxDemo01Application {

    public static void main(String[] args) {
        SpringApplication.run(WebfluxDemo01Application.class, args);
    }

}

查询一个:http://localhost:8080/user/findOne/1

image-20201228155334194

查询全部:http://localhost:8080/user/findAll

image-20201228155400709

新增一个:http://localhost:8080/user/save

image-20201228155443061

image-20201228155527213

删除一个:http://localhost:8080/user/delete

image-20201228155627264

image-20201228155647022

修改一个:http://localhost:8080/user/update

image-20201228155801745

image-20201228155821462

4、WebFlux的函数式编程模型

项目名称: webflux-demo-02

image-20201228141205060

image-20201228160948417

image-20201228141730530

image-20201228161020190

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

application.properties

server.port=8080

com.caochenlei.webfluxdemo01.entity.User

public class User {
    private Integer id;
    private String name;
    private String gender;
    private Integer age;

    public User() {
    }

    public User(Integer id, Integer age) {
        this.id = id;
        this.name = name;
        this.gender = gender;
        this.age = age;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }
}

com.caochenlei.webfluxdemo01.dao.UserDao

public interface UserDao {
    //查找一个用户
    public User findOne(Integer id);

    //查找全部用户
    public Collection<User> findAll();

    //新增一个用户
    public void save(User user);

    //删除一个用户
    public void delete(User user);

    //更新一个用户
    public void update(User user);
}

com.caochenlei.webfluxdemo01.dao.impl.UserDaoImpl

Repository
public class UserDaoImpl implements UserDao {
    //模拟数据库中的数据
    private final Map<Integer, user);
    }
}

com.caochenlei.webfluxdemo01.service.UserService

public interface UserService {
    //查找一个用户
    public Mono<User> findOne(Integer id);

    //查找全部用户
    public Flux<User> findAll();

    //新增一个用户
    public Mono<Void> save(Mono<User> userMono);

    //删除一个用户
    public Mono<Void> delete(Mono<User> userMono);

    //更新一个用户
    public Mono<Void> update(Mono<User> userMono);
}

com.caochenlei.webfluxdemo01.service.UserServiceImpl

@Service
public class UserServiceImpl implements UserService {
    @Autowired
    private UserDao userDao;

    @Override
    public Mono<User> findOne(Integer id) {
        return Mono.justOrEmpty(userDao.findOne(id));
    }

    @Override
    public Flux<User> findAll() {
        return Flux.fromIterable(userDao.findAll());
    }

    @Override
    public Mono<Void> save(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //保存一个用户
            userDao.save(user);
        }).thenEmpty(Mono.empty());
    }

    @Override
    public Mono<Void> delete(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //删除一个用户
            userDao.delete(user);
        }).thenEmpty(Mono.empty());
    }

    @Override
    public Mono<Void> update(Mono<User> userMono) {
        return userMono.doOnNext(user -> {
            //更新一个用户
            userDao.update(user);
        }).thenEmpty(Mono.empty());
    }
}

com.caochenlei.webfluxdemo01.handler.UserHandler

@Component
public class UserHandler {
    @Autowired
    private UserService userService;

    //查找一个用户
    public Mono<ServerResponse> findOne(ServerRequest request) {
        //获取id值
        int id = Integer.valueOf(request.pathVariable("id"));
        Mono<User> userMono = userService.findOne(id);
        return ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userMono, User.class);
    }

    //查找全部用户
    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<User> userFlux = userService.findAll();
        return ServerResponse
                .ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(userFlux, User.class);
    }

    //新增一个用户
    public Mono<ServerResponse> save(ServerRequest request) {
        //获取user值
        Mono<User> userMono = request.bodyToMono(User.class);
        return ServerResponse.ok().build(userService.save(userMono));
    }

    //删除一个用户
    public Mono<ServerResponse> delete(ServerRequest request) {
        //获取user值
        Mono<User> userMono = request.bodyToMono(User.class);
        return ServerResponse.ok().build(userService.delete(userMono));
    }

    //更新一个用户
    public Mono<ServerResponse> update(ServerRequest request) {
        //获取user值
        Mono<User> userMono = request.bodyToMono(User.class);
        return ServerResponse.ok().build(userService.update(userMono));
    }
}

com.caochenlei.webfluxdemo01.routers.RouterConfig

@Configuration
public class RouterConfig {
    @Bean
    public RouterFunction<ServerResponse> userRouter(UserHandler userHandler) {
        return RouterFunctions
                .route(GET("/user/findOne/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::findOne)
                .andRoute(GET("/user/findAll").and(accept(MediaType.APPLICATION_JSON)), userHandler::findAll)
                .andRoute(POST("/user/save").and(accept(MediaType.APPLICATION_JSON)), userHandler::save)
                .andRoute(POST("/user/delete").and(accept(MediaType.APPLICATION_JSON)), userHandler::delete)
                .andRoute(POST("/user/update").and(accept(MediaType.APPLICATION_JSON)), userHandler::update);
    }
}

com.caochenlei.webfluxdemo01.WebfluxDemo02Application,启动主函数

@SpringBootApplication
public class WebfluxDemo02Application {

    public static void main(String[] args) {
        SpringApplication.run(WebfluxDemo02Application.class, args);
    }

}

查询一个:http://localhost:8080/user/findOne/1

image-20201228155334194

查询全部:http://localhost:8080/user/findAll

image-20201228155400709

新增一个:http://localhost:8080/user/save

image-20201228155443061

image-20201228155527213

删除一个:http://localhost:8080/user/delete

image-20201228155627264

image-20201228155647022

修改一个:http://localhost:8080/user/update

image-20201228155801745

image-20201228155821462

5、WebFlux的WebClient编程

本章前提: 需要 webflux-demo-02 处于启动状态,我们需要手动编程实现访问路由

项目名称: webflux-demo-03

image-20201228183305114

image-20201228183401676

image-20201228183514341

image-20201228183527201

image-20201228183546313

pom.xml

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

com.caochenlei.webfluxdemo03.entity.User

public class User {
    private Integer id;
    private String name;
    private String gender;
    private Integer age;

    public User() {
    }

    public User(Integer id, Integer age) {
        this.id = id;
        this.name = name;
        this.gender = gender;
        this.age = age;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getGender() {
        return gender;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ",name='" + name + '\'' +
                ",gender='" + gender + '\'' +
                ",age=" + age +
                '}';
    }
}

com.caochenlei.webfluxdemo03.WebClientDemo

public class WebClientDemo {
    public static void main(String[] args) {
        //创建Web客户端
        WebClient webClient = WebClient.create("http://localhost:8080");
        
        //查询指定用户
        User user = webClient
                .get()
                .uri("/user/findOne/{id}", 2)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(User.class)
                .block();
        System.out.println(user);
        
        //查询所有用户
        Flux<User> userFlux = webClient
                .get()
                .uri("/user/findAll")
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToFlux(User.class);
        userFlux.buffer().doOnNext(System.out::println).blockFirst();
        
        //添加一个用户
        User user1 = new User(null, "赵六", 16);
        String returnValue1 = webClient
                .post()
                .uri("/user/save")
                .body(Mono.just(user1), User.class)
                .retrieve()
                .bodyToMono(String.class)
                .block();
        System.out.println(returnValue1);
        
        //删除一个用户
        User user2 = new User();
        user2.setId(4);
        String returnValue2 = webClient
                .post()
                .uri("/user/delete")
                .body(Mono.just(user2), User.class)
                .retrieve()
                .bodyToMono(String.class)
                .block();
        System.out.println(returnValue2);
        
        //修改一个用户
        User user3 = user;
        user3.setName("李思思");
        String returnValue3 = webClient
                .post()
                .uri("/user/update")
                .body(Mono.just(user3), User.class)
                .retrieve()
                .bodyToMono(String.class)
                .block();
        System.out.println(returnValue3);
    }
}

相关文章

Nacos 中的参数有很多,如:命名空间、分组名、服务名、保护...
Nacos 支持两种 HTTP 服务请求,一个是 REST Template,另一...
Nacos 是 Spring Cloud Alibaba 中一个重要的组成部分,它提...
Spring Cloud Alibaba 是阿里巴巴提供的一站式微服务开发解决...
在 Nacos 的路由策略中有 3 个比较重要的内容:权重、保护阈...
前两天遇到了一个问题,Nacos 中的永久服务删除不了,折腾了...