Spring Boot中的响应式流处理:Flux与Mono实践
Flux与Mono的创建方式
静态数据构建
Flux构建示例:
// 显式声明元素
Flux<String> fruitFlux = Flux.just("Apple", "Banana", "Cherry");
// 基于集合创建
List<String> colors = Arrays.asList("Red", "Green", "Blue");
Flux<String> colorFlux = Flux.fromIterable(colors);
// 数字序列生成
Flux<Integer> numberFlux = Flux.range(10, 3); // 10,11,12
Mono构建示例:
// 单元素构造
Mono<String> welcomeMono = Mono.just("Welcome");
// 空序列处理
Mono<String> emptyMono = Mono.empty();
// 异常序列
Mono<String> errorMono = Mono.error(new IllegalStateException("Failure"));
// Optional适配处理
Mono<String> optionalMono = Mono.justOrEmpty(Optional.of("Value"));
动态数据生成
// 动态生成Flux
Flux<String> customFlux = Flux.generate(sink -> {
sink.next("Element-" + System.currentTimeMillis());
if (Math.random() > 0.7) sink.complete();
});
// 动态生成Mono
Mono<String> customMono = Mono.create(sink ->
sink.success("Generated-" + Instant.now())
);
核心操作符应用
数据转换
// 元素类型转换
Flux<Integer> lengthFlux = fruitFlux.map(String::length);
// 异步转换处理
Flux<String> processedFlux = fruitFlux.flatMap(item ->
Mono.just(item.concat("-Processed"))
);
数据筛选与合并
// 条件过滤
Flux<String> filteredFlux = fruitFlux.filter(fruit -> fruit.length() > 5);
// 多流合并
Flux<String> first = Flux.just("Alpha", "Beta");
Flux<Integer> second = Flux.just(1, 2);
Flux<String> merged = Flux.zip(first, second)
.map(tuple -> tuple.getT1() + ":" + tuple.getT2());
异常处理
// 异常默认值
Flux<String> resilientFlux = Flux.error(new RuntimeException())
.onErrorReturn("RecoveryData");
// 异常切换流
Mono<String> fallbackMono = errorMono.switchOnError(Mono.just("Alternative"));
线程调度
// 指定执行线程
Flux<String> asyncFlux = fruitFlux.subscribeOn(Schedulers.parallel());
WebFlux应用场景
REST接口实现
@RestController
@RequestMapping("/data")
public class DataEndpoint {
@GetMapping("/items")
public Flux<String> fetchItems() {
return Flux.just("Item1", "Item2", "Item3");
}
@GetMapping("/item")
public Mono<String> fetchItem() {
return Mono.just("SingleItem");
}
}
大文件流式传输
@GetMapping("/stream-file")
public Flux<DataBuffer> streamFile() {
Path filePath = Paths.get("large_data.bin");
return DataBufferUtils.read(
PathResource.of(filePath),
DefaultDataBufferFactory.sharedInstance,
4096
);
}
外部API流式调用
@GetMapping("/external-data")
public Flux<String> fetchExternalData() {
return WebClient.create("https://service.provider")
.get()
.uri("/stream")
.retrieve()
.bodyToFlux(String.class);
}
响应式流测试方法
@SpringBootTest(webEnvironment = RANDOM_PORT)
class ApiTest {
@Autowired
WebTestClient testClient;
@Test
void verifyItemsEndpoint() {
testClient.get().uri("/data/items")
.exchange()
.expectStatus().isOk()
.expectBodyList(String.class)
.hasSize(3);
}
}
实施要点
- 确保响应式链路完整,避免阻塞操作
- 合理配置背压策略防止生产者过载
- 启用Reactor调试日志:
logging.level.reactor.core.publisher: DEBUG
- 计算密集型任务使用
Schedulers.parallel()