Skip to content

🌐 使用 Kotlin DSL 配置 Spring Integration TCP 组件

目录

  1. TCP 通信基础
  2. 配置 TCP 服务器与客户端
  3. TCP 适配器模式详解
  4. TCP 网关模式详解
  5. 常见问题与解决方案

TCP 通信基础

TCP 在 Spring Integration 中的作用

Spring Integration 的 TCP 模块提供了基于流的双向通信能力,支持同步/异步交互模式。它抽象了底层的 Socket 操作,开发者可以专注于业务逻辑。

核心组件关系图

必要依赖

kotlin
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-integration")
    implementation("org.springframework.integration:spring-integration-ip")
}

配置 TCP 服务器与客户端

基础配置类

kotlin
@Configuration
class TcpConfig {

    //  // 使用 lengthHeader1 编解码器
    @Bean
    fun serverConnectionFactory(): TcpNetServerConnectionFactory {
        return Tcp.netServer(1234)
            .apply {
                deserializer = TcpCodecs.lengthHeader1() //  // 消息头指定长度
                backlog = 30 // 最大等待连接数
            }
    }

    @Bean
    fun clientConnectionFactory(): TcpNioClientConnectionFactory {
        return Tcp.nioClient("localhost", 1234)
            .apply {
                serializer = TCodecs.lengthHeader1() //  // 与服务器匹配
                isSingleUse = false // 保持长连接
            }
    }
}

TIP

编解码器选择

  • lengthHeader1:1字节长度头(最大255字节)
  • lengthHeader2:2字节长度头(最大65K字节)
  • crlf:回车换行分隔符
  • stxetx:STX/ETX控制字符分隔

TCP 适配器模式详解

单向通信模型:仅发送或接收数据,无响应返回

服务器入站适配器

kotlin
@Bean
fun serverAdapterFlow(): IntegrationFlow {
    return IntegrationFlow
        .from(
            Tcp.inboundAdapter(serverConnectionFactory())
                .errorChannel("tcpErrorChannel") //  // 错误处理通道
                .id("tcpInAdapter")
        )
        .transform(Transformers.objectToString()) //  // 字节转字符串
        .channel("inboundChannel")
        .get()
}

客户端出站适配器

kotlin
@Bean
fun clientAdapterFlow(): IntegrationFlow {
    return IntegrationFlow
        .from("outboundChannel")
        .handle(
            Tcp.outboundAdapter(clientConnectionFactory())
        )
        .get()
}

适配器交互时序

注意事项

  1. 适配器模式适用于单向数据流场景(如日志采集)
  2. 客户端发送后不会等待服务器响应
  3. 需要额外配置错误处理通道

TCP 网关模式详解

双向通信模型:请求/响应完整交互

服务器网关

kotlin
@Bean
fun serverGatewayFlow(): IntegrationFlow {
    return IntegrationFlow
        .from(
            Tcp.inboundGateway(serverConnectionFactory())
                .apply {
                    errorChannel = "tcpErrorChannel"
                    id = "tcpInGateway"
                }
        )
        .transform(Transformers.objectToString())
        .<String, String>transform { it.uppercase() } //  // 业务处理
        .channel("gatewayChannel")
        .get()
}

客户端网关

kotlin
@Bean
fun clientGatewayFlow(): IntegrationFlow {
    return IntegrationFlow
        .from("requestChannel")
        .handle(
            Tcp.outboundGateway(clientConnectionFactory())
        )
        .get()
}

网关交互时序

kotlin
// 单向通信
.handle(Tcp.outboundAdapter(factory))
kotlin
// 双向通信
.handle(Tcp.outboundGateway(factory))

IMPORTANT

网关关键配置

  1. 必须同时设置 serializerdeserializer
  2. 服务器和客户端需使用相同编解码器
  3. 默认超时时间为 10 秒(可通过 replyTimeout 调整)

常见问题与解决方案

⚠️ 消息截断问题

现象:接收不完整数据包
原因:编解码器配置不一致
解决方案

kotlin
// 服务器和客户端保持相同配置
TcpCodecs.lengthHeader1() 

⚠️ 连接资源耗尽

现象Too many open files 错误
解决方案

kotlin
Tcp.nioServer(port).apply {
    backlog = 50 // 控制最大等待队列
    isSingleUse = true // 短连接模式
    soTimeout = 30000 // 30秒超时
}

⚠️ 性能调优建议

kotlin
Tcp.nioClient(host, port).apply {
    usingDirectBuffers = true // 使用直接缓冲区
    soSendBufferSize = 1024 * 1024 // 1MB发送缓冲区
    soReceiveBufferSize = 1024 * 1024 // 1MB接收缓冲区
}

❌ 编解码器选择错误

kotlin
// 错误:服务器使用长度头,客户端使用行分隔
TcpCodecs.lengthHeader1() // 服务器
TcpCodecs.crlf() // 客户端 ❌ 不匹配!

// 正确:双方使用相同编解码器 ✅
TcpCodecs.lengthHeader1() // 双方一致

CAUTION

生产环境建议

  1. 使用 TcpNioConnectionFactory 替代 TcpNetConnectionFactory 获得更好性能
  2. 对于高并发场景,配置连接池管理 TCP 连接
  3. 始终实现错误处理通道记录异常信息

通过以上配置,您可以快速构建可靠的 TCP 通信系统。实际开发中,建议结合 Spring Integration 的监控端点实时观察消息流状态。