Skip to content

Spring Integration 消息路由指南

在现代分布式系统中,消息路由是构建灵活、可扩展应用架构的核心技术。Spring Integration 提供了强大的消息路由能力,让开发者能够轻松实现消息的过滤、拆分、聚合等复杂处理逻辑。

TIP

消息路由就像邮局的分拣系统:根据信件特征(地址、类型等)决定投递路径,确保每封信都能到达正确目的地。

核心路由组件

路由器 (Routers)

路由器根据消息内容或头部信息决定消息流向不同通道。

kotlin
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.annotation.Router
import org.springframework.integration.router.AbstractMessageRouter
import org.springframework.messaging.Message

@Configuration
class RouterConfig {

    //  // 重点:使用注解配置路由器
    @Bean
    @Router(inputChannel = "inputChannel")
    fun messageRouter(): AbstractMessageRouter {
        return object : AbstractMessageRouter() {
            override fun determineTargetChannels(message: Message<*>): Collection<MessageChannel> {
                val type = message.headers["messageType"] as String
                return when (type) {
                    "order" -> listOf(orderChannel())
                    "payment" -> listOf(paymentChannel())
                    else -> listOf(defaultChannel()) //  警告:必须有默认处理
                }
            }
        }
    }
}

应用场景:电商系统中根据订单类型(普通/加急)路由到不同处理通道

注意事项

  1. 必须设置默认通道处理未知消息类型
  2. 避免路由逻辑过于复杂(超过 3 层判断应考虑其他方案)

🚫 2. 过滤器 (Filter)

过滤器根据预定条件决定是否允许消息通过。

kotlin
import org.springframework.integration.annotation.Filter
import org.springframework.messaging.Message

class MessageFilter {

    //  // 重点:过滤条件实现
    @Filter(inputChannel = "inputChannel", outputChannel = "outputChannel")
    fun isValid(message: Message<String>): Boolean {
        return !message.payload.contains("blocked_keyword") // 错误:实际项目应使用正则匹配
    }
}

最佳实践:在金融系统中过滤包含敏感词的交易请求

增加过滤器之后的消息流向

🪓 3. 分割器 (Splitter)

单个消息拆分为多个独立消息并行处理。

kotlin
import org.springframework.integration.annotation.Splitter
import org.springframework.messaging.Message

class OrderSplitter {

    @Splitter(inputChannel = "orderInput", outputChannel = "itemProcessing")
    fun splitOrder(orderMessage: Message<Order>): List<OrderItem> {
        return orderMessage.payload.items.map { item ->
            OrderItem(item.id, item.name, item.quantity)
        }
    }
}

data class Order(val id: String, val items: List<OrderItem>)
data class OrderItem(val id: String, val name: String, val quantity: Int)

应用场景:电商订单拆分为单个商品项独立处理

🧩 4. 聚合器 (Aggregator)

相关消息组合成单个消息,是分割器的逆操作。

kotlin
import org.springframework.integration.annotation.Aggregator
import org.springframework.integration.annotation.CorrelationStrategy
import org.springframework.integration.annotation.ReleaseStrategy
import org.springframework.messaging.Message

class OrderAggregator {

    //  // 重点:关联策略
    @CorrelationStrategy
    fun correlateByOrderId(item: OrderItem): String {
        return item.orderId
    }

    //  // 重点:释放策略
    @ReleaseStrategy
    fun release(items: List<Message<OrderItem>>): Boolean {
        return items.size == items[0].payload.totalItems
    }

    @Aggregator(inputChannel = "processedItems", outputChannel = "completeOrders")
    fun assembleOrder(items: List<OrderItem>): Order {
        return Order(items.first().orderId, items)
    }
}

IMPORTANT

聚合器必须配置超时策略,防止因消息丢失导致无限等待!

🔢 5. 重排器 (Resequencer)

确保消息按指定顺序处理,常用于分布式系统。

kotlin
import org.springframework.integration.annotation.Resequencer
import org.springframework.messaging.Message
import java.util.Comparator

class MessageResequencer {

    @Resequencer(inputChannel = "unorderedInput", outputChannel = "orderedOutput")
    fun resequenceComparator(): Comparator<Message<*>> {
        return Comparator { m1, m2 ->
            val seq1 = m1.headers["sequenceNumber"] as Int
            val seq2 = m2.headers["sequenceNumber"] as Int
            seq1.compareTo(seq2)
        }
    }
}

应用场景:处理股票交易时确保价格更新顺序正确

⛓️ 6. 消息处理器链 (Message Handler Chain)

多个处理器串联形成处理流水线。

kotlin
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows

@Configuration
class HandlerChainConfig {

    @Bean
    fun processingFlow(): IntegrationFlow {
        return IntegrationFlows.from("inputChannel")
            .filter { it.payload.isValid() }
            .transform(MessageTransformer::class, "convertFormat")
            .handle(MessageLogger::class, "logMessage") // [!code ++] // 推荐:添加日志记录
            .channel("outputChannel")
            .get()
    }
}

🌐 7. 分散-聚集 (Scatter-Gather)

并行发送请求到多个服务并聚合结果。

kotlin
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.dsl.MessageChannels

@Configuration
class ScatterGatherConfig {

    @Bean
    fun scatterGatherFlow(): IntegrationFlow {
        return IntegrationFlows.from("inputChannel")
            .scatterGather(
                scatterer = scatterer()
                    .applySequence(true)
                    .recipientFlow(flow1())
                    .recipientFlow(flow2()),
                gatherer = gatherer()
                    .processor(aggregator(), null)
            )
            .channel("resultChannel")
            .get()
    }

    private fun flow1() = IntegrationFlow { it.handle("service1", "process") }
    private fun flow2() = IntegrationFlow { it.handle("service2", "process") }
}

应用场景:从多个供应商 API 获取报价并返回最优价格

🚧 8. 线程屏障 (Thread Barrier)

暂停处理并等待外部事件后再继续。

kotlin
import org.springframework.integration.annotation.BridgeFrom
import org.springframework.integration.annotation.BridgeTo
import org.springframework.integration.annotation.ServiceActivator
import org.springframework.messaging.MessageChannel

class BarrierService {

    @BridgeFrom("pauseChannel")
    @BridgeTo("resumeChannel")
    lateinit var barrierChannel: MessageChannel

    @ServiceActivator(inputChannel = "pauseChannel")
    fun pauseExecution(message: Message<*>) {
        // 存储消息并等待恢复信号
        barrierStore.save(message)
    }

    @ServiceActivator(inputChannel = "resumeSignal")
    fun resumeExecution(signal: Message<*>) {
        // 获取存储的消息并继续处理
        val original = barrierStore.retrieve(signal)
        resumeChannel.send(original)
    }
}

应用场景:支付处理中等待银行确认后再完成订单

最佳实践总结

kotlin
// 使用DSL配置清晰的路由逻辑
IntegrationFlows.from("input")
    .routeByExpression("headers['type']") {
        it.channelMapping("order", "orderChannel")
          .channelMapping("payment", "paymentChannel")
          .defaultOutputChannel("defaultChannel") // [!code highlight] // 重点:默认通道
    }
kotlin
// 不推荐:在路由器中直接写业务逻辑
@Router(inputChannel = "input")
fun complexRouter(): AbstractMessageRouter {
    return object : AbstractMessageRouter() {
        override fun determineTargetChannels(message: Message<*>) {
            // 200+行业务逻辑... // [!code error] // 错误:路由器应保持简单
        }
    }
}

路由设计黄金法则

  1. 保持路由逻辑简单纯粹
  2. 单个路由器决策点不超过 5 个
  3. 为未处理消息配置死信通道
  4. 为聚合器设置合理超时

CAUTION

在分布式系统中使用重排器时需特别注意:

  • 消息序列号必须全局唯一
  • 设置合理的窗口大小和超时
  • 考虑使用持久化存储防止重启后序列丢失

常见问题解决

消息卡在聚合器中怎么办?
kotlin
// 配置聚合器超时释放
@Bean
fun aggregator() = AggregatorSpec().apply {
    processor(CustomAggregator())
    releaseStrategy { group.size == 3 }
    groupTimeout(30_000) // 30秒超时 // [!code highlight] // 重点:设置超时
    expireGroupsUponCompletion(true)
}
如何调试路由问题?
kotlin
// 启用Spring Integration调试日志
logging.level.org.springframework.integration=DEBUG
logging.level.org.springframework.messaging=TRACE

// 添加消息历史跟踪
@Bean
fun messageHistoryConfig() {
    GlobalChannelInterceptor(MessageHistoryWritingInterceptor())
}

掌握这些消息路由模式,你将能构建出高度解耦、弹性伸缩的分布式系统。在实际项目中,建议从简单路由开始,逐步引入更复杂的模式以满足业务需求。

"好的架构不是一次性设计出来的,而是通过恰当的消息路由逐步演化而来。" — Spring Integration 设计哲学