Appearance
Spring Integration AMQP Inbound Channel Adapter 详解
概述
AMQP Inbound Channel Adapter 是 Spring Integration 的核心组件,用于从 AMQP 队列(如 RabbitMQ)消费消息并转换为 Spring 消息。它充当了外部消息系统与 Spring Integration 应用之间的桥梁,支持消息驱动的架构模式。
快速配置示例
Kotlin DSL 配置(推荐)
kotlin
@Bean
fun amqpInboundFlow(connectionFactory: ConnectionFactory) =
integrationFlow(Amqp.inboundAdapter(connectionFactory, "orderQueue")) {
handle { message ->
println("收到订单: ${message.payload}")
// 业务处理逻辑
}
}
注解配置方式
kotlin
@Configuration
@EnableIntegration
class AmqpConfig {
@Bean
fun amqpInputChannel() = DirectChannel()
@Bean
fun container(connectionFactory: ConnectionFactory): SimpleMessageListenerContainer {
return SimpleMessageListenerContainer(connectionFactory).apply {
queueNames = arrayOf("orderQueue")
concurrentConsumers = 2
}
}
@Bean
fun inboundAdapter(
container: SimpleMessageListenerContainer,
@Qualifier("amqpInputChannel") channel: MessageChannel
) = AmqpInboundChannelAdapter(container).apply {
outputChannel = channel
}
@ServiceActivator(inputChannel = "amqpInputChannel")
fun messageHandler() = MessageHandler { message ->
println("处理订单: ${message.payload}")
}
}
核心配置详解
基础配置选项
参数 | 说明 | 默认值 |
---|---|---|
queueNames | 监听的队列名称(逗号分隔) | 必填 |
connectionFactory | RabbitMQ 连接工厂 | 默认 Bean |
concurrentConsumers | 并发消费者数量 | 1 |
prefetchCount | 单次请求预取消息数 | 1 |
acknowledgeMode | 消息确认模式 | AUTO |
消息确认模式
kotlin
@Bean
fun inboundAdapter(container: SimpleMessageListenerContainer) =
AmqpInboundChannelAdapter(container).apply {
acknowledgeMode = AcknowledgeMode.MANUAL
}
IMPORTANT
确认模式详解:
- AUTO(默认):下游流程完成后自动确认
- MANUAL:需手动确认(消息头含
amqp_deliveryTag
和amqp_channel
) - NONE:无确认(类似
autoAck
)
错误处理配置
kotlin
@Bean
fun inboundAdapter(container: SimpleMessageListenerContainer) =
AmqpInboundChannelAdapter(container).apply {
errorChannel = errorChannel()
setMessageRecoverer(RejectAndDontRequeueRecoverer())
}
@Bean
fun errorChannel() = DirectChannel()
@ServiceActivator(inputChannel = "errorChannel")
fun errorHandler() = MessageHandler { message ->
println("错误处理: ${message.payload}")
}
最佳实践
当使用 MANUAL
确认模式时,务必在业务逻辑中处理消息确认:
kotlin
handle { message ->
try {
processMessage(message)
// 手动确认消息
message.headers[AmqpHeaders.CHANNEL]?.let { channel ->
val deliveryTag = message.headers[AmqpHeaders.DELIVERY_TAG] as Long
channel.basicAck(deliveryTag, false)
}
} catch (e: Exception) {
// 拒绝消息(不重新入队)
channel.basicReject(deliveryTag, false)
}
}
批处理消息配置
批量消费配置
kotlin
@Bean
fun container(connectionFactory: ConnectionFactory) =
SimpleMessageListenerContainer(connectionFactory).apply {
queueNames = arrayOf("batch.queue")
batchSize = 50 // 每批处理50条消息
consumerBatchEnabled = true
}
@Bean
fun batchAdapter(container: SimpleMessageListenerContainer) =
AmqpInboundChannelAdapter(container).apply {
batchMode = BatchMode.EXTRACT_PAYLOADS
}
批处理模式对比
kotlin
// 消息负载格式:List<Message<?>>
@ServiceActivator(inputChannel = "inputChannel")
fun handleBatch(messages: List<Message<Order>>) {
messages.forEach { process(it.payload) }
}
kotlin
// 消息负载格式:List<?>
@ServiceActivator(inputChannel = "inputChannel")
fun handleBatch(orders: List<Order>) {
orders.forEach { process(it) }
}
CAUTION
使用批处理时:
batchSize
≤prefetchCount
- 错误恢复需使用
MessageBatchRecoverer
- 排序保证仅在单消费者时有效
高级特性
与 @Publisher
注解集成
kotlin
@RabbitListener(queuesToDeclare = [Queue("publisherQueue")])
@Publisher("processedOrders")
@Payload("#args.payload.toUpperCase()")
fun processAndPublish(payload: String) {
// 处理逻辑
}
事务管理
kotlin
@Bean
fun inboundAdapter(container: SimpleMessageListenerContainer) =
AmqpInboundChannelAdapter(container).apply {
channelTransacted = true
transactionManager = transactionManager()
}
常见问题解决
队列不可用错误
kotlin
@Bean
fun container(connectionFactory: ConnectionFactory) =
SimpleMessageListenerContainer(connectionFactory).apply {
missingQueuesFatal = false // 队列不存在时不报错
recoveryInterval = 3000 // 3秒重试间隔
}
消费者停止问题
kotlin
@Bean
fun container(connectionFactory: ConnectionFactory) =
DirectMessageListenerContainer(connectionFactory).apply {
consumersPerQueue = 2 // 使用Direct容器
forceStop = true // 防止竞态条件
}
头部映射配置
kotlin
@Bean
fun headerMapper() = DefaultAmqpHeaderMapper().apply {
setRequestHeaderNames("customHeader*", "x-*")
}
@Bean
fun inboundAdapter(container: SimpleMessageListenerContainer) =
AmqpInboundChannelAdapter(container).apply {
headerMapper = headerMapper()
}
最佳实践总结
- 优先使用 Kotlin DSL 配置,简洁直观
- 生产环境设置
concurrentConsumers
≥ 2 - 高吞吐场景启用批处理并优化
prefetchCount
- 使用
MANUAL
确认模式确保消息可靠性 - 始终配置错误通道处理异常
- 使用
DirectMessageListenerContainer
获取更精准的流控
WARNING
关键注意事项:
- 避免混合使用
listener-container
引用和其他容器配置属性 - 多消费者会破坏消息顺序
- 事务模式下确认操作会延迟到事务提交
- XML 容器配置需使用
<bean>
而非 AMQP 命名空间