Skip to content

Spring Integration TCP/UDP 配置详解

1. TCP/UDP 核心概念

1.1 连接工厂(Connection Factory)

连接工厂是 Spring Integration 中 TCP/UDP 通信的核心组件,负责创建和管理网络连接:

kotlin
@Configuration
class TcpConfig {
    @Bean
    fun clientConnectionFactory(): TcpNetClientConnectionFactory {
        return TcpNetClientConnectionFactory("localhost", 1234).apply {
            //  // 关键配置示例
            serializer = ByteArrayCrLfSerializer()
            deserializer = ByteArrayCrLfSerializer()
            isSingleUse = false
        }
    }

    @Bean
    fun serverConnectionFactory(): TcpNetServerConnectionFactory {
        return TcpNetServerConnectionFactory(1234).apply {
            //  // 服务器特有配置
            usingNio = true
            backlog = 100
            localAddress = "192.168.1.100"
        }
    }
}

TIP

客户端工厂需要指定 hostport,而服务器工厂只需指定 port

1.2 通道适配器 vs 网关

组件类型通信方向是否支持请求/响应适用场景
入站通道适配器接收数据❌ 单向接收日志收集、事件通知
出站通道适配器发送数据❌ 单向发送告警通知、数据上报
入站网关接收数据✅ 支持响应API服务、命令处理
出站网关发送数据✅ 支持响应服务调用、数据查询

2. 关键配置属性详解

2.1 通用连接属性

常用属性说明:

属性默认值说明
type-必须指定:clientserver
host-客户端专用 目标主机地址
port-通信端口号
serializerByteArrayCrLfSerializer消息序列化实现
deserializerByteArrayCrLfSerializer消息反序列化实现
using-niofalse是否使用NIO(非阻塞I/O)
so-timeout0(无限)Socket超时时间(毫秒)
so-keep-alivefalse是否启用TCP keep-alive

CAUTION

using-direct-buffers 仅在 using-nio=true 时有效,否则必须设为 false

2.2 高级配置选项

2.2.1 NIO 配置

kotlin
@Bean
fun nioConnectionFactory(): TcpNioServerConnectionFactory {
    return TcpNioServerConnectionFactory(8080).apply {
        //  // NIO最佳实践配置
        usingDirectBuffers = true
        readDelay = 200 // 读取重试延迟(ms)
        applySequence = true // 启用消息顺序控制
        taskExecutor = SimpleAsyncTaskExecutor()
    }
}

IMPORTANT

当启用NIO时,建议设置 applySequence=true 以保证消息顺序

2.2.2 SSL/TLS 安全配置

kotlin
@Bean
fun sslContextSupport(): DefaultTcpNetSSLSocketFactorySupport {
    val sslContext = SSLContext.getInstance("TLS").apply {
        // 实际应用中应使用正式证书
        init(null, null, null)
    }
    return DefaultTcpNetSSLSocketFactorySupport(sslContext)
}

@Bean
fun secureConnectionFactory(): TcpNetClientConnectionFactory {
    return TcpNetClientConnectionFactory("secure.example.com", 443).apply {
        // SL配置
        sslContextSupport = sslContextSupport()
    }
}

2.3 UDP 特定配置

UDP 配置示例代码
kotlin
DP入站适配器
@Bean
fun udpInboundAdapter(): UdpInboundChannelAdapter {
    return UdpInboundChannelAdapter(
        "udpAdapter",
        InetSocketAddress(9999)
    ).apply {
        //  DP特有配置
        multicast = true
        multicastAddress = "230.0.0.1"
        receiveBufferSize = 65507 // 最大UDP包大小
        checkLength = true
        errorChannel = MessageChannels.direct("udpErrorChannel").channel
    }
}

DP出站适配器
@Bean
fun udpOutboundAdapter(): UnicastSendingMessageHandler {
    return UnicastSendingMessageHandler("localhost", 9998).apply {
        //  // 出站配置
        acknowledge = true
        ackHost = "local.host"
        ackPort = 10000
        ackTimeout = 5000
    }
}

3. 完整配置示例

3.1 TCP 客户端/服务端实现

kotlin
@Configuration
class TcpServerConfig {
    @Bean
    fun serverConnectionFactory(): TcpNetServerConnectionFactory {
        return TcpNetServerConnectionFactory(12345).apply {
            deserializer = ByteArraySingleTerminatorSerializer('\n')
            serializer = ByteArraySingleTerminatorSerializer('\n')
            soTimeout = 30000 // 30秒超时
        }
    }

    @Bean
    fun inboundAdapter(): TcpInboundGateway {
        return TcpInboundGateway().apply {
            connectionFactory = serverConnectionFactory()
            requestChannel = MessageChannels.direct("tcpRequestChannel").channel
        }
    }

    @Bean
    fun requestChannel(): MessageChannel {
        return DirectChannel()
    }

    @Bean
    fun serviceActivator(): ServiceActivator {
        return ServiceActivator({ message ->
            "Echo: ${String(message.payload as ByteArray)}"
        }).apply {
            inputChannel = requestChannel()
        }
    }
}
kotlin
@Configuration
class TcpClientConfig {
    @Bean
    fun clientConnectionFactory(): TcpNetClientConnectionFactory {
        return TcpNetClientConnectionFactory("localhost", 12345).apply {
            deserializer = ByteArraySingleTerminatorSerializer('\n')
            serializer = ByteArraySingleTerminatorSerializer('\n')
            soTimeout = 10000 // 10秒超时
        }
    }

    @Bean
    fun outboundGateway(): TcpOutboundGateway {
        return TcpOutboundGateway().apply {
            connectionFactory = clientConnectionFactory()
            replyTimeout = 5000 // 5秒响应超时
        }
    }

    @Bean
    fun tcpGatewayChannel(): MessageChannel {
        return DirectChannel()
    }

    @Bean
    @ServiceActivator(inputChannel = "tcpGatewayChannel")
    fun gatewayActivator(): MessageHandler {
        return outboundGateway()
    }
}

3.2 使用示例

kotlin
@Service
class TcpClientService(
    @Qualifier("tcpGatewayChannel") private val tcpChannel: MessageChannel
) {
    fun sendMessage(message: String): String {
        val message = MessageBuilder
            .withPayload(message.toByteArray())
            .build()

        val reply = tcpChannel.sendAndReceive(message) as Message<*>

        return String(reply.payload as ByteArray)
    }
}

4. 最佳实践与常见问题

4.1 配置建议

  1. 连接管理

    • 长连接:singleUse = false + 合理设置 soTimeout
    • 短连接:singleUse = true (每次请求新建连接)
  2. 性能优化

    kotlin
    connectionFactory.apply {
        usingNio = true // 高并发场景启用NIO
        soSendBufferSize = 16384 // 16KB发送缓冲区
        soReceiveBufferSize = 32768 // 32KB接收缓冲区
        taskExecutor = ThreadPoolTaskExecutor().apply {
            corePoolSize = 10
            maxPoolSize = 50
        }
    }
  3. 错误处理

    kotlin
    @Bean
    fun tcpInboundAdapter(): TcpInboundChannelAdapter {
        return TcpInboundChannelAdapter().apply {
            // ...
            errorChannel = MessageChannels.direct("tcpErrorChannel").channel
        }
    }
    
    @ServiceActivator(inputChannel = "tcpErrorChannel")
    fun handleError(message: Message<Exception>) {
        logger.error("TCP通信错误: ${message.payload.message}")
        // 实现重试或告警逻辑
    }

4.2 常见问题解决

问题现象可能原因解决方案
连接超时so-timeout 设置过小适当增加超时时间
消息不完整/截断缓冲区大小不足增加 so-receive-buffer-size
高并发时性能下降未启用NIO设置 using-nio=true
消息顺序错乱未启用消息序列控制设置 apply-sequence=true
UDP包丢失未启用确认机制设置 acknowledge=true + 配置确认参数
多宿主系统绑定错误未指定 local-address明确设置本地IP地址

WARNING

生产环境必须配置SSL/TLS!明文通信会导致数据泄露风险

5. 高级技巧

5.1 自定义序列化

kotlin
class JsonSerializer : AbstractByteArraySerializer() {
    override fun serialize(payload: Any): ByteArray {
        //  // 自定义JSON序列化
        return objectMapper.writeValueAsBytes(payload)
    }

    override fun deserialize(bytes: ByteArray): Any {
        // [!code highlight] // 自定义JSON反序列化
        return objectMapper.readValue(bytes, Map::class.java)
    }
}

// 使用自定义序列化
connectionFactory.serializer = JsonSerializer()
connectionFactory.deserializer = JsonSerializer()

5.2 客户端模式(自动重连)

kotlin
@Bean
fun clientModeAdapter(): TcpInboundChannelAdapter {
    return TcpInboundChannelAdapter().apply {
        connectionFactory = clientConnectionFactory()
        //  // 客户端模式配置
        clientMode = true
        retryInterval = 5000 // 5秒重试
        scheduler = taskScheduler()
    }
}

@Bean
fun taskScheduler(): ThreadPoolTaskScheduler {
    return ThreadPoolTaskScheduler().apply {
        poolSize = 5
    }
}

6. 总结

Spring Integration 提供了完善的 TCP/UDP 通信支持,关键点在于:

  1. 正确配置连接工厂(TcpNetClientConnectionFactory/TcpNetServerConnectionFactory
  2. 合理选择序列化方案(ByteArrayCrLfSerializer 为通用选择)
  3. 根据场景选用适配器(单向通信)或网关(请求/响应)
  4. 生产环境务必启用SSL/TLS加密
  5. 高并发场景推荐启用NIO模式

TIP

实际开发中,建议使用Spring Boot Starter简化配置:

gradle
implementation 'org.springframework.integration:spring-integration-ip'

通过本文的详细讲解和示例代码,您应该能够: ✅ 配置各种类型的TCP/UDP通信组件
✅ 理解关键参数的作用和配置方法
✅ 实现安全可靠的网络通信
✅ 解决常见的连接和性能问题