Skip to content

Spring Integration FTP 输出通道适配器教程

概述

FTP Outbound Channel Adapter 是 Spring Integration 的核心组件,用于将本地文件传输到远程 FTP 服务器。它充当消息处理程序,监听输入通道,当接收到包含文件的消息时,自动执行 FTP 传输操作。

核心功能

  • ✅ 自动创建远程目录结构
  • ⚡️ 支持多种文件表示形式(File, byte[], String 等)
  • 🔄 提供文件覆盖策略控制
  • 🛡️ 防止部分写入文件的安全机制
  • 🔐 支持文件权限设置(chmod)

支持的 Payload 类型

适配器支持以下类型的消息负载:

负载类型描述适用场景
java.io.File文件对象本地文件系统已有文件
byte[]字节数组动态生成的文件内容
String文本内容文本文件传输
InputStream输入流大文件或流式传输
ResourceSpring 资源类路径资源等

基础配置(Kotlin DSL)

依赖配置

kotlin
// build.gradle.kts
dependencies {
    implementation("org.springframework.integration:spring-integration-ftp:6.2.1")
    implementation("org.springframework.boot:spring-boot-starter-integration")
}

基础配置示例

kotlin
@Configuration
@EnableIntegration
class FtpConfig {

    // 1. 创建FTP会话工厂
    @Bean
    fun ftpSessionFactory(): SessionFactory<FTPFile> {
        return DefaultFtpSessionFactory().apply {
            host = "ftp.example.com"
            port = 21
            username = "user"
            password = "pass"
            // [!code tip: 启用会话测试确保连接有效]
            isTestSession = true
        }.let { CachingSessionFactory(it) }
    }

    // 2. 配置FTP输出适配器
    @Bean
    fun ftpOutboundFlow(): IntegrationFlow {
        return IntegrationFlow.from("ftpChannel")
            .handle(
                Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.REPLACE)
                    .useTemporaryFileName(true)  // 启用临时文件机制
                    .remoteDirectory("/uploads") // 远程目录
                    .fileNameGenerator { message ->

                        message.headers["file_name"]?.toString() ?: "default.txt"
                    }
                    .chmod(600)  // 设置文件权限
            ).get()
    }
}

关键配置详解

文件存在处理模式

kotlin
ileExistsMode 枚举选项
enum class FileExistsMode {
    REPLACE,         // 默认:覆盖现有文件
    REPLACE_IF_MODIFIED, // 仅当文件修改时间不同时覆盖
    APPEND,          // 追加到现有文件
    APPEND_NO_FLUSH, // 追加但不立即刷新
    IGNORE,          // 忽略传输(静默)
    FAIL             // 抛出异常
}

// 使用示例
Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)

防止部分写入文件

CAUTION

文件传输中断可能导致部分写入的损坏文件。强烈建议启用临时文件机制

kotlin
Ftp.outboundAdapter(ftpSessionFactory())
    .useTemporaryFileName(true) // 启用临时文件
    .temporaryFileSuffix(".tmp") // 自定义临时文件后缀
配置项默认值说明
useTemporaryFileNametrue使用临时文件机制
temporaryFileSuffix.writing临时文件后缀
autoCreateDirectorytrue自动创建目录

文件权限设置

kotlin
Ftp.outboundAdapter(ftpSessionFactory())
    .chmod(600)  // 设置文件权限为 rw-------

> `chmod` 设置仅在 FTP 服务器支持 `SITE CHMOD` 命令时生效

完整应用示例

网关接口定义

kotlin
@MessagingGateway
interface FtpUploadGateway {

    @Gateway(requestChannel = "ftpUploadChannel")
    fun uploadFile(
        @Payload file: File,
        @Header("remote_dir") remoteDir: String
    )
}

服务激活器配置

kotlin
@Bean
@ServiceActivator(inputChannel = "ftpUploadChannel")
fun ftpMessageHandler(): MessageHandler {
    return FtpMessageHandler(ftpSessionFactory()).apply {

        remoteDirectoryExpression = SpelExpressionParser()
            .parseExpression("headers['remote_dir']")


        autoCreateDirectory = true

        // 设置文件名生成器
        fileNameGenerator = object : FileNameGenerator {
            override fun generateFileName(message: Message<*>): String {
                val file = message.payload as File
                return "processed_${file.name}"
            }
        }
    }
}

文件传输时序图

高级配置技巧

动态目录配置

kotlin
Ftp.outboundAdapter(ftpSessionFactory())
    .remoteDirectoryExpression("headers['remote_dir'] ?: '/default'")
    .temporaryRemoteDirectoryExpression("headers['temp_dir'] ?: '/tmp'")

多文件类型处理

kotlin
@Bean
fun fileUploadFlow(): IntegrationFlow {
    return IntegrationFlow.from("fileUploadChannel")
        .handle(Ftp.outboundAdapter(ftpSessionFactory()))
}
kotlin
@Bean
fun byteArrayUploadFlow(): IntegrationFlow {
    return IntegrationFlow.from("byteUploadChannel")
        .transform<ByteArray, File> { bytes ->
            // 将字节数组转换为临时文件
            val tempFile = File.createTempFile("upload", ".tmp")
            Files.write(tempFile.toPath(), bytes)
            tempFile
        }
        .handle(Ftp.outboundAdapter(ftpSessionFactory()))
}

常见问题解决

文件权限问题

WARNING

如果遇到权限错误,请检查:

  1. FTP 用户是否有目标目录写权限
  2. 服务器是否支持 SITE CHMOD 命令
  3. chmod 值是否为有效的八进制数

目录创建失败

kotlin
Ftp.outboundAdapter(ftpSessionFactory())
    .autoCreateDirectory(true)
    .remoteDirectory("/uploads/") // [!code ++: 添加尾部斜杠解决某些FTP服务器问题]

连接池配置

kotlin
@Bean
fun ftpSessionFactory(): SessionFactory<FTPFile> {
    val factory = DefaultFtpSessionFactory().apply {
        // ... 基础配置
    }

    return CachingSessionFactory(factory).apply {
        // [!code tip: 配置连接池大小]
        poolSize = 10
        sessionWaitTimeout = 5000
    }
}

最佳实践建议

  1. 启用临时文件机制 - 防止部分写入文件
  2. 配置合理超时 - 避免连接长时间挂起
    kotlin
    factory.controlTimeout = Duration.ofSeconds(30)
    factory.dataTimeout = Duration.ofSeconds(60)
  3. 使用连接池 - 提高性能和资源利用率
  4. 启用日志记录 - 监控文件传输状态
    properties
    logging.level.org.springframework.integration.ftp=DEBUG
  5. 添加错误处理 - 使用错误通道捕获异常
    kotlin
    .handle(Ftp.outboundAdapter(...),
        ConsumerEndpointSpec<FtpMessageHandler>().apply {
            errorChannel = "ftpErrorChannel"
        })

通过本教程,您应该能够掌握 Spring Integration FTP Outbound Channel Adapter 的核心配置和使用方法。实际应用中,请根据具体需求调整配置参数,并结合监控和日志确保文件传输的可靠性。