Skip to content

🌟 Spring Integration RSocket 支持教程

本教程将介绍如何在 Spring Integration 中使用 RSocket 实现高效的异步通信。通过清晰的 Kotlin 示例和现代配置方式,即使是 Spring 初学者也能快速掌握核心概念。


🚀 一、RSocket 基础与项目配置

1. RSocket 简介

RSocket 是面向反应式应用的二进制协议,支持四种交互模式:

  • Request-Response:传统请求/响应
  • Fire-and-Forget:单向发送
  • Request-Stream:单请求多响应流
  • Channel:双向流通信

2. 添加依赖

build.gradle.kts 中添加:

kotlin
dependencies {
    implementation("org.springframework.integration:spring-integration-rsocket:6.5.1")
    implementation("io.rsocket:rsocket-core:1.1.3")
}

协议选择建议

优先使用 TCP 传输层,WebSocket 适合浏览器集成场景


🔌 二、建立 RSocket 连接

1. 服务端连接器配置

kotlin
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
    .decoder(StringDecoder.textPlainOnly())
    .encoder(CharSequenceEncoder.allMimeTypes())
    .build()

@Bean
fun serverConnector(): ServerRSocketConnector {
    return ServerRSocketConnector("localhost", 7000).apply {
        rSocketStrategies = rsocketStrategies()
        metadataMimeType = MimeType("message", "x.rsocket.routing.v0")
        clientRSocketKeyStrategy = BiFunction { headers, _ ->
            headers[DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER].toString()
        }
    }
}

// 监听连接事件
@EventListener
fun handleConnection(event: RSocketConnectedEvent) {
    println("✅ 客户端连接成功: ${event.requester}")
}

2. 客户端连接器配置

kotlin
@Bean
fun clientConnector(serverPort: Int): ClientRSocketConnector {
    return ClientRSocketConnector("localhost", serverPort).apply {
        setupRoute = "clientConnect/{user}"
        setupRouteVariables = "springUser"
    }
}

连接注意事项

确保服务端端口未被防火墙拦截,生产环境建议使用 TLS 加密


📥 三、入站网关 (RSocketInboundGateway)

1. 基础配置

处理 RSocket 请求并返回响应:

kotlin
@Bean
fun inboundGateway(): RSocketInboundGateway {
    return RSocketInboundGateway("echo").apply {
        requestChannelName = "requestChannel"
        decodeFluxAsUnit = true //  // 重要配置:是否将Flux作为整体解码
    }
}

@Transformer(inputChannel = "requestChannel")
fun processRequest(payload: Flux<String>): Mono<String> {
    return payload.next().map { it.uppercase() }
}

2. 交互模式控制

限制支持的交互类型:

kotlin
@Bean
fun streamGateway(): RSocketInboundGateway {
    return RSocketInboundGateway("/stream").apply {
        interactionModels = setOf(RSocketInteractionModel.REQUEST_STREAM)
        requestChannelName = "streamChannel"
    }
}

3. Kotlin DSL 配置

kotlin
@Bean
fun rsocketFlow() = integrationFlow(
    RSockets.inboundGateway("/uppercase")
        .interactionModels(RSocketInteractionModel.REQUEST_CHANNEL)
) {
    transform<Flux<String>, Mono<String>> { flux ->
        flux.next().map(String::uppercase)
    }
}

📤 四、出站网关 (RSocketOutboundGateway)

1. 请求响应模式

kotlin
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
fun outboundGateway(connector: ClientRSocketConnector): RSocketOutboundGateway {
    return RSocketOutboundGateway(Expression {
        it.headers["targetRoute"] // [!code highlight] // 动态路由
    }).apply {
        clientRSocketConnector = connector
        interactionModel = RSocketInteractionModel.REQUEST_RESPONSE
        expectedResponseType = String::class.java
    }
}

2. 流处理配置

kotlin
@Bean
fun streamFlow(connector: ClientRSocketConnector) = integrationFlow("streamRequest") {
    handle(RSockets.outboundGateway("/dataStream")
        .interactionModel(RSocketInteractionModel.REQUEST_STREAM)
        .expectedResponseType(String::class.java)
        .clientRSocketConnector(connector)
}

3. 元数据附加

kotlin
@Bean
fun metadataGateway(): RSocketOutboundGateway {
    return RSocketOutboundGateway(Expression { "fixedRoute" }).apply {
        metadataExpression = Expression { mapOf(
            "authToken" to "Bearer ${it.headers["token"]}",
            "priority" to "high"
        ) }
    }
}

⚙️ 五、高级配置技巧

1. 编解码优化

自定义复杂对象处理:

kotlin
@Bean
fun customStrategies(): RSocketStrategies {
    return RSocketStrategies.builder()
        .encoders { it.add(Jackson2JsonEncoder()) }
        .decoders { it.add(Jackson2JsonDecoder()) }
        .build()
}

2. 错误处理

kotlin
@Bean
fun errorFlow() = integrationFlow(RSockets.inboundGateway("/safe")) {
    handle({ payload ->
        if (payload == "error") throw IllegalStateException()
        else payload
    }, ConsumerEndpointSpec::transactional)
}

关键注意事项

  1. 使用 decodeFluxAsUnit=true 时确保数据有明确边界标识(如换行符)
  2. 流式响应需手动处理背压
  3. 始终验证客户端路由权限

💡 六、实战场景示例

场景:实时股票报价系统

网关配置

kotlin
@Bean
fun stockQuoteFlow() = integrationFlow(
    RSockets.inboundGateway("/quotes/{symbol}")
) {
    enrichHeaders { it.header("symbol", it.headers["symbol"]) }
    handle(RSockets.outboundGateway()
        .routeExpression("'stockService/' + headers.symbol")
        .interactionModel(RSocketInteractionModel.REQUEST_STREAM)
    )
}

❓ 七、常见问题解答

Q1: 如何获取所有已连接客户端?

kotlin
val clients = (serverConnector.getClientRSocketRequester() as? ConcurrentMap<*, *>)

Q2: 响应超时如何处理?

kotlin
outboundGateway.apply {
    replyTimeout = 5000 // 设置5秒超时
}

Q3: 如何调试 RSocket 通信?

添加日志配置:

properties
logging.level.io.rsocket=DEBUG
logging.level.org.springframework.integration.rsocket=TRACE

✅ 总结要点

  1. 连接器是通信基础:优先配置 ClientRSocketConnector/ServerRSocketConnector
  2. 入站网关处理请求:注意 decodeFluxAsUnit 配置
  3. 出站网关发起请求:灵活使用动态路由和元数据
  4. 优先选择 Kotlin DSL 配置:简洁且类型安全
  5. 生产环境务必添加 TLS 加密身份验证

通过本教程,您已掌握在 Spring Integration 中使用 RSocket 构建高效异步系统的核心技能。实际开发中建议结合 Spring Boot RSocket Starter 简化配置。