Appearance
Spring Integration SFTP 网关操作全解
概述
SFTP Outbound Gateway 是 Spring Integration 提供的关键组件,用于与远程 SFTP 服务器进行双向交互。它支持多种文件操作命令,让开发者能像操作本地文件一样管理远程文件。本教程将深入解析各命令用法,并提供现代化 Kotlin 实现方案。
核心命令详解
1. 文件列表命令
ls
- 详细文件列表
kotlin
@Bean
@ServiceActivator(inputChannel = "lsChannel")
fun sftpLsGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "ls", "payload").apply {
// 命令选项:递归列出+包含隐藏文件+不排序
setCommandOptions("-R -a -f")
setFilter(SftpRegexPatternFileListFilter(".*\\.txt"))
}
}
命令选项说明:
-1
→ 仅返回文件名 (List<String>
)-a
→ 包含隐藏文件 (默认排除)-R
→ 递归列出子目录-dirs
→ 包含目录 (默认排除)
TIP
使用 -R
递归时,返回的文件路径是相对路径。建议配合 -dirs
选项区分文件/目录类型
nlst
- 简单文件名列表
kotlin
@Bean
@ServiceActivator(inputChannel = "nlstChannel")
fun sftpNlstGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "nlst", "payload")
}
等效于 ls -1
,返回 List<String>
类型文件名列表
2. 文件获取命令
get
- 单文件下载
kotlin
@Bean
@ServiceActivator(inputChannel = "getChannel")
fun sftpGetGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "get", "payload").apply {
setCommandOptions("-P -D") // 保留时间戳+传输后删除
setLocalDirectoryExpression("'./downloads/' + #remoteDirectory")
}
}
关键选项:
-P
→ 保留文件时间戳-stream
→ 返回InputStream
(非File对象)-D
→ 传输成功后删除远程文件
WARNING
使用 -stream
时需手动关闭会话:
kotlin
@ServiceActivator(inputChannel = "closeChannel")
fun closeSession(message: Message<*>) {
message.headers["closeableResource"]?.close()
}
mget
- 多文件下载
kotlin
@Bean
@ServiceActivator(inputChannel = "mgetChannel")
fun sftpMgetGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "mget", "payload").apply {
setCommandOptions("-R -x") // 递归+无匹配时抛异常
setFilter(SftpRegexPatternFileListFilter(".*\\.csv").apply {
alwaysAcceptDirectories = true // 关键:允许递归目录
})
setLocalFilenameGeneratorExpression("#remoteFileName.toUpperCase()")
}
}
递归下载最佳实践
kotlin
// 创建支持递归的过滤器
val filter = SftpRegexPatternFileListFilter(".*\\.(csv|txt)").apply {
alwaysAcceptDirectories = true
forRecursion = true // 启用递归元数据存储
}
// 配置网关
SftpOutboundGateway(sessionFactory, "mget", "sftpFiles/*").apply {
setCommandOptions("-R")
setFilter(filter)
setLocalDirectoryExpression("'./local/' + #remoteDirectory")
}
3. 文件上传命令
put
- 单文件上传
kotlin
@Bean
@ServiceActivator(inputChannel = "putChannel")
fun sftpPutGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "put", "payload").apply {
setRemoteDirectoryExpression("headers['remoteDir']")
setTemporaryRemoteDirectoryExpression("headers['tempDir']")
setChmod(0x600) // 设置文件权限: rw-------
}
}
支持上传类型:
java.io.File
ByteArray
String
(文本内容)
mput
- 多文件上传
kotlin
@Bean
@ServiceActivator(inputChannel = "mputChannel")
fun sftpMputGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "mput", "payload").apply {
setCommandOptions("-R") // 递归上传
setMputFilter(SftpSimplePatternFileListFilter("*.pdf"))
setChmod(0o644) // 八进制权限表示
}
}
IMPORTANT
当消息负载为集合时,会批量上传所有文件:
kotlin
val files = listOf(File("report.pdf"), File("data.xlsx"))
messagingTemplate.convertAndSend("mputChannel", files)
4. 文件管理命令
rm
- 删除文件
kotlin
@Bean
@ServiceActivator(inputChannel = "rmChannel")
fun sftpRmGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "rm", "payload")
}
返回值:成功时 true
,失败时 false
mv
- 移动/重命名
kotlin
@Bean
@ServiceActivator(inputChannel = "mvChannel")
fun sftpMvGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>): MessageHandler {
return SftpOutboundGateway(sessionFactory, "mv", "payload").apply {
setRenameExpression("headers['newPath']") // 新路径表达式
}
}
kotlin
// 使用示例
val headers = mapOf("newPath" to "/backup/${file.name}")
messagingTemplate.send(MessageBuilder.createMessage(file, headers))
高级配置技巧
注解驱动配置
kotlin
@Configuration
@EnableIntegration
class SftpConfig {
@Bean
fun sftpSessionFactory(): SessionFactory<SftpClient.DirEntry> {
return DefaultSftpSessionFactory().apply {
host = "sftp.example.com"
port = 22
user = "admin"
password = "pass@123"
isTestSession = true
}
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
fun sftpGateway(sessionFactory: SessionFactory<SftpClient.DirEntry>) =
SftpOutboundGateway(sessionFactory, "ls", "'/reports/'").apply {
commandOptions = "-1 -R"
}
}
Kotlin DSL 配置
kotlin
@Bean
fun sftpFlow(sessionFactory: SessionFactory<SftpClient.DirEntry>): IntegrationFlow {
return IntegrationFlow.from("sftpInputChannel")
.handle(
Sftp.outboundGateway(sessionFactory, Command.MGET, "payload")
.options(Option.RECURSIVE)
.regexFileNameFilter("(invoices/.*\\.pdf)")
.localDirectoryExpression("'./archive/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replace('temp_', '')")
)
.handle { payload, _ ->
logger.info("下载文件数: ${(payload as List<*>).size}")
}
.get()
}
异常处理策略
部分成功处理
当 mget/mput
部分操作失败时,会抛出 PartialSuccessException
:
kotlin
@ServiceActivator(inputChannel = "errorChannel")
fun handlePartialFailure(ex: MessagingException) {
when (val cause = ex.cause) {
is PartialSuccessException -> {
logger.error("部分操作失败!")
logger.info("成功项: ${cause.partialResults}")
logger.info("原始文件列表: ${cause.derivedInput}")
}
}
}
错误处理最佳实践
常见问题排查
问题现象 | 可能原因 | 解决方案 |
---|---|---|
连接超时 | 网络问题/防火墙 | 检查端口22是否开放 |
认证失败 | 密码错误/密钥不匹配 | 验证凭证或使用交互式登录测试 |
文件传输中断 | 会话超时 | 增加 sessionTimeout 配置 |
递归操作卡死 | 目录循环链接 | 使用 -max-depth 限制层级 |
权限不足 | 服务器ACL限制 | 检查目标目录写权限 |
CAUTION
重要安全提示:生产环境请勿硬编码凭证!使用Spring Cloud Config或Kubernetes Secrets管理敏感信息
最佳实践总结
连接管理:使用
CachingSessionFactory
复用连接kotlin@Bean fun sftpSessionFactory() = CachingSessionFactory(DefaultSftpSessionFactory().apply { // 配置基础参数 }, 10) // 最大10个会话
目录遍历防护:递归操作时设置深度限制
kotlinsetFilter(SftpRegexPatternFileListFilter(".*").apply { maxDepth = 5 // 限制递归深度 })
传输校验:大文件使用校验和验证
kotlin.handle(Ftp.outboundGateway(...)) .handle(File.checksum()) // 添加校验步骤
监控集成:暴露操作指标
kotlin@Bean fun sftpMetrics(registry: MeterRegistry) = IntegrationMetrics(registry)
::: success ✅ 关键收获
- 优先使用
mget/mput
进行批量操作,减少连接开销 - 流式传输(
-stream
)适合处理大文件,避免内存溢出 - 递归操作配合
alwaysAcceptDirectories=true
确保完整遍历 - 生产环境务必实现
PartialSuccessException
处理逻辑 :::
通过本教程,您应已掌握SFTP网关的核心操作。在实际应用中,请根据业务需求选择合适的命令和配置策略。