Appearance
Spring Integration 通用路由器配置教程
路由器的核心概念
在消息驱动架构中,路由器(Router) 是决定消息流向的关键组件。它像邮局的分拣系统,根据特定规则将消息分发到不同通道。Spring Integration 提供多种专用路由器,而通用路由器(Generic Router) 则适用于自定义路由逻辑的场景。
路由器的关键特性
- 动态决策:基于消息内容、头部或上下文决定目标通道
- 多目标支持:可将消息路由到单个或多个通道
- 错误处理:提供默认通道处理无法路由的消息
- 无状态设计:路由决策不依赖先前消息状态
Kotlin DSL 路由器配置
基础配置示例
kotlin
// 自定义路由器实现
class CustomRouter : AbstractMessageRouter() {
override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
return when (message.payload) {
is String -> listOf(stringChannel())
is Int -> listOf(numberChannel())
else -> listOf(defaultChannel())
}
}
}
// 使用路由器配置集成流
@Bean
fun routerFlow(): IntegrationFlow {
return IntegrationFlow
.from("inputChannel")
.route(CustomRouter()) {
it.defaultOutputChannel("defaultChannel")
}
.get()
}
// 定义目标通道
@Bean
fun stringChannel() = DirectChannel()
@Bean
fun numberChannel() = DirectChannel()
@Bean
fun defaultChannel() = DirectChannel()
注解式路由器配置
kotlin
@Component
class AnnotatedRouter {
// // 注解标记路由方法
@Router(inputChannel = "routingChannel")
fun route(message: Message<*>): String {
return when (message.headers["priority"]) {
"high" -> "priorityChannel"
"normal" -> "normalChannel"
else -> "defaultChannel"
}
}
}
// 通道配置
@Bean
fun priorityChannel() = DirectChannel()
@Bean
fun normalChannel() = DirectChannel()
内联路由逻辑(推荐)
kotlin
@Bean
fun inlineRouterFlow(): IntegrationFlow {
return IntegrationFlow
.from("bookingChannel")
.route<BookingRequest>({ request ->
when {
request.type == BookingType.FLIGHT -> "flightProcessingChannel"
request.type == BookingType.HOTEL -> "hotelProcessingChannel"
request.urgent -> "urgentProcessingChannel"
else -> null // 使用默认通道
}
}) {
it.defaultOutputChannel("defaultProcessingChannel")
it.channelMapping("flight", "flightProcessingChannel") // 备用映射
}
.get()
}
路由器的关键配置选项
1. 默认输出通道
kotlin
.route(router) {
it.defaultOutputChannel("defaultChannel")
}
TIP
始终设置默认通道!当路由逻辑无法确定目标时,防止消息丢失
2. 通道映射表
kotlin
.route<Order>({ order -> order.category }) {
it.channelMapping("ELECTRONICS", "electronicsChannel")
it.channelMapping("CLOTHING", "clothingChannel")
it.channelMapping("FOOD", "foodChannel")
}
3. 解析失败处理
kotlin
.route(router) {
it.resolutionRequired(false) // // 允许返回空集合
it.sendTimeout(5000) // 发送超时设置
it.advice(expressionEvaluatingAdvice()) // 添加处理建议
}
最佳实践与常见问题
✅ 推荐做法
- 保持路由逻辑简单:路由决策应快速无阻塞
- 使用类型安全路由:Kotlin 的
when
表达式完美适配 - 单元测试路由逻辑:独立测试路由决策逻辑
kotlin
@Test
fun `应路由高优先级消息到专用通道`() {
val message = MessageBuilder.withPayload("urgent")
.setHeader("priority", "high")
.build()
val channels = router.determineTargetChannels(message)
assertThat(channels).containsExactly(priorityChannel)
}
⚠️ 避免的陷阱
kotlin
// 反模式:在路由器中执行业务逻辑
override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
// 错误:包含业务处理逻辑
processOrder(message.payload as Order)
// 正确:仅包含路由决策
return when {
(message.payload as Order).amount > 1000 -> listOf(largeOrderChannel)
else -> listOf(smallOrderChannel)
}
}
WARNING
路由器的单一职责原则:路由器应只负责路由决策,不执行业务逻辑。业务处理应在后续服务中完成。
性能优化技巧
kotlin
.route({ message ->
cache.getOrPut(message.headers["type"]) {
when (message.headers["type"]) {
"A" -> "channelA"
"B" -> "channelB"
else -> "default"
}
}
})
路由缓存策略
对于频繁出现的路由键值,使用缓存可显著提升性能。但要注意缓存失效策略,避免过时路由决策
真实场景:电商订单路由系统
Kotlin 实现代码:
kotlin
@Bean
fun orderRoutingFlow(): IntegrationFlow {
return IntegrationFlow
.from("orderInputChannel")
.route<Order> { order ->
when {
order.customerLevel == CustomerLevel.VIP -> "vipOrderChannel"
order.shippingCountry != "CN" -> "internationalOrderChannel"
order.items.any { it.isDangerousGoods } -> "hazardousChannel"
else -> "domesticOrderChannel"
}
}
.get()
}
// 通道配置
@Bean fun vipOrderChannel() = QueueChannel(100)
@Bean fun internationalOrderChannel() = QueueChannel(50)
@Bean fun hazardousChannel() = DirectChannel()
@Bean fun domesticOrderChannel() = ExecutorChannel(TaskExecutor())
常见问题解答
Q:如何处理路由到多个通道?
A:返回通道集合即可实现多播:
kotlin
.route { message ->
listOf("channelA", "channelB", "channelC")
}
Q:路由决策能否访问Spring上下文?
A:可以!路由器也是Spring Bean:
kotlin
.route { message ->
if (featureToggle.isEnabled("new-processing"))
"newChannel"
else
"legacyChannel"
}
Q:如何动态更新路由规则?
A:使用可重载的路由策略:
kotlin
class ReloadableRouter : AbstractMessageRouter() {
var routingRules: Map<String, String> = emptyMap()
override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
val key = message.headers["routeKey"].toString()
return listOf(channelResolver.resolveDestination(routingRules[key] ?: "default"))
}
}
NOTE
版本兼容性提示:从Spring Integration 5.0开始,推荐使用函数式路由器配置代替XML配置,以获得更好的类型安全和可维护性。
总结
Spring Integration 的通用路由器提供了灵活的消息分发机制,通过本教程你已掌握:
- 使用Kotlin DSL配置路由器的三种方式
- 路由决策的最佳实践和常见陷阱
- 真实场景中的路由应用模式
- 性能优化和动态路由技巧
下一步学习:
- 探索专用路由器(头部路由器、收件人列表路由器)
- 学习结合Spring Expression Language(SpEL)的高级路由
- 了解错误处理策略和消息回退机制
kotlin
// 最终提示:保持路由逻辑简洁明确
fun determineRoute(message: Message<*>): String {
// 好的路由:清晰的条件链
return when {
conditionA(message) -> "ChannelA"
conditionB(message) -> "ChannelB"
else -> "DefaultChannel"
}
}