跳到主要内容

WebFlux的探索与实践 - r2dbc的分页查询

· 阅读需 23 分钟
法欧特斯卡雷特
可爱小猫咪一枚呀

自从上次立下这系列的FLAG之后就再也不想碰了。今天难得早起出门面试,回家之后突发奇想打算再写点儿什么敷衍一下,于是便有了这篇文章。

前言

虽然响应式API更加适合流式列表的查询,但是分页这东西可是很常见的。

也没什么前言可说,反正就是一篇介绍如何在 Spring WebFlux 中使用 Spring Data R2DBC 进行分页查询的文章。如果喜欢,还望点个赞喵~

文章会从创建项目开始,你要是没啥兴趣,就往下划划。

准备

总而言之,先创建个项目,并且要加上 WebFluxR2DBC 和一个支持 R2DBC 的数据库驱动。 至于驱动的选择,你可以去 R2DBC官方网站的这里 看看。

你可以去 start.spring.io 去整个项目下来,我选择使用 gradle 构建项目, 这里是我的项目配置:

gradle.build.kts

plugins {
java
id("org.springframework.boot") version "3.0.3"
id("io.spring.dependency-management") version "1.1.0"
}

group = "com.example"
version = "0.0.1-SNAPSHOT"
java.sourceCompatibility = JavaVersion.VERSION_17

configurations {
compileOnly {
extendsFrom(configurations.annotationProcessor.get())
}
}

repositories {
mavenCentral()
}

dependencies {
compileOnly("org.projectlombok:lombok")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.springframework.boot:spring-boot-starter-webflux")
runtimeOnly("com.h2database:h2")
runtimeOnly("io.r2dbc:r2dbc-h2")
annotationProcessor("org.projectlombok:lombok")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.projectreactor:reactor-test")
testRuntimeOnly("com.h2database:h2")
testRuntimeOnly("io.r2dbc:r2dbc-h2")
}

tasks.withType<Test> {
useJUnitPlatform()
}

这里我选择使用 H2 数据库作为演示用的数据库,其他内容一律默认。

实体类与Repository

按照惯例,首先建个表用作示例:

schema.sql

DROP TABLE IF EXISTS foo;

CREATE TABLE IF NOT EXISTS foo(
id int auto_increment not null primary key comment 'id',
name varchar(20) not null default '' comment '名称',
size int not null default 0 comment '大小'
)

data.sql

INSERT INTO foo(name, size) VALUES ('name1', 1);
INSERT INTO foo(name, size) VALUES ('name2', 2);
INSERT INTO foo(name, size) VALUES ('name3', 3);
INSERT INTO foo(name, size) VALUES ('name4', 4);
INSERT INTO foo(name, size) VALUES ('name5', 114);
INSERT INTO foo(name, size) VALUES ('name6', 514);
INSERT INTO foo(name, size) VALUES ('name7', 19);
INSERT INTO foo(name, size) VALUES ('name8', 10);
INSERT INTO foo(name, size) VALUES ('name9', 11);
INSERT INTO foo(name, size) VALUES ('name10', 12);
INSERT INTO foo(name, size) VALUES ('name11', 13);
INSERT INTO foo(name, size) VALUES ('name12', 14);
INSERT INTO foo(name, size) VALUES ('name13', 15);
INSERT INTO foo(name, size) VALUES ('name14', 16);
INSERT INTO foo(name, size) VALUES ('name15', 17);

foo 是什么意思呢?我也不清楚,但是反正我们这次要去分页查询这个 foo 的表。

接下来,整个对应的实体类吧:

/**
* 数据库 foo 对应实体类
*
* @param id 主键
* @param name 名称
* @param size 大小
*/
public record Foo(@Id Integer id, String name, Integer size) {
}

然后给这个实体类提供一个对应的 Repository 实现。或者更准确的说,是 ReactiveRepository 的实现:

/**
* {@link Foo} 的 Repository 实现
* @author ForteScarlet
*/
@Repository
public interface FooRepository extends R2dbcRepository<Foo, Integer> {
}

顺带一提,R2dbcRepository<T, ID> 实现了下述三个基础接口:

  • ReactiveCrudRepository<T, ID>
  • ReactiveSortingRepository<T, ID>
  • ReactiveQueryByExampleExecutor<T>

那么这样就完成了吗?并没有。通常情况下,一个最简化的、整体性的分页数据应该包括 数据总量分页数据列表 这两个信息,那么让我们首先来提供一个 Paged 类型:

/**
* 分页数据体
*
* @param total 数据总量
* @param data 数据列表
*/
public record Paged<T>(long total, List<T> data) {
}

接下来,因为我们之前的 FooRepository 中已经包含了查询数据总量的 count,所以接下来我们只需要一个查询分页列表数据的方法就好了。十分幸运,R2DBC Repositories 的 Query Methods 支持我们直接这么写:

@Repository
public interface FooRepository extends R2dbcRepository<Foo, Integer> {

/**
* 分页查询 foo
* @param pageable 分页信息
* @return paged foo flux
*/
Flux<Foo> findAllBy(Pageable pageable);

}

直接在接口中增加一个如上所示的 findAllBy 并提供一个分页参数即可。当然,因为我们在用 r2dbc,所以返回值应该是响应式的 Flux 类型。

这里的 Pageable 是Spring所提供的类型,所以可以直接拿来用。

接下来让我们来试试效果。先查询总数,再查询列表,然后将他们合并为一个 Paged:

@SpringBootTest
class WebfluxR2dbcPageableDemoApplicationTests {

@Test
void pagedTest(@Autowired FooRepository repository) {
// 第一页的三条数据
var paged = PageRequest.of(0, 2);
repository.count().flatMap(total -> repository
.findAllBy(paged)
.collectList()
.map(list -> new Paged<>(total, list)))
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();
}
}

输出:

Paged[total=15, data=[Foo[id=1, name=name1, size=1], Foo[id=2, name=name2, size=2]]]

在这个单元测试中,我们首先准备了一个代表 第一页的三条数据 的分页信息。其中,PageRequest 是Spring提供的 Pageable 的一个基本的实现类,所以直接借来用了。

我们首先通过 repository.count 查询数据库数据总数 Mono<Integer>, 再通过 flatMap 进行下一步,也就是查询列表。

查询列表使用了我们之前的 findAllBy(Pageable),然后使用 collectList 将其收集为一个 Mono<List<Foo>>

之后便是将总数和列表合并为了 Paged,然后交给下游。

还是蛮简单的,不是吗?

简单条件查询

但是仅此而已吗?有些时候我们希望分页查询的结果是存在条件的,比如我们想要根据 name包含查询来查询结果。 那么接下来让我们来对 FooRepository 稍作调整,添加几个新函数:

/**
* 分页查询包含 name 的 foo
* @param name Foo的name,包含查询
* @param pageable 分页信息
* @return paged foo flux
*/
Flux<Foo> findAllByNameContains(String name, Pageable pageable);

/**
* 查询包含 name 的 foo 总数
* @param name Foo的name,包含查询
* @return count
*/
Mono<Long> countByNameContains(String name);

可以看到, 新的两个函数与之前的不同的是,它们都是以 ByNameContains 结尾,并且都多了一个 String name 参数。

这里的 ByNameContainsSpring Repositories Query Methods 的关键字(keyword)之一,Spring会根据你的关键字自行处理SQL。更多的关键字你可以去它们的文档 阅读,IDEA的智能提示也会帮你一把:

r2dbc-截图

这些就是另外的话题了.回到正题,让我们再来试试这加了条件的分页查询是如何的:

@Test
void selectByNameTest(@Autowired FooRepository repository) {
// 第一页的三条数据
var paged = PageRequest.of(0, 2);
// 查询包含 'name1' 的内容
repository.countByNameContains("name1").flatMap(total -> repository
.findAllByNameContains("name1", paged)
.collectList()
.map(list -> new Paged<>(total, list)))
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();
}

与之前的测试用例没什么太大的区别,只不过是更换了一下方法名,然后添加了一个新的参数。

控制台输出:

2023-03-01T12:38:49.072+08:00 DEBUG 21376 --- [    Test worker] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT COUNT(FOO.ID) FROM FOO WHERE FOO.NAME LIKE $1]
2023-03-01T12:38:49.089+08:00 DEBUG 21376 --- [ Test worker] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT FOO.ID, FOO.NAME, FOO.SIZE FROM FOO WHERE FOO.NAME LIKE $1 LIMIT 2]
Paged[total=7, data=[Foo[id=1, name=name1, size=1], Foo[id=10, name=name10, size=12]]]

从 DEBUG 日志可以看到,Spring生成的SQL中为我们添加了 WHERE FOO.NAME LIKE $1 的查询条件,这也就说明我们的方法是可行的。

实体Example查询?

如果你有更高的追求,你希望直接通过实体类作为参数载体查询,并且当 name 不为null的时候才进行查询,这时候如果直接用 Query Methods 可能就不太好用了。

还记得之前提到的吗? FooRepository 继承了 R2dbcRepository,而它又继承了 ReactiveQueryByExampleExecutor。 此时,也许你会觉得:我们可以使用 Example 来查询结果嘛!

var foo = new Foo(null, "name", 1);
var example = Example.of(foo,
// 所有条件都需要,简单来说大概就是条件通过 AND 连接
ExampleMatcher.matchingAll()
// 如果属性为null,忽略它
.withIgnoreNullValues()
// 对于 name 属性,使用 contains 策略,简单来说就是模糊查询
.withMatcher("name", ExampleMatcher.GenericPropertyMatchers.contains());
);

观察上述代码,我们通过一个 foo 构建了一个 Example 实例。这个 Example 代表:

  • 如果 foo 中某个属性为null,则SQL条件中不出现
  • foo.name 使用包含策略进行查询
  • 各条件使用 AND 连接

OK, 准备好 Example 之后,要怎么使用呢? ReactiveQueryByExampleExecutor 中存在各种通过 Example 查询的函数,因此 count 可以直接使用:

Mono<Long> count = repository.count(example);
// ...

列表查询也可以直接使用:

Flux<Foo> all = repository.findAll(example);
// ...

等一下,分页哪里去了?

是的, ReactiveQueryByExampleExecutor 并没有提供使用 Example 配合 Pageable 的函数。那么是不是可以再次借助 Query Methods 来实现呢?

/**
* 分页查询包含 name 的 foo ...?
*/
Flux<Foo> findAllByExample(Example<Foo> example, Pageable pageable);

但是很遗憾,这种方式并行不通,你除了会得到一个异常以外,恐怕不会有什么其他结果。

实体Example查询

那么我们应该怎么有效利用 Example 呢?这时候就不能再仰仗 Repository 了,这次我们要使用 R2dbcEntityTemplate

首先来解决分页列表查询的问题,这次先给结果:

@Test
void selectByExampleTest(
@Autowired R2dbcEntityTemplate template
) {
var foo = new Foo(null, "name1", 1);
var example = Example.of(foo,
// 所有条件都需要,简单来说大概就是条件通过 AND 连接
ExampleMatcher.matchingAll()
// 如果属性为null,忽略它
.withIgnoreNullValues()
// 对于 name 属性,使用 contains 策略,简单来说就是模糊查询
.withMatcher("name", ExampleMatcher.GenericPropertyMatchers.contains())
);

RelationalExampleMapper relationalExampleMapper = new RelationalExampleMapper(template.getConverter().getMappingContext());
var page = PageRequest.of(0, 2);
Query query = relationalExampleMapper.getMappedExample(example).with(page);
template.select(query, Foo.class)
.collectList()
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();
}

可以注意到,这次我们直接注入了一个 R2dbcEntityTemplate,并在过程中构建了一个 RelationalExampleMapper:

RelationalExampleMapper relationalExampleMapper = new RelationalExampleMapper(template.getConverter().getMappingContext());

它是干什么的呢?从名字上大概也能猜出来,它的作用是将一个 Example 转化为 Query 类型。这实际上也是 ReactiveQueryByExampleExecutor 提供的那些支持 Example 函数中偷偷做的事情。

RelationalExampleMapper relationalExampleMapper = new RelationalExampleMapper(template.getConverter().getMappingContext());
var page = PageRequest.of(0, 2);
Query query = relationalExampleMapper.getMappedExample(example).with(page);

结合之后几句代码一起看,RelationalExampleMapperExample 转化为了 Query,并通过 .with(page) 为其增加了分页信息,这样我们就得到了一个既有条件又能分页的 Query 了。

之后便可以借助 R2dbcEntityTemplate 进行查询了:

template.select(query, Foo.class) // 使用 query 查询 Foo 对应的表
.collectList()
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();

输出:

2023-03-01T13:13:23.002+08:00 DEBUG 11604 --- [    Test worker] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT FOO.* FROM FOO WHERE (FOO.NAME LIKE $1) AND (FOO.SIZE = $2) LIMIT 2]
[Foo[id=1, name=name1, size=1]]

可以看到实际生成的SQL中的确出现了我们所希望的出现的条件,以及结尾的分页。

那么二者结合一下,便可以得到使用 Example 进行分页查询的方案:

@Test
void selectByExampleTest(@Autowired R2dbcEntityTemplate template) {
// 查询条件
var foo = new Foo(null, "name1", 1);
var example = Example.of(foo,
// 所有条件都需要,简单来说大概就是条件通过 AND 连接
ExampleMatcher.matchingAll()
// 如果属性为null,忽略它
.withIgnoreNullValues()
// 对于 name 属性,使用 contains 策略,简单来说就是模糊查询
.withMatcher("name", ExampleMatcher.GenericPropertyMatchers.contains())
);
var relationalExampleMapper = new RelationalExampleMapper(template.getConverter().getMappingContext());
var page = PageRequest.of(0, 2);
var query = relationalExampleMapper.getMappedExample(example).with(page);
// 先根据条件查询总数
template.count(query, Foo.class)
// 再根据条件查询分页列表
.flatMap(total -> template.select(query, Foo.class)
// 将总数与列表合并为 Paged
.collectList().map(list -> new Paged<>(total, list)))
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();
}

输出:

2023-03-01T13:23:44.212+08:00 DEBUG 25652 --- [    Test worker] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT COUNT(FOO.ID) FROM FOO WHERE (FOO.NAME LIKE $1) AND (FOO.SIZE = $2)]
2023-03-01T13:23:44.230+08:00 DEBUG 25652 --- [ Test worker] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT FOO.* FROM FOO WHERE (FOO.NAME LIKE $1) AND (FOO.SIZE = $2) LIMIT 2]
Paged[total=1, data=[Foo[id=1, name=name1, size=1]]]

嗯,很不错的一次分页呢。

顺带一提,RelationalExampleMapper 在单数据源的情况下理论上只需要一个实例即可,此处每次都 new 是仅作演示。

复杂的条件查询

但是你又开始不满足了,这时候你希望查询条件中,size 要大于匹配,也就是希望 size 生成的条件是 WHERE FOO.SIZE > $1 ,而不是直接等于。

怎么办?直接用 Example 吗?但可惜的是,Example 似乎并不支持更多的条件,它的大多数条件都是为 String 类型的字段准备的。

这时候,我们可能就需要手动构造 Query 了。

先给大家看看结果:

@Test
void selectByQueryTest(@Autowired R2dbcEntityTemplate template) {
// 查询条件
var foo = new Foo(null, "name", 4);
var criteria = Criteria.empty();
if (foo.name() != null) {
// 如果name不为null,则模糊查询name
criteria = criteria.and("name").like("%" + foo.name() + "%");
}
if (foo.size() != null) {
// 如果size不为null,则 size > $size
criteria = criteria.and("size").greaterThan(foo.size());
}
// 构建Query
var page = PageRequest.of(0, 2);
var query = Query.query(criteria).with(page);
// 先根据条件查询总数
template.count(query, Foo.class)
// 再根据条件查询分页列表
.flatMap(total -> template.select(query, Foo.class)
// 将总数与列表合并为 Paged
.collectList().map(list -> new Paged<>(total, list)))
.as(StepVerifier::create)
.consumeNextWith(System.out::println) // 控制台输出
.verifyComplete();
}

你可能注意到了,在一开始我们构建了一个 Criteria.empty(),你可以将它理解为一个"开始",它代表为一个没有条件的条件。

然后,根据我们的需要一步步的判断,并添加我们所需的条件。Criteria 能提供的查询条件要比 Example 丰富的多,当然了,它们的职能可能并不完全一样。

需要注意的是,Criteria 是一个不可变类,因此每次条件的增加后都是一个新的实例,你需要重新接收/保存这个结果。可不要忘了喔!

当你准备好一个最终的 Criteria 之后,你就可以通过它来构建一个 Query 了:

// 构建Query
var page = PageRequest.of(0, 2);
var query = Query.query(criteria).with(page);

这里我也顺手把分页信息一起放进去了。后面的查询就跟之前完全一样了,我们直接来看看控制台的结果吧:

2023-03-01T13:36:03.989+08:00 DEBUG 1752 --- [    Test worker] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT COUNT(FOO.ID) FROM FOO WHERE FOO.NAME LIKE $1 AND FOO.SIZE >= $2]
2023-03-01T13:36:04.006+08:00 DEBUG 1752 --- [ Test worker] o.s.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT FOO.* FROM FOO WHERE FOO.NAME LIKE $1 AND FOO.SIZE > $2 LIMIT 2]
Paged[total=12, data=[Foo[id=4, name=name4, size=4], Foo[id=5, name=name5, size=114]]]

和我们预期的基本一样呢,真棒!

矮一点儿?

一开始便提到,我们在查询分页的时候,基本上都是需要进行两个查询的,一次数量,一次列表。通常情况下,我们其实不是非常需要关心这两个数据是否是真正准确的,它们之间存在一点点小小的误差是完全可以接受的,所以我们通常不会在一个事务中使用它们。

还记得我们上面那些示例的代码吗?它们是怎么合并 totallist 的呢?是通过 flatMap。但是你会发现,通过 flatMap 合并的时候,它们看上去是那么的"丑陋":flatMap(total -> xxx.map(list -> new Paged(total, list)) 这一层 flatMap 一层 map 的,看的人头晕眼花。

有什么什么更..."矮"一些的写法呢?

或许你可以试试这个:

public Mono<Paged<Foo>> queryPaged(Query query) {
return Mono.zip(
template.count(query, Foo.class),
template.select(query, Foo.class).collectList(),
Paged::new
);
}

我们通过 Mono.zip 在前两个参数中直接将 countselect 两个查询压缩,并通过第三个参数 Paged::new 将它们的结果合并为 Paged 类型后返回。

怎么样,你会不会觉得这些写,让这代码变更明了、更"矮"了呢? 但是 先不要着急! 这里面是有些 隐患的

让我们加上事务注解,变成这样:

@Transactional(readOnly = true)
public Mono<Paged<Foo>> queryPaged(Query query) {
return Mono.zip(
template.count(query, Foo.class),
template.select(query, Foo.class).collectList(),
Paged::new
);
}

此时如果再次运行,你就会惊奇的发现...咦?什么事都没有? 你可能会觉得刚刚我只是虚张声势了一下,因为这并没有发生什么了不得的事情。

让我们来回忆一下。在开头,我提到我选择使用 H2 数据库作为演示。而如果我们将数据库驱动改为 MySQL(这里我选择使用 jasync-sql 所提供的 r2dbc 支持),你也许就会发现,加上事务注解后这段代码不好使了。

你可能注意到了我的措辞:也许。因为这种情况可能与不同的数据库驱动的实现有关,有些驱动会解决这个问题,有些则不会,而又有些可能会在未来解决它。

那么到底是什么问题呢?你可以看看 jasync-sql#383 的讨论,这是我在使用的时候发现的一个小问题。

简单来说,当你通过 Mono.zip 或者 Mono.zipWith 连接两个查询的时候,它们之间可能并没有什么先后顺序,也就是说它们可能是一起开始的。这并不代表传统并发中的"并行",但是这会导致两个查询的运行是"交叉"的。

如果它们不在同一个事务中,那么这两个查询可能会各占一个连接,此时问题倒是不大。而当两个查询运行在同一个事务中时,它们便会共享同一个连接。而这便有可能会导致其中一个查询先一步完成了查询,而后就关闭了连接,但是此时另一个查询还在忙着呢,就会出现问题。

至于哪个驱动有这个问题、为什么没有解决这个问题、怎么解决这个问题,那就要看它们的实现者了。按照上述的 #383 来说,jasync-sql 反正是有这种隐患的。

引用 jasync-sql 开发者 oshai 的回复:

...

Is actually running 2 queries on the same connection, without waiting for the first one to complete. The way to solve it is by using some chaining function (map, flatMap, etc') after the first select and only then run the second query.

(大意:实际上是在同一个连接上运行两个查询,而不等待第一个查询的完成。 解决这个问题的方法是,在第一次查询后使用一些连锁函数(mapflatMap 等),然后才运行第二个查询。)

Usually each query can run in it's on connection, but transaction forces the same connection. Running two queries simultaneously on the same connection is not supported.

(大意:通常情况下,每个查询可以在它的连接(独立的连接)上运行,但事务会强制使用同一个连接。不支持在同一个连接上同时运行两个查询。)

总而言之,所以在进行分页查询的时候(也不止分页查询,而是任何可能运行多个SQL的地方)在使用事务的时候都需要"斟酌"一番。通过 then()flatMapmap 这类可以体现"顺序"的方式更加...稳妥。

尾声

这篇文章算是 写完了。

我也不知道自己总结的到不到位,说的对不对。如果你有学到什么,我很荣幸;如果我说错了什么或者遗漏了什么,欢迎在评论区友好交流,非常感谢!

那么,我们有缘再相会~

掘金

掘金上也有这篇文章喔,欢迎前往点赞收藏评论~