Skip to content

Spring Integration FTP 流式入站通道适配器详解

引言:为什么需要流式适配器?🚀

在传统文件传输中,FTP 入站适配器通常需要先将文件完整下载到本地磁盘,然后才能进行处理。这种"下载-处理"模式存在两个痛点:

  1. 需要额外磁盘空间存储临时文件
  2. 大文件处理效率低(必须等待完整下载)

FTP 流式入站通道适配器(FtpStreamingMessageSource)通过直接传输InputStream解决了这些问题:

核心特性与工作原理

关键特性

  • 零磁盘占用:文件不落盘,直接在内存中处理
  • ⚡️ 高效处理大文件:支持流式处理,无需等待完整下载
  • 🔒 自动资源管理:框架组件自动关闭会话
  • 📦 丰富的元数据:提供远程文件路径、名称等头部信息

工作流程

  1. 适配器轮询 FTP 服务器(通过配置的 poller)
  2. 匹配符合条件的远程文件
  3. 打开文件流并创建消息(InputStream作为负载)
  4. 将流和会话资源发送到指定通道
  5. 消费者负责处理完成后关闭会话

IMPORTANT

关键注意事项
消费者必须关闭会话!否则会导致连接泄漏。框架提供的FileSplitterStreamTransformer会自动关闭,但自定义处理器需要手动关闭。会话资源可通过消息头IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE获取。

Kotlin 配置详解

基础配置示例(注解方式)

kotlin
@Configuration
@EnableIntegration
class FtpStreamingConfig {

    // 创建FTP会话工厂
    @Bean
    fun ftpSessionFactory(): DefaultFtpSessionFactory {
        return DefaultFtpSessionFactory().apply {
            host = "ftp.example.com"
            port = 21
            username = "user"
            password = "pass"
        }
    }

    // 流式消息源
    @Bean
    @InboundChannelAdapter(channel = "ftpStream", poller = [Poller(fixedRate = 5000)])
    fun ftpMessageSource(): MessageSource<InputStream> {
        return FtpStreamingMessageSource(ftpRemoteFileTemplate()).apply {
            setRemoteDirectory("remote/files")
            setFilter(AcceptAllFileListFilter()) // [!code warning: 生产环境应替换为更安全的过滤器]
            setMaxFetchSize(10)
        }
    }

    // 远程文件模板
    @Bean
    fun ftpRemoteFileTemplate(): FtpRemoteFileTemplate {
        return FtpRemoteFileTemplate(ftpSessionFactory())
    }
}

配置选项说明

参数类型说明默认值
remoteDirectoryString远程目录路径必填
filterFileListFilter文件过滤器AcceptAllFileListFilter
maxFetchSizeInt单次轮询最大获取文件数全部文件
filenamePatternString文件名模式匹配(如*.txt)null
comparatorComparator文件排序比较器按修改时间倒序

> **过滤器选择指南**:

  • AcceptAllFileListFilter:接受所有文件(开发用)
  • SimplePatternFileListFilter:基于通配符的过滤
  • FtpPersistentAcceptOnceFileListFilter生产推荐,避免重复处理

文件处理管道示例

kotlin
@Bean
@Transformer(inputChannel = "ftpStream", outputChannel = "processedData")
fun streamTransformer(): Transformer {
    // 自动关闭InputStream并转换为UTF-8字符串
    return StreamTransformer("UTF-8")
}

@ServiceActivator(inputChannel = "processedData")
fun fileHandler(message: Message<String>) {
    val content = message.payload
    val filename = message.headers[FileHeaders.REMOTE_FILE] as String

    println("处理文件: $filename")
    println("内容: ${content.take(50)}...") // 只打印前50字符

    // 自动关闭会话资源(由StreamTransformer处理)
}

生产环境最佳实践

避免重复处理文件

kotlin
// 可能导致重复处理
setFilter(AcceptAllFileListFilter())
kotlin
@Bean
fun persistentFilter(): CompositeFileListFilter<FTPFile> {
    return CompositeFileListFilter(
        listOf(
            FtpPersistentAcceptOnceFileListFilter(
                SimpleMetadataStore(),
                "filePrefix"
            ),
            AcceptOnceFileListFilter()
        )
    )
}

// 在消息源中配置
setFilter(persistentFilter())

安全删除已处理文件

kotlin
@Bean
fun removeAdvice(): ExpressionEvaluatingRequestHandlerAdvice {
    return ExpressionEvaluatingRequestHandlerAdvice().apply {
        setOnSuccessExpressionString(
            "@ftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + headers['file_remoteFile']"
        )
        setPropagateEvaluationFailures(true)
    }
}

@ServiceActivator(inputChannel = "processedData", adviceChain = ["removeAdvice"])
fun safeFileHandler(message: Message<String>) {
    // 文件处理逻辑...
    // 处理成功后自动删除远程文件
}

版本变更注意事项

版本 5.0+重要变化

  • ⚠️ 过滤时机变化:5.1+版本先过滤后排序(影响复合过滤器设计)
  • 元数据增强:可通过FileHeaders.REMOTE_FILE_INFO获取完整文件信息
kotlin
// 获取文件信息
val fileInfo = message.headers[FileHeaders.REMOTE_FILE_INFO] as String
println("文件详情: $fileInfo") // JSON格式信息

// 或禁用JSON转换
messageSource.setFileInfoJson(false)
val ftpFileInfo = message.headers[FileHeaders.REMOTE_FILE_INFO] as FtpFileInfo

常见问题解答

Q1:如何处理流式传输中的大文件?

使用分块读取+缓冲区:

kotlin
@ServiceActivator(inputChannel = "ftpStream")
fun handleLargeFile(inputStream: InputStream) {
    BufferedReader(InputStreamReader(inputStream)).use { reader ->
        reader.lineSequence().chunked(1000) { batch ->
            processBatch(batch)
        }
    } se块自动关闭资源
}

Q2:为什么有时连接会超时?

会话未及时关闭是最常见原因:

kotlin
@ServiceActivator(inputChannel = "ftpStream")
fun handleStream(message: Message<InputStream>) {
    try {
        // 处理输入流...
    } finally {
        (message.headers[IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE] as Closeable).close()
    }
}

Q3:如何在集群环境中避免重复?

使用共享的持久化元数据存储:

kotlin
@Bean
fun metadataStore(): MetadataStore {
    return JdbcMetadataStore(dataSource) // RedisMetadataStore等
}

@Bean
func clusterSafeFilter(): FileListFilter<FTPFile> {
    return FtpPersistentAcceptOnceFileListFilter(metadataStore(), "clusterKey")
}

总结

FTP 流式入站通道适配器通过InputStream传输模式:

  • 消除磁盘 I/O 瓶颈:尤其适合大文件处理
  • ⚡️ 实时处理能力:文件下载过程中即可开始处理
  • 🔒 资源安全管控:通过消息头管理会话生命周期

> **最佳实践组合**:

  1. FtpPersistentAcceptOnceFileListFilter + 持久化存储(防重复)
  2. StreamTransformer自动处理文本文件
  3. 处理完成后自动删除远程文件(安全清理)
  4. 配置合理的maxFetchSize(集群环境建议=1)