Appearance
Spring Integration WebFlux 响应式 HTTP 支持教程
🚀 概述
spring-integration-webflux
模块提供了在响应式环境中处理 HTTP 请求的能力,基于 Spring WebFlux 和 Project Reactor 构建。它包含两种核心组件:
📦 依赖配置
Gradle (Kotlin DSL)
kotlin
dependencies {
implementation("org.springframework.integration:spring-integration-webflux:6.5.1")
// 非Servlet服务器需要添加
implementation("io.projectreactor.netty:reactor-netty")
}
🔌 WebFlux 入站组件
处理入站 HTTP 请求的响应式端点
基础配置示例
kotlin
@Configuration
@EnableWebFlux
@EnableIntegration
class ReactiveHttpConfig {
@Bean
fun simpleInboundEndpoint(): WebFluxInboundEndpoint {
return WebFluxInboundEndpoint().apply {
requestMapping = RequestMapping().apply {
pathPatterns = arrayOf("/test")
}
requestChannelName = "serviceChannel"
}
}
@ServiceActivator(inputChannel = "serviceChannel")
fun service(): String = "响应式请求处理成功!"
}
Server-Sent Events (SSE) 示例
kotlin
@Bean
fun sseFlow() = integrationFlow(
WebFlux.inboundGateway("/sse").apply {
requestMapping { m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE) }
}
) {
// 返回Flux流作为SSE事件源
handle { _, _ -> Flux.just("事件1", "事件2", "事件3") }
}
负载验证
kotlin
@Bean
fun validatedInbound() = WebFluxInboundEndpoint().apply {
requestMapping = RequestMapping().apply {
pathPatterns = arrayOf("/validate")
}
// 设置验证器
validator = MyPayloadValidator()
}
class MyPayloadValidator : Validator {
override fun supports(clazz: Class<*>): Boolean = clazz == User::class.java
override fun validate(target: Any, errors: Errors) {
val user = target as User
if (user.name.isBlank()) {
errors.rejectValue("name", "blank", "用户名不能为空")
}
}
}
TIP
验证最佳实践
对于复杂负载结构,建议在消息流下游进行验证,而非在入口端点
📤 WebFlux 出站组件
执行出站 HTTP 请求的响应式处理器
基础 GET 请求
kotlin
@Bean
fun outboundReactive() = integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ message ->
UriComponentsBuilder.fromUriString("http://localhost:8080/data")
.queryParams(message.payload)
.build()
.toUri()
}).apply {
httpMethod = HttpMethod.GET
expectedResponseType = String::class.java
}
)
}
POST 请求处理
kotlin
@Bean
@ServiceActivator(inputChannel = "httpOutChannel")
fun reactiveOutbound(client: WebClient) = WebFluxRequestExecutingMessageHandler(
"http://localhost:8080/api",
client
).apply {
httpMethod = HttpMethod.POST
// 返回整个响应实体而不仅是body
extractResponseBody = false
expectedResponseType = User::class.java
}
响应式请求体处理
kotlin
@Bean
fun reactivePublisherOutbound() = WebFluxRequestExecutingMessageHandler(
"http://localhost:8080/stream",
WebClient.create()
).apply {
httpMethod = HttpMethod.POST
// 指定发布者元素类型
publisherElementTypeExpression = StaticExpression(Product::class.java)
}
IMPORTANT
响应处理模式
当设置 replyPayloadToFlux=true
时,响应体会转换为 Flux
流,适合处理流式数据
🔗 头部映射与请求属性
头部映射配置
kotlin
@Bean
fun headerMapper(): DefaultHttpHeaderMapper = DefaultHttpHeaderMapper().apply {
setInboundHeaderNames(arrayOf("Content-Type", "X-Custom-*"))
setOutboundHeaderNames(arrayOf("Authorization", "X-Response-*"))
}
请求属性设置
kotlin
@Bean
fun attributeOutbound() = WebFluxRequestExecutingMessageHandler(
"http://localhost:8080/attrs",
WebClient.create()
).apply {
// 设置请求属性
attributeVariablesExpression = ExpressionParser.parse(
"#{T(java.util.Map).of('traceId', headers.traceId, 'priority', 5)}"
)
}
⚠️ 常见问题与解决方案
问题1:响应超时处理
kotlin
@Bean
fun timeoutOutbound() = WebFluxRequestExecutingMessageHandler(
"http://slow-service/api",
WebClient.create()
).apply {
// 设置3秒超时
replyTimeout = 3000
// 超时回退处理
setOutputChannelName("timeoutHandlingChannel")
}
问题2:错误处理管道
kotlin
@Bean
fun resilientInbound() = WebFluxInboundEndpoint().apply {
requestMapping = RequestMapping().apply {
pathPatterns = arrayOf("/safe")
}
// 设置专用错误通道
errorChannel = "errorProcessingChannel"
}
@ServiceActivator(inputChannel = "errorProcessingChannel")
fun handleErrors(payload: MessageHandlingException): ResponseEntity<String> {
logger.error("请求处理失败", payload)
return ResponseEntity.status(500).body("服务暂时不可用")
}
问题3:CORS 配置
kotlin
@Bean
fun corsInbound() = WebFluxInboundEndpoint().apply {
requestMapping = RequestMapping().apply {
pathPatterns = arrayOf("/api")
}
crossOrigin = CrossOrigin().apply {
origin = "*"
allowedMethods = arrayOf("GET", "POST")
maxAge = 1800 // 30分钟
}
}
🏁 总结
Spring Integration WebFlux 提供了完整的响应式 HTTP 处理方案:
- 入站处理 - 使用
WebFluxInboundEndpoint
处理请求 - 出站调用 - 通过
WebFluxRequestExecutingMessageHandler
执行HTTP请求 - 响应式优势 - 支持背压和非阻塞IO
- 现代配置 - 优先使用Kotlin DSL和注解配置
最佳实践建议
- 对于流式数据,优先使用
Flux
作为返回类型 - 验证逻辑放在下游组件以获得更大灵活性
- 为关键服务设置合理的超时和熔断机制
- 使用专用错误通道进行异常处理