Skip to content

Spring Integration SFTP 网关操作全解

概述

SFTP Outbound Gateway 是 Spring Integration 提供的关键组件,用于与远程 SFTP 服务器进行双向交互。它支持多种文件操作命令,让开发者能像操作本地文件一样管理远程文件。本教程将深入解析各命令用法,并提供现代化 Kotlin 实现方案。

核心命令详解

1. 文件列表命令

ls - 详细文件列表

kotlin
@Bean
@ServiceActivator(inputChannel = "lsChannel")
fun sftpLsGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "ls", "payload").apply {
        // 命令选项:递归列出+包含隐藏文件+不排序
        setCommandOptions("-R -a -f") 
        setFilter(SftpRegexPatternFileListFilter(".*\\.txt"))
    }
}

命令选项说明:

  • -1 → 仅返回文件名 (List<String>)
  • -a → 包含隐藏文件 (默认排除)
  • -R递归列出子目录
  • -dirs → 包含目录 (默认排除)

TIP

使用 -R 递归时,返回的文件路径是相对路径。建议配合 -dirs 选项区分文件/目录类型

nlst - 简单文件名列表

kotlin
@Bean
@ServiceActivator(inputChannel = "nlstChannel")
fun sftpNlstGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "nlst", "payload")
}

等效于 ls -1,返回 List<String> 类型文件名列表

2. 文件获取命令

get - 单文件下载

kotlin
@Bean
@ServiceActivator(inputChannel = "getChannel")
fun sftpGetGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "get", "payload").apply {
        setCommandOptions("-P -D") // 保留时间戳+传输后删除
        setLocalDirectoryExpression("'./downloads/' + #remoteDirectory")
    }
}

关键选项:

  • -P → 保留文件时间戳
  • -stream → 返回 InputStream (非File对象)
  • -D → 传输成功后删除远程文件

WARNING

使用 -stream 时需手动关闭会话:

kotlin
@ServiceActivator(inputChannel = "closeChannel")
fun closeSession(message: Message<*>) {
    message.headers["closeableResource"]?.close()
}

mget - 多文件下载

kotlin
@Bean
@ServiceActivator(inputChannel = "mgetChannel")
fun sftpMgetGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "mget", "payload").apply {
        setCommandOptions("-R -x") // 递归+无匹配时抛异常
        setFilter(SftpRegexPatternFileListFilter(".*\\.csv").apply {
            alwaysAcceptDirectories = true // 关键:允许递归目录
        })
        setLocalFilenameGeneratorExpression("#remoteFileName.toUpperCase()")
    }
}
递归下载最佳实践
kotlin
// 创建支持递归的过滤器
val filter = SftpRegexPatternFileListFilter(".*\\.(csv|txt)").apply {
    alwaysAcceptDirectories = true
    forRecursion = true // 启用递归元数据存储
}

// 配置网关
SftpOutboundGateway(sessionFactory, "mget", "sftpFiles/*").apply {
    setCommandOptions("-R")
    setFilter(filter)
    setLocalDirectoryExpression("'./local/' + #remoteDirectory")
}

3. 文件上传命令

put - 单文件上传

kotlin
@Bean
@ServiceActivator(inputChannel = "putChannel")
fun sftpPutGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "put", "payload").apply {
        setRemoteDirectoryExpression("headers['remoteDir']")
        setTemporaryRemoteDirectoryExpression("headers['tempDir']")
        setChmod(0x600) // 设置文件权限: rw-------
    }
}

支持上传类型:

  • java.io.File
  • ByteArray
  • String (文本内容)

mput - 多文件上传

kotlin
@Bean
@ServiceActivator(inputChannel = "mputChannel")
fun sftpMputGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "mput", "payload").apply {
        setCommandOptions("-R") // 递归上传
        setMputFilter(SftpSimplePatternFileListFilter("*.pdf"))
        setChmod(0o644) // 八进制权限表示
    }
}

IMPORTANT

当消息负载为集合时,会批量上传所有文件:

kotlin
val files = listOf(File("report.pdf"), File("data.xlsx"))
messagingTemplate.convertAndSend("mputChannel", files)

4. 文件管理命令

rm - 删除文件

kotlin
@Bean
@ServiceActivator(inputChannel = "rmChannel")
fun sftpRmGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "rm", "payload")
}

返回值:成功时 true,失败时 false

mv - 移动/重命名

kotlin
@Bean
@ServiceActivator(inputChannel = "mvChannel")
fun sftpMvGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpOutboundGateway(sessionFactory, "mv", "payload").apply {
        setRenameExpression("headers['newPath']") // 新路径表达式
    }
}
kotlin
// 使用示例
val headers = mapOf("newPath" to "/backup/${file.name}")
messagingTemplate.send(MessageBuilder.createMessage(file, headers))

高级配置技巧

注解驱动配置

kotlin
@Configuration
@EnableIntegration
class SftpConfig {

    @Bean
    fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
        return DefaultSftpSessionFactory().apply {
            host = "sftp.example.com"
            port = 22
            user = "admin"
            password = "pass@123"
            isTestSession = true
        }
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    fun sftpGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>) = 
        SftpOutboundGateway(sessionFactory, "ls", "'/reports/'").apply {
            commandOptions = "-1 -R"
        }
}

Kotlin DSL 配置

kotlin
@Bean
fun sftpFlow(sessionFactory: SessionFactory<SftpClient.DirEntry>): IntegrationFlow {
    return IntegrationFlow.from("sftpInputChannel")
        .handle(
            Sftp.outboundGateway(sessionFactory, Command.MGET, "payload")
                .options(Option.RECURSIVE)
                .regexFileNameFilter("(invoices/.*\\.pdf)")
                .localDirectoryExpression("'./archive/' + #remoteDirectory")
                .localFilenameExpression("#remoteFileName.replace('temp_', '')")
        )
        .handle { payload, _ -> 
            logger.info("下载文件数: ${(payload as List<*>).size}")
        }
        .get()
}

异常处理策略

部分成功处理

mget/mput 部分操作失败时,会抛出 PartialSuccessException

kotlin
@ServiceActivator(inputChannel = "errorChannel")
fun handlePartialFailure(ex: MessagingException) {
    when (val cause = ex.cause) {
        is PartialSuccessException -> {
            logger.error("部分操作失败!")
            logger.info("成功项: ${cause.partialResults}")
            logger.info("原始文件列表: ${cause.derivedInput}")
        }
    }
}

错误处理最佳实践

常见问题排查

问题现象可能原因解决方案
连接超时网络问题/防火墙检查端口22是否开放
认证失败密码错误/密钥不匹配验证凭证或使用交互式登录测试
文件传输中断会话超时增加 sessionTimeout 配置
递归操作卡死目录循环链接使用 -max-depth 限制层级
权限不足服务器ACL限制检查目标目录写权限

CAUTION

重要安全提示:生产环境请勿硬编码凭证!使用Spring Cloud Config或Kubernetes Secrets管理敏感信息

最佳实践总结

  1. 连接管理:使用 CachingSessionFactory 复用连接

    kotlin
    @Bean
    fun sftpSessionFactory() = CachingSessionFactory(DefaultSftpSessionFactory().apply {
        // 配置基础参数
    }, 10) // 最大10个会话
  2. 目录遍历防护:递归操作时设置深度限制

    kotlin
    setFilter(SftpRegexPatternFileListFilter(".*").apply {
        maxDepth = 5 // 限制递归深度
    })
  3. 传输校验:大文件使用校验和验证

    kotlin
    .handle(Ftp.outboundGateway(...))
    .handle(File.checksum()) // 添加校验步骤
  4. 监控集成:暴露操作指标

    kotlin
    @Bean
    fun sftpMetrics(registry: MeterRegistry) = IntegrationMetrics(registry)

::: success ✅ 关键收获

  • 优先使用 mget/mput 进行批量操作,减少连接开销
  • 流式传输(-stream)适合处理大文件,避免内存溢出
  • 递归操作配合 alwaysAcceptDirectories=true 确保完整遍历
  • 生产环境务必实现 PartialSuccessException 处理逻辑 :::

通过本教程,您应已掌握SFTP网关的核心操作。在实际应用中,请根据业务需求选择合适的命令和配置策略。