Skip to content

Spring Integration SFTP 出站通道适配器详解

🚀 学习目标:掌握如何使用 Spring Integration SFTP 出站通道适配器高效安全地传输文件到远程服务器

一、SFTP 出站通道适配器概述

SFTP 出站通道适配器是一个特殊的 MessageHandler,它负责:

  • 连接到远程 SFTP 服务器目录
  • 将传入消息的有效负载作为文件传输
  • 支持多种文件表示形式

支持的 Payload 类型

类型说明适用场景
java.io.File文件对象本地文件传输
byte[]字节数组动态生成内容
java.lang.String文本内容文本文件传输
java.io.InputStream输入流大文件或流式处理
org.springframework.core.io.ResourceSpring资源类路径资源传输

二、核心功能详解

1. SpEL 动态表达式

使用 Spring 表达式语言动态生成远程目录和文件名:

kotlin
@Bean
fun sftpOutboundAdapter(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpMessageHandler(sessionFactory).apply {
        remoteDirectoryExpression = "headers['remoteDir']"
        fileNameGenerator = MessageFileNameGenerator().apply {
            expression = "payload.name + '-processed'"
        }
    }
}

TIP

表达式支持访问消息的 payloadheaders,实现高度动态的文件命名策略

2. 文件传输模式

通过 FileExistsMode 控制文件覆盖行为:

kotlin
enum class FileExistsMode {
    REPLACE,          // 覆盖现有文件(默认)
    REPLACE_IF_MODIFIED, // 仅当源文件较新时覆盖
    APPEND,           // 追加到现有文件
    APPEND_NO_FLUSH,   // 追加但不立即刷新
    IGNORE,            // 忽略传输(记录DEBUG日志)
    FAIL               // 抛出异常
}

WARNING

使用 APPEND 模式时需确保文件格式兼容,否则可能破坏文件结构

3. 文件权限设置

传输后自动修改远程文件权限:

kotlin
Sftp.outboundAdapter(sessionFactory)
    .chmod(0o600)  nix权限格式:rw-------

权限说明:

  • 600:所有者读写
  • 644:所有者读写,其他只读
  • 700:所有者读写执行

4. 避免部分写入文件

通过临时文件机制确保传输原子性:

配置选项:

kotlin
Sftp.outboundAdapter(sessionFactory)
    .useTemporaryFileName(true) // 默认启用
    .temporaryFileSuffix(".writing") // 自定义临时后缀

DANGER

禁用临时文件(useTemporaryFileName=false)时,应用程序需自行确保文件完整性

三、Kotlin 配置实战

1. 注解配置方式

kotlin
@Configuration
@IntegrationComponentScan
class SftpConfig {

    @Bean
    fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
        return DefaultSftpSessionFactory(true).apply {
            host = "sftp.example.com"
            port = 22
            user = "user"
            password = "pass"
            allowUnknownKeys = true
        }.let { CachingSessionFactory(it) }
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    fun sftpHandler(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
        return SftpMessageHandler(sessionFactory).apply {
            remoteDirectoryExpression = "headers['targetDir'] ?: '/uploads'"
            setFileNameGenerator { message ->
                message.headers["fileName"] as? String ?: "file-${System.currentTimeMillis()}.txt"
            }
            chmod = 0o640
            fileExistsMode = FileExistsMode.REPLACE_IF_MODIFIED
        }
    }
}

2. Kotlin DSL 配置

kotlin
@Bean
fun sftpOutboundFlow(sessionFactory: SessionFactory<SftpClient.DirEntry>) =
    integrationFlow("sftpChannel") {
        handle(Sftp.outboundAdapter(sessionFactory)
            .useTemporaryFileName(false) // [!code warning] // 禁用临时文件需确保网络稳定
            .remoteDirectory("/inbound")
            .fileNameExpression("payload.name")
            .chmodOctal("600")
            .mode(FileExistsMode.FAIL)
        )
    }

3. 网关接口示例

kotlin
@MessagingGateway
interface SftpGateway {
    @Gateway(requestChannel = "sftpChannel")
    fun uploadFile(@Payload file: File,
                  @Header("targetDir") directory: String)
}

// 使用示例
sftpGateway.uploadFile(File("data.txt"), "/client-uploads")

四、最佳实践与常见问题

1. 连接池优化

kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
    val factory = DefaultSftpSessionFactory(true).apply {
        // ... 基础配置
    }
    return CachingSessionFactory(factory).apply {
        poolSize = 10  // [!code highlight] // 连接池大小
        sessionWaitTimeout = 5000 // 等待超时(ms)
    }
}

2. 错误处理策略

kotlin
@Bean
fun sftpHandler(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
    return SftpMessageHandler(sessionFactory).apply {
        // ... 其他配置
        remoteFileTemplate.remoteFileSeparator = "/"
        remoteFileTemplate.afterSend { session, path, file ->
            if (!session.exists("$path.sent")) {
                session.mkdir("$path.sent")
            }
            session.rename(path, "$path.sent/${file.name}")
        }
    }
}

3. 常见问题解决

问题现象可能原因解决方案
连接超时网络问题/防火墙检查端口22是否开放
认证失败凭证错误/密钥不匹配验证用户名密码或密钥
权限拒绝目录不可写检查远程目录权限设置
文件不完整传输中断启用临时文件机制
文件覆盖未设置FileExistsMode明确指定REPLACE/FAIL策略

IMPORTANT

生产环境务必配置 SSH 主机密钥验证,避免中间人攻击:

kotlin
factory.setKnownHosts("classpath:known_hosts")

五、总结与进阶

SFTP 出站通道适配器提供了强大的文件传输能力,关键要点:

  • ✅ 优先使用 Kotlin DSL 配置,简洁易读
  • ✅ 启用 临时文件机制 确保传输原子性
  • ✅ 设置 chmod 权限 保证文件安全
  • ✅ 使用 SpEL 表达式 实现动态路径

下一步学习

  • 结合 Spring Integration 的 文件过滤器 实现智能传输
  • 探索 断点续传 功能处理大文件
  • 集成 Spring Cloud Stream 构建文件处理管道
kotlin
// 完整工作流示例
fun integrationFlow() = integrationFlow(File.inboundAdapter(File("local-dir"))) {
    filter<File> { it.extension == "csv" }
    transform<File, ByteArray> { it.readBytes() }
    handle(Sftp.outboundAdapter(sessionFactory)
        .remoteDirectory("/remote")
        .temporaryFileSuffix(".tmp")
    )
}