Skip to content

Spring Integration FTP 入站通道适配器详解

1. FTP 入站通道适配器概述

FTP 入站通道适配器是一个特殊的监听器,它连接 FTP 服务器并监听远程目录事件(如新文件创建),然后自动触发文件传输。它的核心功能是将远程 FTP 服务器上的文件下载到本地系统,并将每个文件作为消息的有效负载发送到指定通道。

2. 基础配置与工作原理

2.1 核心组件配置(Kotlin DSL)

kotlin
@Configuration
@EnableIntegration
class FtpInboundConfig {

    @Bean
    fun ftpSessionFactory(): SessionFactory<FTPFile> {
        return DefaultFtpSessionFactory().apply {
            host = "ftp.example.com"
            port = 21
            username = "user"
            password = "pass"
        }
    }

    @Bean
    fun ftpInboundFlow(): IntegrationFlow {
        return IntegrationFlow.from(
            Ftp.inboundAdapter(ftpSessionFactory())
                .localDirectory(File("local/dir"))
                .remoteDirectory("remote/dir")
                .autoCreateLocalDirectory(true)
                .deleteRemoteFiles(false)
                .filter(CompositeFileListFilter<FTPFile>().apply {
                    addFilter(FtpRegexPatternFileListFilter(".*\\.txt"))
                    addFilter(FtpPersistentAcceptOnceFileListFilter(metadataStore(), "ftpMetadata"))
                }),
            { poller -> poller.poller(Pollers.fixedRate(5000)) }
        )
        .handle { file: File ->
            println("处理文件: ${file.name}")
        }
        .get()
    }

    @Bean
    fun metadataStore(): ConcurrentMetadataStore {
        return RedisMetadataStore(redisConnectionFactory)
    }
}

配置说明

  • localDirectory:本地存储目录路径
  • remoteDirectory:远程FTP服务器目录
  • autoCreateLocalDirectory:自动创建本地目录
  • deleteRemoteFiles:下载后是否删除远程文件
  • filter:组合过滤器(正则+持久化)
  • poller:每5秒轮询一次

最佳实践

优先使用 FtpPersistentAcceptOnceFileListFilter 替代内存过滤器,确保系统重启后仍能正确跟踪已处理文件

3. 文件过滤策略

3.1 常用过滤器类型

过滤器类型说明适用场景
FtpRegexPatternFileListFilter正则表达式匹配复杂文件名匹配
FtpSimplePatternFileListFilter简单模式匹配基本通配符匹配
FtpPersistentAcceptOnceFileListFilter持久化过滤器避免重复处理文件
FtpLastModifiedFileListFilter最后修改时间过滤处理稳定文件
FtpSystemMarkerFilePresentFileListFilter标记文件检查确保文件完整

3.2 过滤器配置示例

kotlin
// 组合过滤器配置
fun customFilter(): CompositeFileListFilter<FTPFile> {
    return CompositeFileListFilter<FTPFile>().apply {
        // 只处理.txt文件
        addFilter(FtpRegexPatternFileListFilter(".*\\.txt"))

        // 防止重复处理(使用Redis持久化)
        addFilter(FtpPersistentAcceptOnceFileListFilter(
            metadataStore(),
            "ftpFilterKey"
        ))

        // 只处理30秒前的文件(避免处理中文件)
        addFilter(FtpLastModifiedFileListFilter(Duration.ofSeconds(30)))
    }
}

注意事项

处理正在写入的文件可能导致数据不完整。务必结合 FtpLastModifiedFileListFilter 或标记文件策略确保文件完整性

4. 轮询器配置要点

4.1 轮询行为控制

kotlin
@Bean
fun ftpPoller(): PollerSpec {
    return Pollers.fixedRate(5000)  // 每5秒轮询一次
        .maxMessagesPerPoll(-1)    // 每次处理所有可用文件
        .errorChannel(errorChannel()) // 错误处理通道
}

关键参数

  • maxMessagesPerPoll
    • -1:处理所有可用文件(推荐)
    • 1:每次只处理一个文件(默认)
    • N:每次最多处理N个文件

::: important 性能建议 对于大文件处理场景,建议设置 maxMessagesPerPoll=1 避免内存溢出,同时配合 maxFetchSize 限制单次获取数量 :::

5. 错误恢复机制

5.1 事务性错误处理

kotlin
@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlow.from(
        Ftp.inboundAdapter(ftpSessionFactory())
            .localDirectory(File("local/dir"))
            .remoteDirectory("remote/dir"),
        { poller ->
            poller.poller(Pollers.fixedRate(5000)
                .transactional(transactionManager())
        }
    )
    // 事务同步配置
    .handle({ file: File ->
        // 文件处理逻辑
    }, { endpoint ->
        endpoint.advice(afterRollback {
            (it.payload as File).delete() // 回滚时删除本地文件
        })
    })
    .get()
}

@Bean
fun transactionManager(): PlatformTransactionManager {
    return DataSourceTransactionManager(dataSource)
}

错误处理流程

  1. 文件下载到本地临时目录(后缀如 .writing
  2. 事务中处理文件
  3. 成功:重命名文件为最终名称
  4. 失败:回滚事务 → 删除本地文件 → 重置过滤器状态

6. 现代配置方式(Kotlin DSL)

6.1 完整配置示例

kotlin
@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlow.from(
        Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)    // 保留远程文件时间戳
            .remoteDirectory("uploads") // 远程目录
            .regexFilter(".*\\.csv$")   // 只处理CSV文件
            .localFilename { remoteFileName ->
                "${remoteFileName.toUpperCase()}_PROCESSED"
            }  // 自定义本地文件名
            .localDirectory(File("data/inbound")) // 本地目录
            .maxFetchSize(10) // 每次最多获取10个文件
            .temporaryFileSuffix(".tmp") // 临时文件后缀
            .localFilter(AcceptOnceFileListFilter()) // 本地过滤器
    ) { e ->
        e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
        e.id("ftpInboundAdapter")
    }
    .transform(File::class) { file ->
        processCsvFile(file) // 自定义CSV处理逻辑
    }
    .handle { payload, _ ->
        logger.info("文件处理完成: ${(payload as File).name}")
    }
    .get()
}

6.2 关键特性说明

kotlin
// 动态远程目录(每次轮询时计算)
.remoteDirectoryExpression("@remoteDirService.getCurrentDir()")

// 自定义目录扫描器
.scanner(CustomFtpDirectoryScanner())

// 保留时间戳
.preserveTimestamp(true)

// 自定义临时文件后缀
.temporaryFileSuffix(".downloading")

动态目录技巧

使用 remoteDirectoryExpression 实现动态目录切换,非常适合多租户或日期分区场景

7. 处理不完整数据

7.1 文件完整性保障策略

kotlin
// 标记文件过滤器
@Bean
fun markerFileFilter(): FileListFilter<FTPFile> {
    return FtpSystemMarkerFilePresentFileListFilter(
        ".COMPLETE", // 标记文件后缀
        ftpSessionFactory()
    )
}

// 在适配器中应用
@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlow.from(
        Ftp.inboundAdapter(ftpSessionFactory())
            .filter(markerFileFilter())
            .remoteDirectory("incoming")
            .localDirectory(File("processed"))
    )
    // ...
    .get()
}

工作原理

  1. 源系统上传 data.csv
  2. 源系统创建 data.csv.COMPLETE 标记文件
  3. 适配器只下载带有标记文件的CSV
  4. 处理完成后删除标记文件

关键注意

不完整文件处理是生产环境必须考虑的问题,缺少此机制可能导致部分数据处理失败

8. 常见问题解决方案

8.1 典型问题排查表

问题现象可能原因解决方案
文件重复处理过滤器状态未持久化使用 FtpPersistentAcceptOnceFileListFilter
处理部分写入文件轮询太快添加 FtpLastModifiedFileListFilter
文件下载后未删除权限不足检查FTP账户删除权限
中文文件名乱码编码不一致配置 sessionFactory.setControlEncoding("UTF-8")
连接频繁断开防火墙限制调整 sessionFactory.setClientMode(FTPClient.PASSIVE_MODE)

8.2 性能优化技巧

kotlin
// 连接池配置
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
    val factory = DefaultFtpSessionFactory().apply {
        // ...基础配置
    }

    return CachingSessionFactory(factory).apply {
        poolSize = 10          // 连接池大小
        sessionWaitTimeout = 30_000 // 等待超时(ms)
        testSession = true     // 验证会话有效性
    }
}

// 适配器配置
@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlow.from(
        Ftp.inboundAdapter(ftpSessionFactory())
            .maxFetchSize(50)  // 每次最多获取50个文件
            .remoteDirectory("large-files")
    ) { e ->
        e.poller(Pollers.fixedRate(60_000) // 降低轮询频率
    }
    // ...
}

生产环境建议

  1. 大文件场景:设置 maxFetchSize=1 避免内存溢出
  2. 高并发场景:使用 CachingSessionFactory 连接池
  3. 网络不稳定:增加 sessionWaitTimeout 并启用 testSession

9. 高级特性:元数据存储

9.1 分布式状态管理

kotlin
@Configuration
@EnableIntegration
class MetadataConfig {

    // Redis元数据存储
    @Bean
    fun metadataStore(): ConcurrentMetadataStore {
        return RedisMetadataStore(redisConnectionFactory).apply {
            setPrefix("ftp::state::") // 键前缀
        }
    }

    // 持久化过滤器
    @Bean
    fun persistentFilter(): FileListFilter<FTPFile> {
        return FtpPersistentAcceptOnceFileListFilter(
            metadataStore(),
            "fileFilter"
        ).apply {
            flushOnUpdate = true // 每次更新立即持久化
        }
    }
}

集群部署优势

  • 多个适配器实例共享处理状态
  • 重启后自动恢复处理位置
  • 避免跨实例重复处理文件

存储选择

除了Redis,还可选择:

  • JdbcMetadataStore(数据库存储)
  • ZookeeperMetadataStore(分布式协调)
  • GemfireMetadataStore(内存网格)

总结

Spring Integration FTP 入站通道适配器提供了强大的文件获取能力,关键实践包括:

  1. 优先使用Kotlin DSL进行声明式配置
  2. 必须实施完整性检查(时间戳或标记文件)
  3. 生产环境务必使用持久化过滤器
  4. 根据文件大小调整 maxFetchSize 参数
  5. 使用连接池优化性能

通过合理配置,FTP入站适配器能够可靠地处理各种文件传输场景,成为企业集成解决方案中的重要组件。

CAUTION

切勿在生产环境使用 deleteRemoteFiles=true 而不做备份,重要文件删除前应增加确认机制