Appearance
Spring Integration 示例教程(Kotlin版)
引言
Spring Integration 示例项目是学习企业集成模式的绝佳资源。自 2.0 版本起,这些示例已迁移到独立的 GitHub 仓库,采用更灵活的协作模型,方便社区贡献。本教程将使用 Kotlin 和注解配置方式讲解核心示例,助你快速掌握 Spring Integration 的核心概念。
获取 Spring Integration 示例
克隆仓库
使用 Git 克隆官方示例仓库:
bash
git clone https://github.com/spring-projects/spring-integration-samples.git
项目结构
示例分为四类,适合不同学习阶段:
类别 | 目录 | 目标人群 | 示例特点 |
---|---|---|---|
基础 | samples/basic | 初学者 | 核心概念演示 |
中级 | samples/intermediate | 进阶者 | 处理复杂场景 |
高级 | samples/advanced | 专家 | 扩展框架功能 |
应用 | samples/applications | 架构师 | 业务场景解决方案 |
Loan Broker 示例(借贷经纪人)
业务场景
模拟贷款经纪系统:收集客户信息 → 筛选银行 → 获取报价 → 返回最佳贷款方案。
Kotlin 实现
1. 网关接口
kotlin
import org.springframework.messaging.handler.annotation.Header
import org.springframework.messaging.handler.annotation.Payload
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
@MessagingGateway
interface LoanBrokerGateway {
@Gateway(requestChannel = "loanBrokerPreProcessingChannel")
fun getBestLoanQuote(
@Payload request: LoanRequest,
@Header("RESPONSE_TYPE") responseType: String = "BEST"
): LoanQuote?
}
2. 信用评分服务
kotlin
@Service
class CreditService {
fun getCreditScore(request: LoanRequest): Int {
// [!code highlight] // 实际项目会调用外部信用系统
return (300..850).random()
}
}
3. 银行路由配置
kotlin
@Configuration
class RoutingConfig {
@Bean
fun loanBrokerFlow(
creditService: CreditService
): IntegrationFlow {
return IntegrationFlow.from("loanBrokerPreProcessingChannel")
.enrichHeaders { it.header("CREDIT_SCORE", creditService::getCreditScore) }
.routeToRecipients { router ->
router.recipientFlow("creditScore >= 700", premierBankFlow())
router.recipientFlow("creditScore < 700", standardBankFlow())
}
.get()
}
private fun premierBankFlow(): IntegrationFlow {
return IntegrationFlow.from("premierBankChannel")
.handle(BankService("Premier Bank"), "getQuote")
.channel("quotesAggregationChannel")
}
}
4. 报价聚合器
kotlin
@Aggregator(inputChannel = "quotesAggregationChannel")
fun aggregateQuotes(
messages: List<Message<LoanQuote>>,
@Header("RESPONSE_TYPE") responseType: String
): Any {
val quotes = messages.map { it.payload }
return when (responseType) {
"BEST" -> quotes.minByOrNull { it.interestRate }
else -> quotes
}
}
路由策略建议
使用 @Router
注解实现动态路由:
kotlin
@Router(inputChannel = "bankRoutingChannel")
fun routeBanks(@Header("CREDIT_SCORE") score: Int): List<String> {
return when {
score > 750 -> listOf("premierBankChannel")
score > 650 -> listOf("premierBankChannel", "standardBankChannel")
else -> listOf("standardBankChannel")
}
}
Cafe 示例(咖啡店订单系统)
业务场景
处理咖啡订单:接收订单 → 拆分订单项 → 按冷热饮品路由 → 并行制作 → 聚合配送。
Kotlin 实现
1. 领域模型
kotlin
data class Order(val number: Int, val items: MutableList<OrderItem> = mutableListOf()) {
fun addItem(type: DrinkType, shots: Int, iced: Boolean) {
items.add(OrderItem(type, shots, iced, this))
}
}
data class OrderItem(
val drinkType: DrinkType,
val shots: Int,
val iced: Boolean,
val order: Order
)
enum class DrinkType { LATTE, MOCHA, ESPRESSO }
2. 集成流配置
kotlin
@Configuration
class CafeFlowConfig {
@Bean
fun orderProcessingFlow(
splitter: OrderSplitter,
router: DrinkRouter,
barista: Barista,
waiter: Waiter
): IntegrationFlow {
return IntegrationFlow.from("ordersChannel")
.split(splitter::split)
.route(router::resolveOrderItemChannel)
.get()
}
@Bean
fun hotDrinkFlow(barista: Barista): IntegrationFlow {
return IntegrationFlow.from("hotDrinksChannel")
.handle(barista::prepareHotDrink) { it.poller(Pollers.fixedRate(1000)) }
.channel("preparedDrinksChannel")
}
@Bean
fun coldDrinkFlow(barista: Barista): IntegrationFlow {
return IntegrationFlow.from("coldDrinksChannel")
.handle(barista::prepareColdDrink) {
it.poller(Pollers.fixedRate(500).taskExecutor(taskExecutor())
}
.channel("preparedDrinksChannel")
}
@Bean
fun deliveryFlow(waiter: Waiter): IntegrationFlow {
return IntegrationFlow.from("preparedDrinksChannel")
.aggregate(waiter::prepareDelivery)
.handle { println("配送订单: ${it.payload}") }
}
@Bean
fun taskExecutor() = ThreadPoolTaskExecutor().apply {
corePoolSize = 5
maxPoolSize = 10
}
}
3. 关键组件
kotlin
@Component
class OrderSplitter {
fun split(order: Order): List<OrderItem> = order.items
}
@Component
class DrinkRouter {
fun resolveOrderItemChannel(item: OrderItem): String {
return if (item.iced) "coldDrinksChannel" else "hotDrinksChannel"
}
}
@Service
class Barista {
@ServiceActivator
fun prepareHotDrink(item: OrderItem): Drink {
Thread.sleep(5000) // [!code warning] // 模拟耗时操作
return Drink(item, "热饮")
}
@ServiceActivator
fun prepareColdDrink(item: OrderItem): Drink {
Thread.sleep(1000)
return Drink(item, "冷饮")
}
}
@Component
class Waiter {
@Aggregator
fun prepareDelivery(messages: List<Message<Drink>>): Delivery {
val orderNumber = messages.first().payload.item.order.number
val drinks = messages.map { it.payload }
return Delivery(orderNumber, drinks)
}
}
并发处理注意事项
- 使用
TaskExecutor
避免阻塞事件循环 - 为耗时操作设置合理的线程池大小
- 冷饮制作较快,可配置较小延迟
XML 消息处理示例
业务场景
处理 XML 订单:拆分订单项 → 检查库存 → 路由到仓库或供应商。
Kotlin 实现
1. 配置 XML 处理流
kotlin
@Configuration
class XmlProcessingConfig {
@Bean
fun orderProcessingFlow(): IntegrationFlow {
return IntegrationFlow.from("ordersChannel")
.transform(UnmarshallingTransformer(jaxbMarshaller()))
.split { it.apply {
expression = "/order/orderItem"
outputType = OrderItem::class.java
}}
.enrichHeaders { it.header("IN_STOCK", stockChecker::checkStock) }
.routeToRecipients { router ->
router.recipient("headers['IN_STOCK'] == true", "warehouseChannel")
router.recipient("headers['IN_STOCK'] == false", "supplierChannel")
}
.get()
}
@Bean
fun jaxbMarshaller(): Jaxb2Marshaller {
return Jaxb2Marshaller().apply {
setContextPath("com.example.xml")
}
}
}
2. XSLT 转换配置
kotlin
@Bean
fun supplierFlow(): IntegrationFlow {
return IntegrationFlow.from("supplierChannel")
.transform(XsltPayloadTransformer(
ClassPathResource("transformations/supplierOrder.xslt")
))
.handle(Http.outboundGateway("https://supplier-api.com/orders")
.httpMethod(HttpMethod.POST)
.expectedResponseType(String::class.java))
.get()
}
常见问题解答
Q1:如何处理消息处理失败?
kotlin
@Bean
fun errorHandlingFlow(): IntegrationFlow {
return IntegrationFlow.from("inputChannel")
.handle(service::process) {
it.advice(retryAdvice())
}
.get()
}
private fun retryAdvice() = RequestHandlerRetryAdvice().apply {
setRetryTemplate(RetryTemplate().apply {
setRetryPolicy(SimpleRetryPolicy(3))
setBackOffPolicy(FixedBackOffPolicy().apply { setBackOffPeriod(2000) })
})
}
Q2:如何监控消息流?
启用 Spring Integration 监控:
kotlin
@SpringBootApplication
@EnableIntegrationManagement
class MyApp
// 访问 /actuator/integrationgraph 查看集成流
Q3:何时使用聚合器 vs 网关?
组件 | 适用场景 | 特点 |
---|---|---|
聚合器 | 合并多个相关消息 | 需要关联策略,支持超时 |
网关 | 请求-响应交互 | 简化同步访问,自动处理消息转换 |
最佳实践总结
- 优先使用 DSL 配置:Kotlin DSL 比 XML 更简洁安全
- 合理使用消息头:传递元数据而非修改消息体
- 异步处理:使用
TaskExecutor
提高吞吐量 - 错误处理:为关键操作配置重试和死信队列
- 监控:集成 Spring Boot Actuator 实时观察消息流
实践建议
从 basic
目录的简单示例开始,逐步过渡到 applications
中的复杂场景,边学边练效果最佳!