Appearance
📖 Spring Integration 文件读取教程
本教程专为 Spring 初学者设计,使用 Kotlin 和现代 Spring 最佳实践(注解配置 + Kotlin DSL),将系统讲解文件读取的核心机制。
🧩 一、核心概念:FileReadingMessageSource
作用:从文件系统目录消费文件并转换为消息的组件,实现 MessageSource
接口。
基础配置示例 (Kotlin DSL)
kotlin
@Configuration
class FileConfig {
@Bean
fun fileSource() = FileReadingMessageSource().apply {
setDirectory(File("/input"))
// 默认过滤器:忽略隐藏文件 + 防止重复处理
filter = CompositeFileListFilter(
listOf(IgnoreHiddenFileListFilter(), AcceptOnceFileListFilter())
)
}
}
过滤器作用说明
IgnoreHiddenFileListFilter
:跳过系统隐藏文件(如 Linux 的.file
)AcceptOnceFileListFilter
:内存中记录已处理文件(重启失效)
🔍 二、文件完整性保障方案
解决文件写入未完成时被读取的问题:
方案 1:临时文件重命名
方案 2:标记文件检测(推荐)
kotlin
// 仅当完成标记存在时才处理文件
val filter = CompositeFileListFilter<File>().apply {
addFilter(
FileSystemMarkerFilePresentFileListFilter(".complete")
)
}
重要选择
- 内存敏感场景:用
FileSystemPersistentAcceptOnceFileListFilter
替代默认过滤器 - 分布式系统:搭配
RedisMetadataStore
实现跨节点状态共享
⚙️ 三、目录扫描机制优化
1. 递归扫描配置
kotlin
Files.inboundAdapter(File("/deep/dir"))
.recursive(true) // [!code ++] // 启用递归扫描
.scanner(RecursiveDirectoryScanner())
2. 文件处理顺序控制
kotlin
FileReadingMessageSource().apply {
setScanner(CustomDirectoryScanner())
setComparator(
Comparator { f1, f2 ->
f1.lastModified().compareTo(f2.lastModified())
}
) // 按修改时间排序
}
性能警告
避免对超大型目录开启
scanEachPoll=true
!
每次轮询全量扫描会显著降低性能,推荐使用HeadDirectoryScanner
限制内存队列大小。 :::
🚀 四、实战配置模板
注解配置方案
kotlin
@Bean
@InboundChannelAdapter(
channel = "fileChannel",
poller = [Poller(fixedRate = 5000)]
)
fun fileSource() = FileReadingMessageSource().apply {
setDirectory(File("/inbound"))
setFilter(RegexPatternFileListFilter(".*\\.csv"))
}
Kotlin DSL 方案
kotlin
@Bean
fun fileFlow() = integrationFlow(
Files.inboundAdapter(File("/realtime"))
.useWatchService(true) // 启用文件系统事件监听
.watchEvents(FileReadingMessageSource.WatchEventType.CREATE)
) {
handle { file: File ->
logger.info("新文件到达: ${file.name}")
}
}
🐌 五、文件追踪(Tailing)技术
实时监控文件尾部变化:
1. 原生 tail
命令集成
kotlin
@Bean
fun tailFlow() = IntegrationFlow.from(
FileTailingMessageProducer().apply {
file = File("/logs/app.log")
options = "-F -n 0" // 持续追踪最新内容
setDelay(1000) // 检查间隔
}
) { ... }
2. Apache Commons 实现
kotlin
FileTailingMessageProducer().apply {
file = File("/data/stream.txt")
isEnd = false // 从文件头开始读
reopen = true // 每次读取后重新打开
}
事件处理示例
kotlin
@EventListener
fun handleTailEvent(event: FileTailingIdleEvent) {
if (event.idleTime > 60000) {
logger.warn("文件 ${event.file} 超过60秒无更新")
}
}
⚡ 六、常见问题解决方案
❌ 问题 1:文件被部分读取
场景:文件写入未完成时已被处理
方案:双重检测策略
kotlin
CompositeFileListFilter<File>().apply {
addFilter(LastModifiedFileListFilter().apply {
age = 120 // 文件需存在超过120秒
})
addFilter(RegexPatternFileListFilter(".*\\.ready")) // 标记文件
}
❌ 问题 2:分布式环境重复消费
场景:多实例同时读取共享目录
方案:Redis 状态存储
kotlin
val filter = FileSystemPersistentAcceptOnceFileListFilter(
RedisMetadataStore(redisConnectionFactory),
"fileCache"
)
✅ 最佳实践总结
- 生产环境必用:
FileSystemPersistentAcceptOnceFileListFilter
+ Redis - 大文件处理:结合
LastModifiedFileListFilter
避免半成品 - 实时监控:优先用
WatchServiceDirectoryScanner
替代定时轮询 - 安全隔离:通过
FileLocker
防止多进程冲突
完整示例代码:
GitHub 文件处理示例库