Skip to content

Spring Integration SFTP 远程文件信息详解

⚠️ 注意:本文适用于 Spring Integration 5.2+ 版本

目录

一、远程文件信息概述

在分布式系统中,获取远程文件的完整信息至关重要。Spring Integration 5.2+ 为SFTP组件新增了远程文件元数据支持,让开发者能够:

支持的组件:

  1. SftpStreamingMessageSource - 流式入站通道适配器
  2. SftpInboundFileSynchronizingMessageSource - 入站通道适配器
  3. SftpOutboundGateway 的读取命令 - 出站网关

TIP

这些元数据对文件审计错误追踪分布式事务管理特别有用

二、新增消息头详解

1. 核心消息头

消息头常量描述适用组件
FileHeaders.REMOTE_HOST_PORT远程主机端口 (host:port)所有组件
FileHeaders.REMOTE_DIRECTORY远程目录路径所有组件
FileHeaders.REMOTE_FILE远程文件名单文件操作

2. 消息头使用示例

kotlin
@ServiceActivator
fun processFile(message: Message<File>) {
    val headers = message.headers

    // 获取远程文件信息
    val hostPort = headers[FileHeaders.REMOTE_HOST_PORT] as String? 
    val directory = headers[FileHeaders.REMOTE_DIRECTORY] as String? 
    val filename = headers[FileHeaders.REMOTE_FILE] as String? 

    logger.info("从 $hostPort 接收文件: $directory/$filename")
}

IMPORTANT

REMOTE_FILE 仅适用于单文件操作,批量操作中不可用

三、同步适配器的特殊处理

SftpInboundFileSynchronizingMessageSource 的处理流程与其他适配器不同:

关键配置点

kotlin
@Bean
fun sftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(
        Sftp.inboundAdapter(sftpSessionFactory())
        .remoteDirectory("/remote/files")
        .localDirectory(File("/local"))
        .metadataStore(metadataStore()) 
        .metadataStorePrefix("sync_")   
        .get()
    )
    .handle { message -> /* 处理逻辑 */ }
    .get()
}

元数据管理最佳实践

关键注意事项

  1. 本地文件删除后必须清除元数据

    kotlin
    @ServiceActivator
    fun deleteFile(file: File, synchronizer: SftpInboundFileSynchronizer) {
        file.delete()
        synchronizer.removeRemoteFileMetadata(file.name) 
    }
  2. 避免键冲突

    • 同步器使用 setMetadataStorePrefix("sync_")
    • 文件过滤器使用不同前缀(如 filter_
    kotlin
    @Bean
    fun metadataStore(): ConcurrentMetadataStore {
        // 使用相同的存储实例但不同前缀
        return ConcurrentMetadataStore(RedisConnectionFactory())
    }
    
    @Bean
    fun filter(): FileListFilter<File> {
        return SftpPersistentAcceptOnceFileListFilter(metadataStore(), "filter_") 
    }

四、完整配置示例

1. SFTP 流式入站适配器

kotlin
@Configuration
class SftpConfig {

    @Bean
    fun sftpSessionFactory(): SessionFactory<LsEntry> {
        return DefaultSftpSessionFactory().apply {
            host = "sftp.example.com"
            port = 22
            user = "user"
            password = "pass"
        }
    }

    @Bean
    fun sftpStreamingFlow(): IntegrationFlow {
        return IntegrationFlows.from(
            Sftp.inboundStreamingAdapter(sftpSessionFactory())
                .remoteDirectory("/remote")
                .remoteFileSeparator("/")
        )
        .enrichHeaders { it.header(FileHeaders.REMOTE_FILE, "payload.name") }
        .handle { payload, headers ->
            processStream(payload.inputStream, headers)
        }
        .get()
    }

    fun processStream(stream: InputStream, headers: Map<String, Any>) {
        // 处理流数据并访问元数据头
    }
}
kotlin
fun processFile(stream: InputStream, headers: MessageHeaders) {
    val host = headers[FileHeaders.REMOTE_HOST_PORT] ftp.example.com:22
    val file = headers[FileHeaders.REMOTE_FILE]     xample.txt
    val dir = headers[FileHeaders.REMOTE_DIRECTORY] // /remote

    // 使用元数据记录审计日志
    auditService.logAccess(host, "$dir/$file")
}

2. SFTP 出站网关(读取文件)

kotlin
@Bean
fun sftpOutboundGateway(sessionFactory: SessionFactory<LsEntry>): IntegrationFlow {
    return IntegrationFlows.from("sftpGetChannel")
        .handle(
            Sftp.outboundGateway(sessionFactory, Command.GET, "payload")
                .options(Option.STREAM)
        )
        .enrichHeaders { 
            it.header(FileHeaders.REMOTE_HOST_PORT, true)
              .header(FileHeaders.REMOTE_DIRECTORY, true)
        }
        .channel("processedFiles")
        .get()
}

@ServiceActivator(inputChannel = "processedFiles")
fun processFile(message: Message<InputStream>) {
    val metadata = mapOf(
        "host" to message.headers[FileHeaders.REMOTE_HOST_PORT],
        "path" to "${message.headers[FileHeaders.REMOTE_DIRECTORY]}/${
                    message.headers[FileHeaders.REMOTE_FILE]}"
    )
    fileService.process(message.payload, metadata)
}

五、常见问题解决

问题1:元数据头缺失

WARNING

症状:消息头中缺少 REMOTE_* 头信息
原因

  • 使用旧版本(<5.2)
  • 未正确配置适配器

解决方案

  1. 升级到 Spring Integration 5.2+
  2. 检查适配器配置:
kotlin
Sftp.inboundAdapter(sessionFactory)
   .preserveTimestamp(true) // 确保使用新特性
   .remoteDirectory("/remote")

问题2:元数据存储冲突

CAUTION

症状:文件过滤器覆盖同步器的元数据
解决方案

kotlin
@Bean
fun synchronizer(): SftpInboundFileSynchronizer {
    return SftpInboundFileSynchronizer(sftpSessionFactory()).apply {
        setMetadataStorePrefix("sync_") 
    }
}

@Bean
fun filter(): FileListFilter<File> {
    return SftpPersistentAcceptOnceFileListFilter(metadataStore(), "filter_") 
}

问题3:批量操作获取不到文件名

NOTE

原因REMOTE_FILE 只适用于单文件操作
替代方案

kotlin
// 使用出站网关的LS命令
val files = sftpGateway.ls("/remote/dir")
files.forEach {
    val fileInfo = it as LsEntry
    println("文件名: ${fileInfo.filename}")
}

最佳实践总结

  1. 优先使用注解配置:避免 XML,采用 Kotlin DSL
  2. 始终清理元数据:文件删除后调用 removeRemoteFileMetadata()
  3. 前缀隔离:不同组件使用不同的 metadataStorePrefix
  4. 审计日志:关键操作记录远程文件信息
  5. 版本检查:确保使用 Spring Integration 5.2+
kotlin
// 安全删除文件示例
fun safeDelete(file: File, synchronizer: SftpInboundFileSynchronizer) {
    try {
        if (file.delete()) {
            synchronizer.removeRemoteFileMetadata(file.name) 
        }
    } catch (e: Exception) {
        logger.error("删除文件元数据失败: ${file.name}", e)
    }
}

✅ 正确使用远程文件信息可以显著提升系统的可观测性故障排查效率