Skip to content

Spring Integration Kafka 支持教程

1. 概述

Spring Integration Kafka 是基于 Spring for Apache Kafka 项目构建的,提供了与 Kafka 集成的组件。它简化了消息发布/订阅、请求/响应等常见消息模式。

依赖配置

kotlin
// build.gradle.kts
dependencies {
    implementation("org.springframework.integration:spring-integration-kafka:6.5.1")
}

核心组件

Spring Integration Kafka 提供以下组件:

  • 出站通道适配器:发布消息到 Kafka
  • 消息驱动通道适配器:消费 Kafka 消息
  • 入站通道适配器:轮询式消息源
  • 出站网关:请求/响应生产者
  • 入站网关:请求/响应消费者
  • Kafka 主题通道:基于 Kafka 的持久化通道

TIP

架构类比 想象 Kafka 是邮政系统,适配器是邮局:

  • 出站适配器 = 寄信窗口
  • 入站适配器 = 收信箱
  • 网关 = 挂号信服务(需要回执)

2. 出站通道适配器 (Outbound Channel Adapter)

功能说明

将 Spring Integration 通道的消息发布到 Kafka 主题,自动将消息转换为 Kafka 记录。

核心配置

kotlin
@Configuration
class KafkaOutboundConfig {

    @Bean
    fun producerFactory(): ProducerFactory<String, String> {
        val props = mutableMapOf<String, Any>(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
        )
        return DefaultKafkaProducerFactory(props)
    }

    @Bean
    fun kafkaTemplate() = KafkaTemplate(producerFactory())

    @Bean
    @ServiceActivator(inputChannel = "toKafka")
    fun kafkaOutboundHandler(): MessageHandler {
        return KafkaProducerMessageHandler(kafkaTemplate()).apply {
            topicExpression = LiteralExpression("orders")
            messageKeyExpression = SpelExpressionParser().parseExpression("headers.orderId")
        }
    }
}

关键特性

  • 动态主题选择:通过表达式动态设置目标主题
  • 消息键生成:支持基于消息头的键生成策略
  • 错误处理:可配置失败/成功回调通道

WARNING

事务注意事项

kotlin
// 事务ID前缀必须唯一
kafkaTemplate.transactionIdPrefix = "tx-${UUID.randomUUID()}"

3. 消息驱动通道适配器 (Message-driven Channel Adapter)

功能说明

监听 Kafka 主题并自动将消息传递到 Spring Integration 通道。

批处理 vs 记录模式

模式消息负载适用场景
记录模式单个 ConsumerRecord逐条处理
批处理模式多个 ConsumerRecord批量处理

配置示例

kotlin
@Bean
fun kafkaMessageDrivenAdapter(
    consumerFactory: ConsumerFactory<String, String>
): KafkaMessageDrivenChannelAdapter<String, String> {

    val containerProps = ContainerProperties("orders-topic")
    val container = KafkaMessageListenerContainer(consumerFactory, containerProps)

    return KafkaMessageDrivenChannelAdapter(container, ListenerMode.batch).apply {
        outputChannel = MessageChannels.direct("processedOrders").get()
        errorChannel = MessageChannels.direct("errorChannel").get()
        setMessageConverter(StringJsonMessageConverter())
    }
}

错误处理机制

DANGER

消费者线程安全 Consumer 对象不是线程安全的!跨线程使用时禁止直接调用其方法。

4. 入站通道适配器 (Inbound Channel Adapter)

功能说明

提供可轮询的消息源,适用于需要控制消费节奏的场景。

配置示例

kotlin
@Bean
@InboundChannelAdapter(channel = "polledMessages", poller = [Poller(fixedRate = "5000")])
fun kafkaMessageSource(consumerFactory: ConsumerFactory<String, String>): MessageSource<Any> {

    val consumerProps = ConsumerProperties("orders-topic").apply {
        groupId = "order-processor-group"
    }

    return KafkaMessageSource(consumerFactory, consumerProps).apply {
        setPayloadType(Order::class.java)
    }
}

关键参数

参数说明默认值
max.poll.records每次拉取最大记录数1
allowMultiFetch允许批量获取false
raw-header保留原始头信息false

TIP

性能优化 启用批量获取可提升吞吐量:

kotlin
consumerProps.allowMultiFetch = true

但需确保在 max.poll.interval.ms 内处理完所有记录!

5. 出站网关 (Outbound Gateway)

功能说明

实现请求/响应模式:发送消息到 Kafka 并等待回复。

配置流程

kotlin
@Bean
fun replyingKafkaTemplate(
    producerFactory: ProducerFactory<String, String>,
    replyContainer: ConcurrentMessageListenerContainer<String, String>
): ReplyingKafkaTemplate<String, String, String> {
    return ReplyingKafkaTemplate(producerFactory, replyContainer).apply {
        defaultReplyTimeout = Duration.ofSeconds(30)
    }
}

@Bean
@ServiceActivator(inputChannel = "requests")
fun outboundGateway(template: ReplyingKafkaTemplate<String, String, String>): MessageHandler {
    return KafkaProducerMessageHandler(template)
}

回复主题解析

  1. 检查消息头的 KafkaHeaders.REPLY_TOPIC
  2. 使用网关配置的默认回复主题

WARNING

超时设置

kotlin
// 设置合理的超时防止线程阻塞
defaultReplyTimeout = Duration.ofSeconds(30)

6. 入站网关 (Inbound Gateway)

功能说明

作为请求/响应服务端:消费请求消息并发送响应。

配置示例

kotlin
@Bean
fun inboundGateway(
    container: ConcurrentMessageListenerContainer<String, String>,
    replyTemplate: KafkaTemplate<String, String>
): KafkaInboundGateway<String, String, String> {

    return KafkaInboundGateway(container, replyTemplate).apply {
        requestChannel = MessageChannels.direct("requests").get()
        replyTimeout = 30_000
    }
}

@Bean
fun serviceFlow(): IntegrationFlow {
    return IntegrationFlow.from("requests")
        .transform<String, String> { it.uppercase() }
        .channel("replies")
        .get()
}

7. 基于 Kafka 的通道

持久化消息通道

kotlin
@Bean
fun kafkaBackedChannel(
    template: KafkaTemplate<String, String>,
    factory: ConcurrentKafkaListenerContainerFactory<String, String>
): SubscribableChannel {

    return Kafka.channel(template, factory, "channel-topic")
        .groupId("channel-group")
        .get()
}

通道类型对比

通道类型特点适用场景
SubscribableChannel发布/订阅广播消息
PollableChannel轮询消费流量控制
PublishSubscribeChannel多订阅者并行处理

8. 消息转换

JSON 转换配置

kotlin
@Bean
fun messageConverter(): StringJsonMessageConverter {
    return StringJsonMessageConverter().apply {
        // 配置类型映射
        typeMapper.addTrustedPackages("com.example.model")
    }
}

// 在适配器中指定负载类型
kafkaMessageDrivenAdapter.setMessageConverter(messageConverter())
kafkaMessageDrivenAdapter.setPayloadType(Order::class.java)

9. Tombstone 记录处理

处理逻辑删除记录

kotlin
@ServiceActivator(inputChannel = "kafkaInput")
fun processOrder(
    @Header(KafkaHeaders.RECEIVED_KEY) key: String,
    @Payload(required = false) order: Order?
) {
    if (order == null) {
        // 处理逻辑删除
        logger.info("Received tombstone for key $key")
    } else {
        // 处理正常订单
        process(order)
    }
}

TIP

什么是 Tombstone? Kafka 中的逻辑删除标记,值为 null 的特殊记录,用于日志压缩。

10. KStream 集成

从 KStream 调用 Spring Integration 流

kotlin
@Bean
fun kStream(streamsBuilder: StreamsBuilder): KStream<String, String> {

    val transformer = MessagingTransformer<String, String, String>(messagingFunction())
        .apply {
            messageConverter = MessagingMessageConverter().apply {
                headerMapper = SimpleKafkaHeaderMapper("*")
            }
        }

    return streamsBuilder.stream("input-topic")
        .transform { transformer }
        .to("output-topic")
}

@Bean
fun messagingFunction(): MessagingFunction {
    // 映射到名为 "flow.gateway" 的集成流
    return MessagingFunction("flow.gateway")
}

@Bean
fun flow(): IntegrationFlow {
    return IntegrationFlow.from(MessagingFunction::class.java)
        .transform<String, String> { it.uppercase() }
        .get()
}

11. 读/处理/写性能优化

批量处理优化

kotlin
@Bean
fun futuresChannel() = QueueChannel()

@Bean
fun outboundAdapter(template: KafkaTemplate<String, String>): MessageHandler {
    return KafkaProducerMessageHandler(template).apply {
        futuresChannel = futuresChannel()
    }
}

@ServiceActivator(inputChannel = "batchProcessor")
fun handleBatch(messages: List<String>) {
    messages.forEach { message ->
        outboundGateway.send(message.toUpperCase())
    }

    // 等待所有发送完成
    repeat(messages.size) {
        futuresChannel.receive(10_000)?.payload?.let { future ->
            (future as Future<*>).get(10, TimeUnit.SECONDS)
        }
    }
}

TIP

性能对比

方式吞吐量延迟可靠性
单条同步发送
批量异步发送
Future 等待

常见问题解决

问题1: 消费者组再平衡

症状:消费者频繁断开连接
解决:增加 max.poll.interval.ms

kotlin
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
    val props = mutableMapOf<String, Any>(
        ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to "300000" // 5分钟
    )
    return DefaultKafkaConsumerFactory(props)
}

问题2: 消息顺序错乱

症状:分区内消息处理顺序不一致
解决

  1. 确保使用单线程消费者
  2. 禁用生产者重试
kotlin
producerFactory.setProperty(ProducerConfig.RETRIES_CONFIG, "0")

问题3: Tombstone 记录处理异常

症状:收到 KafkaNull 而非 null
解决:正确配置方法参数

kotlin
fun process(@Payload(required = false) payload: Any?)

总结

Spring Integration Kafka 提供了强大的集成能力,关键实践包括:

  1. 优先使用注解配置Kotlin DSL
  2. 根据场景选择合适的消息处理模式
  3. 对关键操作配置错误处理通道
  4. 高吞吐场景使用批量处理优化
  5. 使用 Future 等待机制平衡性能与可靠性