Skip to content

Spring Integration 示例教程(Kotlin版)

引言

Spring Integration 示例项目是学习企业集成模式的绝佳资源。自 2.0 版本起,这些示例已迁移到独立的 GitHub 仓库,采用更灵活的协作模型,方便社区贡献。本教程将使用 Kotlin 和注解配置方式讲解核心示例,助你快速掌握 Spring Integration 的核心概念。

获取 Spring Integration 示例

克隆仓库

使用 Git 克隆官方示例仓库:

bash
git clone https://github.com/spring-projects/spring-integration-samples.git

项目结构

示例分为四类,适合不同学习阶段:

类别目录目标人群示例特点
基础samples/basic初学者核心概念演示
中级samples/intermediate进阶者处理复杂场景
高级samples/advanced专家扩展框架功能
应用samples/applications架构师业务场景解决方案

Loan Broker 示例(借贷经纪人)

业务场景

模拟贷款经纪系统:收集客户信息 → 筛选银行 → 获取报价 → 返回最佳贷款方案。

Kotlin 实现

1. 网关接口

kotlin
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway

@MessagingGateway
interface LoanBrokerGateway {
    
    @Gateway(requestChannel = "loanBrokerPreProcessingChannel")
    fun getBestLoanQuote(
        @Payload request: LoanRequest,
        @Header("RESPONSE_TYPE") responseType: String = "BEST"
    ): LoanQuote?
}

2. 信用评分服务

kotlin
@Service
class CreditService {
    
    fun getCreditScore(request: LoanRequest): Int {
        // [!code highlight] // 实际项目会调用外部信用系统
        return (300..850).random()
    }
}

3. 银行路由配置

kotlin
@Configuration
class RoutingConfig {
    
    @Bean
    fun loanBrokerFlow(
        creditService: CreditService
    ): IntegrationFlow {
        return IntegrationFlow.from("loanBrokerPreProcessingChannel")
            .enrichHeaders { it.header("CREDIT_SCORE", creditService::getCreditScore) }
            .routeToRecipients { router ->
                router.recipientFlow("creditScore >= 700", premierBankFlow())
                router.recipientFlow("creditScore < 700", standardBankFlow())
            }
            .get()
    }
    
    private fun premierBankFlow(): IntegrationFlow {
        return IntegrationFlow.from("premierBankChannel")
            .handle(BankService("Premier Bank"), "getQuote")
            .channel("quotesAggregationChannel")
    }
}

4. 报价聚合器

kotlin
@Aggregator(inputChannel = "quotesAggregationChannel")
fun aggregateQuotes(
    messages: List<Message<LoanQuote>>,
    @Header("RESPONSE_TYPE") responseType: String
): Any {
    val quotes = messages.map { it.payload }
    return when (responseType) {
        "BEST" -> quotes.minByOrNull { it.interestRate }
        else -> quotes
    }
}

路由策略建议

使用 @Router 注解实现动态路由:

kotlin
@Router(inputChannel = "bankRoutingChannel")
fun routeBanks(@Header("CREDIT_SCORE") score: Int): List<String> {
    return when {
        score > 750 -> listOf("premierBankChannel")
        score > 650 -> listOf("premierBankChannel", "standardBankChannel")
        else -> listOf("standardBankChannel")
    }
}

Cafe 示例(咖啡店订单系统)

业务场景

处理咖啡订单:接收订单 → 拆分订单项 → 按冷热饮品路由 → 并行制作 → 聚合配送。

Kotlin 实现

1. 领域模型

kotlin
data class Order(val number: Int, val items: MutableList<OrderItem> = mutableListOf()) {
    fun addItem(type: DrinkType, shots: Int, iced: Boolean) {
        items.add(OrderItem(type, shots, iced, this))
    }
}

data class OrderItem(
    val drinkType: DrinkType,
    val shots: Int,
    val iced: Boolean,
    val order: Order
)

enum class DrinkType { LATTE, MOCHA, ESPRESSO }

2. 集成流配置

kotlin
@Configuration
class CafeFlowConfig {
    
    @Bean
    fun orderProcessingFlow(
        splitter: OrderSplitter,
        router: DrinkRouter,
        barista: Barista,
        waiter: Waiter
    ): IntegrationFlow {
        return IntegrationFlow.from("ordersChannel")
            .split(splitter::split)
            .route(router::resolveOrderItemChannel)
            .get()
    }
    
    @Bean
    fun hotDrinkFlow(barista: Barista): IntegrationFlow {
        return IntegrationFlow.from("hotDrinksChannel")
            .handle(barista::prepareHotDrink) { it.poller(Pollers.fixedRate(1000)) }
            .channel("preparedDrinksChannel")
    }
    
    @Bean
    fun coldDrinkFlow(barista: Barista): IntegrationFlow {
        return IntegrationFlow.from("coldDrinksChannel")
            .handle(barista::prepareColdDrink) { 
                it.poller(Pollers.fixedRate(500).taskExecutor(taskExecutor()) 
            }
            .channel("preparedDrinksChannel")
    }
    
    @Bean
    fun deliveryFlow(waiter: Waiter): IntegrationFlow {
        return IntegrationFlow.from("preparedDrinksChannel")
            .aggregate(waiter::prepareDelivery)
            .handle { println("配送订单: ${it.payload}") }
    }
    
    @Bean
    fun taskExecutor() = ThreadPoolTaskExecutor().apply {
        corePoolSize = 5
        maxPoolSize = 10
    }
}

3. 关键组件

kotlin
@Component
class OrderSplitter {
    fun split(order: Order): List<OrderItem> = order.items
}

@Component
class DrinkRouter {
    fun resolveOrderItemChannel(item: OrderItem): String {
        return if (item.iced) "coldDrinksChannel" else "hotDrinksChannel"
    }
}

@Service
class Barista {
    
    @ServiceActivator
    fun prepareHotDrink(item: OrderItem): Drink {
        Thread.sleep(5000) // [!code warning] // 模拟耗时操作
        return Drink(item, "热饮")
    }
    
    @ServiceActivator
    fun prepareColdDrink(item: OrderItem): Drink {
        Thread.sleep(1000)
        return Drink(item, "冷饮")
    }
}

@Component
class Waiter {
    
    @Aggregator
    fun prepareDelivery(messages: List<Message<Drink>>): Delivery {
        val orderNumber = messages.first().payload.item.order.number
        val drinks = messages.map { it.payload }
        return Delivery(orderNumber, drinks)
    }
}

并发处理注意事项

  1. 使用 TaskExecutor 避免阻塞事件循环
  2. 为耗时操作设置合理的线程池大小
  3. 冷饮制作较快,可配置较小延迟

XML 消息处理示例

业务场景

处理 XML 订单:拆分订单项 → 检查库存 → 路由到仓库或供应商。

Kotlin 实现

1. 配置 XML 处理流

kotlin
@Configuration
class XmlProcessingConfig {
    
    @Bean
    fun orderProcessingFlow(): IntegrationFlow {
        return IntegrationFlow.from("ordersChannel")
            .transform(UnmarshallingTransformer(jaxbMarshaller()))
            .split { it.apply {
                expression = "/order/orderItem"
                outputType = OrderItem::class.java
            }}
            .enrichHeaders { it.header("IN_STOCK", stockChecker::checkStock) }
            .routeToRecipients { router ->
                router.recipient("headers['IN_STOCK'] == true", "warehouseChannel")
                router.recipient("headers['IN_STOCK'] == false", "supplierChannel")
            }
            .get()
    }
    
    @Bean
    fun jaxbMarshaller(): Jaxb2Marshaller {
        return Jaxb2Marshaller().apply {
            setContextPath("com.example.xml")
        }
    }
}

2. XSLT 转换配置

kotlin
@Bean
fun supplierFlow(): IntegrationFlow {
    return IntegrationFlow.from("supplierChannel")
        .transform(XsltPayloadTransformer(
            ClassPathResource("transformations/supplierOrder.xslt")
        ))
        .handle(Http.outboundGateway("https://supplier-api.com/orders")
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(String::class.java))
        .get()
}

常见问题解答

Q1:如何处理消息处理失败?

kotlin
@Bean
fun errorHandlingFlow(): IntegrationFlow {
    return IntegrationFlow.from("inputChannel")
        .handle(service::process) { 
            it.advice(retryAdvice()) 
        }
        .get()
}

private fun retryAdvice() = RequestHandlerRetryAdvice().apply {
    setRetryTemplate(RetryTemplate().apply {
        setRetryPolicy(SimpleRetryPolicy(3))
        setBackOffPolicy(FixedBackOffPolicy().apply { setBackOffPeriod(2000) })
    })
}

Q2:如何监控消息流?

启用 Spring Integration 监控:

kotlin
@SpringBootApplication
@EnableIntegrationManagement
class MyApp

// 访问 /actuator/integrationgraph 查看集成流

Q3:何时使用聚合器 vs 网关?

组件适用场景特点
聚合器合并多个相关消息需要关联策略,支持超时
网关请求-响应交互简化同步访问,自动处理消息转换

最佳实践总结

  1. 优先使用 DSL 配置:Kotlin DSL 比 XML 更简洁安全
  2. 合理使用消息头:传递元数据而非修改消息体
  3. 异步处理:使用 TaskExecutor 提高吞吐量
  4. 错误处理:为关键操作配置重试和死信队列
  5. 监控:集成 Spring Boot Actuator 实时观察消息流

实践建议

basic 目录的简单示例开始,逐步过渡到 applications 中的复杂场景,边学边练效果最佳!