Appearance
Spring Integration SFTP 出站通道适配器详解
🚀 学习目标:掌握如何使用 Spring Integration SFTP 出站通道适配器高效安全地传输文件到远程服务器
一、SFTP 出站通道适配器概述
SFTP 出站通道适配器是一个特殊的 MessageHandler
,它负责:
- 连接到远程 SFTP 服务器目录
- 将传入消息的有效负载作为文件传输
- 支持多种文件表示形式
支持的 Payload 类型
类型 | 说明 | 适用场景 |
---|---|---|
java.io.File | 文件对象 | 本地文件传输 |
byte[] | 字节数组 | 动态生成内容 |
java.lang.String | 文本内容 | 文本文件传输 |
java.io.InputStream | 输入流 | 大文件或流式处理 |
org.springframework.core.io.Resource | Spring资源 | 类路径资源传输 |
二、核心功能详解
1. SpEL 动态表达式
使用 Spring 表达式语言动态生成远程目录和文件名:
kotlin
@Bean
fun sftpOutboundAdapter(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpMessageHandler(sessionFactory).apply {
remoteDirectoryExpression = "headers['remoteDir']"
fileNameGenerator = MessageFileNameGenerator().apply {
expression = "payload.name + '-processed'"
}
}
}
TIP
表达式支持访问消息的 payload 和 headers,实现高度动态的文件命名策略
2. 文件传输模式
通过 FileExistsMode
控制文件覆盖行为:
kotlin
enum class FileExistsMode {
REPLACE, // 覆盖现有文件(默认)
REPLACE_IF_MODIFIED, // 仅当源文件较新时覆盖
APPEND, // 追加到现有文件
APPEND_NO_FLUSH, // 追加但不立即刷新
IGNORE, // 忽略传输(记录DEBUG日志)
FAIL // 抛出异常
}
WARNING
使用 APPEND
模式时需确保文件格式兼容,否则可能破坏文件结构
3. 文件权限设置
传输后自动修改远程文件权限:
kotlin
Sftp.outboundAdapter(sessionFactory)
.chmod(0o600) nix权限格式:rw-------
权限说明:
600
:所有者读写644
:所有者读写,其他只读700
:所有者读写执行
4. 避免部分写入文件
通过临时文件机制确保传输原子性:
配置选项:
kotlin
Sftp.outboundAdapter(sessionFactory)
.useTemporaryFileName(true) // 默认启用
.temporaryFileSuffix(".writing") // 自定义临时后缀
DANGER
禁用临时文件(useTemporaryFileName=false
)时,应用程序需自行确保文件完整性
三、Kotlin 配置实战
1. 注解配置方式
kotlin
@Configuration
@IntegrationComponentScan
class SftpConfig {
@Bean
fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
return DefaultSftpSessionFactory(true).apply {
host = "sftp.example.com"
port = 22
user = "user"
password = "pass"
allowUnknownKeys = true
}.let { CachingSessionFactory(it) }
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
fun sftpHandler(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpMessageHandler(sessionFactory).apply {
remoteDirectoryExpression = "headers['targetDir'] ?: '/uploads'"
setFileNameGenerator { message ->
message.headers["fileName"] as? String ?: "file-${System.currentTimeMillis()}.txt"
}
chmod = 0o640
fileExistsMode = FileExistsMode.REPLACE_IF_MODIFIED
}
}
}
2. Kotlin DSL 配置
kotlin
@Bean
fun sftpOutboundFlow(sessionFactory: SessionFactory<SftpClient.DirEntry>) =
integrationFlow("sftpChannel") {
handle(Sftp.outboundAdapter(sessionFactory)
.useTemporaryFileName(false) // [!code warning] // 禁用临时文件需确保网络稳定
.remoteDirectory("/inbound")
.fileNameExpression("payload.name")
.chmodOctal("600")
.mode(FileExistsMode.FAIL)
)
}
3. 网关接口示例
kotlin
@MessagingGateway
interface SftpGateway {
@Gateway(requestChannel = "sftpChannel")
fun uploadFile(@Payload file: File,
@Header("targetDir") directory: String)
}
// 使用示例
sftpGateway.uploadFile(File("data.txt"), "/client-uploads")
四、最佳实践与常见问题
1. 连接池优化
kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
val factory = DefaultSftpSessionFactory(true).apply {
// ... 基础配置
}
return CachingSessionFactory(factory).apply {
poolSize = 10 // [!code highlight] // 连接池大小
sessionWaitTimeout = 5000 // 等待超时(ms)
}
}
2. 错误处理策略
kotlin
@Bean
fun sftpHandler(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpMessageHandler(sessionFactory).apply {
// ... 其他配置
remoteFileTemplate.remoteFileSeparator = "/"
remoteFileTemplate.afterSend { session, path, file ->
if (!session.exists("$path.sent")) {
session.mkdir("$path.sent")
}
session.rename(path, "$path.sent/${file.name}")
}
}
}
3. 常见问题解决
问题现象 | 可能原因 | 解决方案 |
---|---|---|
连接超时 | 网络问题/防火墙 | 检查端口22是否开放 |
认证失败 | 凭证错误/密钥不匹配 | 验证用户名密码或密钥 |
权限拒绝 | 目录不可写 | 检查远程目录权限设置 |
文件不完整 | 传输中断 | 启用临时文件机制 |
文件覆盖 | 未设置FileExistsMode | 明确指定REPLACE/FAIL策略 |
IMPORTANT
生产环境务必配置 SSH 主机密钥验证,避免中间人攻击:
kotlin
factory.setKnownHosts("classpath:known_hosts")
五、总结与进阶
SFTP 出站通道适配器提供了强大的文件传输能力,关键要点:
- ✅ 优先使用 Kotlin DSL 配置,简洁易读
- ✅ 启用 临时文件机制 确保传输原子性
- ✅ 设置 chmod 权限 保证文件安全
- ✅ 使用 SpEL 表达式 实现动态路径
下一步学习
- 结合 Spring Integration 的 文件过滤器 实现智能传输
- 探索 断点续传 功能处理大文件
- 集成 Spring Cloud Stream 构建文件处理管道
kotlin
// 完整工作流示例
fun integrationFlow() = integrationFlow(File.inboundAdapter(File("local-dir"))) {
filter<File> { it.extension == "csv" }
transform<File, ByteArray> { it.readBytes() }
handle(Sftp.outboundAdapter(sessionFactory)
.remoteDirectory("/remote")
.temporaryFileSuffix(".tmp")
)
}