Appearance
Spring Integration SMB 支持教程
概述
NOTE
SMB 协议简介
SMB(Server Message Block)是一种简单的网络协议,用于在共享文件服务器之间传输文件。Spring Integration 提供了对 SMB 协议的全面支持,使开发者能够轻松实现文件传输操作。
核心概念
- SMB 适配器:提供入站(接收文件)和出站(发送文件)功能
- 会话管理:通过
SmbSessionFactory
管理 SMB 连接 - 文件处理:支持同步、流式处理等多种文件处理模式
依赖配置
在项目中添加以下依赖:
kotlin
dependencies {
implementation("org.springframework.integration:spring-integration-smb:6.5.1")
}
xml
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-smb</artifactId>
<version>6.5.1</version>
</dependency>
SMB 会话管理
创建 SessionFactory
kotlin
@Bean
fun smbSessionFactory(): SmbSessionFactory {
return SmbSessionFactory().apply {
host = "myHost"
port = 445
domain = "myDomain"
username = "myUser"
password = "myPassword"
shareAndDir = "myShareAndDir"
smbMinVersion = DialectVersion.SMB210
smbMaxVersion = DialectVersion.SMB311
}
}
TIP
设置 SMB 协议版本范围可提高兼容性,确保与不同版本的 SMB 服务器正常通信
会话缓存优化
kotlin
@Bean
fun cachingSessionFactory(smbSessionFactory: SmbSessionFactory): CachingSessionFactory {
return CachingSessionFactory(smbSessionFactory, 10).apply {
sessionWaitTimeout = 1000
}
}
性能提示
在高并发场景中,合理设置 sessionWaitTimeout
可避免线程阻塞,建议值在 500-2000ms 之间
入站通道适配器
文件同步适配器
kotlin
@Bean
fun smbInboundFileSynchronizer(): SmbInboundFileSynchronizer {
return SmbInboundFileSynchronizer(smbSessionFactory()).apply {
filter = compositeFileListFilter()
remoteDirectory = "mySharedDirectoryPath"
deleteRemoteFiles = true
}
}
@Bean
fun compositeFileListFilter(): CompositeFileListFilter<SmbFile> {
return CompositeFileListFilter<SmbFile>().apply {
addFilter(SmbRegexPatternFileListFilter("^(?i).+((\\.txt))$"))
}
}
@Bean
@InboundChannelAdapter(value = "smbFileInputChannel", poller = [Poller(fixedDelay = "2000")])
fun smbMessageSource(): MessageSource<File> {
return SmbInboundFileSynchronizingMessageSource(smbInboundFileSynchronizer()).apply {
localDirectory = File("myLocalDirectoryPath")
autoCreateLocalDirectory = true
}
}
Java DSL 配置
kotlin
@Bean
fun smbInboundFlow(): IntegrationFlow {
return IntegrationFlow.from(
Smb.inboundAdapter(smbSessionFactory())
.preserveTimestamp(true)
.remoteDirectory("smbSource")
.regexFilter(".*\\.txt$")
.localFilename { f -> f.uppercase() + ".a" }
.localDirectory(File("d:\\smb_files")),
{ e -> e.id("smbInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)) }
)
.handle { m -> println(m.payload) }
.build()
}
流式入站通道适配器
配置示例
kotlin
@Bean
@InboundChannelAdapter(channel = "stream")
fun smbMessageSource(): MessageSource<InputStream> {
return SmbStreamingMessageSource(smbRemoteFileTemplate()).apply {
remoteDirectory = "smbSource/"
filter = AcceptAllFileListFilter()
maxFetchSize = 1
}
}
@Bean
fun smbRemoteFileTemplate(): SmbRemoteFileTemplate {
return SmbRemoteFileTemplate(smbSessionFactory())
}
文件处理流程
IMPORTANT
使用流式处理时,必须确保在处理完成后关闭会话资源,否则会导致连接泄漏
出站通道适配器
基本配置
kotlin
@Bean
@ServiceActivator(inputChannel = "storeToSmbShare")
fun smbMessageHandler(smbSessionFactory: SmbSessionFactory): MessageHandler {
return SmbMessageHandler(smbSessionFactory).apply {
remoteDirectoryExpression = LiteralExpression("remote-target-dir")
fileNameGenerator = { m ->
m.headers[FileHeaders.FILENAME] as String + ".test"
}
autoCreateDirectory = true
}
}
Java DSL 配置
kotlin
@Bean
fun smbOutboundFlow(): IntegrationFlow {
return IntegrationFlow.from("toSmbChannel")
.handle(Smb.outboundAdapter(smbSessionFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(false)
.fileNameExpression("headers['${FileHeaders.FILENAME}']")
.remoteDirectory("smbTarget"))
.get()
}
@MessagingGateway
interface MyGateway {
@Gateway(requestChannel = "toSmbChannel")
fun sendToSmb(file: File)
}
SMB 出站网关
支持的命令
命令 | 功能描述 | 常用选项 |
---|---|---|
ls | 列出文件 | -1 , -a , -R |
get | 获取单个文件 | -P , -stream |
mget | 获取多个文件 | -R , -x |
put | 发送单个文件 | - |
mput | 发送多个文件 | -R |
rm | 删除文件 | - |
mv | 移动/重命名文件 | - |
网关配置示例
kotlin
@Bean
@ServiceActivator(inputChannel = "smbChannel")
fun handler(): MessageHandler {
return SmbOutboundGateway(smbSessionFactory(), "'my_remote_dir/'").apply {
outputChannelName = "replyChannel"
}
}
部分成功处理
kotlin
// 处理mget/mput部分成功场景
try {
gateway.mget("remoteDir/*")
} catch (e: PartialSuccessException) {
println("成功传输的文件: ${e.partialResults}")
println("失败的文件: ${e.derivedInput - e.partialResults}")
}
最佳实践与常见问题
性能优化技巧
- 连接池配置:使用
CachingSessionFactory
减少连接创建开销 - 批量处理:设置
maxFetchSize
控制每次获取文件数量 - 文件过滤:使用
SmbRegexPatternFileListFilter
减少不必要传输
常见问题解决
连接超时问题
症状:频繁出现连接超时异常
解决方案:
kotlin
@Bean
fun smbSessionFactory(): SmbSessionFactory {
return SmbSessionFactory().apply {
// ...
setTimeout(30000) // 增加超时时间至30秒
}
}
文件锁定问题
症状:无法删除或覆盖正在使用的文件
解决方案:
kotlin
Smb.outboundAdapter(smbSessionFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(true) // 使用临时文件
.renameExpression("headers['file_remoteFile']") // 完成后重命名
集群环境注意事项
CAUTION
在集群环境中部署时:
- 设置
maxFetchSize=1
避免单个节点获取所有文件 - 使用共享的
MetadataStore
实现分布式文件跟踪 - 为每个节点配置唯一的工作目录
远程文件信息处理
获取文件元数据
kotlin
@Transformer(inputChannel = "fileInfoChannel")
fun processFileInfo(message: Message<*>) {
val remoteHost = message.headers[FileHeaders.REMOTE_HOST_PORT]
val remoteDir = message.headers[FileHeaders.REMOTE_DIRECTORY]
val remoteFile = message.headers[FileHeaders.REMOTE_FILE]
println("从 $remoteHost 接收文件: $remoteDir/$remoteFile")
}
元数据存储配置
kotlin
@Bean
fun metadataStore(): MetadataStore {
return RedisMetadataStore(redisConnectionFactory) // 使用Redis共享存储
}
@Bean
fun smbInboundFileSynchronizer(): SmbInboundFileSynchronizer {
return SmbInboundFileSynchronizer(smbSessionFactory()).apply {
setMetadataStore(metadataStore())
metadataStorePrefix = "smbSync:" // 设置唯一前缀
}
}
总结
Spring Integration 的 SMB 支持提供了强大而灵活的文件传输能力。通过本教程,您已经学习到:
✅ 核心组件配置:会话工厂、入站/出站适配器、网关
✅ 最佳实践:连接池管理、集群配置、错误处理
✅ 高级技巧:流式处理、元数据管理、性能优化
下一步学习建议
- 尝试在Spring Boot应用中集成SMB支持
- 探索与Spring Cloud Stream的集成方案
- 实现自定义文件过滤器处理复杂业务场景