Skip to content

Spring Integration Aggregator 详解教程

概述

Aggregator(聚合器)是Spring Integration中的核心组件之一,它与Splitter(拆分器)的功能相反,将多个相关消息组合成单个消息。想象一下快递分拣中心:多个包裹(消息)根据相同的目的地(关联ID)被分组,直到整个包裹集齐后才发往最终目的地。

核心概念

1. Aggregator 工作原理

Aggregator 是有状态的组件,需要维护消息组直到完成聚合:

kotlin

class AggregatorWorkflow {
    fun process() {
        // 1. 接收多个相关消息
        // 2. 存储到MessageStore中
        // 3. 判断是否满足释放条件
        // 4. 聚合消息并发送
    }
}

2. 关键组件

组件作用默认实现
CorrelationStrategy定义消息如何分组HeaderAttributeCorrelationStrategy
ReleaseStrategy判断何时释放消息组SimpleSequenceSizeReleaseStrategy
MessageGroupProcessor实际执行聚合逻辑DefaultAggregatingMessageGroupProcessor
MessageStore存储未完成的消息组内存存储(可配置持久化)

配置 Aggregator

1. 使用 Kotlin DSL 配置

kotlin
@Configuration
class AggregatorConfig {

    @Bean
    fun aggregatorFlow() = integrationFlow("inputChannel") {

        aggregate {
            correlationStrategy { message ->
                message.headers["orderId"] // 按订单ID分组
            }
            releaseStrategy { group ->
                group.size() == 3 // 当组内有3条消息时释放
            }
            outputProcessor { group ->
                // 自定义聚合逻辑:合并所有消息内容
                val combined = group.messages.joinToString { it.payload.toString() }
                MessageBuilder.withPayload(combined).build()
            }
            messageStore(messageStore())
            groupTimeout(5000) // 5秒超时
        }
        channel("outputChannel")
    }

    @Bean
    fun messageStore() = SimpleMessageStore()
}

2. 使用注解配置

kotlin
@Service
class OrderAggregator {


    @Aggregator
    fun aggregateOrders(orders: List<OrderItem>): Order {
        val order = Order()
        orders.forEach { order.addItem(it) }
        return order
    }

    @CorrelationStrategy
    fun correlateBy(item: OrderItem) = item.orderId

    @ReleaseStrategy
    fun isComplete(messages: List<Message<OrderItem>>) =
        messages.size == messages[0].headers["itemCount"] as Int
}

最佳实践建议

  1. 优先使用注解配置:代码更简洁,类型安全
  2. 为大型消息组使用持久化存储:避免内存溢出
  3. 始终设置超时机制:防止消息永远滞留

关键策略详解

1. 关联策略 (CorrelationStrategy)

决定消息如何分组,默认使用CORRELATION_ID头信息:

kotlin
// 自定义关联策略示例
class CustomCorrelationStrategy : CorrelationStrategy {
    override fun getCorrelationKey(message: Message<*>): Any {
        return (message.payload as UserEvent).userId
    }
}

2. 释放策略 (ReleaseStrategy)

判断消息组是否就绪,默认基于SEQUENCE_SIZE头信息:

kotlin
class SizeReleaseStrategy(private val requiredSize: Int) : ReleaseStrategy {
    override fun canRelease(group: MessageGroup): Boolean {
        return group.size() >= requiredSize
    }
}

3. 聚合逻辑 (MessageGroupProcessor)

定义如何将多个消息合并为单个消息:

kotlin
class OrderProcessor : AbstractAggregatingMessageGroupProcessor() {

    override fun aggregatePayloads(
        group: MessageGroup,
        headers: MutableMap<String, Any>
    ): Any {
        val orders = group.messages.map { it.payload as OrderItem }
        return CompleteOrder(orders)
    }
}

状态管理与超时处理

1. MessageGroupStore

Aggregator 使用 MessageGroupStore 管理状态:

kotlin
@Bean
fun jdbcMessageStore(dataSource: DataSource) = JdbcMessageStore(dataSource).apply {
    setRegion("ORDER_AGGREGATOR") // 分区存储
}

2. 超时处理机制

kotlin
aggregate {
    groupTimeout(10000) // 10秒超时
    expireGroupsUponCompletion(true) // 完成后移除组
    sendPartialResultOnExpiry(true) // 超时发送部分结果
}

重要注意事项

WARNING

避免内存泄漏:必须配置超时或使用MessageGroupStoreReaper清理过期组,否则长期未完成组会导致内存泄漏 :::

高级场景

1. 处理大型消息组

kotlin
@Bean
fun largeGroupAggregator() = AggregatorSpec().apply {
    processor { group ->
        // 使用流式处理避免内存问题
        Flux.fromIterable(group.messages)
            .map { it.payload }
            .collectList()
    }
    messageStore(jdbcMessageStore())
    lockRegistry(redisLockRegistry()) // 分布式锁
}

2. Flux Aggregator (响应式)

kotlin
@Bean
fun fluxAggregator() = FluxAggregatorMessageHandler().apply {
    setCorrelationStrategy { message ->
        message.headers["sessionId"]
    }
    setWindowSize(5) // 每5条消息一个窗口
    setCombineFunction { flux ->
        flux.map { it.payload }
            .collectList()
            .map { CombinedEvent(it) }
    }
}

常见问题解决方案

问题1:消息永远无法释放

解决方案

  1. 检查释放策略逻辑是否正确
  2. 添加超时设置:groupTimeout(30000)
  3. 实现死锁检测:
kotlin
aggregate {
    releaseLockBeforeSend(true) // 避免死锁
}

问题2:性能瓶颈

优化方案

kotlin

aggregate {
    messageGroupStore(redisMessageStore()) // 使用Redis存储
    lockRegistry(zookeeperLockRegistry()) // 分布式锁
    groupTimeoutExpression("size() ge 1000 ? 100 : 5000") // 动态超时
}

问题3:消息重复处理

预防措施

kotlin
class DeduplicationReleaseStrategy : ReleaseStrategy {
    override fun canRelease(group: MessageGroup): Boolean {
        val ids = group.messages.map { it.headers["id"] }
        return ids.size == ids.distinct().size // 检查重复ID
    }
}

总结

Aggregator 是Spring Integration中处理消息分组聚合的核心组件,关键要点:

  1. 组合相关消息:通过CorrelationStrategy定义分组逻辑
  2. 灵活释放机制:通过ReleaseStrategy控制何时释放组
  3. 状态管理:使用MessageGroupStore持久化消息组
  4. ⚠️ 必须配置超时:防止内存泄漏和死锁
  5. ⚡️ 响应式支持:使用FluxAggregator处理高并发场景

TIP

实际应用场景:订单处理系统 - 将同一订单的多个商品消息聚合成完整订单,当所有商品处理完成或超时时发送到下游服务。