Skip to content

Spring Integration JDBC 消息存储全面指南

概述

JDBC 消息存储是 Spring Integration 中核心持久化机制,提供两种实现:

  • JdbcMessageStore:适用于聚合器Claim Check模式
  • JdbcChannelMessageStore:专为消息通道优化,提供更高性能

数据库初始化

在开始前需初始化数据库表结构

初始化步骤

  1. spring-integration-jdbc JAR 中获取 SQL 脚本
    • 路径:org.springframework.integration.jdbc
  2. 使用 Spring JDBC 初始化器执行脚本
kotlin
@Bean
fun dataSourceInitializer(dataSource: DataSource): DataSourceInitializer {
    val initializer = DataSourceInitializer()
    initializer.setDataSource(dataSource)
    
    val schema = ClassPathResource("org/springframework/integration/jdbc/schema-h2.sql")
    initializer.setDatabasePopulator(ResourceDatabasePopulator(schema))
    
    return initializer
}

IMPORTANT

从版本 6.2 开始,所有 JDBC 存储组件在启动时会自动检查表是否存在,可通过 setCheckDatabaseOnStart(false) 禁用该检查

通用消息存储 (JdbcMessageStore)

适用于聚合器持久化Claim Check模式

基本配置

kotlin
@Configuration
class JdbcStoreConfig {

    @Bean
    fun messageStore(dataSource: DataSource): JdbcMessageStore {
        return JdbcMessageStore(dataSource).apply {
            tablePrefix = "MY_INT_" // 自定义表前缀
        }
    }
}

在聚合器中使用

kotlin
@Bean
fun aggregator() = AggregatingMessageHandler(
    DefaultAggregatingMessageGroupProcessor(),
    JdbcMessageStore(dataSource).apply {
        setGroupTimeout(5000)
        setExpireGroupsUponCompletion(true)
    }
)

通道消息存储 (JdbcChannelMessageStore)

专为消息通道优化,支持队列和优先级通道

基本配置

kotlin
@Bean
fun channelMessageStore(dataSource: DataSource): JdbcChannelMessageStore {
    return JdbcChannelMessageStore(dataSource).apply {
        // 必须设置数据库特定的查询提供器
        setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
        region = "ORDER_PROCESSING" // 数据分区标识
    }
}

@Bean
fun orderChannel(): PollableChannel {
    return MessageChannels.queue(channelMessageStore(dataSource), "orders").get()
}

支持的数据库

数据库查询提供器类
PostgreSQLPostgresChannelMessageStoreQueryProvider
MySQLMysqlChannelMessageStoreQueryProvider
OracleOracleChannelMessageStoreQueryProvider
H2H2ChannelMessageStoreQueryProvider

TIP

若使用未列出的数据库,需实现 ChannelMessageStoreQueryProvider 接口提供自定义查询

高级功能

自定义消息插入

可覆盖默认的序列化策略,如存储为 JSON:

kotlin
class JsonPreparedStatementSetter : ChannelMessageStorePreparedStatementSetter() {

    override fun setValues(
        ps: PreparedStatement, 
        message: Message<*>,
        groupId: Any, 
        region: String,
        priorityEnabled: Boolean
    ) {
        super.setValues(ps, message, groupId, region, priorityEnabled)
        // 存储为 JSON 字符串
        ps.setString(6, objectMapper.writeValueAsString(message.payload))
    }
}

// 配置使用自定义插入器
@Bean
fun customStore(dataSource: DataSource): JdbcChannelMessageStore {
    return JdbcChannelMessageStore(dataSource).apply {
        setPreparedStatementSetter(JsonPreparedStatementSetter())
    }
}

并发轮询优化

当使用多线程轮询时:

kotlin
@Configuration
class ConcurrentPollingConfig {

    @Bean
    fun syncFactory() = DefaultTransactionSynchronizationFactory().apply {
        setAfterCommitExpression("""
            @store.removeFromIdCache(headers.id.toString())
        """.trimIndent().toSpelExpression())
    }

    @Bean
    fun store(dataSource: DataSource) = JdbcChannelMessageStore(dataSource).apply {
        setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
        region = "TX_TIMEOUT"
        isUsingIdCache = true // 启用ID缓存
    }

    @Bean
    fun inputChannel(store: JdbcChannelMessageStore) = MessageChannels.queue(store, "input").get()

    @Bean
    fun pollingBridge(inputChannel: PollableChannel, outputChannel: MessageChannel) {
        return IntegrationFlow.from(
            MessageChannels.queue("inputChannel", store)
        ).bridge { spec ->
            spec.poller(Pollers.fixedDelay(500)
                .transactional(TransactionInterceptor(transactionManager))
        }.channel(outputChannel)
        .get()
    }
}

并发注意事项

对于不支持 MVCC 的数据库(如 Derby),必须启用 ID 缓存 (usingIdCache=true) 以避免重复消费问题

优先级通道

启用优先级支持:

kotlin
@Bean
fun priorityStore(dataSource: DataSource): JdbcChannelMessageStore {
    return JdbcChannelMessageStore(dataSource).apply {
        setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
        isPriorityEnabled = true // 启用优先级
    }
}

@Bean
fun priorityChannel(): PollableChannel {
    return MessageChannels.priorityQueue(priorityStore(dataSource), "urgent").get()
}

CAUTION

避免在同一个 JdbcChannelMessageStore 实例中混用优先级和非优先级通道

PostgreSQL 推送通知

利用 PostgreSQL 的 LISTEN/NOTIFY 机制实现实时消息推送

配置步骤

  1. 创建数据库触发器 (参考 schema-postgresql.sql)
  2. 配置消息订阅器
kotlin
@Bean
fun postgresSubscriber(
    @Value("\${spring.datasource.url}") url: String,
    @Value("\${spring.datasource.username}") user: String,
    @Value("\${spring.datasource.password}") password: String
): PostgresChannelMessageTableSubscriber {
    return PostgresChannelMessageTableSubscriber {
        DriverManager.getConnection(url, user, password)
            .unwrap(PgConnection::class.java)
    }
}

@Bean
fun jdbcChannelStore(dataSource: DataSource) = JdbcChannelMessageStore(dataSource).apply {
    setChannelMessageStoreQueryProvider(PostgresChannelMessageStoreQueryProvider())
}

@Bean
fun subscribableChannel(
    store: JdbcChannelMessageStore,
    subscriber: PostgresChannelMessageTableSubscriber
): PostgresSubscribableChannel {
    return PostgresSubscribableChannel(store, "notifications", subscriber).apply {
        setTransactionManager(transactionManager) // 启用事务支持
        setRetryTemplate(RetryTemplate()) // 配置重试机制
    }
}

连接注意事项

PostgresChannelMessageTableSubscriber独占一个数据库连接,请勿使用连接池提供的连接

最佳实践

消息存储分区

通过 region 属性实现逻辑分区

kotlin
@Bean
fun regionStore(dataSource: DataSource): JdbcMessageStore {
    return JdbcMessageStore(dataSource).apply {
        region = "REGION_A" // 分区标识
    }
}

性能优化建议

  1. 为常用查询字段添加索引:
    sql
    CREATE INDEX INT_CHANNEL_MSG_DATE_IDX 
    ON INT_CHANNEL_MESSAGE (CREATED_DATE, REGION, GROUP_KEY);
  2. 避免使用数据库作为队列的替代品(考虑使用 JMS/AMQP)
  3. 定期清理已完成的消息组
kotlin
@Scheduled(fixedRate = 3600000)
fun cleanCompletedGroups() {
    (messageStore as MessageGroupStore).apply {
        messageGroups.forEach { group ->
            if (group.isComplete) {
                removeMessageGroup(group.groupId)
            }
        }
    }
}

常见问题解决

错误:消息重复消费

解决方案

  1. 确认数据库支持 MVCC
  2. 设置 usingIdCache=true
  3. 配置事务同步工厂

错误:启动时表不存在

解决方案

kotlin
@Bean
fun messageStore(dataSource: DataSource) = JdbcMessageStore(dataSource).apply {
    setCheckDatabaseOnStart(false) // 禁用启动检查
}

优先级通道不生效

确认事项

  1. priorityEnabled=true 已设置
  2. 消息包含 priority 头部
    kotlin
    MessageBuilder.withPayload(data)
        .setHeader("priority", 9) 
        .build()

通过本指南,您应能高效配置和使用 Spring Integration JDBC 消息存储,实现可靠的消息持久化和通道管理