Skip to content

Spring Integration Splitter 详解:消息拆分的艺术

TIP

学习目标
本教程将带你掌握 Spring Integration 的核心组件 Splitter,学会如何将复杂消息拆分为独立处理单元。通过实际订单处理案例,你将理解拆分器在消息路由中的应用场景和最佳实践。

一、Splitter 核心概念

1.1 Splitter 是什么?

Splitter 是 Spring Integration 中的消息拆分组件,它接收单个消息并将其拆分为多个独立部分,每个部分作为独立消息发送到下游处理。典型应用场景包括:

1.2 核心功能与消息头

Splitter 会自动添加三个关键消息头,用于消息追踪和聚合:

消息头说明示例值
CORRELATION_ID拆分消息组的唯一标识order-123
SEQUENCE_SIZE拆分后的消息总数3
SEQUENCE_NUMBER当前消息在组中的序号(从1开始)2

IMPORTANT

这些消息头是后续聚合处理的关键,切勿手动修改它们!

二、Splitter 实现方式

2.1 继承 AbstractMessageSplitter(基础方式)

kotlin
import org.springframework.integration.splitter.AbstractMessageSplitter
import org.springframework.messaging.Message


class OrderSplitter : AbstractMessageSplitter() {
    override fun splitMessage(message: Message<*>): Any {
        val order = message.payload as Order
        return order.items // 返回集合会被自动拆分为多条消息
    }
}

2.2 POJO 方法 + @Splitter 注解(推荐✨)

kotlin
import org.springframework.integration.annotation.Splitter
import org.springframework.stereotype.Component

@Component
class OrderProcessor {


    @Splitter(inputChannel = "orderChannel", outputChannel = "itemChannel")
    fun splitOrder(order: Order): List<OrderItem> {
        // 返回类型可以是集合/数组/流/Flux
        // 非消息对象会自动包装为Message
        return order.items
    }
}

TIP

POJO 方式的优势

  1. 代码与 Spring API 解耦
  2. 更易单元测试
  3. 注解配置简洁直观

三、现代配置实战(Kotlin DSL)

3.1 基础拆分配置

kotlin
@Bean
fun orderProcessingFlow() = integrationFlow {
    split<Order> { order ->
        order.items // 拆分逻辑
    }
    handle { item: OrderItem ->
        // 处理每个订单项
        processItem(item)
    }
}

3.2 高级流处理(响应式 + 容错)

kotlin
@Bean
fun reactiveSplitFlow() = integrationFlow {
    split<Order> { order ->
        Flux.fromIterable(order.items) // 使用响应式流
    }
    .channel(MessageChannels.flux()) // 响应式通道
    .handle { item: OrderItem ->
        processItem(item)
    }
}
kotlin
@Bean
fun safeSplitFlow() = integrationFlow("orderChannel") {
    split<Order>(
        { order -> order.items },
        { discardChannel = "discardChannel" } // 空集合处理
    )
    .handle { item: OrderItem ->
        processItem(item)
    }
}

CAUTION

空集合处理陷阱
当拆分方法返回空集合时,默认行为会丢弃原始消息。通过设置 discardChannel 可捕获这种情况:

kotlin
// 配置丢弃通道
@ServiceActivator(inputChannel = "discardChannel")
fun handleEmptyOrder(order: Order) {
    logger.warn("空订单: ${order.id}")
}

四、实战案例:订单处理系统

4.1 领域模型定义

kotlin
// 订单类
data class Order(
    val id: String,
    val items: List<OrderItem>
)

// 订单项类
data class OrderItem(
    val productId: String,
    val quantity: Int,
    val price: BigDecimal
)

4.2 完整集成流程

kotlin
@Bean
fun orderProcessingFlow() = integrationFlow {
    tep 1: 接收订单
    channel("orderInput")

    tep 2: 拆分订单为订单项
    split<Order> { order -> order.items }

    tep 3: 并行处理每个订单项
    .channel(MessageChannels.executor(Executors.newCachedThreadPool()))

    tep 4: 处理单个订单项
    handle { item: OrderItem, headers: Map<String, Any> ->
        logger.info("处理中: ${item.productId} [序号: ${headers[SEQUENCE_NUMBER]}/${headers[SEQUENCE_SIZE]}]")
        inventoryService.reserve(item.productId, item.quantity)
    }

    tep 5: 聚合处理结果
    .aggregate()
}

4.3 关键处理节点说明

五、高级特性与最佳实践

5.1 迭代器与流处理

kotlin
@Splitter
fun streamSplit(order: Order): Stream<OrderItem> {
    // 使用Stream避免全量加载到内存
    return order.items.stream()
}

TIP

迭代器注意事项
使用 Iterator/Stream 时:

  • SEQUENCE_SIZE 默认为 0(大小未知)
  • 需要自定义聚合器的 ReleaseStrategy
  • 配合 group-timeout 防止无限等待

5.2 响应式拆分(Reactive Streams)

kotlin
@Bean
fun reactiveSplitFlow() = integrationFlow {
    split<Order> { order ->
        // 返回响应式流
        Flux.fromIterable(order.items)
            .delayElements(Duration.ofMillis(100))
    }
    .channel(MessageChannels.flux())
    .handle { item: OrderItem ->
        // 背压支持
    }
}

六、常见问题解决方案

6.1 消息头丢失问题

症状:下游服务无法获取 CORRELATION_ID
原因:手动创建消息未复制原始头信息
解决:使用 MessageBuilder 复制头信息

kotlin
@Splitter
fun manualSplit(order: Order): List<Message<OrderItem>> {
    return order.items.map { item ->
        MessageBuilder.withPayload(item)
            .copyHeaders(order.headers) // 关键复制操作
            .build()
    }
}

6.2 大文件拆分优化

场景:处理 1GB 的 CSV 文件
方案:使用流式拆分避免内存溢出

kotlin
@Splitter
fun largeFileSplit(file: File): Sequence<String> {
    return file.bufferedReader().lineSequence()
}

6.3 性能监控技巧

kotlin
@Bean
fun splitterMonitoring() = IntegrationFlow {
    .log(LoggingHandler.Level.INFO, "splitter.metrics")
    { m -> "拆分统计: ${m.payload.size} 项" }
}

七、总结与最佳实践

正确使用姿势

  1. POJO+注解 优先于继承实现
  2. 返回集合对象让框架处理消息包装
  3. 大数据集使用 Stream/Flux 避免内存溢出
  4. 始终配置 discardChannel 处理空结果

避免的陷阱

  1. 手动修改 SEQUENCE_* 头信息
  2. 阻塞操作阻塞拆分器线程(使用异步通道)
  3. 在拆分方法中进行耗时业务处理

未来学习路径

  1. [Aggregator] - 拆分后的消息聚合
  2. [消息通道] - 不同通道类型的选择
  3. [错误处理] - 分布式消息的错误恢复

"拆分是复杂系统解耦的钥匙,合理使用让消息流如溪水分支般自然流畅。" - Spring Integration 设计哲学

本教程完整代码示例可在 [GitHub示例库] 获取。