Appearance
Spring Integration JDBC 消息存储全面指南
概述
JDBC 消息存储是 Spring Integration 中核心持久化机制,提供两种实现:
JdbcMessageStore
:适用于聚合器和Claim Check模式JdbcChannelMessageStore
:专为消息通道优化,提供更高性能
数据库初始化
在开始前需初始化数据库表结构
初始化步骤
- 从
spring-integration-jdbc
JAR 中获取 SQL 脚本- 路径:
org.springframework.integration.jdbc
- 路径:
- 使用 Spring JDBC 初始化器执行脚本
kotlin
@Bean
fun dataSourceInitializer(dataSource: DataSource): DataSourceInitializer {
val initializer = DataSourceInitializer()
initializer.setDataSource(dataSource)
val schema = ClassPathResource("org/springframework/integration/jdbc/schema-h2.sql")
initializer.setDatabasePopulator(ResourceDatabasePopulator(schema))
return initializer
}
IMPORTANT
从版本 6.2 开始,所有 JDBC 存储组件在启动时会自动检查表是否存在,可通过 setCheckDatabaseOnStart(false)
禁用该检查
通用消息存储 (JdbcMessageStore)
适用于聚合器持久化和Claim Check模式
基本配置
kotlin
@Configuration
class JdbcStoreConfig {
@Bean
fun messageStore(dataSource: DataSource): JdbcMessageStore {
return JdbcMessageStore(dataSource).apply {
tablePrefix = "MY_INT_" // 自定义表前缀
}
}
}
在聚合器中使用
kotlin
@Bean
fun aggregator() = AggregatingMessageHandler(
DefaultAggregatingMessageGroupProcessor(),
JdbcMessageStore(dataSource).apply {
setGroupTimeout(5000)
setExpireGroupsUponCompletion(true)
}
)
通道消息存储 (JdbcChannelMessageStore)
专为消息通道优化,支持队列和优先级通道
基本配置
kotlin
@Bean
fun channelMessageStore(dataSource: DataSource): JdbcChannelMessageStore {
return JdbcChannelMessageStore(dataSource).apply {
// 必须设置数据库特定的查询提供器
setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
region = "ORDER_PROCESSING" // 数据分区标识
}
}
@Bean
fun orderChannel(): PollableChannel {
return MessageChannels.queue(channelMessageStore(dataSource), "orders").get()
}
支持的数据库
数据库 | 查询提供器类 |
---|---|
PostgreSQL | PostgresChannelMessageStoreQueryProvider |
MySQL | MysqlChannelMessageStoreQueryProvider |
Oracle | OracleChannelMessageStoreQueryProvider |
H2 | H2ChannelMessageStoreQueryProvider |
TIP
若使用未列出的数据库,需实现 ChannelMessageStoreQueryProvider
接口提供自定义查询
高级功能
自定义消息插入
可覆盖默认的序列化策略,如存储为 JSON:
kotlin
class JsonPreparedStatementSetter : ChannelMessageStorePreparedStatementSetter() {
override fun setValues(
ps: PreparedStatement,
message: Message<*>,
groupId: Any,
region: String,
priorityEnabled: Boolean
) {
super.setValues(ps, message, groupId, region, priorityEnabled)
// 存储为 JSON 字符串
ps.setString(6, objectMapper.writeValueAsString(message.payload))
}
}
// 配置使用自定义插入器
@Bean
fun customStore(dataSource: DataSource): JdbcChannelMessageStore {
return JdbcChannelMessageStore(dataSource).apply {
setPreparedStatementSetter(JsonPreparedStatementSetter())
}
}
并发轮询优化
当使用多线程轮询时:
kotlin
@Configuration
class ConcurrentPollingConfig {
@Bean
fun syncFactory() = DefaultTransactionSynchronizationFactory().apply {
setAfterCommitExpression("""
@store.removeFromIdCache(headers.id.toString())
""".trimIndent().toSpelExpression())
}
@Bean
fun store(dataSource: DataSource) = JdbcChannelMessageStore(dataSource).apply {
setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
region = "TX_TIMEOUT"
isUsingIdCache = true // 启用ID缓存
}
@Bean
fun inputChannel(store: JdbcChannelMessageStore) = MessageChannels.queue(store, "input").get()
@Bean
fun pollingBridge(inputChannel: PollableChannel, outputChannel: MessageChannel) {
return IntegrationFlow.from(
MessageChannels.queue("inputChannel", store)
).bridge { spec ->
spec.poller(Pollers.fixedDelay(500)
.transactional(TransactionInterceptor(transactionManager))
}.channel(outputChannel)
.get()
}
}
并发注意事项
对于不支持 MVCC 的数据库(如 Derby),必须启用 ID 缓存 (usingIdCache=true
) 以避免重复消费问题
优先级通道
启用优先级支持:
kotlin
@Bean
fun priorityStore(dataSource: DataSource): JdbcChannelMessageStore {
return JdbcChannelMessageStore(dataSource).apply {
setChannelMessageStoreQueryProvider(H2ChannelMessageStoreQueryProvider())
isPriorityEnabled = true // 启用优先级
}
}
@Bean
fun priorityChannel(): PollableChannel {
return MessageChannels.priorityQueue(priorityStore(dataSource), "urgent").get()
}
CAUTION
避免在同一个 JdbcChannelMessageStore
实例中混用优先级和非优先级通道
PostgreSQL 推送通知
利用 PostgreSQL 的 LISTEN/NOTIFY 机制实现实时消息推送
配置步骤
- 创建数据库触发器 (参考
schema-postgresql.sql
) - 配置消息订阅器
kotlin
@Bean
fun postgresSubscriber(
@Value("\${spring.datasource.url}") url: String,
@Value("\${spring.datasource.username}") user: String,
@Value("\${spring.datasource.password}") password: String
): PostgresChannelMessageTableSubscriber {
return PostgresChannelMessageTableSubscriber {
DriverManager.getConnection(url, user, password)
.unwrap(PgConnection::class.java)
}
}
@Bean
fun jdbcChannelStore(dataSource: DataSource) = JdbcChannelMessageStore(dataSource).apply {
setChannelMessageStoreQueryProvider(PostgresChannelMessageStoreQueryProvider())
}
@Bean
fun subscribableChannel(
store: JdbcChannelMessageStore,
subscriber: PostgresChannelMessageTableSubscriber
): PostgresSubscribableChannel {
return PostgresSubscribableChannel(store, "notifications", subscriber).apply {
setTransactionManager(transactionManager) // 启用事务支持
setRetryTemplate(RetryTemplate()) // 配置重试机制
}
}
连接注意事项
PostgresChannelMessageTableSubscriber
会独占一个数据库连接,请勿使用连接池提供的连接
最佳实践
消息存储分区
通过 region
属性实现逻辑分区:
kotlin
@Bean
fun regionStore(dataSource: DataSource): JdbcMessageStore {
return JdbcMessageStore(dataSource).apply {
region = "REGION_A" // 分区标识
}
}
性能优化建议
- 为常用查询字段添加索引:sql
CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE, REGION, GROUP_KEY);
- 避免使用数据库作为队列的替代品(考虑使用 JMS/AMQP)
- 定期清理已完成的消息组
kotlin
@Scheduled(fixedRate = 3600000)
fun cleanCompletedGroups() {
(messageStore as MessageGroupStore).apply {
messageGroups.forEach { group ->
if (group.isComplete) {
removeMessageGroup(group.groupId)
}
}
}
}
常见问题解决
错误:消息重复消费
解决方案:
- 确认数据库支持 MVCC
- 设置
usingIdCache=true
- 配置事务同步工厂
错误:启动时表不存在
解决方案:
kotlin
@Bean
fun messageStore(dataSource: DataSource) = JdbcMessageStore(dataSource).apply {
setCheckDatabaseOnStart(false) // 禁用启动检查
}
优先级通道不生效
确认事项:
priorityEnabled=true
已设置- 消息包含
priority
头部kotlinMessageBuilder.withPayload(data) .setHeader("priority", 9) .build()
通过本指南,您应能高效配置和使用 Spring Integration JDBC 消息存储,实现可靠的消息持久化和通道管理。