Skip to content

Spring Integration 注解配置路由器教程

概述

在消息驱动架构中,路由器(Router) 就像邮局的分拣员,根据消息内容决定将其发送到哪个处理通道。本教程将介绍如何使用注解方式配置 Spring Integration 路由器,采用现代 Kotlin DSL 替代传统 XML 配置。

路由器核心功能

一、基础路由器配置

1.1 @Router 注解基础用法

使用 @Router 注解的方法可以返回以下类型:

kotlin
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.integration.annotation.Router

class OrderRouter {

    // 1. 返回单个MessageChannel
    @Router
    fun routeByMessage(message: Message<Order>): MessageChannel {
        return if (message.payload.isPremium) premiumChannel else standardChannel
    }

    // 2. 返回多个MessageChannel
    @Router
    fun multicastRoute(message: Message<Order>): List<MessageChannel> {
        return listOf(inventoryChannel, billingChannel, notificationChannel)
    }

    // 3. 返回通道名称(String)
    @Router
    fun routeByPayload(order: Order): String {
        return if (order.amount > 1000) "priorityChannel" else "normalChannel"
    }

    // 4. 返回多个通道名称
    @Router
    fun multiRoute(order: Order): List<String> {
        val channels = mutableListOf("loggingChannel")
        if (order.needsVerification) channels.add("verificationChannel")
        return channels
    }
}

TIP

返回字符串类型时,Spring 会自动解析通道名称,简化配置

1.2 配置路由器端点

@Configuration 类中启用集成配置:

kotlin
@Configuration
@EnableIntegration
class IntegrationConfig {

    @Bean
    fun orderRouter(): OrderRouter {
        return OrderRouter()
    }

    @Bean
    fun priorityChannel(): MessageChannel {
        return DirectChannel()
    }

    @Bean
    fun normalChannel(): MessageChannel {
        return DirectChannel()
    }

    // 其他通道定义...
}

二、基于消息头的路由

2.1 使用 @Header 注解

从消息头提取路由依据:

kotlin
import org.springframework.integration.annotation.Header

class StatusRouter {

    //  // 重点:基于消息头的路由
    @Router
    fun routeByStatus(@Header("orderStatus") status: OrderStatus): List<String> {
        return when (status) {
            OrderStatus.PENDING -> listOf("validationChannel")
            OrderStatus.APPROVED -> listOf("processingChannel")
            OrderStatus.REJECTED -> listOf("rejectionChannel")
            else -> listOf("errorChannel") // [!code warning] // 警告:必须处理默认情况
        }
    }
}

2.2 消息头路由时序

三、实际应用场景:订单处理系统

3.1 业务场景

根据订单特征路由:

  • 高金额订单 ➔ 优先通道
  • 国际订单 ➔ 海关检查通道
  • 高风险用户 ➔ 人工审核通道
完整路由器实现
kotlin
class OrderProcessingRouter {

    @Router
    fun routeOrder(order: Order): List<String> {
        val channels = mutableListOf("loggingChannel")

        when {
            order.amount > 5000 -> channels.add("priorityChannel")
            order.isInternational -> {
                channels.add("customsCheckChannel")
                channels.add("currencyConversionChannel")
            }
            order.customer.riskLevel > 7 -> channels.add("manualReviewChannel")
            else -> channels.add("standardProcessingChannel")
        }

        // [!code error] // 错误:缺少空订单检查
        if (order.items.isEmpty()) {
            throw IllegalArgumentException("空订单") // [!code warning] // 警告:需配置错误处理
        }

        return channels
    }

    // 错误处理配置示例
    @ServiceActivator(inputChannel = "errorChannel")
    fun handleErrors(ex: MessagingException) {
        logger.error("路由错误: ${ex.failedMessage}", ex)
    }
}

3.2 通道配置最佳实践

kotlin
@Bean
fun orderRouter(): IntegrationFlow {
    return IntegrationFlow.from("orderInputChannel")
        .route<Order>({ order ->
            when {
                order.isPriority -> "priorityChannel"
                else -> "standardChannel"
            }
        })
        .get()
}
kotlin
@Bean
fun multicastRouter(): IntegrationFlow {
    return IntegrationFlow.from("orderInputChannel")
        .routeToRecipients { router ->
            router.recipient("inventoryChannel", { order -> order.needsStockCheck })
                   .recipient("billingChannel") // 始终路由
                   .recipient("notificationChannel", { order -> order.customer.wantsNotifications })
        }
}

四、常见问题解决方案

4.1 路由失败处理

CAUTION

当路由器找不到匹配通道时,会抛出 MessageDeliveryException

解决方案

kotlin
@Bean
fun routerWithDefault() = IntegrationFlow.from("input")
    .route<Order>(
        { order -> order.type ?: "unknownType" },
        { mapping ->
            mapping.channelMapping("VIP", "vipChannel")
                   .channelMapping("STANDARD", "standardChannel")
                   .defaultOutputChannel("unroutedChannel") // [!code highlight] // 关键:默认通道
        }
    )

4.2 动态路由更新

实现动态路由规则更新

kotlin
@Router
fun dynamicRoute(order: Order, @Header("routingRules") rules: RoutingRules): String {
    return rules.determineChannel(order) ?: "defaultChannel"
}

// 更新路由规则
fun updateRules(newRules: RoutingRules) {
    messagingTemplate.convertAndSend("ruleUpdateChannel", newRules)
}

五、最佳实践与注意事项

  1. 通道解析策略

    kotlin
    @Bean
    fun channelResolver(): HeaderChannelRegistry {
        return HeaderChannelRegistry() // 启用通道名称解析
    }
  2. 性能优化

    • 对高频路由使用 RecipientListRouter 替代方法路由
    • 使用 @Scope("prototype") 创建无状态路由器实例
  3. 测试策略

    kotlin
    @SpringBootTest
    class RouterTests {
    
        @Autowired
        private lateinit var inputChannel: MessageChannel
    
        @Test
        fun testPriorityRouting() {
            val order = Order(amount = 6000)
            inputChannel.send(MessageBuilder.withPayload(order).build())
            // 验证是否进入priorityChannel
        }
    }

IMPORTANT

关键注意事项

  1. 路由方法应保持无状态幂等性
  2. 使用 @Header 时确保消息头存在,否则会抛出异常
  3. 多播路由时注意消息顺序保证需求
  4. 为所有路由场景提供默认处理逻辑

通过本教程,您应该掌握了使用注解配置 Spring Integration 路由器的核心技巧。实际应用中,路由器常与过滤器、转换器组合使用,构建完整的消息处理管道。