Skip to content

什么是消息桥接(Messaging Bridge)?

消息桥是一个连接器,就像现实中的桥梁一样,它的作用是连接两个消息通道,让消息可以从一个通道流向另一个通道。

消息桥的核心作用

    1. 连接不同类型的通道:轮询通道 ----消息桥----> 订阅通道
    1. 流量控制:高速输入 ----消息桥(节流)----> 慢速处理
    1. 系统解耦:系统 A ----消息桥----> 系统 B

消息桥 vs 其他组件的区别

组件作用使用场景
消息桥连接通道,不改变消息内容流量控制、通道连接
转换器改变消息格式JSON 转 XML、大小写转换
网关协议转换HTTP 到 JMS、文件到数据库
路由器条件分发根据内容路由到不同通道

实际应用示例

订单处理系统

kotlin
@Configuration
class FlowControlDemo {

    @Bean
    fun sourceChannel(): QueueChannel {
        // 创建有界队列,最多缓存10000条消息
        return QueueChannel(10000)  //  消息积压位置
    }

    @Bean
    fun targetChannel(): DirectChannel = DirectChannel()

    @Bean
    fun throttleBridge() = IntegrationFlow
        .from("sourceChannel")
        .bridge {
            it.poller(
                Pollers.fixedDelay(5000)      // 每5秒处理一次
                    .maxMessagesPerPoll(5)    // 每次只取5条 流量控制点
            )
        }
        .channel("targetChannel")
        .get()
}

关键配置参数

kotlin
poller(
    Pollers.fixedDelay(1000)          // 间隔时间:每1秒
        .maxMessagesPerPoll(50)       // 批量大小:每次50条
        .taskExecutor(taskExecutor)   // 执行器:并发处理
)
  • fixedDelay:处理间隔时间
  • maxMessagesPerPoll:每次处理的最大消息数
  • taskExecutor:用于并发处理的线程池

⚠️ 常见误区

误区 1:用消息桥做数据转换

kotlin
// ❌ 错误:消息桥不应该改变数据
.bridge { /* 不能在这里转换数据 */ }

// ✅ 正确:使用转换器
.transform(String.class, s -> s.toUpperCase())

误区 2:忘记配置输出通道

kotlin
// ❌ 错误:没有指定输出
@Bean
fun badBridge() = IntegrationFlow
    .from("input")
    .bridge()
    .get()  // 会报错!

// ✅ 正确:指定输出通道
@Bean
fun goodBridge() = IntegrationFlow
    .from("input")
    .bridge()
    .channel("output")  // 必须指定
    .get()

背压机制

当队列满时,生产者会被阻塞:

kotlin
@Bean
fun boundedSourceChannel(): QueueChannel {
    val channel = QueueChannel(10)  // 只能缓存10条消息
    return channel
}

// 测试背压
@GetMapping("/test-backpressure")
fun testBackpressure() {
    try {
        repeat(20) { i ->  // 尝试发送20条消息
            val sent = sourceChannel.send(
                MessageBuilder.withPayload("Message $i").build(),
                1000  // 1秒超时
            )
            println("Message $i sent: $sent")
        }
    } catch (e: Exception) {
        println("Backpressure triggered: ${e.message}")
    }
}