Appearance
SFTP入站通道适配器全面指南
概述
SFTP入站通道适配器是一个监听器组件,它连接到SFTP服务器并监听远程目录事件(如新文件创建)。当检测到事件时,它会自动触发文件传输。这种机制非常适合实现自动化文件处理系统,如日志收集、数据导入等场景。
基础配置
Kotlin DSL配置示例
kotlin
@Bean
fun sftpInboundFlow() = integrationFlow(
Sftp.inboundAdapter(sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("/remote/files")
.regexFilter(".*\\.txt$")
.localFilename { it.toUpperCase() + ".processed" }
.localDirectory(File("local/storage")),
{ poller { fixedDelay(5000) } }
) {
handle { message ->
println("收到文件: ${message.payload}")
// 处理文件内容
}
}
关键配置属性详解
属性 | 说明 | 默认值 |
---|---|---|
remoteDirectory | SFTP服务器上的源目录 | 必填 |
localDirectory | 本地存储目录 | 必填 |
filenamePattern | 文件匹配模式(如*.txt) | * |
preserveTimestamp | 保留文件时间戳 | false |
autoCreateLocalDirectory | 自动创建本地目录 | false |
deleteRemoteFiles | 传输后删除远程文件 | false |
TIP
最佳实践建议:
- 始终设置
autoCreateLocalDirectory=true
避免目录不存在错误 - 生产环境中设置
preserveTimestamp=true
便于审计追踪 - 使用
localFilename
表达式自定义本地文件名增强可读性
高级文件过滤策略
1. 基础过滤方式
kotlin
// 基于文件扩展名过滤
.filter(SftpSimplePatternFileListFilter("*.csv"))
// 使用正则表达式过滤
.filter(SftpRegexPatternFileListFilter(".*_\\d{8}\\.txt"))
2. 复合过滤器
kotlin
val compositeFilter = CompositeFileListFilter<ChannelSftp.LsEntry>().apply {
addFilter(SftpSimplePatternFileListFilter("*.log")) // 基本模式
addFilter(SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileFilter")) // 持久化
addFilter(SftpLastModifiedFileListFilter(Duration.ofMinutes(5))) // 时间过滤
}
3. 处理不完整文件
kotlin
// 只处理带有.done标记的完整文件
.filter(SftpSystemMarkerFilePresentFileListFilter(".done"))
CAUTION
重要注意事项:
AcceptOnceFileListFilter
仅内存有效,重启后状态丢失- 集群环境中必须使用
SftpPersistentAcceptOnceFileListFilter
- 大文件传输需设置足够长的
SftpLastModifiedFileListFilter
时间
大文件处理与错误恢复
事务性处理配置
kotlin
@Bean
fun sftpInboundAdapter(): MessageSource<File> {
val source = SftpInboundFileSynchronizingMessageSource(synchronizer())
source.setLocalDirectory(File("local/dir"))
source.setAutoCreateLocalDirectory(true)
// 设置可重置过滤器
source.setLocalFilter(AcceptOnceFileListFilter<File>().apply {
setSupportReset(true)
})
return source
}
@Bean
fun transactionSynchronizationFactory(): TransactionSynchronizationFactory {
return DefaultTransactionSynchronizationFactory().apply {
afterRollback { message ->
// 事务回滚时删除本地文件
(message.payload as File).delete()
}
}
}
错误处理机制
kotlin
.handle({ message ->
try {
processFile(message.payload as File)
} catch (e: Exception) {
// 自定义错误处理逻辑
errorHandler.recover(message)
throw e
}
}, { it.advice(recoveryAdvice()) })
事务支持配置
完整事务配置示例
kotlin
@Bean
fun sftpFlow() = integrationFlow(
Sftp.inboundAdapter(sessionFactory)
.remoteDirectory("/sftp")
.localDirectory(File("local"))
.filter(filter()),
{
poller {
fixedRate(1000)
transactional {
synchronizationFactory(transactionSynchronizationFactory())
}
}
}
) {
// ...处理逻辑
}
@Bean
fun transactionSynchronizationFactory() = DefaultTransactionSynchronizationFactory().apply {
afterRollback { (it.payload as File).delete() } // 回滚时清理
}
@Bean
fun filter() = SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileKey")
事务设计要点
- 幂等性处理:确保同一文件重复处理不会产生副作用
- 状态管理:使用持久化存储记录处理状态
- 补偿机制:回滚时需清理本地文件和状态标记
性能优化技巧
1. 分批获取策略
kotlin
// 限制每次轮询获取的文件数量
source.maxFetchSize = 50
2. 目录扫描优化
kotlin
source.scanner = object : DirectoryScanner() {
override fun listFiles(dir: File): Array<File> {
// 自定义扫描逻辑
return dir.listFiles { file ->
file.lastModified() > System.currentTimeMillis() - 60000
} ?: emptyArray()
}
}
3. 集群环境配置
kotlin
@Bean
fun metadataStore(): ConcurrentMetadataStore {
// 使用Redis共享状态
return RedisMetadataStore(redisConnectionFactory)
}
IMPORTANT
性能关键指标:
- 轮询间隔:根据业务需求平衡实时性与负载
- 最大获取量:防止单次轮询处理过多文件
- 过滤效率:复杂过滤器可能影响性能
常见问题解决方案
问题1:文件重复处理
解决方案:
kotlin
// 使用持久化过滤器
.filter(SftpPersistentAcceptOnceFileListFilter(metadataStore(), "fileFilter"))
问题2:不完整文件处理
解决方案:
kotlin
// 设置临时文件后缀
.temporaryFileSuffix(".tmp")
问题3:权限问题
解决方案:
kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<LsEntry> {
return DefaultSftpSessionFactory().apply {
host = "sftp.example.com"
port = 22
user = "user"
// 使用密钥认证更安全
privateKey = File("path/to/key")
privateKeyPassphrase = "passphrase"
allowUnknownKeys = true
}
}
问题4:连接不稳定
解决方案:
kotlin
@Bean
fun sftpSessionFactory(): SessionFactory<LsEntry> {
val factory = DefaultSftpSessionFactory()
// 配置重试机制
factory.setSessionConfig(mapOf(
"StrictHostKeyChecking" to "no",
"MaxAttempts" to 5,
"RetryInterval" to 5000
))
return CachingSessionFactory(factory)
}
总结
SFTP入站通道适配器提供了强大的文件获取能力,关键要点包括:
- 智能过滤:使用复合过滤器精确控制文件选择
- 可靠传输:通过事务支持确保数据处理完整性
- 弹性设计:内置错误恢复和重试机制
- 扩展能力:支持自定义逻辑注入
最佳实践清单
- [✅] 使用持久化过滤器防止重复处理
- [✅] 配置事务回滚清理机制
- [✅] 设置合理的轮询间隔和获取量
- [✅] 实施完整的错误监控和报警
- [✅] 定期审计文件处理状态
通过合理配置和遵循最佳实践,您可以构建稳定高效的SFTP文件处理系统,满足各类企业级文件传输需求。