Skip to content

SFTP流式入站通道适配器详解

概述

SFTP流式入站通道适配器(Streaming Inbound Channel Adapter)是Spring Integration 4.3引入的强大功能,它允许直接从SFTP服务器获取文件内容,而无需将文件写入本地文件系统。这种流式处理方式特别适合处理大文件或内存敏感的场景。

TIP

与传统SFTP适配器的关键区别:流式适配器产生的消息负载是InputStream类型,文件内容直接从SFTP服务器流式传输,避免本地磁盘IO开销。

核心机制

关键特性

  1. 资源管理:会话保持打开状态,消费应用必须关闭会话(通过IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE头)
  2. 自动关闭FileSplitterStreamTransformer等组件会自动关闭会话
  3. 避免重复:默认使用SftpPersistentAcceptOnceFileListFilter防止重复处理
  4. 头部信息
    • FileHeaders.REMOTE_DIRECTORY:远程目录
    • FileHeaders.REMOTE_FILE:文件名
    • FileHeaders.REMOTE_FILE_INFO:文件元数据(5.0+)

Kotlin配置实现

基础配置

kotlin
@Configuration
@EnableIntegration
class SftpConfig {

    FTP会话工厂
    @Bean
    fun sftpSessionFactory(): DefaultSftpSessionFactory {
        return DefaultSftpSessionFactory().apply {
            host = "sftp.example.com"
            port = 22
            user = "user"
            password = "pass"
            allowUnknownKeys = true
        }
    }

    // 远程文件模板
    @Bean
    fun sftpRemoteFileTemplate() = SftpRemoteFileTemplate(sftpSessionFactory())

    // 流式消息源
    @Bean
    @InboundChannelAdapter(channel = "sftpChannel")
    fun sftpMessageSource(): MessageSource<InputStream> {
        return SftpStreamingMessageSource(sftpRemoteFileTemplate()).apply {
            setRemoteDirectory("remote/files")
            setFilter(AcceptAllFileListFilter())
            setMaxFetchSize(1)
        }
    }

    // 轮询器
    @Bean
    fun poller() = Pollers.fixedRate(1000).get()
}

流数据处理管道

kotlin
@Bean
@Transformer(inputChannel = "sftpChannel", outputChannel = "dataChannel")
fun streamTransformer(): Transformer {
    //  // 自动关闭输入流
    return StreamTransformer("UTF-8")
}

@ServiceActivator(inputChannel = "dataChannel")
fun handleMessage(payload: String) {
    println("处理文件内容: $payload")
}

高级配置选项

文件过滤策略

kotlin
// 组合过滤器示例
val compositeFilter = CompositeFileListFilter<ChannelSftp.LsEntry>().apply {
    addFilter(SftpRegexPatternFileListFilter(".*\\.txt".toRegex()))
    addFilter(SftpPersistentAcceptOnceFileListFilter(metadataStore, "sftpAdapter"))
    addFilter(SftpSimplePatternFileListFilter("*.data"))
}

// [!code warning:3] // 注意:过滤表达式会覆盖其他过滤器
messageSource.setFilterExpression("@customFilter.filter(#root)")

集群环境优化

kotlin
messageSource.apply {
    setMaxFetchSize(1)  // [!code highlight] // 每次只取一个文件
    setFilter(
        SftpPersistentAcceptOnceFileListFilter(
            JdbcMetadataStore(dataSource),  // [!code highlight] // 使用数据库存储状态
            "sftpAdapter"
        )
    )
}

文件处理后删除

kotlin
@Bean
fun deleteAdvice(): ExpressionEvaluatingRequestHandlerAdvice {
    return ExpressionEvaluatingRequestHandlerAdvice().apply {
        setOnSuccessExpressionString(
            "@sftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])"
        )
        setPropagateEvaluationFailures(true)
    }
}

@ServiceActivator(inputChannel = "dataChannel", adviceChain = ["deleteAdvice"]) 
fun processingService() = ...

最佳实践与常见问题

注意事项

WARNING

资源泄漏风险:如果消费组件未能正确关闭输入流,会导致SFTP连接保持打开状态,最终耗尽连接池资源。请确保:

  1. 使用StreamTransformer等自动关闭资源的组件
  2. 手动处理时在finally块中调用inputStream.close()
  3. 通过message.headers[IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE]获取资源句柄

性能调优

kotlin
// 优化配置示例
messageSource.apply {
    setRemoteDirectoryExpression("'backup/' + T(java.time.LocalDate).now().toString()") // [!code highlight] // 动态目录
    setComparator(Comparator { f1, f2 -> f1.attributes.mtime.compareTo(f2.attributes.mtime) }) // 按修改时间排序
    setRemoteFileSeparator("/")
}

错误处理策略

kotlin
@Bean
fun errorChannel() = DirectChannel()

@ServiceActivator(inputChannel = "errorChannel")
fun errorHandler() = MessageHandler {
    val failedMessage = (it.payload as MessagingException).failedMessage
    println("处理失败: ${failedMessage.headers[FileHeaders.REMOTE_FILE]}")
    // 重试或移动失败文件逻辑
}

// 在适配器配置
@Bean
fun sftpInboundFlow() = IntegrationFlows
    .from(sftpMessageSource()) { e ->
        e.poller(Pollers.fixedRate(1000))
        e.errorChannel("errorChannel")  
    }
    .channel("sftpChannel")
    .get()

完整应用示例

查看完整Spring Boot应用
kotlin
@SpringBootApplication
class SftpApplication {

    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            runApplication<SftpApplication>(*args)
        }
    }

    @Bean
    fun sftpSessionFactory() = DefaultSftpSessionFactory().apply {
        host = "sftp.example.com"
        port = 22
        user = "user"
        password = "pass"
    }

    @Bean
    fun sftpTemplate() = SftpRemoteFileTemplate(sftpSessionFactory())

    @Bean
    @InboundChannelAdapter(channel = "streamChannel")
    fun sftpSource(): MessageSource<InputStream> {
        return SftpStreamingMessageSource(sftpTemplate()).apply {
            setRemoteDirectory("inbound")
            setFilter(SimplePatternFileListFilter("*.csv"))
            setMaxFetchSize(2)
        }
    }

    @Bean
    @Transformer(inputChannel = "streamChannel", outputChannel = "dataChannel")
    fun transformer() = StreamTransformer("UTF-8")

    @Bean
    fun deleteAdvice() = ExpressionEvaluatingRequestHandlerAdvice().apply {
        setOnSuccessExpressionString(
            "@sftpTemplate.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])"
        )
    }

    @ServiceActivator(inputChannel = "dataChannel", adviceChain = ["deleteAdvice"])
    fun dataHandler() = MessageHandler {
        println("处理内容: ${it.payload}")
        // 实际业务处理逻辑
    }

    // 错误处理
    @Bean
    fun errorChannel() = DirectChannel()

    @ServiceActivator(inputChannel = "errorChannel")
    fun errorHandler() = MessageHandler {
        val ex = it.payload as MessagingException
        println("处理失败: ${ex.failedMessage.headers[FileHeaders.REMOTE_FILE]}")
    }
}

常见问题解决

Q1:文件被重复处理怎么办?

TIP

解决方案

  1. 确认使用SftpPersistentAcceptOnceFileListFilter
  2. 检查元数据存储(如JDBC)是否正常工作
  3. 集群环境中确保所有节点使用共享的MetadataStore
kotlin
@Bean
fun metadataStore(): ConcurrentMetadataStore {
    return JdbcMetadataStore(DataSourceInitializer().apply {
        setDataSource(dataSource)
    })
}

@Bean
fun fileListFilter() = SftpPersistentAcceptOnceFileListFilter(
    metadataStore(),
    "sftpFilter"
).apply {
    setFlushOnUpdate(true)
}

Q2:如何处理大文件避免内存溢出?

kotlin
@Bean
@Transformer(inputChannel = "streamChannel", outputChannel = "chunkChannel")
fun fileSplitter() = FileSplitter(true).apply {
    setApplySequence(true)
    setMarkersJson(true)
}

@ServiceActivator(inputChannel = "chunkChannel")
fun processChunk() = MessageHandler {
    // 分块处理逻辑
}

Q3:如何获取文件元数据信息?

kotlin
@ServiceActivator(inputChannel = "dataChannel")
fun handleWithMetadata(payload: String,
                      @Header(FileHeaders.REMOTE_FILE_INFO) fileInfo: String) {
    val json = JSONObject(fileInfo)
    println("文件名: ${json.getString("filename")}")
    println("大小: ${json.getLong("size")}字节")
    println("修改时间: ${json.getLong("modified")}")
}

通过流式SFTP适配器,您可以构建高效的文件处理流水线,避免本地存储瓶颈,特别适合云原生和容器化环境。实际应用中,请根据业务需求选择合适的文件过滤策略和错误处理机制。