Skip to content

Spring AMQP 延迟消息交换实战指南

概述

本教程将详细介绍如何在 Spring AMQP 中使用 RabbitMQ 的延迟消息交换插件实现消息的定时投递。延迟消息交换就像邮件系统中的"定时发送"功能,允许你指定消息在特定时间后才被消费者处理,非常适合预约任务、重试机制等场景。

TIP

在开始前,请确保 RabbitMQ 已安装延迟消息交换插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

核心概念

消息头说明

消息头常量描述方向
AmqpHeaders.RECEIVED_DELAY接收到的延迟时间(毫秒)入站
AmqpHeaders.DELAY设置发送延迟时间(毫秒)出站

环境配置

添加依赖 (build.gradle.kts)

kotlin
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-amqp")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
}

配置延迟交换器

kotlin
@Configuration
class RabbitConfig {

    @Bean
    fun delayedExchange(): Exchange {
        return ExchangeBuilder.directExchange("delayed.exchange")
            .delayed()  // [!code highlight] // 关键:声明为延迟交换器
            .durable(true)
            .build()
    }

    @Bean
    fun queue(): Queue {
        return QueueBuilder.durable("delayed.queue").build()
    }

    @Bean
    fun binding(): Binding {
        return BindingBuilder.bind(queue())
            .to(delayedExchange())
            .with("delayed.routingKey")
            .noargs()
    }
}

发送延迟消息

方式1:通过消息头设置延迟

kotlin
@Service
class MessageSender(private val rabbitTemplate: RabbitTemplate) {

    fun sendWithHeader(delayMillis: Long, message: String) {
        val message = Message(
            message.toByteArray(),
            MessageProperties().apply {
                // 设置延迟时间(毫秒)
                setHeader(AmqpHeaders.DELAY, delayMillis)  
            }
        )
        
        rabbitTemplate.send("delayed.exchange", "delayed.routingKey", message)
    }
}

方式2:通过端点属性设置延迟(推荐)

kotlin
@Service
class MessageSender(private val rabbitTemplate: RabbitTemplate) {

    // 使用Spring Integration的@SendTo注解
    @SendTo("delayed.exchange::delayed.routingKey")
    fun sendWithProperty(message: String): Message<*> {
        return MessageBuilder.withPayload(message)
            .setHeader("delay", 5000)  // [!code highlight] // 延迟5秒
            .build()
    }
}
kotlin
// 固定延迟3秒
@RabbitListener(queues = ["delayed.queue"])
@SendTo("delayed.exchange::delayed.routingKey")
fun processAndResend(message: String): Message<*> {
    return MessageBuilder.withPayload("Processed: $message")
        .setHeader("delay", 3000)  
        .build()
}
kotlin
// 根据消息内容动态设置延迟
@RabbitListener(queues = ["delayed.queue"])
@SendTo("delayed.exchange::delayed.routingKey")
fun dynamicDelay(message: String): Message<*> {
    val delay = when {
        message.contains("urgent") -> 1000
        message.contains("low") -> 10000
        else -> 5000
    }
    
    return MessageBuilder.withPayload(message)
        .setHeader("delay", delay)  
        .build()
}

IMPORTANT

优先级说明
端点属性(delay) > 消息头(AmqpHeaders.DELAY)
推荐使用端点属性设置,避免消息头被意外覆盖

接收延迟消息

kotlin
@Service
class MessageReceiver {

    @RabbitListener(queues = ["delayed.queue"])
    fun handleMessage(
        message: String,
        @Header(AmqpHeaders.RECEIVED_DELAY) delay: Long?  // [!code highlight] // 获取实际延迟时间
    ) {
        println("""
        |✅ 收到延迟消息:
        |内容:$message
        |实际延迟:${delay ?: 0}ms
        |处理时间:${LocalDateTime.now()}
        """.trimMargin())
    }
}

最佳实践与常见问题

1. 延迟时间限制

kotlin
fun sendMessage() {
    // [!code warning] // 不推荐:超过最大延迟(2147483647ms≈24天)
    MessageBuilder.withPayload("long-delay")
        .setHeader("delay", 3_000_000_000) 
        .build()
    
    // 推荐:添加边界检查
    val safeDelay = delay.coerceIn(0, Int.MAX_VALUE.toLong())
}

2. 插件未启用错误处理

CAUTION

如果忘记启用插件,会抛出如下异常:
ERROR: Exchange type 'x-delayed-message' not supported

解决方案:

bash
# 在RabbitMQ容器中执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3. 时间单位混淆

DANGER

延迟时间单位为毫秒(ms),常见错误使用秒:

kotlin
// 错误:实际延迟10秒(应该是10000)
.setHeader("delay", 10)  

4. 消息可见性管理

完整示例场景

订单超时取消实现
kotlin
// 下单服务
@Service
class OrderService(private val rabbitTemplate: RabbitTemplate) {

    fun placeOrder(order: Order) {
        // 保存订单到数据库...
        
        // 发送30分钟延迟消息
        rabbitTemplate.convertAndSend("order.exchange", "order.cancel", order.id) {
            it.messageProperties.setHeader("delay", 30 * 60 * 1000)  // 30分钟
            it
        }
    }
}

// 订单服务
@Service
class OrderListener {

    @RabbitListener(queues = ["order.cancel.queue"])
    fun checkOrderTimeout(orderId: String) {
        val order = orderRepository.findById(orderId)
        if (order.status == UNPAID) {
            order.status = CANCELLED
            orderRepository.save(order)
            println("⏱ 订单 $orderId 超时取消")
        }
    }
}

总结

通过本教程,您已掌握:

  1. ✅ 配置 RabbitMQ 延迟消息交换器
  2. ✅ 三种设置消息延迟的方法
  3. ✅ 接收和处理延迟消息
  4. ⚠️ 避免常见错误的最佳实践

实际应用场景

  • 电商订单超时取消
  • 预约提醒通知
  • 失败任务的重试机制
  • 定时批量处理任务

TIP

使用延迟消息时,建议在消息体中加入时间戳延迟原因字段,便于后续追踪和调试

通过合理使用延迟消息交换,您可以构建更加灵活可靠的异步处理系统,有效解耦时间敏感型业务逻辑!