Appearance
Spring Integration DelegatingSessionFactory 详解
NOTE
本文专为 Spring 初学者设计,将使用 Kotlin 语言和注解配置方式,结合现代 Spring 最佳实践讲解 DelegatingSessionFactory。
为什么需要 DelegatingSessionFactory?
现实场景痛点
想象你管理着一个多租户系统,不同客户的数据存储在不同的 FTP 服务器上。传统方式需要为每个客户端单独配置 FTP 适配器,导致配置复杂且难以维护:
CAUTION
传统方式问题:
- 配置冗余:每新增一个客户端就需要新配置
- 资源浪费:每个适配器独立维护连接池
- 动态切换困难:运行时无法灵活切换服务器
DelegatingSessionFactory 解决方案
DelegatingSessionFactory
在 Spring Integration 4.2 引入,核心功能是在运行时动态选择实际使用的 FTP SessionFactory:
核心概念与原理
工作机制
- 线程键绑定:调用
setThreadKey()
将键与当前线程关联 - 工厂委派:根据键查找对应的实际 SessionFactory
- 资源清理:操作后调用
clearThreadKey()
释放资源
IMPORTANT
线程键使用 ThreadLocal
存储,确保多线程环境下隔离安全
组件关系
完整配置与使用指南
基础配置(Kotlin DSL)
kotlin
@Configuration
class FtpConfig {
// 服务器1的SessionFactory
@Bean
fun server1Factory(): DefaultFtpSessionFactory {
return DefaultFtpSessionFactory().apply {
host = "ftp.server1.com"
port = 21
username = "user1"
password = "pass1"
}
}
// 服务器2的SessionFactory
@Bean
fun server2Factory(): DefaultFtpSessionFactory {
return DefaultFtpSessionFactory().apply {
host = "ftp.server2.com"
port = 21
username = "user2"
password = "pass2"
}
}
elegatingSessionFactory 主配置
@Bean
fun delegatingSessionFactory(
server1Factory: DefaultFtpSessionFactory,
server2Factory: DefaultFtpSessionFactory
): DelegatingSessionFactory<FTPFile> {
val factories = mapOf(
"server1" to server1Factory,
"server2" to server2Factory
)
return DelegatingSessionFactory(factories)
}
}
kotlin
@Service
class FtpService(
private val delegatingFactory: DelegatingSessionFactory<FTPFile>
) {
fun downloadFromServer(serverKey: String, remotePath: String): File {
try {
// 1. 设置当前线程使用的服务器键
delegatingFactory.setThreadKey(serverKey)
// 2. 执行FTP操作
val session = delegatingFactory.session
return session.read(remotePath)
} finally {
// 3. 必须清理线程键!
delegatingFactory.clearThreadKey()
}
}
}
TIP
最佳实践:始终在 finally 块中调用 clearThreadKey(),避免线程污染导致后续操作错误
高级用法:结合 ContextHolderRequestHandlerAdvice
kotlin
@Bean
fun contextHolderAdvice(
delegatingFactory: DelegatingSessionFactory<FTPFile>
): ContextHolderRequestHandlerAdvice<String> {
return ContextHolderRequestHandlerAdvice<String>().apply {
setThreadKeyManager(object : ThreadKeyManager<String> {
override fun setKey(key: String) {
delegatingFactory.setThreadKey(key)
}
override fun clearKey() {
delegatingFactory.clearThreadKey()
}
override fun getCurrentKey(): String? = null
})
}
}
// 在消息端点使用
@Bean
fun ftpInboundFlow(
delegatingFactory: DelegatingSessionFactory<FTPFile>
): IntegrationFlow {
return IntegrationFlows
.from(Ftp.inboundAdapter(delegatingFactory)
.handle({ payload, _ -> processFile(payload) }) {
it.advice(contextHolderAdvice(delegatingFactory))
}
.get()
}
多服务器轮询(5.0.7+)
kotlin
@Bean
fun rotatingServerAdvice(
delegatingFactory: DelegatingSessionFactory<FTPFile>
): RotatingServerAdvice {
// 定义要轮询的服务器键列表
val serverKeys = listOf("server1", "server2", "server3")
return RotatingServerAdvice(delegatingFactory, serverKeys)
}
@Bean
fun multiServerInboundFlow(): IntegrationFlow {
return IntegrationFlows
.from(Ftp.inboundAdapter(delegatingSessionFactory())
.advice(rotatingServerAdvice(delegatingSessionFactory()))
.channel("processChannel")
.get()
}
TIP
轮询机制适用于:
- 负载均衡分发
- 故障转移处理
- 地理分布式文件采集
关键注意事项
缓存配置(重要!)
WARNING
错误配置缓存是常见陷阱!必须缓存委托的 SessionFactory,而非 DelegatingSessionFactory 本身
kotlin
// 错误:直接缓存了DelegatingSessionFactory
@Bean
fun wrongCachedFactory(
delegatingFactory: DelegatingSessionFactory<FTPFile>
): SessionFactory<FTPFile> {
return CachingSessionFactory(delegatingFactory)
}
kotlin
// 正确:缓存每个委托的SessionFactory
@Bean
fun cachedServer1Factory(
server1Factory: DefaultFtpSessionFactory
): SessionFactory<FTPFile> {
return CachingSessionFactory(server1Factory, 10) // 最大10个会话
}
@Bean
fun cachedServer2Factory(
server2Factory: DefaultFtpSessionFactory
): SessionFactory<FTPFile> {
return CachingSessionFactory(server2Factory, 10)
}
@Bean
fun delegatingSessionFactory(
cachedServer1Factory: SessionFactory<FTPFile>,
cachedServer2Factory: SessionFactory<FTPFile>
): DelegatingSessionFactory<FTPFile> {
val factories = mapOf(
"server1" to cachedServer1Factory,
"server2" to cachedServer2Factory
)
return DelegatingSessionFactory(factories)
}
异步环境处理
kotlin
@Async
fun asyncFtpOperation(serverKey: String) {
// 手动传递线程键到新线程
val currentKey = delegatingFactory.getThreadKey()
try {
delegatingFactory.setThreadKey(serverKey)
// 执行操作...
} finally {
delegatingFactory.setThreadKey(currentKey) // 恢复原始键
}
}
CAUTION
在 @Async
或线程池环境中,需手动传递线程键,因为 ThreadLocal
值不会自动继承
最佳实践总结
场景 | 推荐方案 | 优势 |
---|---|---|
多租户隔离 | ContextHolderAdvice + 消息头 | 请求级别自动路由 |
服务器轮询 | RotatingServerAdvice | 自动负载均衡 |
高并发访问 | 缓存委托SessionFactory | 性能优化 |
错误处理 | try-finally + clearThreadKey | 资源安全 |
典型应用场景
- 多租户文件处理:根据租户ID路由到专属FTP
- 环境隔离:测试/生产环境使用不同服务器
- 地理分布式存储:就近选择区域服务器
- 故障转移:主备服务器自动切换
TIP
结合 Spring Cloud Config 可实现动态服务器配置更新,无需重启应用
通过 DelegatingSessionFactory,我们实现了 FTP 资源的动态路由管理,大幅提升了系统的灵活性和可维护性。