Spring WebFlux 和Reactive MongoDB来构建Reactive Rest API

位置:首页>文章>详情   分类: 教程分享 > Java教程   阅读(3525)   2023-03-28 11:29:14

1.引言

Spring 5通过引入一种名为Spring WebFlux的全新反应框架来支持响应式编程范例。

Spring WebFlux是一个自下而上的异步框架。它可以使用Servlet 3.1非阻塞IO API以及其他异步运行时环境(如netty或undertow)在Servlet容器上运行。

它可以与Spring MVC一起使用。是的,Spring MVC不会去任何地方。这是一个开发人员长期以来使用的流行的Web框架。

但是你现在可以在新的反应框架和传统的Spring MVC之间做出选择。您可以根据自己的使用情况选择使用它们中的任何一个。

 

Spring WebFlux使用一个名为Reactor的库作为响应支持。Reactor是Reactive Streams规范的一个实现。

Reactor提供两种主要的类型,称为FluxMono。这两种类型都实现了PublisherReactive Streams提供的接口。Flux用于表示0..N个元素的流,Mono用于表示0..1个元素的流。

虽然Spring使用Reactor作为其大部分内部API的核心依赖,但它也支持在应用程序级别使用RxJava。

 

2.Spring WebFlux支持的编程模型

Spring WebFlux支持两种类型的编程模型:

  1. 带有@Controller@RequestMapping和其他注释的基于注释的传统模型,您在Spring MVC中一直使用。
  2. 基于Java 8 lambda表达式的全新功能样式模型,用于路由和处理请求。

在本文中,我们将使用传统的基于注释的编程模型。我将在未来的文章中撰写功能风格模型。


3.让我们在Spring Boot中构建一个Reactive Restful服务

在本文中,我们将为迷你Twitter应用程序构建一个Restful API。该应用程序将只有一个称为的域模型Tweet。每个Tweet人都有text一个createdAt领域。

我们将使用MongoDB作为我们的数据存储以及反应型mongodb驱动程序。我们将构建用于创建,检索,更新和删除Tweet的REST API。所有的REST API都是异步的,并且会返回一个发布者。

我们还将学习如何将数据从数据库传输到客户端。

最后,我们将编写集成测试以使用Spring 5提供的新异步WebTestClient测试所有API。

4.创建项目

我们使用Spring Initializr Web应用程序来生成我们的应用程序。按照以下步骤生成项目 -

  1. 转到http://start.spring.io
  2. 选择Spring Boot版本2.x
  3. 输入工件的值作为webflux-demo
  4. 添加Reactive WebReactive MongoDB依赖项
  5. 点击生成项目生成并下载项目。

spring官网生成项目
下载项目后,将其解压缩并导入到您最喜欢的IDE中。该项目的目录结构应该如下所示 -
导入spring 官网生成的maven项目

配置MongoDB

您可以通过简单地将以下属性添加到application.properties文件来配置MongoDB -

spring.data.mongodb.uri=mongodb://localhost:27017/webflux_demo

Spring Boot将在启动时读取此配置并自动配置数据源。

创建领域模型

让我们创建我们的领域模型 - Tweet。创建一个名为modelinside com.example.webfluxdemopackage 的新包,然后创建一个名为Tweet.java以下内容的文件-

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;

@Document(collection = "tweets")
public class Tweet {
    @Id
    private String id;

    @NotBlank
    @Size(max = 140)
    private String text;

    @NotNull
    private Date createdAt = new Date();

    public Tweet() {

    }

    public Tweet(String text) {
        this.id = id;
        this.text = text;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public Date getCreatedAt() {
        return createdAt;
    }

    public void setCreatedAt(Date createdAt) {
        this.createdAt = createdAt;
    }
}

够简单!Tweet模型包含一个text和一个createdAt字段。该text字段用注释@NotBlank@Size注释确保它不是空白并且最多有140个字符。

 

5.创建存储库

接下来,我们将创建将用于访问MongoDB数据库的数据访问层。创建一个名为repositoryinside 的新包com.example.webfluxdemo,然后TweetRepository.java使用以下内容创建一个新文件-

import com.example.webfluxdemo.model.Tweet;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {

}

TweetRepository接口扩展ReactiveMongoRepository了文档中的各种CRUD方法。

Spring Boot自动插入在SimpleReactiveMongoRepository运行时调用的此接口的实现。

因此,您无需编写任何代码就可以轻松获取文档上的所有CRUD方法。以下是一些可用的方法SimpleReactiveMongoRepository-

reactor.core.publisher.Flux<T>  findAll(); 

reactor.core.publisher.Mono<T>  findById(ID id); 

<S extends T> reactor.core.publisher.Mono<S>  save(S entity); 

reactor.core.publisher.Mono<Void>   delete(T entity);

请注意,所有方法都是异步的,并以a FluxMono类型的形式返回发布者。

创建控制器端点

最后,让我们编写将暴露给客户端的API。创建一个名为controllerinside 的新包com.example.webfluxdemo,然后TweetController.java使用以下内容创建一个新文件-
 

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;

@RestController
public class TweetController {

    @Autowired
    private TweetRepository tweetRepository;

    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
        return tweetRepository.findAll();
    }

    @PostMapping("/tweets")
    public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet) {
        return tweetRepository.save(tweet);
    }

    @GetMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
        return tweetRepository.findById(tweetId)
                .map(savedTweet -> ResponseEntity.ok(savedTweet))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PutMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
                                                   @Valid @RequestBody Tweet tweet) {
        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet -> {
                    existingTweet.setText(tweet.getText());
                    return tweetRepository.save(existingTweet);
                })
                .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @DeleteMapping("/tweets/{id}")
    public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {

        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet ->
                        tweetRepository.delete(existingTweet)
                            .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
                )
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    // Tweets are Sent to the client as Server Sent Events
    @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Tweet> streamAllTweets() {
        return tweetRepository.findAll();
    }
}

所有的控制器端点都以Flux或Mono的形式返回一个Publisher。我们将内容类型设置为的最后一个端点非常有趣text/event-stream。它以服务器发送事件的形式将推文发送到像这样的浏览器 -

data: {"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
data: {"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

现在我们正在讨论事件流,您可能会问以下端点是否也返回一个Stream?

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    return tweetRepository.findAll();
}

答案是肯定的。Flux<Tweet>代表推文流。但是,默认情况下,它将生成一个JSON数组,因为如果将单个JSON对象流发送给浏览器,那么它将不会是一个有效的JSON文档。除了使用Server-Sent-Events或WebSocket之外,浏览器客户端无法使用流。

但是,非浏览器客户端可以通过设置Accept标头来请求JSON流application/stream+json,并且响应将是类似于Server-Sent-Events的JSON流,但不需要额外的格式:

{"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
{"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

使用WebTestClient进行集成测试

Spring 5还提供了一个异步和被动的http客户端,WebClient用于处理异步和流式API。这是一个被动的选择RestTemplate

此外,你还可以得到一个WebTestClient写作集成测试。测试客户端可以运行在实时服务器上,也可以用于模拟请求和响应。

我们将使用WebTestClient为我们的REST API编写集成测试。打开WebfluxDemoApplicationTests.java文件并将以下测试添加到它 -

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

import java.util.Collections;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebfluxDemoApplicationTests {

	@Autowired
	private WebTestClient webTestClient;

	@Autowired
    TweetRepository tweetRepository;

	@Test
	public void testCreateTweet() {
		Tweet tweet = new Tweet("This is a Test Tweet");

		webTestClient.post().uri("/tweets")
				.contentType(MediaType.APPLICATION_JSON_UTF8)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .body(Mono.just(tweet), Tweet.class)
				.exchange()
				.expectStatus().isOk()
				.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
				.expectBody()
                .jsonPath("$.id").isNotEmpty()
                .jsonPath("$.text").isEqualTo("This is a Test Tweet");
	}

	@Test
    public void testGetAllTweets() {
	    webTestClient.get().uri("/tweets")
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBodyList(Tweet.class);
    }

    @Test
    public void testGetSingleTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();

        webTestClient.get()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .exchange()
                .expectStatus().isOk()
                .expectBody()
                .consumeWith(response ->
                        Assertions.assertThat(response.getResponseBody()).isNotNull());
    }

    @Test
    public void testUpdateTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();

        Tweet newTweetData = new Tweet("Updated Tweet");

        webTestClient.put()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .body(Mono.just(newTweetData), Tweet.class)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBody()
                .jsonPath("$.text").isEqualTo("Updated Tweet");
    }

    @Test
    public void testDeleteTweet() {
	    Tweet tweet = tweetRepository.save(new Tweet("To be deleted")).block();

	    webTestClient.delete()
                .uri("/tweets/{id}", Collections.singletonMap("id",  tweet.getId()))
                .exchange()
                .expectStatus().isOk();
    }
}

在上面的例子中,我为所有的CRUD API编写了测试。您可以通过转到项目的根目录并键入来运行测试mvn test
 

6.总结

在本文中,我们学习了使用Spring进行反应式编程的基础知识,并使用Spring WebFlux框架提供的反应式支持构建了一个简单的Restful服务。我们还使用WebTestClient测试了所有Rest API。

提示:项目源码下载 demo-springboot2.0-webflux-mongodb.zip

地址:https://www.leftso.com/article/405.html

相关阅读

spring boot webflux client实战,webclient是spring webflux的一个小组件。对于Java的http通讯来说,webclient是非常简单易用的。
Spring WebFlux,spring框架5.0将会新增的web增强框架,这里主要讲述什么是Spring WebFlux以及Spring WebFlux的新功能,Spring WebFlux...
Spring WebFlux入门程序hello word。本文主要在于讲解如何创建和运行spring webflux入门程序hello word。其实不难发现和spring mvc相比代码层基本...
1.引言Spring 5通过引入一种名为Spring WebFlux的全新反应框架来支持响应式编程范例
引言Spring Boot 2.0最近去了GA,所以我决定写我关于Spring的第一篇文章很长一段时间
Spring WebFlux 项目实战 在Spring WebFlux中创建多个RouterFunctions,在这篇文章中,我们将着眼于在Spring WebFlux中将多个路由器功能定义到不...
Spring Boot 2.0 有哪些新特性_Spring Boot 2.0新功能,在本文中,我们将探讨为Spring Boot 2.0计划的一些更改和功能。我们还会描述这些变化如何帮助我们提高...
引言    通过之前spring boot mybatis 整合的讲解: spring boot mybaties整合  (spring boot mybaties 整合 基于Java注解方式写...
Spring Boot 2.0,Spring框架的Spring Boot 中的Spring Boot Actuator变化讲解。并且了解如何在Spring Boot 2.0中使用Actuator...
spring boot 1.5整合redis实现spring的缓存框架,spring boot,redis