Appearance
Spring Integration 注解配置路由器教程
概述
在消息驱动架构中,路由器(Router) 就像邮局的分拣员,根据消息内容决定将其发送到哪个处理通道。本教程将介绍如何使用注解方式配置 Spring Integration 路由器,采用现代 Kotlin DSL 替代传统 XML 配置。
路由器核心功能
一、基础路由器配置
1.1 @Router 注解基础用法
使用 @Router
注解的方法可以返回以下类型:
kotlin
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.integration.annotation.Router
class OrderRouter {
// 1. 返回单个MessageChannel
@Router
fun routeByMessage(message: Message<Order>): MessageChannel {
return if (message.payload.isPremium) premiumChannel else standardChannel
}
// 2. 返回多个MessageChannel
@Router
fun multicastRoute(message: Message<Order>): List<MessageChannel> {
return listOf(inventoryChannel, billingChannel, notificationChannel)
}
// 3. 返回通道名称(String)
@Router
fun routeByPayload(order: Order): String {
return if (order.amount > 1000) "priorityChannel" else "normalChannel"
}
// 4. 返回多个通道名称
@Router
fun multiRoute(order: Order): List<String> {
val channels = mutableListOf("loggingChannel")
if (order.needsVerification) channels.add("verificationChannel")
return channels
}
}
TIP
返回字符串类型时,Spring 会自动解析通道名称,简化配置
1.2 配置路由器端点
在 @Configuration
类中启用集成配置:
kotlin
@Configuration
@EnableIntegration
class IntegrationConfig {
@Bean
fun orderRouter(): OrderRouter {
return OrderRouter()
}
@Bean
fun priorityChannel(): MessageChannel {
return DirectChannel()
}
@Bean
fun normalChannel(): MessageChannel {
return DirectChannel()
}
// 其他通道定义...
}
二、基于消息头的路由
2.1 使用 @Header 注解
从消息头提取路由依据:
kotlin
import org.springframework.integration.annotation.Header
class StatusRouter {
// // 重点:基于消息头的路由
@Router
fun routeByStatus(@Header("orderStatus") status: OrderStatus): List<String> {
return when (status) {
OrderStatus.PENDING -> listOf("validationChannel")
OrderStatus.APPROVED -> listOf("processingChannel")
OrderStatus.REJECTED -> listOf("rejectionChannel")
else -> listOf("errorChannel") // [!code warning] // 警告:必须处理默认情况
}
}
}
2.2 消息头路由时序
三、实际应用场景:订单处理系统
3.1 业务场景
根据订单特征路由:
- 高金额订单 ➔ 优先通道
- 国际订单 ➔ 海关检查通道
- 高风险用户 ➔ 人工审核通道
完整路由器实现
kotlin
class OrderProcessingRouter {
@Router
fun routeOrder(order: Order): List<String> {
val channels = mutableListOf("loggingChannel")
when {
order.amount > 5000 -> channels.add("priorityChannel")
order.isInternational -> {
channels.add("customsCheckChannel")
channels.add("currencyConversionChannel")
}
order.customer.riskLevel > 7 -> channels.add("manualReviewChannel")
else -> channels.add("standardProcessingChannel")
}
// [!code error] // 错误:缺少空订单检查
if (order.items.isEmpty()) {
throw IllegalArgumentException("空订单") // [!code warning] // 警告:需配置错误处理
}
return channels
}
// 错误处理配置示例
@ServiceActivator(inputChannel = "errorChannel")
fun handleErrors(ex: MessagingException) {
logger.error("路由错误: ${ex.failedMessage}", ex)
}
}
3.2 通道配置最佳实践
kotlin
@Bean
fun orderRouter(): IntegrationFlow {
return IntegrationFlow.from("orderInputChannel")
.route<Order>({ order ->
when {
order.isPriority -> "priorityChannel"
else -> "standardChannel"
}
})
.get()
}
kotlin
@Bean
fun multicastRouter(): IntegrationFlow {
return IntegrationFlow.from("orderInputChannel")
.routeToRecipients { router ->
router.recipient("inventoryChannel", { order -> order.needsStockCheck })
.recipient("billingChannel") // 始终路由
.recipient("notificationChannel", { order -> order.customer.wantsNotifications })
}
}
四、常见问题解决方案
4.1 路由失败处理
CAUTION
当路由器找不到匹配通道时,会抛出 MessageDeliveryException
解决方案:
kotlin
@Bean
fun routerWithDefault() = IntegrationFlow.from("input")
.route<Order>(
{ order -> order.type ?: "unknownType" },
{ mapping ->
mapping.channelMapping("VIP", "vipChannel")
.channelMapping("STANDARD", "standardChannel")
.defaultOutputChannel("unroutedChannel") // [!code highlight] // 关键:默认通道
}
)
4.2 动态路由更新
实现动态路由规则更新:
kotlin
@Router
fun dynamicRoute(order: Order, @Header("routingRules") rules: RoutingRules): String {
return rules.determineChannel(order) ?: "defaultChannel"
}
// 更新路由规则
fun updateRules(newRules: RoutingRules) {
messagingTemplate.convertAndSend("ruleUpdateChannel", newRules)
}
五、最佳实践与注意事项
通道解析策略:
kotlin@Bean fun channelResolver(): HeaderChannelRegistry { return HeaderChannelRegistry() // 启用通道名称解析 }
性能优化:
- 对高频路由使用
RecipientListRouter
替代方法路由 - 使用
@Scope("prototype")
创建无状态路由器实例
- 对高频路由使用
测试策略:
kotlin@SpringBootTest class RouterTests { @Autowired private lateinit var inputChannel: MessageChannel @Test fun testPriorityRouting() { val order = Order(amount = 6000) inputChannel.send(MessageBuilder.withPayload(order).build()) // 验证是否进入priorityChannel } }
IMPORTANT
关键注意事项:
- 路由方法应保持无状态和幂等性
- 使用
@Header
时确保消息头存在,否则会抛出异常 - 多播路由时注意消息顺序保证需求
- 为所有路由场景提供默认处理逻辑
通过本教程,您应该掌握了使用注解配置 Spring Integration 路由器的核心技巧。实际应用中,路由器常与过滤器、转换器组合使用,构建完整的消息处理管道。