Appearance
什么是消息桥接(Messaging Bridge)?
消息桥是一个连接器,就像现实中的桥梁一样,它的作用是连接两个消息通道,让消息可以从一个通道流向另一个通道。
消息桥的核心作用
- 连接不同类型的通道:轮询通道 ----消息桥----> 订阅通道
- 流量控制:高速输入 ----消息桥(节流)----> 慢速处理
- 系统解耦:系统 A ----消息桥----> 系统 B
消息桥 vs 其他组件的区别
组件 | 作用 | 使用场景 |
---|---|---|
消息桥 | 连接通道,不改变消息内容 | 流量控制、通道连接 |
转换器 | 改变消息格式 | JSON 转 XML、大小写转换 |
网关 | 协议转换 | HTTP 到 JMS、文件到数据库 |
路由器 | 条件分发 | 根据内容路由到不同通道 |
实际应用示例
订单处理系统
kotlin
@Configuration
class FlowControlDemo {
@Bean
fun sourceChannel(): QueueChannel {
// 创建有界队列,最多缓存10000条消息
return QueueChannel(10000) // 消息积压位置
}
@Bean
fun targetChannel(): DirectChannel = DirectChannel()
@Bean
fun throttleBridge() = IntegrationFlow
.from("sourceChannel")
.bridge {
it.poller(
Pollers.fixedDelay(5000) // 每5秒处理一次
.maxMessagesPerPoll(5) // 每次只取5条 流量控制点
)
}
.channel("targetChannel")
.get()
}
关键配置参数
kotlin
poller(
Pollers.fixedDelay(1000) // 间隔时间:每1秒
.maxMessagesPerPoll(50) // 批量大小:每次50条
.taskExecutor(taskExecutor) // 执行器:并发处理
)
- fixedDelay:处理间隔时间
- maxMessagesPerPoll:每次处理的最大消息数
- taskExecutor:用于并发处理的线程池
⚠️ 常见误区
误区 1:用消息桥做数据转换
kotlin
// ❌ 错误:消息桥不应该改变数据
.bridge { /* 不能在这里转换数据 */ }
// ✅ 正确:使用转换器
.transform(String.class, s -> s.toUpperCase())
误区 2:忘记配置输出通道
kotlin
// ❌ 错误:没有指定输出
@Bean
fun badBridge() = IntegrationFlow
.from("input")
.bridge()
.get() // 会报错!
// ✅ 正确:指定输出通道
@Bean
fun goodBridge() = IntegrationFlow
.from("input")
.bridge()
.channel("output") // 必须指定
.get()
背压机制
当队列满时,生产者会被阻塞:
kotlin
@Bean
fun boundedSourceChannel(): QueueChannel {
val channel = QueueChannel(10) // 只能缓存10条消息
return channel
}
// 测试背压
@GetMapping("/test-backpressure")
fun testBackpressure() {
try {
repeat(20) { i -> // 尝试发送20条消息
val sent = sourceChannel.send(
MessageBuilder.withPayload("Message $i").build(),
1000 // 1秒超时
)
println("Message $i sent: $sent")
}
} catch (e: Exception) {
println("Backpressure triggered: ${e.message}")
}
}