Skip to content

Spring Integration 通用路由器配置教程

路由器的核心概念

在消息驱动架构中,路由器(Router) 是决定消息流向的关键组件。它像邮局的分拣系统,根据特定规则将消息分发到不同通道。Spring Integration 提供多种专用路由器,而通用路由器(Generic Router) 则适用于自定义路由逻辑的场景。

路由器的关键特性

  1. 动态决策:基于消息内容、头部或上下文决定目标通道
  2. 多目标支持:可将消息路由到单个或多个通道
  3. 错误处理:提供默认通道处理无法路由的消息
  4. 无状态设计:路由决策不依赖先前消息状态

Kotlin DSL 路由器配置

基础配置示例

kotlin
// 自定义路由器实现
class CustomRouter : AbstractMessageRouter() {
    override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
        return when (message.payload) {
            is String -> listOf(stringChannel())
            is Int -> listOf(numberChannel())
            else -> listOf(defaultChannel())
        }
    }
}

// 使用路由器配置集成流
@Bean
fun routerFlow(): IntegrationFlow {
    return IntegrationFlow
        .from("inputChannel")
        .route(CustomRouter()) {
            it.defaultOutputChannel("defaultChannel")
        }
        .get()
}

// 定义目标通道
@Bean
fun stringChannel() = DirectChannel()
@Bean
fun numberChannel() = DirectChannel()
@Bean
fun defaultChannel() = DirectChannel()

注解式路由器配置

kotlin
@Component
class AnnotatedRouter {
    //  // 注解标记路由方法
    @Router(inputChannel = "routingChannel")
    fun route(message: Message<*>): String {
        return when (message.headers["priority"]) {
            "high" -> "priorityChannel"
            "normal" -> "normalChannel"
            else -> "defaultChannel"
        }
    }
}

// 通道配置
@Bean
fun priorityChannel() = DirectChannel()
@Bean
fun normalChannel() = DirectChannel()

内联路由逻辑(推荐)

kotlin
@Bean
fun inlineRouterFlow(): IntegrationFlow {
    return IntegrationFlow
        .from("bookingChannel")
        .route<BookingRequest>({ request ->
            when {
                request.type == BookingType.FLIGHT -> "flightProcessingChannel"
                request.type == BookingType.HOTEL -> "hotelProcessingChannel"
                request.urgent -> "urgentProcessingChannel"
                else -> null // 使用默认通道
            }
        }) {
            it.defaultOutputChannel("defaultProcessingChannel")
            it.channelMapping("flight", "flightProcessingChannel") // 备用映射
        }
        .get()
}

路由器的关键配置选项

1. 默认输出通道

kotlin
.route(router) {
    it.defaultOutputChannel("defaultChannel") 
}

TIP

始终设置默认通道!当路由逻辑无法确定目标时,防止消息丢失

2. 通道映射表

kotlin
.route<Order>({ order -> order.category }) {
    it.channelMapping("ELECTRONICS", "electronicsChannel") 
    it.channelMapping("CLOTHING", "clothingChannel")
    it.channelMapping("FOOD", "foodChannel")
}

3. 解析失败处理

kotlin
.route(router) {
    it.resolutionRequired(false) //  // 允许返回空集合
    it.sendTimeout(5000) // 发送超时设置
    it.advice(expressionEvaluatingAdvice()) // 添加处理建议
}

最佳实践与常见问题

✅ 推荐做法

  1. 保持路由逻辑简单:路由决策应快速无阻塞
  2. 使用类型安全路由:Kotlin 的 when 表达式完美适配
  3. 单元测试路由逻辑:独立测试路由决策逻辑
kotlin
@Test
fun `应路由高优先级消息到专用通道`() {
    val message = MessageBuilder.withPayload("urgent")
        .setHeader("priority", "high")
        .build()

    val channels = router.determineTargetChannels(message)

    assertThat(channels).containsExactly(priorityChannel)
}

⚠️ 避免的陷阱

kotlin
// 反模式:在路由器中执行业务逻辑
override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
    // 错误:包含业务处理逻辑
    processOrder(message.payload as Order)

    // 正确:仅包含路由决策
    return when {
        (message.payload as Order).amount > 1000 -> listOf(largeOrderChannel)
        else -> listOf(smallOrderChannel)
    }
}

WARNING

路由器的单一职责原则:路由器应只负责路由决策,不执行业务逻辑。业务处理应在后续服务中完成。

性能优化技巧

kotlin
.route({ message ->
    cache.getOrPut(message.headers["type"]) {
        when (message.headers["type"]) {
            "A" -> "channelA"
            "B" -> "channelB"
            else -> "default"
        }
    }
})

路由缓存策略

对于频繁出现的路由键值,使用缓存可显著提升性能。但要注意缓存失效策略,避免过时路由决策

真实场景:电商订单路由系统

Kotlin 实现代码

kotlin
@Bean
fun orderRoutingFlow(): IntegrationFlow {
    return IntegrationFlow
        .from("orderInputChannel")
        .route<Order> { order ->
            when {
                order.customerLevel == CustomerLevel.VIP -> "vipOrderChannel"
                order.shippingCountry != "CN" -> "internationalOrderChannel"
                order.items.any { it.isDangerousGoods } -> "hazardousChannel"
                else -> "domesticOrderChannel"
            }
        }
        .get()
}

// 通道配置
@Bean fun vipOrderChannel() = QueueChannel(100)
@Bean fun internationalOrderChannel() = QueueChannel(50)
@Bean fun hazardousChannel() = DirectChannel()
@Bean fun domesticOrderChannel() = ExecutorChannel(TaskExecutor())

常见问题解答

Q:如何处理路由到多个通道?

A:返回通道集合即可实现多播:

kotlin
.route { message ->
    listOf("channelA", "channelB", "channelC")
}

Q:路由决策能否访问Spring上下文?

A:可以!路由器也是Spring Bean:

kotlin
.route { message ->
    if (featureToggle.isEnabled("new-processing")) 
        "newChannel"
    else
        "legacyChannel"
}

Q:如何动态更新路由规则?

A:使用可重载的路由策略:

kotlin
class ReloadableRouter : AbstractMessageRouter() {
    var routingRules: Map<String, String> = emptyMap()

    override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
        val key = message.headers["routeKey"].toString()
        return listOf(channelResolver.resolveDestination(routingRules[key] ?: "default"))
    }
}

NOTE

版本兼容性提示:从Spring Integration 5.0开始,推荐使用函数式路由器配置代替XML配置,以获得更好的类型安全和可维护性。

总结

Spring Integration 的通用路由器提供了灵活的消息分发机制,通过本教程你已掌握:

  1. 使用Kotlin DSL配置路由器的三种方式
  2. 路由决策的最佳实践和常见陷阱
  3. 真实场景中的路由应用模式
  4. 性能优化和动态路由技巧

下一步学习

  • 探索专用路由器(头部路由器、收件人列表路由器)
  • 学习结合Spring Expression Language(SpEL)的高级路由
  • 了解错误处理策略和消息回退机制
kotlin
// 最终提示:保持路由逻辑简洁明确
fun determineRoute(message: Message<*>): String {
    // 好的路由:清晰的条件链
    return when {
        conditionA(message) -> "ChannelA"
        conditionB(message) -> "ChannelB"
        else -> "DefaultChannel"
    }
}