Skip to content

SFTP入站通道适配器全面指南

概述

SFTP入站通道适配器是一个监听器组件,它连接到SFTP服务器并监听远程目录事件(如新文件创建)。当检测到事件时,它会自动触发文件传输。这种机制非常适合实现自动化文件处理系统,如日志收集、数据导入等场景。

基础配置

Kotlin DSL配置示例

kotlin
@Bean
fun sftpInboundFlow() = integrationFlow(
    Sftp.inboundAdapter(sftpSessionFactory)
        .preserveTimestamp(true)
        .remoteDirectory("/remote/files")
        .regexFilter(".*\\.txt$")
        .localFilename { it.toUpperCase() + ".processed" }
        .localDirectory(File("local/storage")),
    { poller { fixedDelay(5000) } }
) {
    handle { message ->
        println("收到文件: ${message.payload}")
        // 处理文件内容
    }
}

关键配置属性详解

属性说明默认值
remoteDirectorySFTP服务器上的源目录必填
localDirectory本地存储目录必填
filenamePattern文件匹配模式(如*.txt)*
preserveTimestamp保留文件时间戳false
autoCreateLocalDirectory自动创建本地目录false
deleteRemoteFiles传输后删除远程文件false

TIP

最佳实践建议

  1. 始终设置autoCreateLocalDirectory=true避免目录不存在错误
  2. 生产环境中设置preserveTimestamp=true便于审计追踪
  3. 使用localFilename表达式自定义本地文件名增强可读性

高级文件过滤策略

1. 基础过滤方式

kotlin
// 基于文件扩展名过滤
.filter(SftpSimplePatternFileListFilter("*.csv"))

// 使用正则表达式过滤
.filter(SftpRegexPatternFileListFilter(".*_\\d{8}\\.txt"))

2. 复合过滤器

kotlin
val compositeFilter = CompositeFileListFilter<ChannelSftp.LsEntry>().apply {
    addFilter(SftpSimplePatternFileListFilter("*.log")) // 基本模式
    addFilter(SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileFilter")) // 持久化
    addFilter(SftpLastModifiedFileListFilter(Duration.ofMinutes(5))) // 时间过滤
}

3. 处理不完整文件

kotlin
// 只处理带有.done标记的完整文件
.filter(SftpSystemMarkerFilePresentFileListFilter(".done"))

CAUTION

重要注意事项

  • AcceptOnceFileListFilter仅内存有效,重启后状态丢失
  • 集群环境中必须使用SftpPersistentAcceptOnceFileListFilter
  • 大文件传输需设置足够长的SftpLastModifiedFileListFilter时间

大文件处理与错误恢复

事务性处理配置

kotlin
@Bean
fun sftpInboundAdapter(): MessageSource<File> {
    val source = SftpInboundFileSynchronizingMessageSource(synchronizer())
    source.setLocalDirectory(File("local/dir"))
    source.setAutoCreateLocalDirectory(true)

    // 设置可重置过滤器
    source.setLocalFilter(AcceptOnceFileListFilter<File>().apply {
        setSupportReset(true)
    })
    return source
}

@Bean
fun transactionSynchronizationFactory(): TransactionSynchronizationFactory {
    return DefaultTransactionSynchronizationFactory().apply {
        afterRollback { message ->
            // 事务回滚时删除本地文件
            (message.payload as File).delete()
        }
    }
}

错误处理机制

kotlin
.handle({ message ->
    try {
        processFile(message.payload as File)
    } catch (e: Exception) {
        // 自定义错误处理逻辑
        errorHandler.recover(message)
        throw e
    }
}, { it.advice(recoveryAdvice()) })

事务支持配置

完整事务配置示例

kotlin
@Bean
fun sftpFlow() = integrationFlow(
    Sftp.inboundAdapter(sessionFactory)
        .remoteDirectory("/sftp")
        .localDirectory(File("local"))
        .filter(filter()),
    {
        poller {
            fixedRate(1000)
            transactional {
                synchronizationFactory(transactionSynchronizationFactory())
            }
        }
    }
) {
    // ...处理逻辑
}

@Bean
fun transactionSynchronizationFactory() = DefaultTransactionSynchronizationFactory().apply {
    afterRollback { (it.payload as File).delete() } // 回滚时清理
}

@Bean
fun filter() = SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileKey")

事务设计要点

  1. 幂等性处理:确保同一文件重复处理不会产生副作用
  2. 状态管理:使用持久化存储记录处理状态
  3. 补偿机制:回滚时需清理本地文件和状态标记

性能优化技巧

1. 分批获取策略

kotlin
// 限制每次轮询获取的文件数量
source.maxFetchSize = 50

2. 目录扫描优化

kotlin
source.scanner = object : DirectoryScanner() {
    override fun listFiles(dir: File): Array<File> {
        // 自定义扫描逻辑
        return dir.listFiles { file ->
            file.lastModified() > System.currentTimeMillis() - 60000
        } ?: emptyArray()
    }
}

3. 集群环境配置

kotlin
@Bean
fun metadataStore(): ConcurrentMetadataStore {
    // 使用Redis共享状态
    return RedisMetadataStore(redisConnectionFactory)
}

IMPORTANT

性能关键指标

  • 轮询间隔:根据业务需求平衡实时性与负载
  • 最大获取量:防止单次轮询处理过多文件
  • 过滤效率:复杂过滤器可能影响性能

常见问题解决方案

问题1:文件重复处理

解决方案

kotlin
// 使用持久化过滤器
.filter(SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileFilter"))

问题2:不完整文件处理

解决方案

kotlin
// 设置临时文件后缀
.temporaryFileSuffix(".tmp")

问题3:权限问题

解决方案

kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<LsEntry> {
    return DefaultSftpSessionFactory().apply {
        host = "sftp.example.com"
        port = 22
        user = "user"
        // 使用密钥认证更安全
        privateKey = File("path/to/key")
        privateKeyPassphrase = "passphrase"
        allowUnknownKeys = true
    }
}

问题4:连接不稳定

解决方案

kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<LsEntry> {
    val factory = DefaultSftpSessionFactory()
    // 配置重试机制
    factory.setSessionConfig(mapOf(
        "StrictHostKeyChecking" to "no",
        "MaxAttempts" to 5,
        "RetryInterval" to 5000
    ))
    return CachingSessionFactory(factory)
}

总结

SFTP入站通道适配器提供了强大的文件获取能力,关键要点包括:

  1. 智能过滤:使用复合过滤器精确控制文件选择
  2. 可靠传输:通过事务支持确保数据处理完整性
  3. 弹性设计:内置错误恢复和重试机制
  4. 扩展能力:支持自定义逻辑注入

最佳实践清单

  1. [✅] 使用持久化过滤器防止重复处理
  2. [✅] 配置事务回滚清理机制
  3. [✅] 设置合理的轮询间隔和获取量
  4. [✅] 实施完整的错误监控和报警
  5. [✅] 定期审计文件处理状态

通过合理配置和遵循最佳实践,您可以构建稳定高效的SFTP文件处理系统,满足各类企业级文件传输需求。