Appearance
Spring Integration STOMP 支持教程
1. STOMP 协议简介
STOMP(Simple Text Orientated Messaging Protocol)是一种简单的基于文本的消息协议,设计用于在客户端与消息代理之间进行异步消息传递。
1.1 STOMP 工作原理
TIP
STOMP 类似于 HTTP 的消息版 - 使用简单的文本命令如 CONNECT
, SUBSCRIBE
, SEND
等,非常适合 Web 应用的消息通信。
2. 添加 STOMP 依赖
2.1 Gradle 配置 (Kotlin DSL)
kotlin
// build.gradle.kts
dependencies {
implementation("org.springframework.integration:spring-integration-stomp:6.5.1")
// 服务器端额外依赖
implementation("org.springframework:spring-websocket")
implementation("io.projectreactor.netty:reactor-netty")
}
2.2 客户端类型
Spring 提供两种 STOMP 客户端实现:
WebSocketStompClient
:基于 WebSocket 标准ReactorNettyTcpStompClient
:基于 Reactor Netty
3. 配置 STOMP 会话管理器
3.1 创建 STOMP 客户端
kotlin
@Configuration
class StompConfig {
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
return ReactorNettyTcpStompClient("127.0.0.1", 61613).apply {
messageConverter = PassThruMessageConverter()
taskScheduler = ThreadPoolTaskScheduler().apply { afterPropertiesSet() }
receiptTimeLimit = 5000 // 回执超时5秒
}
}
}
NOTE
PassThruMessageConverter
用于直接传递消息内容而不进行转换,适用于简单文本消息。
3.2 会话管理器配置
kotlin
@Bean
fun stompSessionManager(): StompSessionManager {
return ReactorNettyTcpStompSessionManager(stompClient()).apply {
autoReceipt = true // 自动生成RECEIPT头部
}
}
最佳实践
会话管理器是 STOMP 集成的核心组件:
- 管理客户端与代理的连接
- 处理会话生命周期
- 提供线程安全的会话访问
4. 接收 STOMP 消息(入站适配器)
4.1 基本配置
kotlin
@Bean
fun stompInputChannel(): PollableChannel {
return QueueChannel()
}
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
return StompInboundChannelAdapter(stompSessionManager(), "/topic/orders").apply {
outputChannel = stompInputChannel()
payloadType = String::class.java // 消息体类型
}
}
4.2 动态订阅目的地
kotlin
@Service
class SubscriptionService(
private val adapter: StompInboundChannelAdapter
) {
// 动态添加订阅
fun addSubscription(destination: String) {
adapter.addDestination(destination)
}
// 移除订阅
fun removeSubscription(destination: String) {
adapter.removeDestination(destination)
}
}
CAUTION
动态订阅时需确保:
- 目标路径格式正确
- 客户端有相应权限
- 避免频繁变更导致性能问题
5. 发送 STOMP 消息(出站适配器)
5.1 基础消息发送
kotlin
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
fun stompMessageHandler(): MessageHandler {
return StompMessageHandler(stompSessionManager()).apply {
destination = "/topic/notifications" // 固定目的地
}
}
5.2 动态目的地设置
kotlin
// 使用SpEL表达式动态确定目的地
@Bean
@ServiceActivator(inputChannel = "dynamicStompChannel")
fun dynamicDestinationHandler(): MessageHandler {
return StompMessageHandler(stompSessionManager()).apply {
destinationExpression = "headers['stomp_destination']"
}
}
// 发送示例
fun sendDynamicMessage(destination: String, payload: Any) {
val message = MessageBuilder.withPayload(payload)
.setHeader("stomp_destination", destination)
.build()
dynamicStompChannel.send(message)
}
6. STOMP 头部映射
6.1 默认头部映射
Spring Integration 自动处理标准 STOMP 头部:
STOMP 头部 | Spring 头部 | 说明 |
---|---|---|
destination | stomp_destination | 消息目标 |
content-type | stomp_content-type | 内容类型 |
receipt | stomp_receipt | 回执ID |
6.2 自定义头部映射
kotlin
@Bean
fun customHeaderMapper(): StompHeaderMapper {
return object : StompHeaderMapper() {
override fun toHeaders(source: StompHeaders): Map<String, Any> {
val headers = super.toHeaders(source)
// 添加自定义映射
headers["custom-header"] = source["x-custom-header"]
return headers
}
}
}
// 在适配器中使用
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
return StompInboundChannelAdapter(stompSessionManager(), "/topic/data").apply {
headerMapper = customHeaderMapper()
}
}
7. 处理 STOMP 事件
7.1 事件类型
Spring Integration 提供的事件类型:
StompExceptionEvent
:连接/处理错误StompReceiptEvent
:消息回执事件StompConnectionFailedEvent
:连接失败
7.2 事件监听配置
kotlin
@Bean
fun stompEventChannel(): PollableChannel {
return QueueChannel()
}
@Bean
fun stompEventListener(): ApplicationListener<ApplicationEvent> {
return ApplicationEventListeningMessageProducer().apply {
eventTypes = arrayOf(StompIntegrationEvent::class.java)
outputChannel = stompEventChannel()
}
}
@Bean
@ServiceActivator(inputChannel = "stompEventChannel")
fun eventHandler(): MessageHandler {
return MessageHandler { message ->
when (val event = message.payload) {
is StompExceptionEvent ->
logger.error("STOMP错误: ${event.exception.message}")
is StompReceiptEvent ->
logger.info("消息回执: ${event.receiptId}")
}
}
}
重要提示
回执处理注意事项:
- 确保
autoReceipt = true
启用回执 - 设置合理的
receiptTimeLimit
(默认15秒) - 处理超时未收到的回执事件
8. 完整配置示例
8.1 Kotlin 配置类
kotlin
@Configuration
@EnableIntegration
class FullStompConfig {
TOMP客户端配置
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
return ReactorNettyTcpStompClient("broker.example.com", 61613).apply {
messageConverter = MappingJackson2MessageConverter()
taskScheduler = ThreadPoolTaskScheduler().apply { afterPropertiesSet() }
receiptTimeLimit = 10000 // 10秒回执超时
}
}
// 会话管理器
@Bean
fun stompSessionManager(): StompSessionManager {
return ReactorNettyTcpStompSessionManager(stompClient()).apply {
autoReceipt = true
}
}
// 入站适配器
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
return StompInboundChannelAdapter(stompSessionManager(), "/topic/data").apply {
outputChannel = stompInputChannel()
payloadType = DataModel::class.java // 自定义数据类型
}
}
// 出站适配器
@Bean
@ServiceActivator(inputChannel = "stompOutChannel")
fun stompOutHandler(): MessageHandler {
return StompMessageHandler(stompSessionManager()).apply {
destination = "/topic/notifications"
}
}
// 事件监听
@Bean
fun stompEventListener(): ApplicationListener<ApplicationEvent> {
return ApplicationEventListeningMessageProducer().apply {
eventTypes = arrayOf(StompIntegrationEvent::class.java)
outputChannel = stompEventChannel()
}
}
}
8.2 领域模型示例
kotlin
// 消息数据模型
data class DataModel(
val id: String,
val timestamp: Instant,
val content: String
)
9. 常见问题解答
9.1 连接不稳定问题
症状:频繁触发 StompConnectionFailedEvent
✅ 解决方案:
kotlin
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
return ReactorNettyTcpStompClient(...).apply {
// 启用自动重连
isAutoStartup = true
// 设置心跳
setDefaultHeartbeat(arrayOf(10000L, 10000L)) // 发送/接收心跳间隔
}
}
9.2 消息序列化问题
症状:接收端无法解析消息
✅ 解决方案:
kotlin
@Bean
fun stompClient(): ReactorNettyTcpStompClient {
return ReactorNettyTcpStompClient(...).apply {
// 配置合适的消息转换器
messageConverter = MappingJackson2MessageConverter().apply {
objectMapper = Jackson2ObjectMapperBuilder.json()
.failOnUnknownProperties(false)
.build()
}
}
}
// 在入站适配器指定类型
@Bean
fun stompInboundAdapter(): StompInboundChannelAdapter {
return StompInboundChannelAdapter(...).apply {
payloadType = MyDataClass::class.java
}
}
9.3 性能优化技巧
连接池配置:
kotlin@Bean fun stompSessionManager(): StompSessionManager { val pool = object : StompSessionManager { // 实现连接池逻辑 } return pool }
批量消息处理:
kotlin@Bean @ServiceActivator(inputChannel = "stompInputChannel") fun batchProcessor(): MessageHandler { return MessageHandler { message -> // 实现批量处理逻辑 } }
异步处理:
kotlin@Bean fun stompInputChannel(): ExecutorChannel { return ExecutorChannel(Executors.newFixedThreadPool(10)) }
总结
通过本教程,您已掌握在 Spring Integration 中使用 STOMP 的关键技术:
- ✅ STOMP 基础:理解协议工作原理和消息格式
- ✅ 客户端配置:使用 Reactor Netty 或 WebSocket 客户端
- ✅ 消息处理:实现入站/出站适配器
- ✅ 头部映射:自定义 STOMP 与 Spring 消息头的转换
- ✅ 事件处理:监听和处理连接、回执等事件
- ✅ 最佳实践:动态订阅、性能优化和错误处理
下一步学习建议
- 结合 Spring Security 实现安全的 STOMP 连接
- 探索与 Spring Boot 的自动配置集成
- 尝试在云原生环境中部署 STOMP 应用
本教程完整代码示例可在 GitHub 仓库 获取