Skip to content

Spring Integration STOMP 支持教程

1. STOMP 协议简介

STOMP(Simple Text Orientated Messaging Protocol)是一种简单的基于文本的消息协议,设计用于在客户端与消息代理之间进行异步消息传递。

1.1 STOMP 工作原理

TIP

STOMP 类似于 HTTP 的消息版 - 使用简单的文本命令如 CONNECT, SUBSCRIBE, SEND 等,非常适合 Web 应用的消息通信。

2. 添加 STOMP 依赖

2.1 Gradle 配置 (Kotlin DSL)

kotlin
// build.gradle.kts
dependencies {
    implementation("org.springframework.integration:spring-integration-stomp:6.5.1")
    // 服务器端额外依赖
    implementation("org.springframework:spring-websocket")
    implementation("io.projectreactor.netty:reactor-netty")
}

2.2 客户端类型

Spring 提供两种 STOMP 客户端实现:

  • WebSocketStompClient:基于 WebSocket 标准
  • ReactorNettyTcpStompClient:基于 Reactor Netty

3. 配置 STOMP 会话管理器

3.1 创建 STOMP 客户端

kotlin
@Configuration
class StompConfig {

    @Bean
    fun stompClient(): ReactorNettyTcpStompClient {
        return ReactorNettyTcpStompClient("127.0.0.1", 61613).apply {
            messageConverter = PassThruMessageConverter()
            taskScheduler = ThreadPoolTaskScheduler().apply { afterPropertiesSet() }
            receiptTimeLimit = 5000  // 回执超时5秒
        }
    }
}

NOTE

PassThruMessageConverter 用于直接传递消息内容而不进行转换,适用于简单文本消息。

3.2 会话管理器配置

kotlin
@Bean
fun stompSessionManager(): StompSessionManager {
    return ReactorNettyTcpStompSessionManager(stompClient()).apply {
        autoReceipt = true  // 自动生成RECEIPT头部
    }
}

最佳实践

会话管理器是 STOMP 集成的核心组件:

  • 管理客户端与代理的连接
  • 处理会话生命周期
  • 提供线程安全的会话访问

4. 接收 STOMP 消息(入站适配器)

4.1 基本配置

kotlin
@Bean
fun stompInputChannel(): PollableChannel {
    return QueueChannel()
}

@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
    return StompInboundChannelAdapter(stompSessionManager(), "/topic/orders").apply {
        outputChannel = stompInputChannel()
        payloadType = String::class.java  // 消息体类型
    }
}

4.2 动态订阅目的地

kotlin
@Service
class SubscriptionService(
    private val adapter: StompInboundChannelAdapter
) {
    // 动态添加订阅
    fun addSubscription(destination: String) {
        adapter.addDestination(destination)
    }

    // 移除订阅
    fun removeSubscription(destination: String) {
        adapter.removeDestination(destination)
    }
}

CAUTION

动态订阅时需确保:

  1. 目标路径格式正确
  2. 客户端有相应权限
  3. 避免频繁变更导致性能问题

5. 发送 STOMP 消息(出站适配器)

5.1 基础消息发送

kotlin
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
fun stompMessageHandler(): MessageHandler {
    return StompMessageHandler(stompSessionManager()).apply {
        destination = "/topic/notifications"  // 固定目的地
    }
}

5.2 动态目的地设置

kotlin
// 使用SpEL表达式动态确定目的地
@Bean
@ServiceActivator(inputChannel = "dynamicStompChannel")
fun dynamicDestinationHandler(): MessageHandler {
    return StompMessageHandler(stompSessionManager()).apply {
        destinationExpression = "headers['stomp_destination']"
    }
}

// 发送示例
fun sendDynamicMessage(destination: String, payload: Any) {
    val message = MessageBuilder.withPayload(payload)
        .setHeader("stomp_destination", destination)
        .build()
    dynamicStompChannel.send(message)
}

6. STOMP 头部映射

6.1 默认头部映射

Spring Integration 自动处理标准 STOMP 头部:

STOMP 头部Spring 头部说明
destinationstomp_destination消息目标
content-typestomp_content-type内容类型
receiptstomp_receipt回执ID

6.2 自定义头部映射

kotlin
@Bean
fun customHeaderMapper(): StompHeaderMapper {
    return object : StompHeaderMapper() {
        override fun toHeaders(source: StompHeaders): Map<String, Any> {
            val headers = super.toHeaders(source)
            // 添加自定义映射
            headers["custom-header"] = source["x-custom-header"]
            return headers
        }
    }
}

// 在适配器中使用
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
    return StompInboundChannelAdapter(stompSessionManager(), "/topic/data").apply {
        headerMapper = customHeaderMapper()  
    }
}

7. 处理 STOMP 事件

7.1 事件类型

Spring Integration 提供的事件类型:

  • StompExceptionEvent:连接/处理错误
  • StompReceiptEvent:消息回执事件
  • StompConnectionFailedEvent:连接失败

7.2 事件监听配置

kotlin
@Bean
fun stompEventChannel(): PollableChannel {
    return QueueChannel()
}

@Bean
fun stompEventListener(): ApplicationListener<ApplicationEvent> {
    return ApplicationEventListeningMessageProducer().apply {
        eventTypes = arrayOf(StompIntegrationEvent::class.java)
        outputChannel = stompEventChannel()
    }
}

@Bean
@ServiceActivator(inputChannel = "stompEventChannel")
fun eventHandler(): MessageHandler {
    return MessageHandler { message ->
        when (val event = message.payload) {
            is StompExceptionEvent ->
                logger.error("STOMP错误: ${event.exception.message}")
            is StompReceiptEvent ->
                logger.info("消息回执: ${event.receiptId}")
        }
    }
}

重要提示

回执处理注意事项

  1. 确保 autoReceipt = true 启用回执
  2. 设置合理的 receiptTimeLimit(默认15秒)
  3. 处理超时未收到的回执事件

8. 完整配置示例

8.1 Kotlin 配置类

kotlin
@Configuration
@EnableIntegration
class FullStompConfig {

    TOMP客户端配置
    @Bean
    fun stompClient(): ReactorNettyTcpStompClient {
        return ReactorNettyTcpStompClient("broker.example.com", 61613).apply {
            messageConverter = MappingJackson2MessageConverter()
            taskScheduler = ThreadPoolTaskScheduler().apply { afterPropertiesSet() }
            receiptTimeLimit = 10000 // 10秒回执超时
        }
    }

    // 会话管理器
    @Bean
    fun stompSessionManager(): StompSessionManager {
        return ReactorNettyTcpStompSessionManager(stompClient()).apply {
            autoReceipt = true
        }
    }

    // 入站适配器
    @Bean
    fun stompInboundAdapter(): StompInboundChannelAdapter {
        return StompInboundChannelAdapter(stompSessionManager(), "/topic/data").apply {
            outputChannel = stompInputChannel()
            payloadType = DataModel::class.java // 自定义数据类型
        }
    }

    // 出站适配器
    @Bean
    @ServiceActivator(inputChannel = "stompOutChannel")
    fun stompOutHandler(): MessageHandler {
        return StompMessageHandler(stompSessionManager()).apply {
            destination = "/topic/notifications"
        }
    }

    // 事件监听
    @Bean
    fun stompEventListener(): ApplicationListener<ApplicationEvent> {
        return ApplicationEventListeningMessageProducer().apply {
            eventTypes = arrayOf(StompIntegrationEvent::class.java)
            outputChannel = stompEventChannel()
        }
    }
}

8.2 领域模型示例

kotlin
// 消息数据模型
data class DataModel(
    val id: String,
    val timestamp: Instant,
    val content: String
)

9. 常见问题解答

9.1 连接不稳定问题

症状:频繁触发 StompConnectionFailedEvent

解决方案

kotlin
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
    return ReactorNettyTcpStompClient(...).apply {
        // 启用自动重连
        isAutoStartup = true
        // 设置心跳
        setDefaultHeartbeat(arrayOf(10000L, 10000L))  // 发送/接收心跳间隔
    }
}

9.2 消息序列化问题

症状:接收端无法解析消息

解决方案

kotlin
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
    return ReactorNettyTcpStompClient(...).apply {
        // 配置合适的消息转换器
        messageConverter = MappingJackson2MessageConverter().apply {
            objectMapper = Jackson2ObjectMapperBuilder.json()
                .failOnUnknownProperties(false)
                .build()
        }
    }
}

// 在入站适配器指定类型
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
    return StompInboundChannelAdapter(...).apply {
        payloadType = MyDataClass::class.java  
    }
}

9.3 性能优化技巧

  1. 连接池配置

    kotlin
    @Bean
    fun stompSessionManager(): StompSessionManager {
        val pool = object : StompSessionManager {
            // 实现连接池逻辑
        }
        return pool
    }
  2. 批量消息处理

    kotlin
    @Bean
    @ServiceActivator(inputChannel = "stompInputChannel")
    fun batchProcessor(): MessageHandler {
        return MessageHandler { message ->
            // 实现批量处理逻辑
        }
    }
  3. 异步处理

    kotlin
    @Bean
    fun stompInputChannel(): ExecutorChannel {
        return ExecutorChannel(Executors.newFixedThreadPool(10))
    }

总结

通过本教程,您已掌握在 Spring Integration 中使用 STOMP 的关键技术:

  1. STOMP 基础:理解协议工作原理和消息格式
  2. 客户端配置:使用 Reactor Netty 或 WebSocket 客户端
  3. 消息处理:实现入站/出站适配器
  4. 头部映射:自定义 STOMP 与 Spring 消息头的转换
  5. 事件处理:监听和处理连接、回执等事件
  6. 最佳实践:动态订阅、性能优化和错误处理

下一步学习建议

  1. 结合 Spring Security 实现安全的 STOMP 连接
  2. 探索与 Spring Boot 的自动配置集成
  3. 尝试在云原生环境中部署 STOMP 应用

本教程完整代码示例可在 GitHub 仓库 获取