Skip to content

Spring Integration TCP网关详解

本教程使用Kotlin实现,采用Spring Boot 3.x + Spring Integration 6.x,优先使用注解配置

一、TCP网关基础概念

1.1 TCP网关的作用

TCP网关是Spring Integration中处理TCP通信的核心组件,分为入站网关(TcpInboundGateway)和出站网关(TcpOutboundGateway):

1.2 核心组件关系

组件作用对应工厂
TcpInboundGateway处理入站TCP请求TcpNetServerConnectionFactory
TcpOutboundGateway处理出站TCP请求TcpNetClientConnectionFactory

IMPORTANT

关键区别:入站网关监听端口等待请求,出站网关主动发起连接

二、入站TCP网关配置

2.1 基础配置(服务端模式)

kotlin
@Configuration
class TcpInboundConfig {

    // 1. 创建TCP服务器连接工厂
    @Bean
    fun serverConnectionFactory(): TcpNetServerConnectionFactory {
        return TcpNetServerConnectionFactory(8080).apply {
            setSerializer(TcpCodecs.crlf())  // 使用CRLF分隔符
            setDeserializer(TcpCodecs.crlf())
            isSingleUse = false             // 保持长连接
        }
    }

    // 2. 创建入站网关
    @Bean
    fun inboundGateway(
        cf: TcpNetServerConnectionFactory,
        channels: IntegrationFlowChannels
    ): TcpInboundGateway {
        return TcpInboundGateway().apply {
            connectionFactory = cf
            requestChannel = channels.tcpRequestChannel()
            replyChannel = channels.tcpReplyChannel()
            replyTimeout = 10000            // 10秒超时
        }
    }

    // 3. 定义消息处理流程
    @Bean
    fun integrationFlow(): IntegrationFlow {
        return IntegrationFlow
            .from("tcpRequestChannel")
            .transform(Transformers.objectToString()) 
            .handle { payload, _ ->
                "处理结果: ${payload.toString().uppercase()}"
            }
            .channel("tcpReplyChannel")
            .get()
    }
}

TIP

连接ID处理要点
当手动构造回复消息时,必须包含原始消息的ip_connectionId头信息,否则网关无法正确路由回复

2.2 客户端模式配置

当网关需要主动连接外部服务时:

kotlin
@Bean
fun clientModeConnectionFactory(): TcpNetClientConnectionFactory {
    return TcpNetClientConnectionFactory("remote-server", 8080).apply {
        setSerializer(TcpCodecs.crlf())
        setDeserializer(TcpCodecs.crlf())
        isSingleUse = false
    }
}

@Bean
fun clientModeGateway(cf: TcpNetClientConnectionFactory): TcpInboundGateway {
    return TcpInboundGateway().apply {
        connectionFactory = cf
        isClientMode = true
        retryInterval = 5000                 // 5秒重连间隔
        scheduler = ThreadPoolTaskScheduler() // 任务调度器
        // ...其他配置
    }
}

客户端模式注意事项

  1. 必须设置singleUse=false保持长连接
  2. 使用retryInterval控制重连频率
  3. 通过@gatewayBeanName.retryConnection()可手动触发重连

三、出站TCP网关配置

3.1 基础配置

kotlin
@Configuration
class TcpOutboundConfig {

    // 1. 创建TCP客户端连接工厂
    @Bean
    fun clientConnectionFactory(): TcpNetClientConnectionFactory {
        return TcpNetClientConnectionFactory("remote-host", 8080).apply {
            setSerializer(TcpCodecs.lengthHeader4())
            setDeserializer(TcpCodecs.lengthHeader4())
        }
    }

    // 2. 创建出站网关
    @Bean
    fun outboundGateway(cf: TcpNetClientFactory): TcpOutboundGateway {
        return TcpOutboundGateway().apply {
            connectionFactory = cf
            requestTimeout = 10000          // 10秒超时
            remoteTimeout = 10000
        }
    }

    // 3. 定义出站流程
    @Bean
    fun outboundFlow(gateway: TcpOutboundGateway): IntegrationFlow {
        return IntegrationFlow
            .from("outboundRequestChannel")
            .handle(gateway)                 
            .channel("outboundReplyChannel")
            .get()
    }
}

3.2 高级特性配置

kotlin
@Bean
fun asyncOutboundGateway(cf: TcpNetClientFactory): TcpOutboundGateway {
    return TcpOutboundGateway().apply {
        connectionFactory = cf
        isAsync = true // 启用异步
    }
}
kotlin
@Bean
fun singleUseFactory(): TcpNetClientConnectionFactory {
    return TcpNetClientConnectionFactory("host", 8080).apply {
        isSingleUse = true
    }
}

CAUTION

并发请求警告:当使用共享连接(singleUse=false)时,多个并发请求会被阻塞直到前一个请求完成

四、实战应用场景

4.1 金融交易网关示例

kotlin
@Bean
fun tradingIntegrationFlow(
    inboundGW: TcpInboundGateway,
    outboundGW: TcpOutboundGateway
): IntegrationFlow {
    return IntegrationFlow
        .from(inboundGW)
        .enrichHeaders {
            header("transactionId", UUID.randomUUID())
            header("timestamp", System.currentTimeMillis())
        }
        .handle { payload, headers ->
            TradingRequest(
                id = headers["transactionId"] as UUID,
                data = payload.toString()
            )
        }
        .handle(outboundGW)
        .transform { response ->
            processTradingResponse(response)
        }
        .get()
}

private fun processTradingResponse(response: Message<*>) {
    // 验证签名/解密等操作
}

4.2 设备监控系统

五、常见问题解决方案

5.1 连接管理问题

问题:客户端意外断开导致连接泄漏
解决方案:配置心跳检测

kotlin
@Bean
fun connectionFactoryWithHeartbeat(): TcpNetServerConnectionFactory {
    return TcpNetServerConnectionFactory(8080).apply {
        setSoKeepAlive(true) 
        setMapper(object : TcpMessageMapper() {
            override fun toMessage(payload: Any): Message<ByteArray> {
                // 添加心跳检测逻辑
            }
        })
    }
}

5.2 消息乱码问题

问题:中文内容传输乱码
解决方案:统一字符编码

kotlin
@Bean
fun utf8ConnectionFactory(): TcpNetClientConnectionFactory {
    return TcpNetClientConnectionFactory("host", 8080).apply {
        setSerializer(TcpCodecs.crlf(Charset.forName("UTF-8"))) 
        setDeserializer(TcpCodecs.crlf(Charset.forName("UTF-8")))
    }
}

5.3 性能优化建议

  1. 使用连接池提高并发能力
kotlin
@Bean
fun cachedConnectionFactory(): CachingClientConnectionFactory {
    return CachingClientConnectionFactory(
        TcpNetClientConnectionFactory("host", 8080),
        10 // 连接池大小
    )
}
  1. 启用NIO提升吞吐量
kotlin
@Bean
fun nioConnectionFactory(): TcpNioClientConnectionFactory {
    return TcpNioClientConnectionFactory("host", 8080).apply {
        setUsingDirectBuffers(true) // 使用直接内存
    }
}

六、最佳实践总结

  1. 连接策略选择

    • 短连接:singleUse=true(每次请求新建连接)
    • 长连接:singleUse=false + 心跳检测
  2. 超时配置原则

    kotlin
    TcpInboundGateway().apply {
        replyTimeout = 15000 // 略大于外部服务最大响应时间
    }
    
    TcpOutboundGateway().apply {
        requestTimeout = 5000  // 连接超时
        remoteTimeout = 10000  // 响应超时
    }
  3. 异常处理机制

    kotlin
    @Bean
    fun errorHandlingFlow(): IntegrationFlow {
        return IntegrationFlow
            .from("errorChannel")
            .handle { message ->
                logger.error("TCP处理异常: ${message.payload}")
                // 发送告警/重试等
            }
            .get()
    }

完整示例代码: GitHub示例项目