Appearance
Spring Integration FTP 入站通道适配器详解
1. FTP 入站通道适配器概述
FTP 入站通道适配器是一个特殊的监听器,它连接 FTP 服务器并监听远程目录事件(如新文件创建),然后自动触发文件传输。它的核心功能是将远程 FTP 服务器上的文件下载到本地系统,并将每个文件作为消息的有效负载发送到指定通道。
2. 基础配置与工作原理
2.1 核心组件配置(Kotlin DSL)
kotlin
@Configuration
@EnableIntegration
class FtpInboundConfig {
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
return DefaultFtpSessionFactory().apply {
host = "ftp.example.com"
port = 21
username = "user"
password = "pass"
}
}
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Ftp.inboundAdapter(ftpSessionFactory())
.localDirectory(File("local/dir"))
.remoteDirectory("remote/dir")
.autoCreateLocalDirectory(true)
.deleteRemoteFiles(false)
.filter(CompositeFileListFilter<FTPFile>().apply {
addFilter(FtpRegexPatternFileListFilter(".*\\.txt"))
addFilter(FtpPersistentAcceptOnceFileListFilter(metadataStore(), "ftpMetadata"))
}),
{ poller -> poller.poller(Pollers.fixedRate(5000)) }
)
.handle { file: File ->
println("处理文件: ${file.name}")
}
.get()
}
@Bean
fun metadataStore(): ConcurrentMetadataStore {
return RedisMetadataStore(redisConnectionFactory)
}
}
配置说明:
localDirectory
:本地存储目录路径remoteDirectory
:远程FTP服务器目录autoCreateLocalDirectory
:自动创建本地目录deleteRemoteFiles
:下载后是否删除远程文件filter
:组合过滤器(正则+持久化)poller
:每5秒轮询一次
最佳实践
优先使用 FtpPersistentAcceptOnceFileListFilter
替代内存过滤器,确保系统重启后仍能正确跟踪已处理文件
3. 文件过滤策略
3.1 常用过滤器类型
过滤器类型 | 说明 | 适用场景 |
---|---|---|
FtpRegexPatternFileListFilter | 正则表达式匹配 | 复杂文件名匹配 |
FtpSimplePatternFileListFilter | 简单模式匹配 | 基本通配符匹配 |
FtpPersistentAcceptOnceFileListFilter | 持久化过滤器 | 避免重复处理文件 |
FtpLastModifiedFileListFilter | 最后修改时间过滤 | 处理稳定文件 |
FtpSystemMarkerFilePresentFileListFilter | 标记文件检查 | 确保文件完整 |
3.2 过滤器配置示例
kotlin
// 组合过滤器配置
fun customFilter(): CompositeFileListFilter<FTPFile> {
return CompositeFileListFilter<FTPFile>().apply {
// 只处理.txt文件
addFilter(FtpRegexPatternFileListFilter(".*\\.txt"))
// 防止重复处理(使用Redis持久化)
addFilter(FtpPersistentAcceptOnceFileListFilter(
metadataStore(),
"ftpFilterKey"
))
// 只处理30秒前的文件(避免处理中文件)
addFilter(FtpLastModifiedFileListFilter(Duration.ofSeconds(30)))
}
}
注意事项
处理正在写入的文件可能导致数据不完整。务必结合 FtpLastModifiedFileListFilter
或标记文件策略确保文件完整性
4. 轮询器配置要点
4.1 轮询行为控制
kotlin
@Bean
fun ftpPoller(): PollerSpec {
return Pollers.fixedRate(5000) // 每5秒轮询一次
.maxMessagesPerPoll(-1) // 每次处理所有可用文件
.errorChannel(errorChannel()) // 错误处理通道
}
关键参数:
maxMessagesPerPoll
:-1
:处理所有可用文件(推荐)1
:每次只处理一个文件(默认)N
:每次最多处理N个文件
::: important 性能建议 对于大文件处理场景,建议设置 maxMessagesPerPoll=1
避免内存溢出,同时配合 maxFetchSize
限制单次获取数量 :::
5. 错误恢复机制
5.1 事务性错误处理
kotlin
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Ftp.inboundAdapter(ftpSessionFactory())
.localDirectory(File("local/dir"))
.remoteDirectory("remote/dir"),
{ poller ->
poller.poller(Pollers.fixedRate(5000)
.transactional(transactionManager())
}
)
// 事务同步配置
.handle({ file: File ->
// 文件处理逻辑
}, { endpoint ->
endpoint.advice(afterRollback {
(it.payload as File).delete() // 回滚时删除本地文件
})
})
.get()
}
@Bean
fun transactionManager(): PlatformTransactionManager {
return DataSourceTransactionManager(dataSource)
}
错误处理流程:
- 文件下载到本地临时目录(后缀如
.writing
) - 事务中处理文件
- 成功:重命名文件为最终名称
- 失败:回滚事务 → 删除本地文件 → 重置过滤器状态
6. 现代配置方式(Kotlin DSL)
6.1 完整配置示例
kotlin
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true) // 保留远程文件时间戳
.remoteDirectory("uploads") // 远程目录
.regexFilter(".*\\.csv$") // 只处理CSV文件
.localFilename { remoteFileName ->
"${remoteFileName.toUpperCase()}_PROCESSED"
} // 自定义本地文件名
.localDirectory(File("data/inbound")) // 本地目录
.maxFetchSize(10) // 每次最多获取10个文件
.temporaryFileSuffix(".tmp") // 临时文件后缀
.localFilter(AcceptOnceFileListFilter()) // 本地过滤器
) { e ->
e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)))
e.id("ftpInboundAdapter")
}
.transform(File::class) { file ->
processCsvFile(file) // 自定义CSV处理逻辑
}
.handle { payload, _ ->
logger.info("文件处理完成: ${(payload as File).name}")
}
.get()
}
6.2 关键特性说明
kotlin
// 动态远程目录(每次轮询时计算)
.remoteDirectoryExpression("@remoteDirService.getCurrentDir()")
// 自定义目录扫描器
.scanner(CustomFtpDirectoryScanner())
// 保留时间戳
.preserveTimestamp(true)
// 自定义临时文件后缀
.temporaryFileSuffix(".downloading")
动态目录技巧
使用 remoteDirectoryExpression
实现动态目录切换,非常适合多租户或日期分区场景
7. 处理不完整数据
7.1 文件完整性保障策略
kotlin
// 标记文件过滤器
@Bean
fun markerFileFilter(): FileListFilter<FTPFile> {
return FtpSystemMarkerFilePresentFileListFilter(
".COMPLETE", // 标记文件后缀
ftpSessionFactory()
)
}
// 在适配器中应用
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Ftp.inboundAdapter(ftpSessionFactory())
.filter(markerFileFilter())
.remoteDirectory("incoming")
.localDirectory(File("processed"))
)
// ...
.get()
}
工作原理:
- 源系统上传
data.csv
- 源系统创建
data.csv.COMPLETE
标记文件 - 适配器只下载带有标记文件的CSV
- 处理完成后删除标记文件
关键注意
不完整文件处理是生产环境必须考虑的问题,缺少此机制可能导致部分数据处理失败
8. 常见问题解决方案
8.1 典型问题排查表
问题现象 | 可能原因 | 解决方案 |
---|---|---|
文件重复处理 | 过滤器状态未持久化 | 使用 FtpPersistentAcceptOnceFileListFilter |
处理部分写入文件 | 轮询太快 | 添加 FtpLastModifiedFileListFilter |
文件下载后未删除 | 权限不足 | 检查FTP账户删除权限 |
中文文件名乱码 | 编码不一致 | 配置 sessionFactory.setControlEncoding("UTF-8") |
连接频繁断开 | 防火墙限制 | 调整 sessionFactory.setClientMode(FTPClient.PASSIVE_MODE) |
8.2 性能优化技巧
kotlin
// 连接池配置
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
val factory = DefaultFtpSessionFactory().apply {
// ...基础配置
}
return CachingSessionFactory(factory).apply {
poolSize = 10 // 连接池大小
sessionWaitTimeout = 30_000 // 等待超时(ms)
testSession = true // 验证会话有效性
}
}
// 适配器配置
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Ftp.inboundAdapter(ftpSessionFactory())
.maxFetchSize(50) // 每次最多获取50个文件
.remoteDirectory("large-files")
) { e ->
e.poller(Pollers.fixedRate(60_000) // 降低轮询频率
}
// ...
}
生产环境建议
- 大文件场景:设置
maxFetchSize=1
避免内存溢出 - 高并发场景:使用
CachingSessionFactory
连接池 - 网络不稳定:增加
sessionWaitTimeout
并启用testSession
9. 高级特性:元数据存储
9.1 分布式状态管理
kotlin
@Configuration
@EnableIntegration
class MetadataConfig {
// Redis元数据存储
@Bean
fun metadataStore(): ConcurrentMetadataStore {
return RedisMetadataStore(redisConnectionFactory).apply {
setPrefix("ftp::state::") // 键前缀
}
}
// 持久化过滤器
@Bean
fun persistentFilter(): FileListFilter<FTPFile> {
return FtpPersistentAcceptOnceFileListFilter(
metadataStore(),
"fileFilter"
).apply {
flushOnUpdate = true // 每次更新立即持久化
}
}
}
集群部署优势:
- 多个适配器实例共享处理状态
- 重启后自动恢复处理位置
- 避免跨实例重复处理文件
存储选择
除了Redis,还可选择:
JdbcMetadataStore
(数据库存储)ZookeeperMetadataStore
(分布式协调)GemfireMetadataStore
(内存网格)
总结
Spring Integration FTP 入站通道适配器提供了强大的文件获取能力,关键实践包括:
- 优先使用Kotlin DSL进行声明式配置
- 必须实施完整性检查(时间戳或标记文件)
- 生产环境务必使用持久化过滤器
- 根据文件大小调整
maxFetchSize
参数 - 使用连接池优化性能
通过合理配置,FTP入站适配器能够可靠地处理各种文件传输场景,成为企业集成解决方案中的重要组件。
CAUTION
切勿在生产环境使用 deleteRemoteFiles=true
而不做备份,重要文件删除前应增加确认机制