Appearance
Spring Integration Scatter-Gather 模式详解
什么是 Scatter-Gather 模式?
Scatter-Gather 是 Spring Integration 4.1 版本引入的企业集成模式,用于同时向多个接收方发送请求并聚合结果。它特别适用于需要多源数据汇总的场景,如:
- 供应商报价比较(获取多家报价后选择最优方案)
- 分布式数据查询(从多个数据源获取数据后合并)
- 并行任务处理(同时执行多个子任务后汇总结果)
类比理解
想象你同时向多家餐厅发送外卖菜单询价(Scatter),等待它们各自回复报价(Gather),最后选择最优惠的餐厅下单 - 这就是 Scatter-Gather 的典型应用场景!
核心架构解析
Scatter-Gather 端点由两个核心组件组成:
1. Scatter(分散器)
负责将消息分发到多个通道,有两种实现方式:
- 发布-订阅通道 (PublishSubscribeChannel):适用于拍卖场景
- 接收列表路由器 (RecipientListRouter):适用于分发场景
2. Gather(聚合器)
AggregatingMessageHandler
负责收集并处理所有响应,支持:
- 自定义聚合逻辑
- 超时控制
- 结果筛选
重要关系
分散器和聚合器通过 gatherResultChannel
隐式连接,确保响应能正确路由回聚合器
两种应用场景对比
1. 拍卖场景 (Auction)
适用情况:向所有接收方广播消息,收集所有响应(如供应商报价)
kotlin
@Bean
fun auctionScatterChannel(): MessageChannel {
return PublishSubscribeChannel().apply {
applySequence = true // 必须设置为true!
}
}
@ServiceActivator(inputChannel = "inputChannel")
fun scatterGatherAuction(): MessageHandler {
return ScatterGatherHandler(
auctionScatterChannel(), // 发布-订阅通道
aggregatingMessageHandler() // 聚合处理器
).apply {
outputChannel = outputChannel()
}
}
TIP
拍卖场景中必须设置 applySequence=true
,确保聚合器能正确关联响应和原始请求
2. 分发场景 (Distribution)
适用情况:根据条件选择特定接收方分发消息(如定向数据查询)
kotlin
@Bean
fun distributor(): MessageHandler {
return RecipientListRouter().apply {
applySequence = true // 关键配置
setChannels(listOf(
channel("channel1"),
channel("channel2"),
channel("channel3")
))
// 可添加选择器筛选目标
setSelector { message ->
message.payload.toString().contains("VIP")
}
}
}
@ServiceActivator(inputChannel = "distributionChannel")
fun scatterGatherDistribution(): MessageHandler {
return ScatterGatherHandler(
distributor(),
aggregatingMessageHandler()
).apply {
outputChannel = outputChannel()
gatherTimeout = 10000 // 10秒超时
}
}
互斥提示
拍卖和分发两种实现方式互斥,请根据场景选择一种
Kotlin DSL 配置实战
Spring Integration 的 Kotlin DSL 提供更简洁的配置方式:
kotlin
@Bean
fun scatterGatherFlow() = integrationFlow("scatterInput") {
scatterGather(
scatterer = {
recipientFlow { handle { p -> "结果1: $p" } }
recipientFlow { handle { p -> "结果2: $p" } }
},
gatherer = {
aggregator {
outputProcessor = MessageGroupProcessor { group ->
group.messages.joinToString { it.payload.toString() }
}
}
},
gatherTimeout = 5000 // 5秒超时
)
}
kotlin
@Bean
fun distributionFlow() = integrationFlow("distributionInput") {
scatterGather(
scatterer = recipientListRouter(
recipients = mapOf(
"channel1" to { true },
"channel2" to { it.payload.toString().length > 5 }
)
),
gatherer = aggregator {
releaseStrategy { group -> group.size() == 2 }
outputProcessor = ExpressionEvaluatingMessageGroupProcessor("#this.![payload]")
}
)
}
错误处理策略
Scatter-Gather 的错误处理需要特别关注并行执行特性:
1. 异步错误处理
kotlin
@Bean
fun scatterWithErrorHandling() = integrationFlow("input") {
scatterGather(
scatterer = {
recipientFlow {
transform<String> { throw RuntimeException("模拟错误") }
}
},
errorChannel = "scatterErrorChannel"
)
}
@ServiceActivator(inputChannel = "scatterErrorChannel")
fun handleError(exception: MessagingException): Message<Any> {
return MessageBuilder.withPayload("错误补偿: ${exception.cause?.message}")
.copyHeaders(exception.failedMessage.headers)
.build()
}
2. 同步错误处理
kotlin
@Bean
fun safeRecipientFlow() = integrationFlow {
handle<Any>({ p, _ ->
try {
// 业务逻辑
} catch (e: Exception) {
// 返回补偿消息
"ERROR: ${e.message}"
}
}, advise {
expressionEvaluatingRequestHandlerAdvice(
onFailureExpression = "#result"
)
})
}
CAUTION
未处理的异常会导致整个 Scatter-Gather 操作失败,所有子流程的结果都将被丢弃!
最佳实践与常见问题
✅ 推荐实践
- 设置合理超时:避免无限等待kotlin
ScatterGatherHandler(...).apply { gatherTimeout = 30000 // 30秒 }
- 启用序列标记:确保
applySequence=true
- 使用专用错误通道:隔离错误处理逻辑
- 结果筛选:聚合时过滤无效响应kotlin
AggregatingMessageHandler( outputProcessor = ExpressionEvaluatingMessageGroupProcessor( "#this.?[payload.successful].![payload.data]" ) )
❌ 避免陷阱
kotlin
// 错误示例:未设置超时
ScatterGatherHandler(distributor(), aggregator()).apply {
gatherTimeout = null // 可能导致永久阻塞
}
// 错误示例:未启用序列标记
RecipientListRouter().apply {
applySequence = false // 需自定义关联策略
}
💡 性能优化建议
kotlin
@Bean
fun optimizedScatterGather() = integrationFlow {
scatterGather(
scatterer = {
recipientFlow { channel("executorChannel") }
},
gatherer = {
aggregator(correlationStrategy = HeaderAttributeCorrelationStrategy("customId"))
}
)
}
@Bean
fun executorChannel() = MessageChannels.executor(taskExecutor()).get()
总结
Scatter-Gather 模式是处理并行请求-响应场景的强大工具:
- 拍卖模式:广播消息到所有接收方
- 分发模式:条件筛选目标接收方
- 聚合阶段:支持自定义结果处理逻辑
- 错误处理:需专门设计补偿机制
版本提示
从 Spring Integration 6.0 开始,XML 和 DSL 配置默认启用 applySequence=true
,简化了配置
下一步学习:尝试在实际项目中实现多供应商报价比较系统,体验 Scatter-Gather 的强大功能!