Skip to content

Spring Integration Cassandra 支持教程

概述

Spring Integration 提供了与 Apache Cassandra 集成的组件,允许开发者通过消息通道执行 Cassandra 数据库操作。本教程将介绍如何使用这些组件,特别关注CassandraMessageHandler的使用。

TIP

Spring Integration Cassandra 基于 Spring Data for Apache Cassandra 项目,提供了反应式同步两种操作模式。

环境配置

添加依赖

build.gradle.kts 中添加依赖:

kotlin
dependencies {
    implementation("org.springframework.integration:spring-integration-cassandra:6.5.1")
}

CassandraMessageHandler 详解

核心功能

CassandraMessageHandlerAbstractReplyProducingMessageHandler 的实现,支持:

  • 单向操作(默认模式)
  • 请求-响应模式(需设置 producesReply = true
  • 同步/异步执行(默认异步)

操作类型

支持四种数据库操作:

操作类型配置方法输入数据类型
INSERTingestQueryList<List<?>>
UPDATE自动检测实体对象
DELETE自动检测实体对象
STATEMENTquerystatementExpressionStatement 对象

配置示例

kotlin
@Bean
fun cassandraFlow(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundGateway(cassandraOperations)
                .query("SELECT * FROM book WHERE author = :author LIMIT :size")
                .parameter("author", "payload")
                .parameter("size", { it.headers["limit"] })
        ) {
            async = false  // [!code warning: 设置为同步模式]
            outputChannel = MessageChannels.flux("resultChannel").get()
        }
    }
kotlin
@ServiceActivator(inputChannel = "cassandraChannel")
@Bean
fun cassandraHandler(template: ReactiveCassandraOperations): MessageHandler {
    val handler = CassandraMessageHandler(template).apply {
        query = "INSERT INTO book (isbn, title) VALUES (:isbn, :title)"
        parameterExpressions = mapOf(
            "isbn" to PARSER.parseExpression("payload.isbn"),
            "title" to PARSER.parseExpression("payload.title")
        )
        async = true  // 默认异步
        producesReply = true
    }
    return handler
}

实战示例

1. 插入数据 (INSERT)

kotlin
// 实体类定义
@Table("book")
data class Book(
    @PrimaryKey val isbn: String,
    val title: String,
    val author: String,
    val pages: Int
)

// 消息处理器配置
@Bean
fun insertHandler(template: ReactiveCassandraOperations) = 
    CassandraMessageHandler(template).apply {
        ingestQuery = """
            INSERT INTO book 
            (isbn, title, author, pages) 
            VALUES (?, ?, ?, ?)
        """.trimIndent()
    }

IMPORTANT

使用 ingestQuery 时,消息负载必须是 List<List<*>> 格式

2. 执行查询 (STATEMENT)

kotlin
@Bean
fun queryHandler(template: ReactiveCassandraOperations) = 
    CassandraMessageHandler(template).apply {
        statementExpression = """
            T(QueryBuilder).selectFrom("book")
                .all()
                .whereColumn("author")
                .isEqualTo(T(QueryBuilder).bindMarker())
            """.trimIndent()  
        parameterExpressions = mapOf(
            "author" to PARSER.parseExpression("payload")
        )
        producesReply = true
    }

3. 批量操作

kotlin
@ServiceActivator(inputChannel = "batchChannel")
@Bean
fun batchHandler(template: ReactiveCassandraOperations) = 
    CassandraMessageHandler(template).apply {
        // 自动检测操作类型
        mode = CassandraMessageHandler.Type.INSERT
    }

// 发送批量消息
fun sendBatch(books: List<Book>) {
    messagingTemplate.convertAndSend("batchChannel", books)
}

最佳实践

同步 vs 异步模式

  • 异步模式(默认):
    kotlin
    handler.async = true  // 返回 Mono<WriteResult>
  • 同步模式
    kotlin
    handler.async = false // 阻塞等待结果

WARNING

在同步模式下使用 Mono.block() 可能导致线程阻塞,在高并发场景谨慎使用

参数绑定技巧

kotlin
parameterExpressions = mapOf(
    "author" to PARSER.parseExpression("payload.author"),
    "minPages" to PARSER.parseExpression("headers['min-pages'] ?: 100")
)

反应式结果处理

kotlin
@Bean
fun resultFlow() = integrationFlow("resultChannel") {
    handle { message: Message<Flux<Book>> ->  
        message.payload.subscribe { book ->
            logger.info("Received book: ${book.title}")
        }
    }
}

常见问题解决

问题1:Unsupported parameter type [java.util.ArrayList]

CAUTION

当使用 ingestQuery 时,确保负载格式为 List<List<*>>

解决方案

kotlin
fun convertToIngestFormat(books: List<Book>): List<List<Any>> {
    return books.map { book ->
        listOf(book.isbn, book.title, book.author, book.pages)
    }
}

问题2:反应式结果未正确处理

解决方案

kotlin
// 配置正确的返回类型
handler.outputChannel = MessageChannels.flux("resultChannel").get()

// 消费者端
@ServiceActivator(inputChannel = "resultChannel")
fun handleResults(flux: Flux<Book>) {
    flux.subscribe { book -> /* 处理每本书 */ }
}

总结

Spring Integration 的 Cassandra 支持提供了强大且灵活的方式与 Cassandra 数据库交互:

  1. 支持 四种操作类型:INSERT/UPDATE/DELETE/STATEMENT
  2. 提供同步和异步两种处理模式
  3. 支持参数绑定表达式语言
  4. 完美集成 Spring 的响应式编程模型

通过本教程,您应该能够:

  • ✅ 配置 Cassandra 出站组件
  • ✅ 执行各类数据库操作
  • ✅ 正确处理同步/异步结果
  • ✅ 解决常见集成问题

TIP

实际开发中,建议优先使用反应式异步模式以获得最佳性能和资源利用率