Skip to content

Spring Integration Scatter-Gather 模式详解

什么是 Scatter-Gather 模式?

Scatter-Gather 是 Spring Integration 4.1 版本引入的企业集成模式,用于同时向多个接收方发送请求并聚合结果。它特别适用于需要多源数据汇总的场景,如:

  • 供应商报价比较(获取多家报价后选择最优方案)
  • 分布式数据查询(从多个数据源获取数据后合并)
  • 并行任务处理(同时执行多个子任务后汇总结果)

类比理解

想象你同时向多家餐厅发送外卖菜单询价(Scatter),等待它们各自回复报价(Gather),最后选择最优惠的餐厅下单 - 这就是 Scatter-Gather 的典型应用场景!

核心架构解析

Scatter-Gather 端点由两个核心组件组成:

1. Scatter(分散器)

负责将消息分发到多个通道,有两种实现方式:

  • 发布-订阅通道 (PublishSubscribeChannel):适用于拍卖场景
  • 接收列表路由器 (RecipientListRouter):适用于分发场景

2. Gather(聚合器)

AggregatingMessageHandler 负责收集并处理所有响应,支持:

  • 自定义聚合逻辑
  • 超时控制
  • 结果筛选

重要关系

分散器和聚合器通过 gatherResultChannel 隐式连接,确保响应能正确路由回聚合器

两种应用场景对比

1. 拍卖场景 (Auction)

适用情况:向所有接收方广播消息,收集所有响应(如供应商报价)

kotlin
@Bean
fun auctionScatterChannel(): MessageChannel {
    return PublishSubscribeChannel().apply {
        applySequence = true  // 必须设置为true!
    }
}

@ServiceActivator(inputChannel = "inputChannel")
fun scatterGatherAuction(): MessageHandler {
    return ScatterGatherHandler(
        auctionScatterChannel(),  // 发布-订阅通道
        aggregatingMessageHandler() // 聚合处理器
    ).apply {
        outputChannel = outputChannel()
    }
}

TIP

拍卖场景中必须设置 applySequence=true,确保聚合器能正确关联响应和原始请求

2. 分发场景 (Distribution)

适用情况:根据条件选择特定接收方分发消息(如定向数据查询)

kotlin
@Bean
fun distributor(): MessageHandler {
    return RecipientListRouter().apply {
        applySequence = true  // 关键配置
        setChannels(listOf(
            channel("channel1"), 
            channel("channel2"),
            channel("channel3")
        ))
        // 可添加选择器筛选目标
        setSelector { message ->
            message.payload.toString().contains("VIP")
        }
    }
}

@ServiceActivator(inputChannel = "distributionChannel")
fun scatterGatherDistribution(): MessageHandler {
    return ScatterGatherHandler(
        distributor(), 
        aggregatingMessageHandler()
    ).apply {
        outputChannel = outputChannel()
        gatherTimeout = 10000  // 10秒超时
    }
}

互斥提示

拍卖和分发两种实现方式互斥,请根据场景选择一种

Kotlin DSL 配置实战

Spring Integration 的 Kotlin DSL 提供更简洁的配置方式:

kotlin
@Bean
fun scatterGatherFlow() = integrationFlow("scatterInput") {
    scatterGather(
        scatterer = {
            recipientFlow { handle { p -> "结果1: $p" } }
            recipientFlow { handle { p -> "结果2: $p" } }
        },
        gatherer = {
            aggregator { 
                outputProcessor = MessageGroupProcessor { group ->
                    group.messages.joinToString { it.payload.toString() }
                }
            }
        },
        gatherTimeout = 5000  // 5秒超时
    )
}
kotlin
@Bean
fun distributionFlow() = integrationFlow("distributionInput") {
    scatterGather(
        scatterer = recipientListRouter(
            recipients = mapOf(
                "channel1" to { true },
                "channel2" to { it.payload.toString().length > 5 }
            )
        ),
        gatherer = aggregator {
            releaseStrategy { group -> group.size() == 2 }
            outputProcessor = ExpressionEvaluatingMessageGroupProcessor("#this.![payload]")
        }
    )
}

错误处理策略

Scatter-Gather 的错误处理需要特别关注并行执行特性:

1. 异步错误处理

kotlin
@Bean
fun scatterWithErrorHandling() = integrationFlow("input") {
    scatterGather(
        scatterer = {
            recipientFlow { 
                transform<String> { throw RuntimeException("模拟错误") }
            }
        },
        errorChannel = "scatterErrorChannel"
    )
}

@ServiceActivator(inputChannel = "scatterErrorChannel")
fun handleError(exception: MessagingException): Message<Any> {
    return MessageBuilder.withPayload("错误补偿: ${exception.cause?.message}")
        .copyHeaders(exception.failedMessage.headers)
        .build()
}

2. 同步错误处理

kotlin
@Bean
fun safeRecipientFlow() = integrationFlow {
    handle<Any>({ p, _ -> 
        try {
            // 业务逻辑
        } catch (e: Exception) {
            // 返回补偿消息
            "ERROR: ${e.message}"
        }
    }, advise { 
        expressionEvaluatingRequestHandlerAdvice(
            onFailureExpression = "#result"
        ) 
    })
}

CAUTION

未处理的异常会导致整个 Scatter-Gather 操作失败,所有子流程的结果都将被丢弃!

最佳实践与常见问题

✅ 推荐实践

  1. 设置合理超时:避免无限等待
    kotlin
    ScatterGatherHandler(...).apply {
        gatherTimeout = 30000  // 30秒
    }
  2. 启用序列标记:确保 applySequence=true
  3. 使用专用错误通道:隔离错误处理逻辑
  4. 结果筛选:聚合时过滤无效响应
    kotlin
    AggregatingMessageHandler(
        outputProcessor = ExpressionEvaluatingMessageGroupProcessor(
            "#this.?[payload.successful].![payload.data]"
        )
    )

❌ 避免陷阱

kotlin
// 错误示例:未设置超时
ScatterGatherHandler(distributor(), aggregator()).apply {
    gatherTimeout = null  // 可能导致永久阻塞
}

// 错误示例:未启用序列标记
RecipientListRouter().apply {
    applySequence = false // 需自定义关联策略
}

💡 性能优化建议

kotlin
@Bean
fun optimizedScatterGather() = integrationFlow {
    scatterGather(
        scatterer = {
            recipientFlow { channel("executorChannel") }
        },
        gatherer = {
            aggregator(correlationStrategy = HeaderAttributeCorrelationStrategy("customId")) 
        }
    )
}

@Bean
fun executorChannel() = MessageChannels.executor(taskExecutor()).get()

总结

Scatter-Gather 模式是处理并行请求-响应场景的强大工具:

  1. 拍卖模式:广播消息到所有接收方
  2. 分发模式:条件筛选目标接收方
  3. 聚合阶段:支持自定义结果处理逻辑
  4. 错误处理:需专门设计补偿机制

版本提示

从 Spring Integration 6.0 开始,XML 和 DSL 配置默认启用 applySequence=true,简化了配置

下一步学习:尝试在实际项目中实现多供应商报价比较系统,体验 Scatter-Gather 的强大功能!