Skip to content

Spring Integration WebFlux 响应式 HTTP 支持教程

🚀 概述

spring-integration-webflux 模块提供了在响应式环境中处理 HTTP 请求的能力,基于 Spring WebFlux 和 Project Reactor 构建。它包含两种核心组件:

📦 依赖配置

Gradle (Kotlin DSL)

kotlin

dependencies {
    implementation("org.springframework.integration:spring-integration-webflux:6.5.1")
    // 非Servlet服务器需要添加
    implementation("io.projectreactor.netty:reactor-netty")
}

🔌 WebFlux 入站组件

处理入站 HTTP 请求的响应式端点

基础配置示例

kotlin
@Configuration
@EnableWebFlux
@EnableIntegration
class ReactiveHttpConfig {

    @Bean
    fun simpleInboundEndpoint(): WebFluxInboundEndpoint {
        return WebFluxInboundEndpoint().apply {
            requestMapping = RequestMapping().apply {
                pathPatterns = arrayOf("/test")
            }
            requestChannelName = "serviceChannel"
        }
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    fun service(): String = "响应式请求处理成功!"
}

Server-Sent Events (SSE) 示例

kotlin
@Bean
fun sseFlow() = integrationFlow(
    WebFlux.inboundGateway("/sse").apply {
        requestMapping { m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE) }
    }
) {
    // 返回Flux流作为SSE事件源
    handle { _, _ -> Flux.just("事件1", "事件2", "事件3") }
}

负载验证

kotlin
@Bean
fun validatedInbound() = WebFluxInboundEndpoint().apply {
    requestMapping = RequestMapping().apply {
        pathPatterns = arrayOf("/validate")
    }
    // 设置验证器
    validator = MyPayloadValidator()
}

class MyPayloadValidator : Validator {
    override fun supports(clazz: Class<*>): Boolean = clazz == User::class.java

    override fun validate(target: Any, errors: Errors) {
        val user = target as User
        if (user.name.isBlank()) {
            errors.rejectValue("name", "blank", "用户名不能为空")
        }
    }
}

TIP

验证最佳实践
对于复杂负载结构,建议在消息流下游进行验证,而非在入口端点

📤 WebFlux 出站组件

执行出站 HTTP 请求的响应式处理器

基础 GET 请求

kotlin
@Bean
fun outboundReactive() = integrationFlow {
    handle(
        WebFlux.outboundGateway<MultiValueMap<String, String>>({ message ->
            UriComponentsBuilder.fromUriString("http://localhost:8080/data")
                .queryParams(message.payload)
                .build()
                .toUri()
        }).apply {
            httpMethod = HttpMethod.GET
            expectedResponseType = String::class.java
        }
    )
}

POST 请求处理

kotlin
@Bean
@ServiceActivator(inputChannel = "httpOutChannel")
fun reactiveOutbound(client: WebClient) = WebFluxRequestExecutingMessageHandler(
    "http://localhost:8080/api",
    client
).apply {
    httpMethod = HttpMethod.POST
    // 返回整个响应实体而不仅是body
    extractResponseBody = false
    expectedResponseType = User::class.java
}

响应式请求体处理

kotlin
@Bean
fun reactivePublisherOutbound() = WebFluxRequestExecutingMessageHandler(
    "http://localhost:8080/stream",
    WebClient.create()
).apply {
    httpMethod = HttpMethod.POST
    // 指定发布者元素类型
    publisherElementTypeExpression = StaticExpression(Product::class.java)
}

IMPORTANT

响应处理模式
当设置 replyPayloadToFlux=true 时,响应体会转换为 Flux 流,适合处理流式数据

🔗 头部映射与请求属性

头部映射配置

kotlin
@Bean
fun headerMapper(): DefaultHttpHeaderMapper = DefaultHttpHeaderMapper().apply {
    setInboundHeaderNames(arrayOf("Content-Type", "X-Custom-*"))
    setOutboundHeaderNames(arrayOf("Authorization", "X-Response-*"))
}

请求属性设置

kotlin
@Bean
fun attributeOutbound() = WebFluxRequestExecutingMessageHandler(
    "http://localhost:8080/attrs",
    WebClient.create()
).apply {
    // 设置请求属性
    attributeVariablesExpression = ExpressionParser.parse(
        "#{T(java.util.Map).of('traceId', headers.traceId, 'priority', 5)}"
    )
}

⚠️ 常见问题与解决方案

问题1:响应超时处理

kotlin
@Bean
fun timeoutOutbound() = WebFluxRequestExecutingMessageHandler(
    "http://slow-service/api",
    WebClient.create()
).apply {
    // 设置3秒超时
    replyTimeout = 3000
    // 超时回退处理
    setOutputChannelName("timeoutHandlingChannel")
}

问题2:错误处理管道

kotlin
@Bean
fun resilientInbound() = WebFluxInboundEndpoint().apply {
    requestMapping = RequestMapping().apply {
        pathPatterns = arrayOf("/safe")
    }
    // 设置专用错误通道
    errorChannel = "errorProcessingChannel"
}

@ServiceActivator(inputChannel = "errorProcessingChannel")
fun handleErrors(payload: MessageHandlingException): ResponseEntity<String> {
    logger.error("请求处理失败", payload)
    return ResponseEntity.status(500).body("服务暂时不可用")
}

问题3:CORS 配置

kotlin
@Bean
fun corsInbound() = WebFluxInboundEndpoint().apply {
    requestMapping = RequestMapping().apply {
        pathPatterns = arrayOf("/api")
    }
    crossOrigin = CrossOrigin().apply {
        origin = "*"
        allowedMethods = arrayOf("GET", "POST")
        maxAge = 1800 // 30分钟
    }
}

🏁 总结

Spring Integration WebFlux 提供了完整的响应式 HTTP 处理方案:

  1. 入站处理 - 使用 WebFluxInboundEndpoint 处理请求
  2. 出站调用 - 通过 WebFluxRequestExecutingMessageHandler 执行HTTP请求
  3. 响应式优势 - 支持背压和非阻塞IO
  4. 现代配置 - 优先使用Kotlin DSL和注解配置

最佳实践建议

  • 对于流式数据,优先使用 Flux 作为返回类型
  • 验证逻辑放在下游组件以获得更大灵活性
  • 为关键服务设置合理的超时和熔断机制
  • 使用专用错误通道进行异常处理