Appearance
Spring Integration JDBC 出站通道适配器教程
引言
JDBC出站通道适配器是Spring Integration的核心组件,用于将消息转换为SQL操作。它接收消息后自动执行数据库写入操作,适用于订单处理、日志记录等场景。与XML配置不同,本教程全程使用Kotlin注解配置实现现代化开发。
一、基础用法
1.1 核心功能
1.2 最小化配置示例
kotlin
@Configuration
class JdbcConfig {
@Bean
fun outboundAdapter(dataSource: DataSource) = integrationFlow("inputChannel") {
// // 核心适配器配置
handle(JdbcMessageHandler(dataSource,
"INSERT INTO items (id, status, name) VALUES (:headers[id], 0, :payload)"))
}
}
TIP
:headers[id]
自动从消息头提取值,:payload
使用消息主体,符合Spring的命名参数规范
1.3 参数绑定原理
组件 | 作用 | 默认行为 |
---|---|---|
SqlParameterSource | 参数映射 | 自动转换消息头和负载 |
BeanPropertySqlParameterSourceFactory | 参数工厂 | 支持POJO字段映射 |
usePayloadAsParameterSource | 参数模式 | true 时整个消息作为参数源 |
二、高级参数控制
2.1 SpEL动态参数
注意
当需要动态计算参数值(如日期函数、条件判断)时,必须显式配置参数工厂
kotlin
@Bean
fun spelAdapter(dataSource: DataSource) = integrationFlow("spelChannel") {
handle(JdbcMessageHandler(dataSource,
"INSERT INTO messages (id, payload, created_date) VALUES (:id, :payload, :createdDate)").apply {
sqlParameterSourceFactory = spelSource()
}
)
}
@Bean
fun spelSource() = ExpressionEvaluatingSqlParameterSourceFactory().apply {
parameterExpressions = mapOf(
"id" to "headers['id'].toString()",
"createdDate" to "T(java.util.Date)()", // 调用构造函数
"payload" to "payload"
)
}
2.2 手动参数绑定
处理二进制数据流(如图片/文件)时需自定义回调:
kotlin
@Bean
@ServiceActivator(inputChannel = "blobChannel")
fun blobHandler(dataSource: DataSource) = JdbcMessageHandler(
dataSource,
"INSERT INTO imagedb (name, content) VALUES (?, ?)"
).apply {
// 重点:流式处理BLOB
setPreparedStatementSetter { ps, message ->
ps.setString(1, message.headers["filename"] as String)
ps.setBlob(2, (message.payload as File).inputStream())
}
}
三、批量操作优化
3.1 批量写入配置
kotlin
@Bean
fun batchAdapter(dataSource: DataSource) = integrationFlow("batchChannel") {
handle(JdbcMessageHandler(dataSource,
"INSERT INTO orders (id, amount) VALUES (:id, :amount)").apply {
setBatchSize(50) // [!code highlight] // 每批50条
}
)
}
IMPORTANT
当消息负载为List
类型时自动触发批量模式,需确保SQL语句支持批处理
3.2 性能对比
模式 | 10条耗时 | 1000条耗时 | 内存占用 |
---|---|---|---|
单条提交 | 120ms | 12s | 低 |
批量提交 | 40ms | 800ms | 中等 |
四、事务管理
4.1 同步事务控制
kotlin
@Bean
fun transactionalFlow(dataSource: DataSource) = integrationFlow("txChannel") {
handle(JdbcMessageHandler(dataSource, "UPDATE accounts SET balance = balance - :amount"),
e -> e.transactional()) // [!code highlight] // 启用事务
}
4.2 事务传播机制
kotlin
@Transactional(propagation = Propagation.REQUIRED)
fun processOrder(message: Message<Order>) {
// 数据库操作将加入当前事务
jdbcTemplate.update("INSERT INTO orders...")
}
五、常见问题解决
5.1 参数绑定错误
CAUTION
问题现象:抛出InvalidDataAccessApiUsageException
原因:参数占位符与实际参数不匹配
解决方案:
kotlin
// 启用严格模式验证参数
BeanPropertySqlParameterSourceFactory().apply {
isStrict = true
}
5.2 批量写入失效
WARNING
问题现象:批量消息被拆分为单条执行
检查清单:
- 消息负载是否为
List
或Iterable
- SQL语句是否支持批处理(无
RETURNING
等语法) - 是否错误启用了
keysGenerated
模式
六、最佳实践总结
配置选择原则:
性能优化技巧:
- 批量操作设置
batchSize=100-500
- 使用连接池(如HikariCP)
- 避免在SpEL中执行复杂计算
- 批量操作设置
事务边界建议:
kotlin// 在入口方法声明事务 @Transactional fun receiveAndProcess(message: Message) { inputChannel.send(message) }
TIP
完整示例项目可在 GitHub示例库 的jdbc-kotlin-dsl
模块找到