logo-cover-Spring WebFlux 和Reactive MongoDB来构建Reactive Rest API

1.引言

Spring 5通过引入一种名为Spring WebFlux的全新反应框架来支持响应式编程范例。

Spring WebFlux是一个自下而上的异步框架。它可以使用Servlet 3.1非阻塞IO API以及其他异步运行时环境(如netty或undertow)在Servlet容器上运行。

它可以与Spring MVC一起使用。是的,Spring MVC不会去任何地方。这是一个开发人员长期以来使用的流行的Web框架。

但是你现在可以在新的反应框架和传统的Spring MVC之间做出选择。您可以根据自己的使用情况选择使用它们中的任何一个。

 

Spring WebFlux使用一个名为Reactor的库作为响应支持。Reactor是Reactive Streams规范的一个实现。

Reactor提供两种主要的类型,称为FluxMono。这两种类型都实现了PublisherReactive Streams提供的接口。Flux用于表示0..N个元素的流,Mono用于表示0..1个元素的流。

虽然Spring使用Reactor作为其大部分内部API的核心依赖,但它也支持在应用程序级别使用RxJava。

 

2.Spring WebFlux支持的编程模型

Spring WebFlux支持两种类型的编程模型:

  1. 带有@Controller@RequestMapping和其他注释的基于注释的传统模型,您在Spring MVC中一直使用。
  2. 基于Java 8 lambda表达式的全新功能样式模型,用于路由和处理请求。

在本文中,我们将使用传统的基于注释的编程模型。我将在未来的文章中撰写功能风格模型。


3.让我们在Spring Boot中构建一个Reactive Restful服务

在本文中,我们将为迷你Twitter应用程序构建一个Restful API。该应用程序将只有一个称为的域模型Tweet。每个Tweet人都有text一个createdAt领域。

我们将使用MongoDB作为我们的数据存储以及反应型mongodb驱动程序。我们将构建用于创建,检索,更新和删除Tweet的REST API。所有的REST API都是异步的,并且会返回一个发布者。

我们还将学习如何将数据从数据库传输到客户端。

最后,我们将编写集成测试以使用Spring 5提供的新异步WebTestClient测试所有API。

4.创建项目

我们使用Spring Initializr Web应用程序来生成我们的应用程序。按照以下步骤生成项目 -

  1. 转到http://start.spring.io
  2. 选择Spring Boot版本2.x
  3. 输入工件的值作为webflux-demo
  4. 添加Reactive WebReactive MongoDB依赖项
  5. 点击生成项目生成并下载项目。

spring官网生成项目
下载项目后,将其解压缩并导入到您最喜欢的IDE中。该项目的目录结构应该如下所示 -
导入spring 官网生成的maven项目

配置MongoDB

您可以通过简单地将以下属性添加到application.properties文件来配置MongoDB -

spring.data.mongodb.uri=mongodb://localhost:27017/webflux_demo

Spring Boot将在启动时读取此配置并自动配置数据源。

创建领域模型

让我们创建我们的领域模型 - Tweet。创建一个名为modelinside com.example.webfluxdemopackage 的新包,然后创建一个名为Tweet.java以下内容的文件-

import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Size;
import java.util.Date;

@Document(collection = "tweets")
public class Tweet {
    @Id
    private String id;

    @NotBlank
    @Size(max = 140)
    private String text;

    @NotNull
    private Date createdAt = new Date();

    public Tweet() {

    }

    public Tweet(String text) {
        this.id = id;
        this.text = text;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getText() {
        return text;
    }

    public void setText(String text) {
        this.text = text;
    }

    public Date getCreatedAt() {
        return createdAt;
    }

    public void setCreatedAt(Date createdAt) {
        this.createdAt = createdAt;
    }
}

够简单!Tweet模型包含一个text和一个createdAt字段。该text字段用注释@NotBlank@Size注释确保它不是空白并且最多有140个字符。

 

5.创建存储库

接下来,我们将创建将用于访问MongoDB数据库的数据访问层。创建一个名为repositoryinside 的新包com.example.webfluxdemo,然后TweetRepository.java使用以下内容创建一个新文件-

import com.example.webfluxdemo.model.Tweet;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface TweetRepository extends ReactiveMongoRepository<Tweet, String> {

}

TweetRepository接口扩展ReactiveMongoRepository了文档中的各种CRUD方法。

Spring Boot自动插入在SimpleReactiveMongoRepository运行时调用的此接口的实现。

因此,您无需编写任何代码就可以轻松获取文档上的所有CRUD方法。以下是一些可用的方法SimpleReactiveMongoRepository-

reactor.core.publisher.Flux<T>  findAll(); 

reactor.core.publisher.Mono<T>  findById(ID id); 

<S extends T> reactor.core.publisher.Mono<S>  save(S entity); 

reactor.core.publisher.Mono<Void>   delete(T entity);

请注意,所有方法都是异步的,并以a FluxMono类型的形式返回发布者。

创建控制器端点

最后,让我们编写将暴露给客户端的API。创建一个名为controllerinside 的新包com.example.webfluxdemo,然后TweetController.java使用以下内容创建一个新文件-
 

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import javax.validation.Valid;

@RestController
public class TweetController {

    @Autowired
    private TweetRepository tweetRepository;

    @GetMapping("/tweets")
    public Flux<Tweet> getAllTweets() {
        return tweetRepository.findAll();
    }

    @PostMapping("/tweets")
    public Mono<Tweet> createTweets(@Valid @RequestBody Tweet tweet) {
        return tweetRepository.save(tweet);
    }

    @GetMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> getTweetById(@PathVariable(value = "id") String tweetId) {
        return tweetRepository.findById(tweetId)
                .map(savedTweet -> ResponseEntity.ok(savedTweet))
                .defaultIfEmpty(ResponseEntity.notFound().build());
    }

    @PutMapping("/tweets/{id}")
    public Mono<ResponseEntity<Tweet>> updateTweet(@PathVariable(value = "id") String tweetId,
                                                   @Valid @RequestBody Tweet tweet) {
        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet -> {
                    existingTweet.setText(tweet.getText());
                    return tweetRepository.save(existingTweet);
                })
                .map(updatedTweet -> new ResponseEntity<>(updatedTweet, HttpStatus.OK))
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    @DeleteMapping("/tweets/{id}")
    public Mono<ResponseEntity<Void>> deleteTweet(@PathVariable(value = "id") String tweetId) {

        return tweetRepository.findById(tweetId)
                .flatMap(existingTweet ->
                        tweetRepository.delete(existingTweet)
                            .then(Mono.just(new ResponseEntity<Void>(HttpStatus.OK)))
                )
                .defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    }

    // Tweets are Sent to the client as Server Sent Events
    @GetMapping(value = "/stream/tweets", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<Tweet> streamAllTweets() {
        return tweetRepository.findAll();
    }
}

所有的控制器端点都以Flux或Mono的形式返回一个Publisher。我们将内容类型设置为的最后一个端点非常有趣text/event-stream。它以服务器发送事件的形式将推文发送到像这样的浏览器 -

data: {"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
data: {"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

现在我们正在讨论事件流,您可能会问以下端点是否也返回一个Stream?

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
    return tweetRepository.findAll();
}

答案是肯定的。Flux<Tweet>代表推文流。但是,默认情况下,它将生成一个JSON数组,因为如果将单个JSON对象流发送给浏览器,那么它将不会是一个有效的JSON文档。除了使用Server-Sent-Events或WebSocket之外,浏览器客户端无法使用流。

但是,非浏览器客户端可以通过设置Accept标头来请求JSON流application/stream+json,并且响应将是类似于Server-Sent-Events的JSON流,但不需要额外的格式:

{"id":"59ba5389d2b2a85ed4ebdafa","text":"tweet1","createdAt":1505383305602}
{"id":"59ba5587d2b2a85f93b8ece7","text":"tweet2","createdAt":1505383814847}

使用WebTestClient进行集成测试

Spring 5还提供了一个异步和被动的http客户端,WebClient用于处理异步和流式API。这是一个被动的选择RestTemplate

此外,你还可以得到一个WebTestClient写作集成测试。测试客户端可以运行在实时服务器上,也可以用于模拟请求和响应。

我们将使用WebTestClient为我们的REST API编写集成测试。打开WebfluxDemoApplicationTests.java文件并将以下测试添加到它 -

import com.example.webfluxdemo.model.Tweet;
import com.example.webfluxdemo.repository.TweetRepository;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
import reactor.core.publisher.Mono;

import java.util.Collections;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WebfluxDemoApplicationTests {

	@Autowired
	private WebTestClient webTestClient;

	@Autowired
    TweetRepository tweetRepository;

	@Test
	public void testCreateTweet() {
		Tweet tweet = new Tweet("This is a Test Tweet");

		webTestClient.post().uri("/tweets")
				.contentType(MediaType.APPLICATION_JSON_UTF8)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .body(Mono.just(tweet), Tweet.class)
				.exchange()
				.expectStatus().isOk()
				.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
				.expectBody()
                .jsonPath("$.id").isNotEmpty()
                .jsonPath("$.text").isEqualTo("This is a Test Tweet");
	}

	@Test
    public void testGetAllTweets() {
	    webTestClient.get().uri("/tweets")
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBodyList(Tweet.class);
    }

    @Test
    public void testGetSingleTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Hello, World!")).block();

        webTestClient.get()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .exchange()
                .expectStatus().isOk()
                .expectBody()
                .consumeWith(response ->
                        Assertions.assertThat(response.getResponseBody()).isNotNull());
    }

    @Test
    public void testUpdateTweet() {
        Tweet tweet = tweetRepository.save(new Tweet("Initial Tweet")).block();

        Tweet newTweetData = new Tweet("Updated Tweet");

        webTestClient.put()
                .uri("/tweets/{id}", Collections.singletonMap("id", tweet.getId()))
                .contentType(MediaType.APPLICATION_JSON_UTF8)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .body(Mono.just(newTweetData), Tweet.class)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBody()
                .jsonPath("$.text").isEqualTo("Updated Tweet");
    }

    @Test
    public void testDeleteTweet() {
	    Tweet tweet = tweetRepository.save(new Tweet("To be deleted")).block();

	    webTestClient.delete()
                .uri("/tweets/{id}", Collections.singletonMap("id",  tweet.getId()))
                .exchange()
                .expectStatus().isOk();
    }
}

在上面的例子中,我为所有的CRUD API编写了测试。您可以通过转到项目的根目录并键入来运行测试mvn test
 

6.总结

在本文中,我们学习了使用Spring进行反应式编程的基础知识,并使用Spring WebFlux框架提供的反应式支持构建了一个简单的Restful服务。我们还使用WebTestClient测试了所有Rest API。

提示:项目源码下载