当前位置:首页 > 技术 > 正文内容

Spring Boot中的响应式流处理:Flux与Mono实践

访客 技术 2026年6月12日 1

Flux与Mono的创建方式

静态数据构建

Flux构建示例

// 显式声明元素
Flux<String> fruitFlux = Flux.just("Apple", "Banana", "Cherry");

// 基于集合创建
List<String> colors = Arrays.asList("Red", "Green", "Blue");
Flux<String> colorFlux = Flux.fromIterable(colors);

// 数字序列生成
Flux<Integer> numberFlux = Flux.range(10, 3); // 10,11,12

Mono构建示例

// 单元素构造
Mono<String> welcomeMono = Mono.just("Welcome");

// 空序列处理
Mono<String> emptyMono = Mono.empty();

// 异常序列
Mono<String> errorMono = Mono.error(new IllegalStateException("Failure"));

// Optional适配处理
Mono<String> optionalMono = Mono.justOrEmpty(Optional.of("Value"));

动态数据生成

// 动态生成Flux
Flux<String> customFlux = Flux.generate(sink -> {
    sink.next("Element-" + System.currentTimeMillis());
    if (Math.random() > 0.7) sink.complete();
});

// 动态生成Mono
Mono<String> customMono = Mono.create(sink -> 
    sink.success("Generated-" + Instant.now())
);

核心操作符应用

数据转换

// 元素类型转换
Flux<Integer> lengthFlux = fruitFlux.map(String::length);

// 异步转换处理
Flux<String> processedFlux = fruitFlux.flatMap(item -> 
    Mono.just(item.concat("-Processed"))
);

数据筛选与合并

// 条件过滤
Flux<String> filteredFlux = fruitFlux.filter(fruit -> fruit.length() > 5);

// 多流合并
Flux<String> first = Flux.just("Alpha", "Beta");
Flux<Integer> second = Flux.just(1, 2);
Flux<String> merged = Flux.zip(first, second)
    .map(tuple -> tuple.getT1() + ":" + tuple.getT2());

异常处理

// 异常默认值
Flux<String> resilientFlux = Flux.error(new RuntimeException())
    .onErrorReturn("RecoveryData");

// 异常切换流
Mono<String> fallbackMono = errorMono.switchOnError(Mono.just("Alternative"));

线程调度

// 指定执行线程
Flux<String> asyncFlux = fruitFlux.subscribeOn(Schedulers.parallel());

WebFlux应用场景

REST接口实现

@RestController
@RequestMapping("/data")
public class DataEndpoint {
    @GetMapping("/items")
    public Flux<String> fetchItems() {
        return Flux.just("Item1", "Item2", "Item3");
    }
    
    @GetMapping("/item")
    public Mono<String> fetchItem() {
        return Mono.just("SingleItem");
    }
}

大文件流式传输

@GetMapping("/stream-file")
public Flux<DataBuffer> streamFile() {
    Path filePath = Paths.get("large_data.bin");
    return DataBufferUtils.read(
        PathResource.of(filePath), 
        DefaultDataBufferFactory.sharedInstance, 
        4096
    );
}

外部API流式调用

@GetMapping("/external-data")
public Flux<String> fetchExternalData() {
    return WebClient.create("https://service.provider")
        .get()
        .uri("/stream")
        .retrieve()
        .bodyToFlux(String.class);
}

响应式流测试方法

@SpringBootTest(webEnvironment = RANDOM_PORT)
class ApiTest {
    @Autowired
    WebTestClient testClient;

    @Test
    void verifyItemsEndpoint() {
        testClient.get().uri("/data/items")
            .exchange()
            .expectStatus().isOk()
            .expectBodyList(String.class)
            .hasSize(3);
    }
}

实施要点

  • 确保响应式链路完整,避免阻塞操作
  • 合理配置背压策略防止生产者过载
  • 启用Reactor调试日志:
    logging.level.reactor.core.publisher: DEBUG
  • 计算密集型任务使用Schedulers.parallel()

相关文章

Linux crontab 详解

1) crontab 是什么cron 是 Linux 的定时任务守护进程;crontab 是用来编辑/查看“按时间周期执行命令”的表(cron table)。常见两类:用户 crontab:每个用户一份(crontab -e 编辑)系统级 crontab / cron.d:可指定执行用户(/etc/crontab、/etc/cron.d/*)2) crontab 时间...

富文本里可以允许的 HTML 属性

一、所有标签默认允许的安全属性(极少)class        (可选)id           (通常建议禁用)title️ 注意:id 容易被滥用做锚点注入,很多系统直接禁用class 允许的话最好只允许固定前缀(如 editor-*)二、a 标签允许属性<a href="" t...

Mac 安装 Node.js 指南

方法一:通过官网安装包(最简单,适合初学者)如果你只是想快速安装并开始使用,这是最直接的方法。访问 Node.js 官网。页面会显示两个版本:LTS (Recommended For Most Users):长期支持版,最稳定。建议选这个。Current:最新特性版,包含最新功能但可能不够稳定。下载 .pkg 安装包并运行。按照安装向导点击“下一步”即可完成。方法二:使用 Homebrew 安装(...

Dom\HTML_NO_DEFAULT_NS 的副作用:自动加闭合标签

在使用Dom\HTMLDocument时,Dom\HTML_NO_DEFAULT_NS 将禁止在解析过程中设置元素的命名空间, 此设置是为了与DOMDocument向后兼容而存在的。当使用它时,已知的一个副作用就是:自动加闭合标签例如 </img> 为什么会这样?当你使用:Dom\HTML_NO_DEFAULT_NS文档会变成 无命名空间模式,此时内部更接近 XML...

Laravel 事件和监听器创建

在 Laravel 中,使用 Artisan 命令创建 Events(事件) 和 Listeners(监听器) 是非常高效的。你可以通过以下几种方式来实现:1. 手动创建单个 Event如果你只想创建一个事件类,可以使用 make:event 命令:Bashphp artisan make:event UserRegistered执行后,文件将生成在 app/Even...

自定义域名解析神器 dnsmasq

什么是 dnsmasq?dnsmasq 是一个轻量级、功能强大的网络服务工具,专为小型和中等规模网络设计。它是一个综合的网络基础设施解决方案[1]。dnsmasq 能做什么?功能说明应用场景DNS 转发与缓存将 DNS 查询转发到上游服务器(ISP、Google DNS 等),并在本地缓存结果加快 DNS 查询速度,减少外部 DNS 流量本地 DNS解析本地网络设备的主机名,无需编辑&n...

发表评论

访客

◎欢迎参与讨论,请在这里发表您的看法和观点。