Appearance
Spring Integration TCP网关详解
本教程使用Kotlin实现,采用Spring Boot 3.x + Spring Integration 6.x,优先使用注解配置
一、TCP网关基础概念
1.1 TCP网关的作用
TCP网关是Spring Integration中处理TCP通信的核心组件,分为入站网关(TcpInboundGateway
)和出站网关(TcpOutboundGateway
):
1.2 核心组件关系
组件 | 作用 | 对应工厂 |
---|---|---|
TcpInboundGateway | 处理入站TCP请求 | TcpNetServerConnectionFactory |
TcpOutboundGateway | 处理出站TCP请求 | TcpNetClientConnectionFactory |
IMPORTANT
关键区别:入站网关监听端口等待请求,出站网关主动发起连接
二、入站TCP网关配置
2.1 基础配置(服务端模式)
kotlin
@Configuration
class TcpInboundConfig {
// 1. 创建TCP服务器连接工厂
@Bean
fun serverConnectionFactory(): TcpNetServerConnectionFactory {
return TcpNetServerConnectionFactory(8080).apply {
setSerializer(TcpCodecs.crlf()) // 使用CRLF分隔符
setDeserializer(TcpCodecs.crlf())
isSingleUse = false // 保持长连接
}
}
// 2. 创建入站网关
@Bean
fun inboundGateway(
cf: TcpNetServerConnectionFactory,
channels: IntegrationFlowChannels
): TcpInboundGateway {
return TcpInboundGateway().apply {
connectionFactory = cf
requestChannel = channels.tcpRequestChannel()
replyChannel = channels.tcpReplyChannel()
replyTimeout = 10000 // 10秒超时
}
}
// 3. 定义消息处理流程
@Bean
fun integrationFlow(): IntegrationFlow {
return IntegrationFlow
.from("tcpRequestChannel")
.transform(Transformers.objectToString())
.handle { payload, _ ->
"处理结果: ${payload.toString().uppercase()}"
}
.channel("tcpReplyChannel")
.get()
}
}
TIP
连接ID处理要点:
当手动构造回复消息时,必须包含原始消息的ip_connectionId
头信息,否则网关无法正确路由回复
2.2 客户端模式配置
当网关需要主动连接外部服务时:
kotlin
@Bean
fun clientModeConnectionFactory(): TcpNetClientConnectionFactory {
return TcpNetClientConnectionFactory("remote-server", 8080).apply {
setSerializer(TcpCodecs.crlf())
setDeserializer(TcpCodecs.crlf())
isSingleUse = false
}
}
@Bean
fun clientModeGateway(cf: TcpNetClientConnectionFactory): TcpInboundGateway {
return TcpInboundGateway().apply {
connectionFactory = cf
isClientMode = true
retryInterval = 5000 // 5秒重连间隔
scheduler = ThreadPoolTaskScheduler() // 任务调度器
// ...其他配置
}
}
客户端模式注意事项
- 必须设置
singleUse=false
保持长连接 - 使用
retryInterval
控制重连频率 - 通过
@gatewayBeanName.retryConnection()
可手动触发重连
三、出站TCP网关配置
3.1 基础配置
kotlin
@Configuration
class TcpOutboundConfig {
// 1. 创建TCP客户端连接工厂
@Bean
fun clientConnectionFactory(): TcpNetClientConnectionFactory {
return TcpNetClientConnectionFactory("remote-host", 8080).apply {
setSerializer(TcpCodecs.lengthHeader4())
setDeserializer(TcpCodecs.lengthHeader4())
}
}
// 2. 创建出站网关
@Bean
fun outboundGateway(cf: TcpNetClientFactory): TcpOutboundGateway {
return TcpOutboundGateway().apply {
connectionFactory = cf
requestTimeout = 10000 // 10秒超时
remoteTimeout = 10000
}
}
// 3. 定义出站流程
@Bean
fun outboundFlow(gateway: TcpOutboundGateway): IntegrationFlow {
return IntegrationFlow
.from("outboundRequestChannel")
.handle(gateway)
.channel("outboundReplyChannel")
.get()
}
}
3.2 高级特性配置
kotlin
@Bean
fun asyncOutboundGateway(cf: TcpNetClientFactory): TcpOutboundGateway {
return TcpOutboundGateway().apply {
connectionFactory = cf
isAsync = true // 启用异步
}
}
kotlin
@Bean
fun singleUseFactory(): TcpNetClientConnectionFactory {
return TcpNetClientConnectionFactory("host", 8080).apply {
isSingleUse = true
}
}
CAUTION
并发请求警告:当使用共享连接(singleUse=false
)时,多个并发请求会被阻塞直到前一个请求完成
四、实战应用场景
4.1 金融交易网关示例
kotlin
@Bean
fun tradingIntegrationFlow(
inboundGW: TcpInboundGateway,
outboundGW: TcpOutboundGateway
): IntegrationFlow {
return IntegrationFlow
.from(inboundGW)
.enrichHeaders {
header("transactionId", UUID.randomUUID())
header("timestamp", System.currentTimeMillis())
}
.handle { payload, headers ->
TradingRequest(
id = headers["transactionId"] as UUID,
data = payload.toString()
)
}
.handle(outboundGW)
.transform { response ->
processTradingResponse(response)
}
.get()
}
private fun processTradingResponse(response: Message<*>) {
// 验证签名/解密等操作
}
4.2 设备监控系统
五、常见问题解决方案
5.1 连接管理问题
问题:客户端意外断开导致连接泄漏
解决方案:配置心跳检测
kotlin
@Bean
fun connectionFactoryWithHeartbeat(): TcpNetServerConnectionFactory {
return TcpNetServerConnectionFactory(8080).apply {
setSoKeepAlive(true)
setMapper(object : TcpMessageMapper() {
override fun toMessage(payload: Any): Message<ByteArray> {
// 添加心跳检测逻辑
}
})
}
}
5.2 消息乱码问题
问题:中文内容传输乱码
解决方案:统一字符编码
kotlin
@Bean
fun utf8ConnectionFactory(): TcpNetClientConnectionFactory {
return TcpNetClientConnectionFactory("host", 8080).apply {
setSerializer(TcpCodecs.crlf(Charset.forName("UTF-8")))
setDeserializer(TcpCodecs.crlf(Charset.forName("UTF-8")))
}
}
5.3 性能优化建议
- 使用连接池提高并发能力
kotlin
@Bean
fun cachedConnectionFactory(): CachingClientConnectionFactory {
return CachingClientConnectionFactory(
TcpNetClientConnectionFactory("host", 8080),
10 // 连接池大小
)
}
- 启用NIO提升吞吐量
kotlin
@Bean
fun nioConnectionFactory(): TcpNioClientConnectionFactory {
return TcpNioClientConnectionFactory("host", 8080).apply {
setUsingDirectBuffers(true) // 使用直接内存
}
}
六、最佳实践总结
连接策略选择:
- 短连接:
singleUse=true
(每次请求新建连接) - 长连接:
singleUse=false
+ 心跳检测
- 短连接:
超时配置原则:
kotlinTcpInboundGateway().apply { replyTimeout = 15000 // 略大于外部服务最大响应时间 } TcpOutboundGateway().apply { requestTimeout = 5000 // 连接超时 remoteTimeout = 10000 // 响应超时 }
异常处理机制:
kotlin@Bean fun errorHandlingFlow(): IntegrationFlow { return IntegrationFlow .from("errorChannel") .handle { message -> logger.error("TCP处理异常: ${message.payload}") // 发送告警/重试等 } .get() }
完整示例代码: GitHub示例项目