Appearance
Spring Integration Cassandra 支持教程
概述
Spring Integration 提供了与 Apache Cassandra 集成的组件,允许开发者通过消息通道执行 Cassandra 数据库操作。本教程将介绍如何使用这些组件,特别关注CassandraMessageHandler的使用。
TIP
Spring Integration Cassandra 基于 Spring Data for Apache Cassandra 项目,提供了反应式和同步两种操作模式。
环境配置
添加依赖
在 build.gradle.kts
中添加依赖:
kotlin
dependencies {
implementation("org.springframework.integration:spring-integration-cassandra:6.5.1")
}
CassandraMessageHandler 详解
核心功能
CassandraMessageHandler
是 AbstractReplyProducingMessageHandler
的实现,支持:
- ✅ 单向操作(默认模式)
- ✅ 请求-响应模式(需设置
producesReply = true
) - ✅ 同步/异步执行(默认异步)
操作类型
支持四种数据库操作:
操作类型 | 配置方法 | 输入数据类型 |
---|---|---|
INSERT | ingestQuery | List<List<?>> |
UPDATE | 自动检测 | 实体对象 |
DELETE | 自动检测 | 实体对象 |
STATEMENT | query 或 statementExpression | Statement 对象 |
配置示例
kotlin
@Bean
fun cassandraFlow(cassandraOperations: ReactiveCassandraOperations) =
integrationFlow {
handle(
Cassandra.outboundGateway(cassandraOperations)
.query("SELECT * FROM book WHERE author = :author LIMIT :size")
.parameter("author", "payload")
.parameter("size", { it.headers["limit"] })
) {
async = false // [!code warning: 设置为同步模式]
outputChannel = MessageChannels.flux("resultChannel").get()
}
}
kotlin
@ServiceActivator(inputChannel = "cassandraChannel")
@Bean
fun cassandraHandler(template: ReactiveCassandraOperations): MessageHandler {
val handler = CassandraMessageHandler(template).apply {
query = "INSERT INTO book (isbn, title) VALUES (:isbn, :title)"
parameterExpressions = mapOf(
"isbn" to PARSER.parseExpression("payload.isbn"),
"title" to PARSER.parseExpression("payload.title")
)
async = true // 默认异步
producesReply = true
}
return handler
}
实战示例
1. 插入数据 (INSERT)
kotlin
// 实体类定义
@Table("book")
data class Book(
@PrimaryKey val isbn: String,
val title: String,
val author: String,
val pages: Int
)
// 消息处理器配置
@Bean
fun insertHandler(template: ReactiveCassandraOperations) =
CassandraMessageHandler(template).apply {
ingestQuery = """
INSERT INTO book
(isbn, title, author, pages)
VALUES (?, ?, ?, ?)
""".trimIndent()
}
IMPORTANT
使用 ingestQuery
时,消息负载必须是 List<List<*>>
格式
2. 执行查询 (STATEMENT)
kotlin
@Bean
fun queryHandler(template: ReactiveCassandraOperations) =
CassandraMessageHandler(template).apply {
statementExpression = """
T(QueryBuilder).selectFrom("book")
.all()
.whereColumn("author")
.isEqualTo(T(QueryBuilder).bindMarker())
""".trimIndent()
parameterExpressions = mapOf(
"author" to PARSER.parseExpression("payload")
)
producesReply = true
}
3. 批量操作
kotlin
@ServiceActivator(inputChannel = "batchChannel")
@Bean
fun batchHandler(template: ReactiveCassandraOperations) =
CassandraMessageHandler(template).apply {
// 自动检测操作类型
mode = CassandraMessageHandler.Type.INSERT
}
// 发送批量消息
fun sendBatch(books: List<Book>) {
messagingTemplate.convertAndSend("batchChannel", books)
}
最佳实践
同步 vs 异步模式
- 异步模式(默认):kotlin
handler.async = true // 返回 Mono<WriteResult>
- 同步模式:kotlin
handler.async = false // 阻塞等待结果
WARNING
在同步模式下使用 Mono.block()
可能导致线程阻塞,在高并发场景谨慎使用
参数绑定技巧
kotlin
parameterExpressions = mapOf(
"author" to PARSER.parseExpression("payload.author"),
"minPages" to PARSER.parseExpression("headers['min-pages'] ?: 100")
)
反应式结果处理
kotlin
@Bean
fun resultFlow() = integrationFlow("resultChannel") {
handle { message: Message<Flux<Book>> ->
message.payload.subscribe { book ->
logger.info("Received book: ${book.title}")
}
}
}
常见问题解决
问题1:Unsupported parameter type [java.util.ArrayList]
CAUTION
当使用 ingestQuery
时,确保负载格式为 List<List<*>>
解决方案:
kotlin
fun convertToIngestFormat(books: List<Book>): List<List<Any>> {
return books.map { book ->
listOf(book.isbn, book.title, book.author, book.pages)
}
}
问题2:反应式结果未正确处理
解决方案:
kotlin
// 配置正确的返回类型
handler.outputChannel = MessageChannels.flux("resultChannel").get()
// 消费者端
@ServiceActivator(inputChannel = "resultChannel")
fun handleResults(flux: Flux<Book>) {
flux.subscribe { book -> /* 处理每本书 */ }
}
总结
Spring Integration 的 Cassandra 支持提供了强大且灵活的方式与 Cassandra 数据库交互:
- 支持 四种操作类型:INSERT/UPDATE/DELETE/STATEMENT
- 提供同步和异步两种处理模式
- 支持参数绑定和表达式语言
- 完美集成 Spring 的响应式编程模型
通过本教程,您应该能够:
- ✅ 配置 Cassandra 出站组件
- ✅ 执行各类数据库操作
- ✅ 正确处理同步/异步结果
- ✅ 解决常见集成问题
TIP
实际开发中,建议优先使用反应式异步模式以获得最佳性能和资源利用率