Appearance
Spring Integration 消息路由指南
在现代分布式系统中,消息路由是构建灵活、可扩展应用架构的核心技术。Spring Integration 提供了强大的消息路由能力,让开发者能够轻松实现消息的过滤、拆分、聚合等复杂处理逻辑。
TIP
消息路由就像邮局的分拣系统:根据信件特征(地址、类型等)决定投递路径,确保每封信都能到达正确目的地。
核心路由组件
路由器 (Routers)
路由器根据消息内容或头部信息决定消息流向不同通道。
kotlin
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.Router
import org.springframework.integration.router.AbstractMessageRouter
import org.springframework.messaging.Message
@Configuration
class RouterConfig {
// // 重点:使用注解配置路由器
@Bean
@Router(inputChannel = "inputChannel")
fun messageRouter(): AbstractMessageRouter {
return object : AbstractMessageRouter() {
override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
val type = message.headers["messageType"] as String
return when (type) {
"order" -> listOf(orderChannel())
"payment" -> listOf(paymentChannel())
else -> listOf(defaultChannel()) // 警告:必须有默认处理
}
}
}
}
}
应用场景:电商系统中根据订单类型(普通/加急)路由到不同处理通道
注意事项
- 必须设置默认通道处理未知消息类型
- 避免路由逻辑过于复杂(超过 3 层判断应考虑其他方案)
🚫 2. 过滤器 (Filter)
过滤器根据预定条件决定是否允许消息通过。
kotlin
import org.springframework.integration.annotation.Filter
import org.springframework.messaging.Message
class MessageFilter {
// // 重点:过滤条件实现
@Filter(inputChannel = "inputChannel", outputChannel = "outputChannel")
fun isValid(message: Message<String>): Boolean {
return !message.payload.contains("blocked_keyword") // 错误:实际项目应使用正则匹配
}
}
最佳实践:在金融系统中过滤包含敏感词的交易请求
增加过滤器之后的消息流向
🪓 3. 分割器 (Splitter)
将单个消息拆分为多个独立消息并行处理。
kotlin
import org.springframework.integration.annotation.Splitter
import org.springframework.messaging.Message
class OrderSplitter {
@Splitter(inputChannel = "orderInput", outputChannel = "itemProcessing")
fun splitOrder(orderMessage: Message<Order>): List<OrderItem> {
return orderMessage.payload.items.map { item ->
OrderItem(item.id, item.name, item.quantity)
}
}
}
data class Order(val id: String, val items: List<OrderItem>)
data class OrderItem(val id: String, val name: String, val quantity: Int)
应用场景:电商订单拆分为单个商品项独立处理
🧩 4. 聚合器 (Aggregator)
将相关消息组合成单个消息,是分割器的逆操作。
kotlin
import org.springframework.integration.annotation.Aggregator
import org.springframework.integration.annotation.CorrelationStrategy
import org.springframework.integration.annotation.ReleaseStrategy
import org.springframework.messaging.Message
class OrderAggregator {
// // 重点:关联策略
@CorrelationStrategy
fun correlateByOrderId(item: OrderItem): String {
return item.orderId
}
// // 重点:释放策略
@ReleaseStrategy
fun release(items: List<Message<OrderItem>>): Boolean {
return items.size == items[0].payload.totalItems
}
@Aggregator(inputChannel = "processedItems", outputChannel = "completeOrders")
fun assembleOrder(items: List<OrderItem>): Order {
return Order(items.first().orderId, items)
}
}
IMPORTANT
聚合器必须配置超时策略,防止因消息丢失导致无限等待!
🔢 5. 重排器 (Resequencer)
确保消息按指定顺序处理,常用于分布式系统。
kotlin
import org.springframework.integration.annotation.Resequencer
import org.springframework.messaging.Message
import java.util.Comparator
class MessageResequencer {
@Resequencer(inputChannel = "unorderedInput", outputChannel = "orderedOutput")
fun resequenceComparator(): Comparator<Message<*>> {
return Comparator { m1, m2 ->
val seq1 = m1.headers["sequenceNumber"] as Int
val seq2 = m2.headers["sequenceNumber"] as Int
seq1.compareTo(seq2)
}
}
}
应用场景:处理股票交易时确保价格更新顺序正确
⛓️ 6. 消息处理器链 (Message Handler Chain)
将多个处理器串联形成处理流水线。
kotlin
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
@Configuration
class HandlerChainConfig {
@Bean
fun processingFlow(): IntegrationFlow {
return IntegrationFlows.from("inputChannel")
.filter { it.payload.isValid() }
.transform(MessageTransformer::class, "convertFormat")
.handle(MessageLogger::class, "logMessage") // [!code ++] // 推荐:添加日志记录
.channel("outputChannel")
.get()
}
}
🌐 7. 分散-聚集 (Scatter-Gather)
并行发送请求到多个服务并聚合结果。
kotlin
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.dsl.MessageChannels
@Configuration
class ScatterGatherConfig {
@Bean
fun scatterGatherFlow(): IntegrationFlow {
return IntegrationFlows.from("inputChannel")
.scatterGather(
scatterer = scatterer()
.applySequence(true)
.recipientFlow(flow1())
.recipientFlow(flow2()),
gatherer = gatherer()
.processor(aggregator(), null)
)
.channel("resultChannel")
.get()
}
private fun flow1() = IntegrationFlow { it.handle("service1", "process") }
private fun flow2() = IntegrationFlow { it.handle("service2", "process") }
}
应用场景:从多个供应商 API 获取报价并返回最优价格
🚧 8. 线程屏障 (Thread Barrier)
暂停处理并等待外部事件后再继续。
kotlin
import org.springframework.integration.annotation.BridgeFrom
import org.springframework.integration.annotation.BridgeTo
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.messaging.MessageChannel
class BarrierService {
@BridgeFrom("pauseChannel")
@BridgeTo("resumeChannel")
lateinit var barrierChannel: MessageChannel
@ServiceActivator(inputChannel = "pauseChannel")
fun pauseExecution(message: Message<*>) {
// 存储消息并等待恢复信号
barrierStore.save(message)
}
@ServiceActivator(inputChannel = "resumeSignal")
fun resumeExecution(signal: Message<*>) {
// 获取存储的消息并继续处理
val original = barrierStore.retrieve(signal)
resumeChannel.send(original)
}
}
应用场景:支付处理中等待银行确认后再完成订单
最佳实践总结
kotlin
// 使用DSL配置清晰的路由逻辑
IntegrationFlows.from("input")
.routeByExpression("headers['type']") {
it.channelMapping("order", "orderChannel")
.channelMapping("payment", "paymentChannel")
.defaultOutputChannel("defaultChannel") // [!code highlight] // 重点:默认通道
}
kotlin
// 不推荐:在路由器中直接写业务逻辑
@Router(inputChannel = "input")
fun complexRouter(): AbstractMessageRouter {
return object : AbstractMessageRouter() {
override fun determineTargetChannels(message: Message<*>) {
// 200+行业务逻辑... // [!code error] // 错误:路由器应保持简单
}
}
}
✅ 路由设计黄金法则:
- 保持路由逻辑简单纯粹
- 单个路由器决策点不超过 5 个
- 为未处理消息配置死信通道
- 为聚合器设置合理超时
CAUTION
在分布式系统中使用重排器时需特别注意:
- 消息序列号必须全局唯一
- 设置合理的窗口大小和超时
- 考虑使用持久化存储防止重启后序列丢失
常见问题解决
消息卡在聚合器中怎么办?
kotlin
// 配置聚合器超时释放
@Bean
fun aggregator() = AggregatorSpec().apply {
processor(CustomAggregator())
releaseStrategy { group.size == 3 }
groupTimeout(30_000) // 30秒超时 // [!code highlight] // 重点:设置超时
expireGroupsUponCompletion(true)
}
如何调试路由问题?
kotlin
// 启用Spring Integration调试日志
logging.level.org.springframework.integration=DEBUG
logging.level.org.springframework.messaging=TRACE
// 添加消息历史跟踪
@Bean
fun messageHistoryConfig() {
GlobalChannelInterceptor(MessageHistoryWritingInterceptor())
}
掌握这些消息路由模式,你将能构建出高度解耦、弹性伸缩的分布式系统。在实际项目中,建议从简单路由开始,逐步引入更复杂的模式以满足业务需求。
"好的架构不是一次性设计出来的,而是通过恰当的消息路由逐步演化而来。" — Spring Integration 设计哲学