Appearance
Spring AMQP 延迟消息交换实战指南
概述
本教程将详细介绍如何在 Spring AMQP 中使用 RabbitMQ 的延迟消息交换插件实现消息的定时投递。延迟消息交换就像邮件系统中的"定时发送"功能,允许你指定消息在特定时间后才被消费者处理,非常适合预约任务、重试机制等场景。
TIP
在开始前,请确保 RabbitMQ 已安装延迟消息交换插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
核心概念
消息头说明
消息头常量 | 描述 | 方向 |
---|---|---|
AmqpHeaders.RECEIVED_DELAY | 接收到的延迟时间(毫秒) | 入站 |
AmqpHeaders.DELAY | 设置发送延迟时间(毫秒) | 出站 |
环境配置
添加依赖 (build.gradle.kts)
kotlin
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
}
配置延迟交换器
kotlin
@Configuration
class RabbitConfig {
@Bean
fun delayedExchange(): Exchange {
return ExchangeBuilder.directExchange("delayed.exchange")
.delayed() // [!code highlight] // 关键:声明为延迟交换器
.durable(true)
.build()
}
@Bean
fun queue(): Queue {
return QueueBuilder.durable("delayed.queue").build()
}
@Bean
fun binding(): Binding {
return BindingBuilder.bind(queue())
.to(delayedExchange())
.with("delayed.routingKey")
.noargs()
}
}
发送延迟消息
方式1:通过消息头设置延迟
kotlin
@Service
class MessageSender(private val rabbitTemplate: RabbitTemplate) {
fun sendWithHeader(delayMillis: Long, message: String) {
val message = Message(
message.toByteArray(),
MessageProperties().apply {
// 设置延迟时间(毫秒)
setHeader(AmqpHeaders.DELAY, delayMillis)
}
)
rabbitTemplate.send("delayed.exchange", "delayed.routingKey", message)
}
}
方式2:通过端点属性设置延迟(推荐)
kotlin
@Service
class MessageSender(private val rabbitTemplate: RabbitTemplate) {
// 使用Spring Integration的@SendTo注解
@SendTo("delayed.exchange::delayed.routingKey")
fun sendWithProperty(message: String): Message<*> {
return MessageBuilder.withPayload(message)
.setHeader("delay", 5000) // [!code highlight] // 延迟5秒
.build()
}
}
kotlin
// 固定延迟3秒
@RabbitListener(queues = ["delayed.queue"])
@SendTo("delayed.exchange::delayed.routingKey")
fun processAndResend(message: String): Message<*> {
return MessageBuilder.withPayload("Processed: $message")
.setHeader("delay", 3000)
.build()
}
kotlin
// 根据消息内容动态设置延迟
@RabbitListener(queues = ["delayed.queue"])
@SendTo("delayed.exchange::delayed.routingKey")
fun dynamicDelay(message: String): Message<*> {
val delay = when {
message.contains("urgent") -> 1000
message.contains("low") -> 10000
else -> 5000
}
return MessageBuilder.withPayload(message)
.setHeader("delay", delay)
.build()
}
IMPORTANT
优先级说明:
端点属性(delay
) > 消息头(AmqpHeaders.DELAY
)
推荐使用端点属性设置,避免消息头被意外覆盖
接收延迟消息
kotlin
@Service
class MessageReceiver {
@RabbitListener(queues = ["delayed.queue"])
fun handleMessage(
message: String,
@Header(AmqpHeaders.RECEIVED_DELAY) delay: Long? // [!code highlight] // 获取实际延迟时间
) {
println("""
|✅ 收到延迟消息:
|内容:$message
|实际延迟:${delay ?: 0}ms
|处理时间:${LocalDateTime.now()}
""".trimMargin())
}
}
最佳实践与常见问题
1. 延迟时间限制
kotlin
fun sendMessage() {
// [!code warning] // 不推荐:超过最大延迟(2147483647ms≈24天)
MessageBuilder.withPayload("long-delay")
.setHeader("delay", 3_000_000_000)
.build()
// 推荐:添加边界检查
val safeDelay = delay.coerceIn(0, Int.MAX_VALUE.toLong())
}
2. 插件未启用错误处理
CAUTION
如果忘记启用插件,会抛出如下异常:ERROR: Exchange type 'x-delayed-message' not supported
解决方案:
bash
# 在RabbitMQ容器中执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
3. 时间单位混淆
DANGER
延迟时间单位为毫秒(ms),常见错误使用秒:
kotlin
// 错误:实际延迟10秒(应该是10000)
.setHeader("delay", 10)
4. 消息可见性管理
完整示例场景
订单超时取消实现
kotlin
// 下单服务
@Service
class OrderService(private val rabbitTemplate: RabbitTemplate) {
fun placeOrder(order: Order) {
// 保存订单到数据库...
// 发送30分钟延迟消息
rabbitTemplate.convertAndSend("order.exchange", "order.cancel", order.id) {
it.messageProperties.setHeader("delay", 30 * 60 * 1000) // 30分钟
it
}
}
}
// 订单服务
@Service
class OrderListener {
@RabbitListener(queues = ["order.cancel.queue"])
fun checkOrderTimeout(orderId: String) {
val order = orderRepository.findById(orderId)
if (order.status == UNPAID) {
order.status = CANCELLED
orderRepository.save(order)
println("⏱ 订单 $orderId 超时取消")
}
}
}
总结
通过本教程,您已掌握:
- ✅ 配置 RabbitMQ 延迟消息交换器
- ✅ 三种设置消息延迟的方法
- ✅ 接收和处理延迟消息
- ⚠️ 避免常见错误的最佳实践
实际应用场景:
- 电商订单超时取消
- 预约提醒通知
- 失败任务的重试机制
- 定时批量处理任务
TIP
使用延迟消息时,建议在消息体中加入时间戳和延迟原因字段,便于后续追踪和调试
通过合理使用延迟消息交换,您可以构建更加灵活可靠的异步处理系统,有效解耦时间敏感型业务逻辑!