Skip to content

Spring Integration 分布式锁完全指南

🔍 前置知识:本教程假设您已掌握 Spring Boot 基础,了解并发编程基本概念

🌟 一、分布式锁核心概念

1.1 为什么需要分布式锁

在分布式系统中,多个服务实例可能同时访问共享资源(如数据库、缓存、文件系统等)。分布式锁确保在同一时间只有一个实例能执行关键操作:

1.2 典型应用场景

场景示例锁键设计
订单处理防止重复支付order_${orderId}
库存扣减避免超卖inventory_${skuId}
定时任务集群任务调度job_${jobName}
聚合器处理消息分组处理group_${correlationId}

分布式锁 ≠ 数据库事务

分布式锁解决跨进程/服务的并发问题,数据库事务解决单进程内的数据一致性

🧩 二、Spring LockRegistry 抽象

2.1 核心接口解析

kotlin
interface LockRegistry {
    // 获取指定键的锁对象
    fun obtain(lockKey: Any): Lock

    // 6.2+ 新增:带超时的锁定执行(推荐)
    fun <T> executeLocked(
        lockKey: Any,
        timeout: Duration,
        action: Callable<T>
    ): T

    // 6.2+ 新增:无限等待的锁定执行
    fun <T> executeLocked(
        lockKey: Any,
        action: Callable<T>
    ): T
}

2.2 内存锁实现(单机环境)

kotlin

// 创建内存锁注册表(基于ReentrantLock)
val registry = DefaultLockRegistry()

// 执行锁定操作
registry.executeLocked("resource_123") {
    // 受保护的临界区代码
    updateSharedResource()
}

TIP

DefaultLockRegistry 仅适用于单机环境,分布式系统需要使用下文介绍的分布式实现

⚡️ 三、分布式锁实战(Kotlin)

3.1 Redis 分布式锁配置

kotlin
@Configuration
class RedisLockConfig {


    @Bean
    fun lockRegistry(redisConnectionFactory: RedisConnectionFactory): LockRegistry {
        return RedisLockRegistry(
            connectionFactory = redisConnectionFactory,
            registryKey = "my_app_locks", // 锁前缀
            expireAfter = Duration.ofSeconds(30) // 锁过期时间
        )
    }
}

3.2 服务层使用示例

kotlin
@Service
class OrderService(
    private val lockRegistry: LockRegistry
) {

    @Transactional
    fun processOrder(orderId: String) {

        lockRegistry.executeLocked("order_$orderId", Duration.ofSeconds(5)) {
            // 受保护的订单处理逻辑
            val order = orderRepository.findById(orderId)
                ?: throw OrderNotFoundException(orderId)

            if (order.status != PENDING) {
                throw IllegalStateException("订单状态无效")
            }

            order.status = PROCESSING
            orderRepository.save(order)
        }
    }
}
完整异常处理示例
kotlin
try {
    lockRegistry.executeLocked("key", Duration.ofSeconds(10)) {
        criticalOperation()
    }
} catch (e: TimeoutException) {
    // 处理获取锁超时
    logger.error("获取锁超时", e)
    throw BusinessException("系统繁忙,请稍后重试")
} catch (e: InterruptedException) {
    // 处理线程中断
    logger.error("操作被中断", e)
    Thread.currentThread().interrupt()
} catch (e: Exception) {
    // 处理业务异常
    logger.error("业务操作失败", e)
    throw e
}

3.3 最佳实践

  1. 锁粒度:选择细粒度锁键(如order_123global_order_lock更优)
  2. 超时设置:始终设置合理超时,避免死锁
  3. 异常处理:区分锁异常和业务异常
  4. 锁释放:使用executeLocked自动管理锁生命周期

🌐 四、分布式锁实现方案对比

kotlin
// 高性能,AP系统首选
@Bean
fun redisLockRegistry(): LockRegistry = RedisLockRegistry(
    redisConnectionFactory,
    "app_locks",
    Duration.ofSeconds(30)
kotlin
// 基于数据库,CP系统适用
@Bean
fun jdbcLockRegistry(dataSource: DataSource): LockRegistry {
    return JdbcLockRegistry(
        DefaultLockRepository(dataSource).apply {
            timeToLive = 30000 // TTL 30秒
        }
    )
}
kotlin
// 强一致性场景
@Bean
fun zkLockRegistry(curator: CuratorFramework): LockRegistry {
    return ZookeeperLockRegistry(curator)
}
实现方案一致性模型性能适用场景依赖
RedisAP⚡️高高并发、最终一致Redis 服务
JDBCCP🐢低已有DB、低并发强一致关系型数据库
ZookeeperCP🐢中强一致、协调服务Zookeeper 集群
HazelcastAP/CP⚡️高IMDG用户、内存数据网格Hazelcast 集群

IMPORTANT

CAP选择原则:根据业务场景选择AP(高可用)或CP(强一致)实现

🛠 五、常见问题解决方案

5.1 死锁预防

kotlin
// 错误示范:未设置超时时间 ❌
registry.executeLocked("key") { /* 长时间操作 */ }

// 正确做法:设置合理超时 ✅
registry.executeLocked("key", Duration.ofSeconds(5)) {
    /* 受控操作 */
}

5.2 锁续期问题

TIP

对于长时间操作,需要实现锁续期机制(Redis可用Redisson的看门狗)

5.3 锁重入问题

kotlin
class OrderService {

    @Lockable(key = "#orderId") // 自定义注解
    fun placeOrder(orderId: String) {
        // 调用链中的其他受保护方法
        deductInventory(orderId)
    }

    @Lockable(key = "#orderId")
    fun deductInventory(orderId: String) {
        // 可重入锁允许同一线程再次获取锁
    }
}

🚀 六、进阶技巧

6.1 自定义锁注解

kotlin
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.FUNCTION)
annotation class Lockable(
    val key: String,        pEL表达式
    val timeout: Long = 5,  // 默认超时5秒
    val unit: TimeUnit = TimeUnit.SECONDS
)

@Aspect
@Component
class LockAspect(private val lockRegistry: LockRegistry) {

    @Around("@annotation(lockable)")
    fun aroundLock(joinPoint: ProceedingJoinPoint, lockable: Lockable): Any? {
        val key = parseSpel(lockable.key, joinPoint)
        return lockRegistry.executeLocked(key,
            Duration.of(lockable.timeout, lockable.unit.toChronoUnit())) {
            joinPoint.proceed()
        }
    }

    private fun parseSpel(expr: String, jp: ProceedingJoinPoint): String {
        // 解析SpEL表达式逻辑
    }
}

6.2 锁监控方案

kotlin
@Bean
fun lockRegistry(): LockRegistry {
    return ObservableLockRegistry(RedisLockRegistry(...)).apply {
        registerListener { event ->
            when (event) {
                is LockAcquiredEvent ->
                    metrics.increment("locks.acquired")
                is LockReleasedEvent ->
                    metrics.increment("locks.released")
                is LockFailedEvent ->
                    logger.error("锁操作失败", event.cause)
            }
        }
    }
}

📚 七、总结与最佳实践

  1. 选择合适实现:根据业务需求选择Redis/JDBC/Zookeeper
  2. 设置合理超时:避免死锁,推荐5-30秒范围
  3. 细粒度锁键业务前缀_资源ID格式
  4. 异常处理:区分锁超时、中断和业务异常
  5. 监控报警:跟踪锁获取成功率、等待时间
kotlin
// 终极最佳实践模板
fun safeCriticalOperation(resourceId: String) {
    try {
        lockRegistry.executeLocked("res_$resourceId", Duration.ofSeconds(10)) {
            // 1. 状态校验
            validateState()

            // 2. 核心业务逻辑
            processBusiness()

            // 3. 结果持久化
            saveResult()
        }
    } catch (e: TimeoutException) {
        // 处理锁获取超时
        throw RetryableException("操作繁忙,请重试")
    } catch (e: BusinessException) {
        // 业务异常特殊处理
        handleBusinessError(e)
    }
}

黄金法则锁内操作应轻量、快速、幂等,避免长时间阻塞