Appearance
Spring Integration SFTP 远程文件信息详解
⚠️ 注意:本文适用于 Spring Integration 5.2+ 版本
目录
一、远程文件信息概述
在分布式系统中,获取远程文件的完整信息至关重要。Spring Integration 5.2+ 为SFTP组件新增了远程文件元数据支持,让开发者能够:
支持的组件:
SftpStreamingMessageSource
- 流式入站通道适配器SftpInboundFileSynchronizingMessageSource
- 入站通道适配器SftpOutboundGateway
的读取命令 - 出站网关
TIP
这些元数据对文件审计、错误追踪和分布式事务管理特别有用
二、新增消息头详解
1. 核心消息头
消息头常量 | 描述 | 适用组件 |
---|---|---|
FileHeaders.REMOTE_HOST_PORT | 远程主机端口 (host:port) | 所有组件 |
FileHeaders.REMOTE_DIRECTORY | 远程目录路径 | 所有组件 |
FileHeaders.REMOTE_FILE | 远程文件名 | 单文件操作 |
2. 消息头使用示例
kotlin
@ServiceActivator
fun processFile(message: Message<File>) {
val headers = message.headers
// 获取远程文件信息
val hostPort = headers[FileHeaders.REMOTE_HOST_PORT] as String?
val directory = headers[FileHeaders.REMOTE_DIRECTORY] as String?
val filename = headers[FileHeaders.REMOTE_FILE] as String?
logger.info("从 $hostPort 接收文件: $directory/$filename")
}
IMPORTANT
REMOTE_FILE
仅适用于单文件操作,批量操作中不可用
三、同步适配器的特殊处理
SftpInboundFileSynchronizingMessageSource
的处理流程与其他适配器不同:
关键配置点
kotlin
@Bean
fun sftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(
Sftp.inboundAdapter(sftpSessionFactory())
.remoteDirectory("/remote/files")
.localDirectory(File("/local"))
.metadataStore(metadataStore())
.metadataStorePrefix("sync_")
.get()
)
.handle { message -> /* 处理逻辑 */ }
.get()
}
元数据管理最佳实践
关键注意事项
本地文件删除后必须清除元数据:
kotlin@ServiceActivator fun deleteFile(file: File, synchronizer: SftpInboundFileSynchronizer) { file.delete() synchronizer.removeRemoteFileMetadata(file.name) }
避免键冲突:
- 同步器使用
setMetadataStorePrefix("sync_")
- 文件过滤器使用不同前缀(如
filter_
)
kotlin@Bean fun metadataStore(): ConcurrentMetadataStore { // 使用相同的存储实例但不同前缀 return ConcurrentMetadataStore(RedisConnectionFactory()) } @Bean fun filter(): FileListFilter<File> { return SftpPersistentAcceptOnceFileListFilter(metadataStore(), "filter_") }
- 同步器使用
四、完整配置示例
1. SFTP 流式入站适配器
kotlin
@Configuration
class SftpConfig {
@Bean
fun sftpSessionFactory(): SessionFactory<LsEntry> {
return DefaultSftpSessionFactory().apply {
host = "sftp.example.com"
port = 22
user = "user"
password = "pass"
}
}
@Bean
fun sftpStreamingFlow(): IntegrationFlow {
return IntegrationFlows.from(
Sftp.inboundStreamingAdapter(sftpSessionFactory())
.remoteDirectory("/remote")
.remoteFileSeparator("/")
)
.enrichHeaders { it.header(FileHeaders.REMOTE_FILE, "payload.name") }
.handle { payload, headers ->
processStream(payload.inputStream, headers)
}
.get()
}
fun processStream(stream: InputStream, headers: Map<String, Any>) {
// 处理流数据并访问元数据头
}
}
kotlin
fun processFile(stream: InputStream, headers: MessageHeaders) {
val host = headers[FileHeaders.REMOTE_HOST_PORT] ftp.example.com:22
val file = headers[FileHeaders.REMOTE_FILE] xample.txt
val dir = headers[FileHeaders.REMOTE_DIRECTORY] // /remote
// 使用元数据记录审计日志
auditService.logAccess(host, "$dir/$file")
}
2. SFTP 出站网关(读取文件)
kotlin
@Bean
fun sftpOutboundGateway(sessionFactory: SessionFactory<LsEntry>): IntegrationFlow {
return IntegrationFlows.from("sftpGetChannel")
.handle(
Sftp.outboundGateway(sessionFactory, Command.GET, "payload")
.options(Option.STREAM)
)
.enrichHeaders {
it.header(FileHeaders.REMOTE_HOST_PORT, true)
.header(FileHeaders.REMOTE_DIRECTORY, true)
}
.channel("processedFiles")
.get()
}
@ServiceActivator(inputChannel = "processedFiles")
fun processFile(message: Message<InputStream>) {
val metadata = mapOf(
"host" to message.headers[FileHeaders.REMOTE_HOST_PORT],
"path" to "${message.headers[FileHeaders.REMOTE_DIRECTORY]}/${
message.headers[FileHeaders.REMOTE_FILE]}"
)
fileService.process(message.payload, metadata)
}
五、常见问题解决
问题1:元数据头缺失
WARNING
症状:消息头中缺少 REMOTE_*
头信息
原因:
- 使用旧版本(<5.2)
- 未正确配置适配器
解决方案:
- 升级到 Spring Integration 5.2+
- 检查适配器配置:
kotlin
Sftp.inboundAdapter(sessionFactory)
.preserveTimestamp(true) // 确保使用新特性
.remoteDirectory("/remote")
问题2:元数据存储冲突
CAUTION
症状:文件过滤器覆盖同步器的元数据
解决方案:
kotlin
@Bean
fun synchronizer(): SftpInboundFileSynchronizer {
return SftpInboundFileSynchronizer(sftpSessionFactory()).apply {
setMetadataStorePrefix("sync_")
}
}
@Bean
fun filter(): FileListFilter<File> {
return SftpPersistentAcceptOnceFileListFilter(metadataStore(), "filter_")
}
问题3:批量操作获取不到文件名
NOTE
原因:REMOTE_FILE
只适用于单文件操作
替代方案:
kotlin
// 使用出站网关的LS命令
val files = sftpGateway.ls("/remote/dir")
files.forEach {
val fileInfo = it as LsEntry
println("文件名: ${fileInfo.filename}")
}
最佳实践总结
- 优先使用注解配置:避免 XML,采用 Kotlin DSL
- 始终清理元数据:文件删除后调用
removeRemoteFileMetadata()
- 前缀隔离:不同组件使用不同的
metadataStorePrefix
- 审计日志:关键操作记录远程文件信息
- 版本检查:确保使用 Spring Integration 5.2+
kotlin
// 安全删除文件示例
fun safeDelete(file: File, synchronizer: SftpInboundFileSynchronizer) {
try {
if (file.delete()) {
synchronizer.removeRemoteFileMetadata(file.name)
}
} catch (e: Exception) {
logger.error("删除文件元数据失败: ${file.name}", e)
}
}
✅ 正确使用远程文件信息可以显著提升系统的可观测性和故障排查效率