Appearance
Spring Integration FTP 流式入站通道适配器详解
引言:为什么需要流式适配器?🚀
在传统文件传输中,FTP 入站适配器通常需要先将文件完整下载到本地磁盘,然后才能进行处理。这种"下载-处理"模式存在两个痛点:
- 需要额外磁盘空间存储临时文件
- 大文件处理效率低(必须等待完整下载)
FTP 流式入站通道适配器(FtpStreamingMessageSource
)通过直接传输InputStream
解决了这些问题:
核心特性与工作原理
关键特性
- ✅ 零磁盘占用:文件不落盘,直接在内存中处理
- ⚡️ 高效处理大文件:支持流式处理,无需等待完整下载
- 🔒 自动资源管理:框架组件自动关闭会话
- 📦 丰富的元数据:提供远程文件路径、名称等头部信息
工作流程
- 适配器轮询 FTP 服务器(通过配置的 poller)
- 匹配符合条件的远程文件
- 打开文件流并创建消息(
InputStream
作为负载) - 将流和会话资源发送到指定通道
- 消费者负责处理完成后关闭会话
IMPORTANT
关键注意事项:
消费者必须关闭会话!否则会导致连接泄漏。框架提供的FileSplitter
和StreamTransformer
会自动关闭,但自定义处理器需要手动关闭。会话资源可通过消息头IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
获取。
Kotlin 配置详解
基础配置示例(注解方式)
kotlin
@Configuration
@EnableIntegration
class FtpStreamingConfig {
// 创建FTP会话工厂
@Bean
fun ftpSessionFactory(): DefaultFtpSessionFactory {
return DefaultFtpSessionFactory().apply {
host = "ftp.example.com"
port = 21
username = "user"
password = "pass"
}
}
// 流式消息源
@Bean
@InboundChannelAdapter(channel = "ftpStream", poller = [Poller(fixedRate = 5000)])
fun ftpMessageSource(): MessageSource<InputStream> {
return FtpStreamingMessageSource(ftpRemoteFileTemplate()).apply {
setRemoteDirectory("remote/files")
setFilter(AcceptAllFileListFilter()) // [!code warning: 生产环境应替换为更安全的过滤器]
setMaxFetchSize(10)
}
}
// 远程文件模板
@Bean
fun ftpRemoteFileTemplate(): FtpRemoteFileTemplate {
return FtpRemoteFileTemplate(ftpSessionFactory())
}
}
配置选项说明
参数 | 类型 | 说明 | 默认值 |
---|---|---|---|
remoteDirectory | String | 远程目录路径 | 必填 |
filter | FileListFilter | 文件过滤器 | AcceptAllFileListFilter |
maxFetchSize | Int | 单次轮询最大获取文件数 | 全部文件 |
filenamePattern | String | 文件名模式匹配(如*.txt) | null |
comparator | Comparator | 文件排序比较器 | 按修改时间倒序 |
> **过滤器选择指南**:
AcceptAllFileListFilter
:接受所有文件(开发用)SimplePatternFileListFilter
:基于通配符的过滤FtpPersistentAcceptOnceFileListFilter
:生产推荐,避免重复处理
文件处理管道示例
kotlin
@Bean
@Transformer(inputChannel = "ftpStream", outputChannel = "processedData")
fun streamTransformer(): Transformer {
// 自动关闭InputStream并转换为UTF-8字符串
return StreamTransformer("UTF-8")
}
@ServiceActivator(inputChannel = "processedData")
fun fileHandler(message: Message<String>) {
val content = message.payload
val filename = message.headers[FileHeaders.REMOTE_FILE] as String
println("处理文件: $filename")
println("内容: ${content.take(50)}...") // 只打印前50字符
// 自动关闭会话资源(由StreamTransformer处理)
}
生产环境最佳实践
避免重复处理文件
kotlin
// 可能导致重复处理
setFilter(AcceptAllFileListFilter())
kotlin
@Bean
fun persistentFilter(): CompositeFileListFilter<FTPFile> {
return CompositeFileListFilter(
listOf(
FtpPersistentAcceptOnceFileListFilter(
SimpleMetadataStore(),
"filePrefix"
),
AcceptOnceFileListFilter()
)
)
}
// 在消息源中配置
setFilter(persistentFilter())
安全删除已处理文件
kotlin
@Bean
fun removeAdvice(): ExpressionEvaluatingRequestHandlerAdvice {
return ExpressionEvaluatingRequestHandlerAdvice().apply {
setOnSuccessExpressionString(
"@ftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + headers['file_remoteFile']"
)
setPropagateEvaluationFailures(true)
}
}
@ServiceActivator(inputChannel = "processedData", adviceChain = ["removeAdvice"])
fun safeFileHandler(message: Message<String>) {
// 文件处理逻辑...
// 处理成功后自动删除远程文件
}
版本变更注意事项
版本 5.0+重要变化
- ⚠️ 过滤时机变化:5.1+版本先过滤后排序(影响复合过滤器设计)
- ✅ 元数据增强:可通过
FileHeaders.REMOTE_FILE_INFO
获取完整文件信息
kotlin
// 获取文件信息
val fileInfo = message.headers[FileHeaders.REMOTE_FILE_INFO] as String
println("文件详情: $fileInfo") // JSON格式信息
// 或禁用JSON转换
messageSource.setFileInfoJson(false)
val ftpFileInfo = message.headers[FileHeaders.REMOTE_FILE_INFO] as FtpFileInfo
常见问题解答
Q1:如何处理流式传输中的大文件?
使用分块读取+缓冲区:
kotlin
@ServiceActivator(inputChannel = "ftpStream")
fun handleLargeFile(inputStream: InputStream) {
BufferedReader(InputStreamReader(inputStream)).use { reader ->
reader.lineSequence().chunked(1000) { batch ->
processBatch(batch)
}
} se块自动关闭资源
}
Q2:为什么有时连接会超时?
会话未及时关闭是最常见原因:
kotlin
@ServiceActivator(inputChannel = "ftpStream")
fun handleStream(message: Message<InputStream>) {
try {
// 处理输入流...
} finally {
(message.headers[IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE] as Closeable).close()
}
}
Q3:如何在集群环境中避免重复?
使用共享的持久化元数据存储:
kotlin
@Bean
fun metadataStore(): MetadataStore {
return JdbcMetadataStore(dataSource) // RedisMetadataStore等
}
@Bean
func clusterSafeFilter(): FileListFilter<FTPFile> {
return FtpPersistentAcceptOnceFileListFilter(metadataStore(), "clusterKey")
}
总结
FTP 流式入站通道适配器通过InputStream
传输模式:
- ✅ 消除磁盘 I/O 瓶颈:尤其适合大文件处理
- ⚡️ 实时处理能力:文件下载过程中即可开始处理
- 🔒 资源安全管控:通过消息头管理会话生命周期
> **最佳实践组合**:
FtpPersistentAcceptOnceFileListFilter
+ 持久化存储(防重复)StreamTransformer
自动处理文本文件- 处理完成后自动删除远程文件(安全清理)
- 配置合理的
maxFetchSize
(集群环境建议=1)