Skip to content

Spring Integration DelegatingSessionFactory 详解

NOTE

本文专为 Spring 初学者设计,将使用 Kotlin 语言和注解配置方式,结合现代 Spring 最佳实践讲解 DelegatingSessionFactory。

为什么需要 DelegatingSessionFactory?

现实场景痛点

想象你管理着一个多租户系统,不同客户的数据存储在不同的 FTP 服务器上。传统方式需要为每个客户端单独配置 FTP 适配器,导致配置复杂且难以维护:

CAUTION

传统方式问题:

  • 配置冗余:每新增一个客户端就需要新配置
  • 资源浪费:每个适配器独立维护连接池
  • 动态切换困难:运行时无法灵活切换服务器

DelegatingSessionFactory 解决方案

DelegatingSessionFactorySpring Integration 4.2 引入,核心功能是在运行时动态选择实际使用的 FTP SessionFactory

核心概念与原理

工作机制

  1. 线程键绑定:调用 setThreadKey() 将键与当前线程关联
  2. 工厂委派:根据键查找对应的实际 SessionFactory
  3. 资源清理:操作后调用 clearThreadKey() 释放资源

IMPORTANT

线程键使用 ThreadLocal 存储,确保多线程环境下隔离安全

组件关系

完整配置与使用指南

基础配置(Kotlin DSL)

kotlin
@Configuration
class FtpConfig {

    // 服务器1的SessionFactory
    @Bean
    fun server1Factory(): DefaultFtpSessionFactory {
        return DefaultFtpSessionFactory().apply {
            host = "ftp.server1.com"
            port = 21
            username = "user1"
            password = "pass1"
        }
    }

    // 服务器2的SessionFactory
    @Bean
    fun server2Factory(): DefaultFtpSessionFactory {
        return DefaultFtpSessionFactory().apply {
            host = "ftp.server2.com"
            port = 21
            username = "user2"
            password = "pass2"
        }
    }

    elegatingSessionFactory 主配置
    @Bean
    fun delegatingSessionFactory(
        server1Factory: DefaultFtpSessionFactory,
        server2Factory: DefaultFtpSessionFactory
    ): DelegatingSessionFactory<FTPFile> {
        val factories = mapOf(
            "server1" to server1Factory,
            "server2" to server2Factory
        )
        return DelegatingSessionFactory(factories)
    }
}
kotlin
@Service
class FtpService(
    private val delegatingFactory: DelegatingSessionFactory<FTPFile>
) {

    fun downloadFromServer(serverKey: String, remotePath: String): File {
        try {
            // 1. 设置当前线程使用的服务器键
            delegatingFactory.setThreadKey(serverKey)

            // 2. 执行FTP操作
            val session = delegatingFactory.session
            return session.read(remotePath)

        } finally {
            // 3. 必须清理线程键!
            delegatingFactory.clearThreadKey()
        }
    }
}

TIP

最佳实践:始终在 finally 块中调用 clearThreadKey(),避免线程污染导致后续操作错误

高级用法:结合 ContextHolderRequestHandlerAdvice

kotlin
@Bean
fun contextHolderAdvice(
    delegatingFactory: DelegatingSessionFactory<FTPFile>
): ContextHolderRequestHandlerAdvice<String> {
    return ContextHolderRequestHandlerAdvice<String>().apply {
        setThreadKeyManager(object : ThreadKeyManager<String> {
            override fun setKey(key: String) {
                delegatingFactory.setThreadKey(key)
            }
            override fun clearKey() {
                delegatingFactory.clearThreadKey()
            }
            override fun getCurrentKey(): String? = null
        })
    }
}

// 在消息端点使用
@Bean
fun ftpInboundFlow(
    delegatingFactory: DelegatingSessionFactory<FTPFile>
): IntegrationFlow {
    return IntegrationFlows
        .from(Ftp.inboundAdapter(delegatingFactory)
        .handle({ payload, _ -> processFile(payload) }) {
            it.advice(contextHolderAdvice(delegatingFactory))
        }
        .get()
}

多服务器轮询(5.0.7+)

kotlin
@Bean
fun rotatingServerAdvice(
    delegatingFactory: DelegatingSessionFactory<FTPFile>
): RotatingServerAdvice {
    // 定义要轮询的服务器键列表
    val serverKeys = listOf("server1", "server2", "server3")
    return RotatingServerAdvice(delegatingFactory, serverKeys)
}

@Bean
fun multiServerInboundFlow(): IntegrationFlow {
    return IntegrationFlows
        .from(Ftp.inboundAdapter(delegatingSessionFactory())
        .advice(rotatingServerAdvice(delegatingSessionFactory())) 
        .channel("processChannel")
        .get()
}

TIP

轮询机制适用于:

  • 负载均衡分发
  • 故障转移处理
  • 地理分布式文件采集

关键注意事项

缓存配置(重要!)

WARNING

错误配置缓存是常见陷阱!必须缓存委托的 SessionFactory,而非 DelegatingSessionFactory 本身

kotlin
// 错误:直接缓存了DelegatingSessionFactory
@Bean
fun wrongCachedFactory(
    delegatingFactory: DelegatingSessionFactory<FTPFile>
): SessionFactory<FTPFile> {
    return CachingSessionFactory(delegatingFactory) 
}
kotlin
// 正确:缓存每个委托的SessionFactory
@Bean
fun cachedServer1Factory(
    server1Factory: DefaultFtpSessionFactory
): SessionFactory<FTPFile> {
    return CachingSessionFactory(server1Factory, 10) // 最大10个会话
}

@Bean
fun cachedServer2Factory(
    server2Factory: DefaultFtpSessionFactory
): SessionFactory<FTPFile> {
    return CachingSessionFactory(server2Factory, 10)
}

@Bean
fun delegatingSessionFactory(
    cachedServer1Factory: SessionFactory<FTPFile>,
    cachedServer2Factory: SessionFactory<FTPFile>
): DelegatingSessionFactory<FTPFile> {
    val factories = mapOf(
        "server1" to cachedServer1Factory,
        "server2" to cachedServer2Factory
    )
    return DelegatingSessionFactory(factories)
}

异步环境处理

kotlin
@Async
fun asyncFtpOperation(serverKey: String) {
    // 手动传递线程键到新线程
    val currentKey = delegatingFactory.getThreadKey()

    try {
        delegatingFactory.setThreadKey(serverKey)
        // 执行操作...
    } finally {
        delegatingFactory.setThreadKey(currentKey) // 恢复原始键
    }
}

CAUTION

@Async 或线程池环境中,需手动传递线程键,因为 ThreadLocal 值不会自动继承

最佳实践总结

场景推荐方案优势
多租户隔离ContextHolderAdvice + 消息头请求级别自动路由
服务器轮询RotatingServerAdvice自动负载均衡
高并发访问缓存委托SessionFactory性能优化
错误处理try-finally + clearThreadKey资源安全

典型应用场景

  1. 多租户文件处理:根据租户ID路由到专属FTP
  2. 环境隔离:测试/生产环境使用不同服务器
  3. 地理分布式存储:就近选择区域服务器
  4. 故障转移:主备服务器自动切换

TIP

结合 Spring Cloud Config 可实现动态服务器配置更新,无需重启应用

通过 DelegatingSessionFactory,我们实现了 FTP 资源的动态路由管理,大幅提升了系统的灵活性和可维护性。