Appearance
SFTP流式入站通道适配器详解
概述
SFTP流式入站通道适配器(Streaming Inbound Channel Adapter)是Spring Integration 4.3引入的强大功能,它允许直接从SFTP服务器获取文件内容,而无需将文件写入本地文件系统。这种流式处理方式特别适合处理大文件或内存敏感的场景。
TIP
与传统SFTP适配器的关键区别:流式适配器产生的消息负载是InputStream
类型,文件内容直接从SFTP服务器流式传输,避免本地磁盘IO开销。
核心机制
关键特性
- 资源管理:会话保持打开状态,消费应用必须关闭会话(通过
IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
头) - 自动关闭:
FileSplitter
和StreamTransformer
等组件会自动关闭会话 - 避免重复:默认使用
SftpPersistentAcceptOnceFileListFilter
防止重复处理 - 头部信息:
FileHeaders.REMOTE_DIRECTORY
:远程目录FileHeaders.REMOTE_FILE
:文件名FileHeaders.REMOTE_FILE_INFO
:文件元数据(5.0+)
Kotlin配置实现
基础配置
kotlin
@Configuration
@EnableIntegration
class SftpConfig {
FTP会话工厂
@Bean
fun sftpSessionFactory(): DefaultSftpSessionFactory {
return DefaultSftpSessionFactory().apply {
host = "sftp.example.com"
port = 22
user = "user"
password = "pass"
allowUnknownKeys = true
}
}
// 远程文件模板
@Bean
fun sftpRemoteFileTemplate() = SftpRemoteFileTemplate(sftpSessionFactory())
// 流式消息源
@Bean
@InboundChannelAdapter(channel = "sftpChannel")
fun sftpMessageSource(): MessageSource<InputStream> {
return SftpStreamingMessageSource(sftpRemoteFileTemplate()).apply {
setRemoteDirectory("remote/files")
setFilter(AcceptAllFileListFilter())
setMaxFetchSize(1)
}
}
// 轮询器
@Bean
fun poller() = Pollers.fixedRate(1000).get()
}
流数据处理管道
kotlin
@Bean
@Transformer(inputChannel = "sftpChannel", outputChannel = "dataChannel")
fun streamTransformer(): Transformer {
// // 自动关闭输入流
return StreamTransformer("UTF-8")
}
@ServiceActivator(inputChannel = "dataChannel")
fun handleMessage(payload: String) {
println("处理文件内容: $payload")
}
高级配置选项
文件过滤策略
kotlin
// 组合过滤器示例
val compositeFilter = CompositeFileListFilter<ChannelSftp.LsEntry>().apply {
addFilter(SftpRegexPatternFileListFilter(".*\\.txt".toRegex()))
addFilter(SftpPersistentAcceptOnceFileListFilter(metadataStore, "sftpAdapter"))
addFilter(SftpSimplePatternFileListFilter("*.data"))
}
// [!code warning:3] // 注意:过滤表达式会覆盖其他过滤器
messageSource.setFilterExpression("@customFilter.filter(#root)")
集群环境优化
kotlin
messageSource.apply {
setMaxFetchSize(1) // [!code highlight] // 每次只取一个文件
setFilter(
SftpPersistentAcceptOnceFileListFilter(
JdbcMetadataStore(dataSource), // [!code highlight] // 使用数据库存储状态
"sftpAdapter"
)
)
}
文件处理后删除
kotlin
@Bean
fun deleteAdvice(): ExpressionEvaluatingRequestHandlerAdvice {
return ExpressionEvaluatingRequestHandlerAdvice().apply {
setOnSuccessExpressionString(
"@sftpRemoteFileTemplate.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])"
)
setPropagateEvaluationFailures(true)
}
}
@ServiceActivator(inputChannel = "dataChannel", adviceChain = ["deleteAdvice"])
fun processingService() = ...
最佳实践与常见问题
注意事项
WARNING
资源泄漏风险:如果消费组件未能正确关闭输入流,会导致SFTP连接保持打开状态,最终耗尽连接池资源。请确保:
- 使用
StreamTransformer
等自动关闭资源的组件 - 手动处理时在finally块中调用
inputStream.close()
- 通过
message.headers[IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE]
获取资源句柄
性能调优
kotlin
// 优化配置示例
messageSource.apply {
setRemoteDirectoryExpression("'backup/' + T(java.time.LocalDate).now().toString()") // [!code highlight] // 动态目录
setComparator(Comparator { f1, f2 -> f1.attributes.mtime.compareTo(f2.attributes.mtime) }) // 按修改时间排序
setRemoteFileSeparator("/")
}
错误处理策略
kotlin
@Bean
fun errorChannel() = DirectChannel()
@ServiceActivator(inputChannel = "errorChannel")
fun errorHandler() = MessageHandler {
val failedMessage = (it.payload as MessagingException).failedMessage
println("处理失败: ${failedMessage.headers[FileHeaders.REMOTE_FILE]}")
// 重试或移动失败文件逻辑
}
// 在适配器配置
@Bean
fun sftpInboundFlow() = IntegrationFlows
.from(sftpMessageSource()) { e ->
e.poller(Pollers.fixedRate(1000))
e.errorChannel("errorChannel")
}
.channel("sftpChannel")
.get()
完整应用示例
查看完整Spring Boot应用
kotlin
@SpringBootApplication
class SftpApplication {
companion object {
@JvmStatic
fun main(args: Array<String>) {
runApplication<SftpApplication>(*args)
}
}
@Bean
fun sftpSessionFactory() = DefaultSftpSessionFactory().apply {
host = "sftp.example.com"
port = 22
user = "user"
password = "pass"
}
@Bean
fun sftpTemplate() = SftpRemoteFileTemplate(sftpSessionFactory())
@Bean
@InboundChannelAdapter(channel = "streamChannel")
fun sftpSource(): MessageSource<InputStream> {
return SftpStreamingMessageSource(sftpTemplate()).apply {
setRemoteDirectory("inbound")
setFilter(SimplePatternFileListFilter("*.csv"))
setMaxFetchSize(2)
}
}
@Bean
@Transformer(inputChannel = "streamChannel", outputChannel = "dataChannel")
fun transformer() = StreamTransformer("UTF-8")
@Bean
fun deleteAdvice() = ExpressionEvaluatingRequestHandlerAdvice().apply {
setOnSuccessExpressionString(
"@sftpTemplate.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])"
)
}
@ServiceActivator(inputChannel = "dataChannel", adviceChain = ["deleteAdvice"])
fun dataHandler() = MessageHandler {
println("处理内容: ${it.payload}")
// 实际业务处理逻辑
}
// 错误处理
@Bean
fun errorChannel() = DirectChannel()
@ServiceActivator(inputChannel = "errorChannel")
fun errorHandler() = MessageHandler {
val ex = it.payload as MessagingException
println("处理失败: ${ex.failedMessage.headers[FileHeaders.REMOTE_FILE]}")
}
}
常见问题解决
Q1:文件被重复处理怎么办?
TIP
解决方案:
- 确认使用
SftpPersistentAcceptOnceFileListFilter
- 检查元数据存储(如JDBC)是否正常工作
- 集群环境中确保所有节点使用共享的MetadataStore
kotlin
@Bean
fun metadataStore(): ConcurrentMetadataStore {
return JdbcMetadataStore(DataSourceInitializer().apply {
setDataSource(dataSource)
})
}
@Bean
fun fileListFilter() = SftpPersistentAcceptOnceFileListFilter(
metadataStore(),
"sftpFilter"
).apply {
setFlushOnUpdate(true)
}
Q2:如何处理大文件避免内存溢出?
kotlin
@Bean
@Transformer(inputChannel = "streamChannel", outputChannel = "chunkChannel")
fun fileSplitter() = FileSplitter(true).apply {
setApplySequence(true)
setMarkersJson(true)
}
@ServiceActivator(inputChannel = "chunkChannel")
fun processChunk() = MessageHandler {
// 分块处理逻辑
}
Q3:如何获取文件元数据信息?
kotlin
@ServiceActivator(inputChannel = "dataChannel")
fun handleWithMetadata(payload: String,
@Header(FileHeaders.REMOTE_FILE_INFO) fileInfo: String) {
val json = JSONObject(fileInfo)
println("文件名: ${json.getString("filename")}")
println("大小: ${json.getLong("size")}字节")
println("修改时间: ${json.getLong("modified")}")
}
通过流式SFTP适配器,您可以构建高效的文件处理流水线,避免本地存储瓶颈,特别适合云原生和容器化环境。实际应用中,请根据业务需求选择合适的文件过滤策略和错误处理机制。