Appearance
Spring Integration Aggregator 详解教程
概述
Aggregator(聚合器)是Spring Integration中的核心组件之一,它与Splitter(拆分器)的功能相反,将多个相关消息组合成单个消息。想象一下快递分拣中心:多个包裹(消息)根据相同的目的地(关联ID)被分组,直到整个包裹集齐后才发往最终目的地。
核心概念
1. Aggregator 工作原理
Aggregator 是有状态的组件,需要维护消息组直到完成聚合:
kotlin
class AggregatorWorkflow {
fun process() {
// 1. 接收多个相关消息
// 2. 存储到MessageStore中
// 3. 判断是否满足释放条件
// 4. 聚合消息并发送
}
}
2. 关键组件
组件 | 作用 | 默认实现 |
---|---|---|
CorrelationStrategy | 定义消息如何分组 | HeaderAttributeCorrelationStrategy |
ReleaseStrategy | 判断何时释放消息组 | SimpleSequenceSizeReleaseStrategy |
MessageGroupProcessor | 实际执行聚合逻辑 | DefaultAggregatingMessageGroupProcessor |
MessageStore | 存储未完成的消息组 | 内存存储(可配置持久化) |
配置 Aggregator
1. 使用 Kotlin DSL 配置
kotlin
@Configuration
class AggregatorConfig {
@Bean
fun aggregatorFlow() = integrationFlow("inputChannel") {
aggregate {
correlationStrategy { message ->
message.headers["orderId"] // 按订单ID分组
}
releaseStrategy { group ->
group.size() == 3 // 当组内有3条消息时释放
}
outputProcessor { group ->
// 自定义聚合逻辑:合并所有消息内容
val combined = group.messages.joinToString { it.payload.toString() }
MessageBuilder.withPayload(combined).build()
}
messageStore(messageStore())
groupTimeout(5000) // 5秒超时
}
channel("outputChannel")
}
@Bean
fun messageStore() = SimpleMessageStore()
}
2. 使用注解配置
kotlin
@Service
class OrderAggregator {
@Aggregator
fun aggregateOrders(orders: List<OrderItem>): Order {
val order = Order()
orders.forEach { order.addItem(it) }
return order
}
@CorrelationStrategy
fun correlateBy(item: OrderItem) = item.orderId
@ReleaseStrategy
fun isComplete(messages: List<Message<OrderItem>>) =
messages.size == messages[0].headers["itemCount"] as Int
}
最佳实践建议
- 优先使用注解配置:代码更简洁,类型安全
- 为大型消息组使用持久化存储:避免内存溢出
- 始终设置超时机制:防止消息永远滞留
关键策略详解
1. 关联策略 (CorrelationStrategy)
决定消息如何分组,默认使用CORRELATION_ID
头信息:
kotlin
// 自定义关联策略示例
class CustomCorrelationStrategy : CorrelationStrategy {
override fun getCorrelationKey(message: Message<*>): Any {
return (message.payload as UserEvent).userId
}
}
2. 释放策略 (ReleaseStrategy)
判断消息组是否就绪,默认基于SEQUENCE_SIZE
头信息:
kotlin
class SizeReleaseStrategy(private val requiredSize: Int) : ReleaseStrategy {
override fun canRelease(group: MessageGroup): Boolean {
return group.size() >= requiredSize
}
}
3. 聚合逻辑 (MessageGroupProcessor)
定义如何将多个消息合并为单个消息:
kotlin
class OrderProcessor : AbstractAggregatingMessageGroupProcessor() {
override fun aggregatePayloads(
group: MessageGroup,
headers: MutableMap<String, Any>
): Any {
val orders = group.messages.map { it.payload as OrderItem }
return CompleteOrder(orders)
}
}
状态管理与超时处理
1. MessageGroupStore
Aggregator 使用 MessageGroupStore
管理状态:
kotlin
@Bean
fun jdbcMessageStore(dataSource: DataSource) = JdbcMessageStore(dataSource).apply {
setRegion("ORDER_AGGREGATOR") // 分区存储
}
2. 超时处理机制
kotlin
aggregate {
groupTimeout(10000) // 10秒超时
expireGroupsUponCompletion(true) // 完成后移除组
sendPartialResultOnExpiry(true) // 超时发送部分结果
}
重要注意事项
WARNING
避免内存泄漏:必须配置超时或使用MessageGroupStoreReaper
清理过期组,否则长期未完成组会导致内存泄漏 :::
高级场景
1. 处理大型消息组
kotlin
@Bean
fun largeGroupAggregator() = AggregatorSpec().apply {
processor { group ->
// 使用流式处理避免内存问题
Flux.fromIterable(group.messages)
.map { it.payload }
.collectList()
}
messageStore(jdbcMessageStore())
lockRegistry(redisLockRegistry()) // 分布式锁
}
2. Flux Aggregator (响应式)
kotlin
@Bean
fun fluxAggregator() = FluxAggregatorMessageHandler().apply {
setCorrelationStrategy { message ->
message.headers["sessionId"]
}
setWindowSize(5) // 每5条消息一个窗口
setCombineFunction { flux ->
flux.map { it.payload }
.collectList()
.map { CombinedEvent(it) }
}
}
常见问题解决方案
问题1:消息永远无法释放
解决方案:
- 检查释放策略逻辑是否正确
- 添加超时设置:
groupTimeout(30000)
- 实现死锁检测:
kotlin
aggregate {
releaseLockBeforeSend(true) // 避免死锁
}
问题2:性能瓶颈
优化方案:
kotlin
aggregate {
messageGroupStore(redisMessageStore()) // 使用Redis存储
lockRegistry(zookeeperLockRegistry()) // 分布式锁
groupTimeoutExpression("size() ge 1000 ? 100 : 5000") // 动态超时
}
问题3:消息重复处理
预防措施:
kotlin
class DeduplicationReleaseStrategy : ReleaseStrategy {
override fun canRelease(group: MessageGroup): Boolean {
val ids = group.messages.map { it.headers["id"] }
return ids.size == ids.distinct().size // 检查重复ID
}
}
总结
Aggregator 是Spring Integration中处理消息分组聚合的核心组件,关键要点:
- ✅ 组合相关消息:通过CorrelationStrategy定义分组逻辑
- ✅ 灵活释放机制:通过ReleaseStrategy控制何时释放组
- ✅ 状态管理:使用MessageGroupStore持久化消息组
- ⚠️ 必须配置超时:防止内存泄漏和死锁
- ⚡️ 响应式支持:使用FluxAggregator处理高并发场景
TIP
实际应用场景:订单处理系统 - 将同一订单的多个商品消息聚合成完整订单,当所有商品处理完成或超时时发送到下游服务。