Appearance
DelegatingSessionFactory 详解:动态选择 SFTP 会话工厂
✅ 关键知识点:DelegatingSessionFactory 是 Spring Integration 4.2+ 提供的动态会话工厂解决方案,允许在运行时根据业务需求切换不同的 SFTP 连接配置。
一、为什么需要 DelegatingSessionFactory?
在实际业务场景中,我们常遇到这些需求:
- 同时连接多个不同认证信息的 SFTP 服务器
- 根据租户 ID 动态切换服务器配置(多租户系统)
- 实现 SFTP 服务器的故障转移和负载均衡
传统方案的痛点
kotlin
// 传统方案:为每个服务器创建独立适配器
@Bean
fun sftpAdapterA(): SftpOutboundGateway {
return SftpOutboundGateway(sessionFactoryA(), ...)
}
@Bean
fun sftpAdapterB(): SftpOutboundGateway {
return SftpOutboundGateway(sessionFactoryB(), ...)
}
// 问题:需要预先配置所有适配器,无法动态切换
传统方案需要为每个服务器创建独立组件,导致:
- 配置冗余且难以维护
- 无法根据请求上下文动态选择服务器
- 系统扩展性差(新增服务器需修改代码)
二、核心概念解析
1. DelegatingSessionFactory 工作原理
2. 关键组件说明
组件 | 作用 | 生命周期 |
---|---|---|
DelegatingSessionFactory | 会话工厂代理 | 应用级单例 |
实际 SessionFactory | 真实 SFTP 连接工厂 | 通常为 Bean |
线程键 (Thread Key) | 路由标识符 | 线程绑定 |
三、完整实现方案(Kotlin DSL)
1. 基础配置
kotlin
// 配置多个真实会话工厂
@Bean
fun sftpFactoryA(): DefaultSftpSessionFactory {
return DefaultSftpSessionFactory().apply {
host = "sftp.serverA.com"
port = 22
user = "userA"
password = "passA"
// [!code tip] 重要:启用会话缓存提升性能
isSessionCache = true
}
}
@Bean
fun sftpFactoryB(): DefaultSftpSessionFactory {
return DefaultSftpSessionFactory().apply {
host = "sftp.serverB.com"
port = 22
user = "userB"
privateKey = File("/path/to/key.pem")
// [!code tip] 使用密钥认证更安全
}
}
// 创建代理工厂
@Bean
fun delegatingFactory(
factories: Map<String, SessionFactory<ChannelSftp.LsEntry>>
): DelegatingSessionFactory<ChannelSftp.LsEntry> {
return DelegatingSessionFactory { contextKey ->
factories[contextKey] ?: throw IllegalStateException("未知工厂键: $contextKey")
}
}
2. 动态路由实现
kotlin
@Bean
fun sftpFlow(dsf: DelegatingSessionFactory<ChannelSftp.LsEntry>) = integrationFlow {
// 步骤1:设置线程键(根据消息头选择工厂)
handle {
val factoryKey = it.headers["sftpTarget"] as? String ?: "default"
dsf.setThreadKey(factoryKey)
}
// 步骤2:SFTP操作
handle(Sftp.outboundGateway(dsf, Sftp.Command.GET, "payload.remotePath"))
// 步骤3:清除线程键
handle { dsf.clearThreadKey() }
// 异常处理
errorChannel("errorFlow.input")
}
3. 使用示例
kotlin
// 发送请求到不同服务器
val messageA = MessageBuilder.withPayload("fileA.txt")
.setHeader("sftpTarget", "serverA")
.build()
val messageB = MessageBuilder.withPayload("fileB.txt")
.setHeader("sftpTarget", "serverB")
.build()
sftpChannel.send(messageA)
sftpChannel.send(messageB)
四、关键注意事项
会话缓存重要规则
WARNING
缓存配置警示:当启用会话缓存时,必须缓存每个委托工厂而非 DelegatingSessionFactory
本身
kotlin
// 正确做法:缓存实际工厂
@Bean
fun cachedFactoryA(): CachingSessionFactory {
return CachingSessionFactory(sftpFactoryA(), 10)
}
// 错误做法:缓存代理工厂
@Bean
fun cachedDelegatingFactory(): CachingSessionFactory {
return CachingSessionFactory(delegatingFactory, 10)
}
错误配置会导致连接泄露和不可预知的行为
最佳实践
- 键清除策略:务必在操作后调用
clearThreadKey()
,推荐在finally
块中执行 - 线程安全:避免在异步场景中使用,或配合
ThreadLocal
扩展机制 - 键管理:使用枚举或常量管理工厂键,避免魔法字符串
五、高级应用场景
1. 多服务器轮询(5.0.7+)
kotlin
@Bean
fun rotatingAdvice() = RotatingServerAdvice().apply {
add("serverA", sftpFactoryA())
add("serverB", sftpFactoryB())
}
@Bean
fun poller() = IntegrationFlow.from(
Sftp.inboundAdapter(delegatingFactory)
.localDirectory(File("/local"))
.remoteDirectory("/remote"),
{ it.poller(Pollers.fixedDelay(5000).advice(rotatingAdvice)) }
)
2. 多租户隔离方案
kotlin
// 根据租户ID选择工厂
handle {
val tenantId = SecurityContextHolder.getContext().authentication.tenantId
dsf.setThreadKey(tenantId)
}
六、常见问题排查
问题现象 | 可能原因 | 解决方案 |
---|---|---|
No qualifying bean 异常 | 未找到键对应的工厂 | 检查工厂映射配置 |
连接未切换 | 未调用 setThreadKey() | 确认消息头是否正确传递 |
连接泄露 | 未清除线程键 | 添加 clearThreadKey() 调用 |
性能低下 | 未启用会话缓存 | 为委托工厂配置 CachingSessionFactory |
TIP
调试技巧:启用 Spring Integration 调试日志可观察键设置过程:
properties
logging.level.org.springframework.integration=DEBUG
七、架构优势总结
✅ 动态路由:运行时按需选择 SFTP 服务器
✅ 配置简化:统一入口管理多服务器配置
✅ 资源优化:连接池按需分配,避免资源浪费
✅ 扩展灵活:新增服务器无需修改核心逻辑
适用场景推荐
- 多环境切换(开发/测试/生产)
- 多租户文件隔离系统
- SFTP 集群负载均衡
- 合规性要求的区域化存储
⚡️ 性能提示:对于高频访问场景,建议结合
CachingSessionFactory
使用,将连接池大小设置为预期并发量的 120%。