Skip to content

Spring Integration MongoDB 支持教程

引言

Spring Integration 的 MongoDB 支持提供了强大的集成能力,允许开发者轻松连接 MongoDB 文档数据库并实现消息驱动架构。本教程将循序渐进地讲解如何在 Spring 应用中使用 MongoDB 集成功能,特别适合 Spring 初学者。我们将使用 Kotlin 语言和注解配置方式,避免 XML 配置,采用现代 Spring 最佳实践。

MongoDB 是什么?

MongoDB 是一个高性能、开源的文档导向数据库,以 JSON-like 格式存储数据,非常适合处理半结构化数据和高扩展性场景。

kotlin

// 添加 Spring Integration MongoDB 依赖
dependencies {
    implementation("org.springframework.integration:spring-integration-mongodb:6.5.1")
}

一、连接 MongoDB

1.1 阻塞式 vs 响应式连接

Spring Integration 支持两种 MongoDB 连接方式:

阻塞式连接(适合传统应用)

kotlin

import org.springframework.data.mongodb.core.SimpleMongoClientDatabaseFactory

@Configuration
class MongoConfig {
    @Bean
    fun mongoDbFactory(): MongoDatabaseFactory {
        return SimpleMongoClientDatabaseFactory(
            com.mongodb.client.MongoClients.create(),
            "test"
        )
    }
}

响应式连接(适合高并发应用)

kotlin

import org.springframework.data.mongodb.core.ReactiveMongoDatabaseFactory
import org.springframework.data.mongodb.core.SimpleReactiveMongoDatabaseFactory

@Configuration
class ReactiveMongoConfig {
    @Bean
    fun reactiveMongoDbFactory(): ReactiveMongoDatabaseFactory {
        return SimpleReactiveMongoDatabaseFactory(
            com.mongodb.reactivestreams.client.MongoClients.create(),
            "test"
        )
    }
}

TIP

选择建议:

  • 使用阻塞式连接处理传统同步任务
  • 使用响应式连接处理高并发、低延迟需求

二、MongoDB 消息存储

2.1 消息存储基础

消息存储(Message Store)用于持久化消息,在需要缓冲消息的组件中特别有用(如队列通道、聚合器等)。

kotlin

@Configuration
class MessageStoreConfig {
    @Bean
    fun mongoDbMessageStore(factory: MongoDatabaseFactory): MongoDbMessageStore {
        return MongoDbMessageStore(factory)
    }

    @Bean
    fun persistentChannel(store: MongoDbMessageStore): MessageChannel {
        return MessageChannels.queue(store).get()
    }
}

2.2 优先级通道存储

支持优先级队列的消息存储:

kotlin

@Configuration
class PriorityStoreConfig {
    @Bean
    fun channelStore(factory: MongoDatabaseFactory): MongoDbChannelMessageStore {
        return MongoDbChannelMessageStore(factory)
    }

    @Bean
    fun priorityStore(channelStore: MongoDbChannelMessageStore): MongoDbChannelMessageStore {
        channelStore.isPriorityEnabled = true
        return channelStore
    }
}

重要限制

MongoDbMessageStore 在存储消息时使用自定义映射转换器,对消息头和负载的数据类型有以下限制:

  • 不支持复杂嵌套对象
  • 二进制数据需要特殊处理

2.3 元数据存储

用于跨应用重启维护状态:

kotlin

@Bean
fun metadataStore(factory: MongoDatabaseFactory): MetadataStore {
    return MongoDbMetadataStore(factory, "integrationMetadata")
}

NOTE

元数据存储常用于适配器状态管理:

  • 文件适配器记录最后读取位置
  • Feed 适配器跟踪最新条目

三、入站通道适配器

3.1 基本配置

从 MongoDB 读取数据并发送到通道:

kotlin

@Bean
fun mongoInboundFlow(factory: MongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from(
        MongoDb.inboundChannelAdapter(factory)
            .query("{ name: 'Bob' }")
            .entityClass(Person::class.java),
        { c -> c.poller(Pollers.fixedRate(1000)) }
    ).channel("outputChannel")
     .get()
}

3.2 事务处理

读取后自动删除文档的完整示例:

kotlin

@Bean
fun transactionalInboundFlow(factory: MongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from(
        MongoDb.inboundChannelAdapter(factory)
            .queryExpression("new BasicQuery('{''name'':''Bob''}').limit(100)")
            .entityClass(Person::class.java),
        { c ->
            c.poller(Pollers.fixedRate(200)
                .transactional(transactionManager())
        }
    ).handle { payload, _ ->
        // 处理消息
        documentCleaner().remove(payload)
    }.get()
}

@Bean
fun documentCleaner() = DocumentCleaner()

class DocumentCleaner {
    fun remove(operations: MongoOperations, target: Any) {
        // 删除处理过的文档
    }
}
事务同步机制
kotlin
@Bean
fun syncFactory(): TransactionSynchronizationFactory {
    return DefaultTransactionSynchronizationFactory().apply {
        setAfterCommitExpression(
            ExpressionParser().parseExpression(
                "@documentCleaner.remove(#mongoTemplate, payload)"
            )
        )
    }
}

四、变更流入站适配器

监听 MongoDB 变更流(Change Stream):

kotlin

@Bean
fun changeStreamFlow(template: ReactiveMongoTemplate): IntegrationFlow {
    return IntegrationFlow.from(
        MongoDb.changeStreamInboundChannelAdapter(template)
            .domainType(Person::class.java)
            .collection("person"),
        { c -> c.autoStartup(true) }
    ).channel(MessageChannels.flux())
     .get()
}

IMPORTANT

变更流适配器特点:

  • 实时响应数据库变化
  • 基于响应式编程模型
  • 自动处理断线重连

五、出站通道适配器

将消息写入 MongoDB:

kotlin

@Bean
fun mongoOutboundFlow(factory: MongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow { flow ->
        flow.handle(
            MongoDb.outboundChannelAdapter(factory)
                .collectionName("orders")
        )
    }
}

六、出站网关

查询 MongoDB 并返回结果:

kotlin

@Bean
fun singleQueryGateway(factory: MongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from("queryChannel")
        .handle(
            MongoDb.outboundGateway(factory, mongoConverter())
                .query("{ status: 'PENDING' }")
                .expectSingleResult(true)
                .entityClass(Order::class.java)
        )
        .channel("resultChannel")
        .get()
}
kotlin

@Bean
fun countOperationGateway(factory: MongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from("countChannel")
        .handle(
            MongoDb.outboundGateway(factory)
                .collectionCallback { collection, _ ->
                    collection.countDocuments()
                }
        )
        .get()
}

七、响应式适配器

7.1 响应式出站适配器

kotlin

@Bean
fun reactiveOutboundFlow(factory: ReactiveMongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from(MessageChannels.flux())
        .handle(MongoDb.reactiveOutboundChannelAdapter(factory))
        .get()
}

7.2 响应式入站适配器

kotlin

@Bean
fun reactiveInboundFlow(factory: ReactiveMongoDatabaseFactory): IntegrationFlow {
    return IntegrationFlow.from(
        MongoDb.reactiveInboundChannelAdapter(factory, "{'status':'ACTIVE'}")
            .entityClass(User::class.java),
        { c -> c.poller(Pollers.fixedDelay(500)) }
    )
    .split()
    .channel { c -> c.flux("reactiveOutput") }
    .get()
}

常见问题解决

Q1: 连接超时问题

kotlin
// 确保 MongoDB 服务器可访问
// 检查防火墙设置
mongoDbFactory.setWriteConcern(WriteConcern.ACKNOWLEDGED)

Q2: 序列化错误

kotlin
// 实体类必须有无参构造函数
data class Person(val id: String, val name: String) {
    constructor() : this("", "") // 添加无参构造
}

Q3: 性能优化

kotlin

// 批量处理提高性能
MongoDb.inboundChannelAdapter(factory)
    .maxDocuments(100) // 每次拉取100条

总结

通过本教程,您已掌握 Spring Integration 的 MongoDB 集成核心功能:

  1. 连接管理:阻塞式和响应式连接配置
  2. 消息存储:持久化消息和元数据管理
  3. 入站适配器:轮询查询和变更流监听
  4. 出站操作:数据写入和查询网关
  5. 响应式支持:非阻塞 IO 处理

最佳实践建议

  1. 优先使用响应式适配器处理高并发场景
  2. 实体类保持无参构造函数
  3. 批量操作提升数据库交互效率
  4. 使用变更流替代轮询获取实时数据

下一步学习:

  • Spring Data MongoDB 高级查询技巧
  • 响应式编程深入实践
  • 分布式事务管理方案