Appearance
Spring Integration Kafka 支持教程
1. 概述
Spring Integration Kafka 是基于 Spring for Apache Kafka 项目构建的,提供了与 Kafka 集成的组件。它简化了消息发布/订阅、请求/响应等常见消息模式。
依赖配置
kotlin
// build.gradle.kts
dependencies {
implementation("org.springframework.integration:spring-integration-kafka:6.5.1")
}
核心组件
Spring Integration Kafka 提供以下组件:
- ✅ 出站通道适配器:发布消息到 Kafka
- ✅ 消息驱动通道适配器:消费 Kafka 消息
- ✅ 入站通道适配器:轮询式消息源
- ✅ 出站网关:请求/响应生产者
- ✅ 入站网关:请求/响应消费者
- ✅ Kafka 主题通道:基于 Kafka 的持久化通道
TIP
架构类比 想象 Kafka 是邮政系统,适配器是邮局:
- 出站适配器 = 寄信窗口
- 入站适配器 = 收信箱
- 网关 = 挂号信服务(需要回执)
2. 出站通道适配器 (Outbound Channel Adapter)
功能说明
将 Spring Integration 通道的消息发布到 Kafka 主题,自动将消息转换为 Kafka 记录。
核心配置
kotlin
@Configuration
class KafkaOutboundConfig {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val props = mutableMapOf<String, Any>(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
)
return DefaultKafkaProducerFactory(props)
}
@Bean
fun kafkaTemplate() = KafkaTemplate(producerFactory())
@Bean
@ServiceActivator(inputChannel = "toKafka")
fun kafkaOutboundHandler(): MessageHandler {
return KafkaProducerMessageHandler(kafkaTemplate()).apply {
topicExpression = LiteralExpression("orders")
messageKeyExpression = SpelExpressionParser().parseExpression("headers.orderId")
}
}
}
关键特性
- 动态主题选择:通过表达式动态设置目标主题
- 消息键生成:支持基于消息头的键生成策略
- 错误处理:可配置失败/成功回调通道
WARNING
事务注意事项
kotlin
// 事务ID前缀必须唯一
kafkaTemplate.transactionIdPrefix = "tx-${UUID.randomUUID()}"
3. 消息驱动通道适配器 (Message-driven Channel Adapter)
功能说明
监听 Kafka 主题并自动将消息传递到 Spring Integration 通道。
批处理 vs 记录模式
模式 | 消息负载 | 适用场景 |
---|---|---|
记录模式 | 单个 ConsumerRecord | 逐条处理 |
批处理模式 | 多个 ConsumerRecord | 批量处理 |
配置示例
kotlin
@Bean
fun kafkaMessageDrivenAdapter(
consumerFactory: ConsumerFactory<String, String>
): KafkaMessageDrivenChannelAdapter<String, String> {
val containerProps = ContainerProperties("orders-topic")
val container = KafkaMessageListenerContainer(consumerFactory, containerProps)
return KafkaMessageDrivenChannelAdapter(container, ListenerMode.batch).apply {
outputChannel = MessageChannels.direct("processedOrders").get()
errorChannel = MessageChannels.direct("errorChannel").get()
setMessageConverter(StringJsonMessageConverter())
}
}
错误处理机制
DANGER
消费者线程安全 Consumer 对象不是线程安全的!跨线程使用时禁止直接调用其方法。
4. 入站通道适配器 (Inbound Channel Adapter)
功能说明
提供可轮询的消息源,适用于需要控制消费节奏的场景。
配置示例
kotlin
@Bean
@InboundChannelAdapter(channel = "polledMessages", poller = [Poller(fixedRate = "5000")])
fun kafkaMessageSource(consumerFactory: ConsumerFactory<String, String>): MessageSource<Any> {
val consumerProps = ConsumerProperties("orders-topic").apply {
groupId = "order-processor-group"
}
return KafkaMessageSource(consumerFactory, consumerProps).apply {
setPayloadType(Order::class.java)
}
}
关键参数
参数 | 说明 | 默认值 |
---|---|---|
max.poll.records | 每次拉取最大记录数 | 1 |
allowMultiFetch | 允许批量获取 | false |
raw-header | 保留原始头信息 | false |
TIP
性能优化 启用批量获取可提升吞吐量:
kotlin
consumerProps.allowMultiFetch = true
但需确保在 max.poll.interval.ms
内处理完所有记录!
5. 出站网关 (Outbound Gateway)
功能说明
实现请求/响应模式:发送消息到 Kafka 并等待回复。
配置流程
kotlin
@Bean
fun replyingKafkaTemplate(
producerFactory: ProducerFactory<String, String>,
replyContainer: ConcurrentMessageListenerContainer<String, String>
): ReplyingKafkaTemplate<String, String, String> {
return ReplyingKafkaTemplate(producerFactory, replyContainer).apply {
defaultReplyTimeout = Duration.ofSeconds(30)
}
}
@Bean
@ServiceActivator(inputChannel = "requests")
fun outboundGateway(template: ReplyingKafkaTemplate<String, String, String>): MessageHandler {
return KafkaProducerMessageHandler(template)
}
回复主题解析
- 检查消息头的
KafkaHeaders.REPLY_TOPIC
- 使用网关配置的默认回复主题
WARNING
超时设置
kotlin
// 设置合理的超时防止线程阻塞
defaultReplyTimeout = Duration.ofSeconds(30)
6. 入站网关 (Inbound Gateway)
功能说明
作为请求/响应服务端:消费请求消息并发送响应。
配置示例
kotlin
@Bean
fun inboundGateway(
container: ConcurrentMessageListenerContainer<String, String>,
replyTemplate: KafkaTemplate<String, String>
): KafkaInboundGateway<String, String, String> {
return KafkaInboundGateway(container, replyTemplate).apply {
requestChannel = MessageChannels.direct("requests").get()
replyTimeout = 30_000
}
}
@Bean
fun serviceFlow(): IntegrationFlow {
return IntegrationFlow.from("requests")
.transform<String, String> { it.uppercase() }
.channel("replies")
.get()
}
7. 基于 Kafka 的通道
持久化消息通道
kotlin
@Bean
fun kafkaBackedChannel(
template: KafkaTemplate<String, String>,
factory: ConcurrentKafkaListenerContainerFactory<String, String>
): SubscribableChannel {
return Kafka.channel(template, factory, "channel-topic")
.groupId("channel-group")
.get()
}
通道类型对比
通道类型 | 特点 | 适用场景 |
---|---|---|
SubscribableChannel | 发布/订阅 | 广播消息 |
PollableChannel | 轮询消费 | 流量控制 |
PublishSubscribeChannel | 多订阅者 | 并行处理 |
8. 消息转换
JSON 转换配置
kotlin
@Bean
fun messageConverter(): StringJsonMessageConverter {
return StringJsonMessageConverter().apply {
// 配置类型映射
typeMapper.addTrustedPackages("com.example.model")
}
}
// 在适配器中指定负载类型
kafkaMessageDrivenAdapter.setMessageConverter(messageConverter())
kafkaMessageDrivenAdapter.setPayloadType(Order::class.java)
9. Tombstone 记录处理
处理逻辑删除记录
kotlin
@ServiceActivator(inputChannel = "kafkaInput")
fun processOrder(
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Payload(required = false) order: Order?
) {
if (order == null) {
// 处理逻辑删除
logger.info("Received tombstone for key $key")
} else {
// 处理正常订单
process(order)
}
}
TIP
什么是 Tombstone? Kafka 中的逻辑删除标记,值为 null
的特殊记录,用于日志压缩。
10. KStream 集成
从 KStream 调用 Spring Integration 流
kotlin
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<String, String> {
val transformer = MessagingTransformer<String, String, String>(messagingFunction())
.apply {
messageConverter = MessagingMessageConverter().apply {
headerMapper = SimpleKafkaHeaderMapper("*")
}
}
return streamsBuilder.stream("input-topic")
.transform { transformer }
.to("output-topic")
}
@Bean
fun messagingFunction(): MessagingFunction {
// 映射到名为 "flow.gateway" 的集成流
return MessagingFunction("flow.gateway")
}
@Bean
fun flow(): IntegrationFlow {
return IntegrationFlow.from(MessagingFunction::class.java)
.transform<String, String> { it.uppercase() }
.get()
}
11. 读/处理/写性能优化
批量处理优化
kotlin
@Bean
fun futuresChannel() = QueueChannel()
@Bean
fun outboundAdapter(template: KafkaTemplate<String, String>): MessageHandler {
return KafkaProducerMessageHandler(template).apply {
futuresChannel = futuresChannel()
}
}
@ServiceActivator(inputChannel = "batchProcessor")
fun handleBatch(messages: List<String>) {
messages.forEach { message ->
outboundGateway.send(message.toUpperCase())
}
// 等待所有发送完成
repeat(messages.size) {
futuresChannel.receive(10_000)?.payload?.let { future ->
(future as Future<*>).get(10, TimeUnit.SECONDS)
}
}
}
TIP
性能对比
方式 | 吞吐量 | 延迟 | 可靠性 |
---|---|---|---|
单条同步发送 | 低 | 高 | 高 |
批量异步发送 | 高 | 低 | 中 |
Future 等待 | 高 | 中 | 高 |
常见问题解决
问题1: 消费者组再平衡
症状:消费者频繁断开连接
解决:增加 max.poll.interval.ms
kotlin
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val props = mutableMapOf<String, Any>(
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to "300000" // 5分钟
)
return DefaultKafkaConsumerFactory(props)
}
问题2: 消息顺序错乱
症状:分区内消息处理顺序不一致
解决:
- 确保使用单线程消费者
- 禁用生产者重试
kotlin
producerFactory.setProperty(ProducerConfig.RETRIES_CONFIG, "0")
问题3: Tombstone 记录处理异常
症状:收到 KafkaNull
而非 null
解决:正确配置方法参数
kotlin
fun process(@Payload(required = false) payload: Any?)
总结
Spring Integration Kafka 提供了强大的集成能力,关键实践包括:
- 优先使用注解配置和 Kotlin DSL
- 根据场景选择合适的消息处理模式
- 对关键操作配置错误处理通道
- 高吞吐场景使用批量处理优化
- 使用 Future 等待机制平衡性能与可靠性