Appearance
Spring Integration FTP 输出通道适配器教程
概述
FTP Outbound Channel Adapter 是 Spring Integration 的核心组件,用于将本地文件传输到远程 FTP 服务器。它充当消息处理程序,监听输入通道,当接收到包含文件的消息时,自动执行 FTP 传输操作。
核心功能
- ✅ 自动创建远程目录结构
- ⚡️ 支持多种文件表示形式(File, byte[], String 等)
- 🔄 提供文件覆盖策略控制
- 🛡️ 防止部分写入文件的安全机制
- 🔐 支持文件权限设置(chmod)
支持的 Payload 类型
适配器支持以下类型的消息负载:
负载类型 | 描述 | 适用场景 |
---|---|---|
java.io.File | 文件对象 | 本地文件系统已有文件 |
byte[] | 字节数组 | 动态生成的文件内容 |
String | 文本内容 | 文本文件传输 |
InputStream | 输入流 | 大文件或流式传输 |
Resource | Spring 资源 | 类路径资源等 |
基础配置(Kotlin DSL)
依赖配置
kotlin
// build.gradle.kts
dependencies {
implementation("org.springframework.integration:spring-integration-ftp:6.2.1")
implementation("org.springframework.boot:spring-boot-starter-integration")
}
基础配置示例
kotlin
@Configuration
@EnableIntegration
class FtpConfig {
// 1. 创建FTP会话工厂
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
return DefaultFtpSessionFactory().apply {
host = "ftp.example.com"
port = 21
username = "user"
password = "pass"
// [!code tip: 启用会话测试确保连接有效]
isTestSession = true
}.let { CachingSessionFactory(it) }
}
// 2. 配置FTP输出适配器
@Bean
fun ftpOutboundFlow(): IntegrationFlow {
return IntegrationFlow.from("ftpChannel")
.handle(
Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.REPLACE)
.useTemporaryFileName(true) // 启用临时文件机制
.remoteDirectory("/uploads") // 远程目录
.fileNameGenerator { message ->
message.headers["file_name"]?.toString() ?: "default.txt"
}
.chmod(600) // 设置文件权限
).get()
}
}
关键配置详解
文件存在处理模式
kotlin
ileExistsMode 枚举选项
enum class FileExistsMode {
REPLACE, // 默认:覆盖现有文件
REPLACE_IF_MODIFIED, // 仅当文件修改时间不同时覆盖
APPEND, // 追加到现有文件
APPEND_NO_FLUSH, // 追加但不立即刷新
IGNORE, // 忽略传输(静默)
FAIL // 抛出异常
}
// 使用示例
Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
防止部分写入文件
CAUTION
文件传输中断可能导致部分写入的损坏文件。强烈建议启用临时文件机制
kotlin
Ftp.outboundAdapter(ftpSessionFactory())
.useTemporaryFileName(true) // 启用临时文件
.temporaryFileSuffix(".tmp") // 自定义临时文件后缀
配置项 | 默认值 | 说明 |
---|---|---|
useTemporaryFileName | true | 使用临时文件机制 |
temporaryFileSuffix | .writing | 临时文件后缀 |
autoCreateDirectory | true | 自动创建目录 |
文件权限设置
kotlin
Ftp.outboundAdapter(ftpSessionFactory())
.chmod(600) // 设置文件权限为 rw-------
> `chmod` 设置仅在 FTP 服务器支持 `SITE CHMOD` 命令时生效
完整应用示例
网关接口定义
kotlin
@MessagingGateway
interface FtpUploadGateway {
@Gateway(requestChannel = "ftpUploadChannel")
fun uploadFile(
@Payload file: File,
@Header("remote_dir") remoteDir: String
)
}
服务激活器配置
kotlin
@Bean
@ServiceActivator(inputChannel = "ftpUploadChannel")
fun ftpMessageHandler(): MessageHandler {
return FtpMessageHandler(ftpSessionFactory()).apply {
remoteDirectoryExpression = SpelExpressionParser()
.parseExpression("headers['remote_dir']")
autoCreateDirectory = true
// 设置文件名生成器
fileNameGenerator = object : FileNameGenerator {
override fun generateFileName(message: Message<*>): String {
val file = message.payload as File
return "processed_${file.name}"
}
}
}
}
文件传输时序图
高级配置技巧
动态目录配置
kotlin
Ftp.outboundAdapter(ftpSessionFactory())
.remoteDirectoryExpression("headers['remote_dir'] ?: '/default'")
.temporaryRemoteDirectoryExpression("headers['temp_dir'] ?: '/tmp'")
多文件类型处理
kotlin
@Bean
fun fileUploadFlow(): IntegrationFlow {
return IntegrationFlow.from("fileUploadChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory()))
}
kotlin
@Bean
fun byteArrayUploadFlow(): IntegrationFlow {
return IntegrationFlow.from("byteUploadChannel")
.transform<ByteArray, File> { bytes ->
// 将字节数组转换为临时文件
val tempFile = File.createTempFile("upload", ".tmp")
Files.write(tempFile.toPath(), bytes)
tempFile
}
.handle(Ftp.outboundAdapter(ftpSessionFactory()))
}
常见问题解决
文件权限问题
WARNING
如果遇到权限错误,请检查:
- FTP 用户是否有目标目录写权限
- 服务器是否支持
SITE CHMOD
命令 chmod
值是否为有效的八进制数
目录创建失败
kotlin
Ftp.outboundAdapter(ftpSessionFactory())
.autoCreateDirectory(true)
.remoteDirectory("/uploads/") // [!code ++: 添加尾部斜杠解决某些FTP服务器问题]
连接池配置
kotlin
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
val factory = DefaultFtpSessionFactory().apply {
// ... 基础配置
}
return CachingSessionFactory(factory).apply {
// [!code tip: 配置连接池大小]
poolSize = 10
sessionWaitTimeout = 5000
}
}
最佳实践建议
- 启用临时文件机制 - 防止部分写入文件
- 配置合理超时 - 避免连接长时间挂起kotlin
factory.controlTimeout = Duration.ofSeconds(30) factory.dataTimeout = Duration.ofSeconds(60)
- 使用连接池 - 提高性能和资源利用率
- 启用日志记录 - 监控文件传输状态properties
logging.level.org.springframework.integration.ftp=DEBUG
- 添加错误处理 - 使用错误通道捕获异常kotlin
.handle(Ftp.outboundAdapter(...), ConsumerEndpointSpec<FtpMessageHandler>().apply { errorChannel = "ftpErrorChannel" })
通过本教程,您应该能够掌握 Spring Integration FTP Outbound Channel Adapter 的核心配置和使用方法。实际应用中,请根据具体需求调整配置参数,并结合监控和日志确保文件传输的可靠性。