跳转到内容

Redis 分布式锁

一、什么是分布式锁

1.1 从单体锁到分布式锁

单体锁(JVM 锁)synchronized / ReentrantLock 只能控制单个 JVM 进程内的线程安全。单体应用部署在一台机器上,一个锁就能保证数据一致性。

// 单体锁 — 单机时代没问题
public synchronized void deduct() {
    int stock = getStock();
    if (stock > 0) {
        stock--;
        updateStock(stock);
    }
}

为什么单体锁在分布式环境下失效?

假设商品服务部署了 3 个节点,用户请求经过 Nginx 负载均衡分发:

用户A → Nginx → 节点1 (JVM锁_1)  ─┐
用户B → Nginx → 节点2 (JVM锁_2)  ─┤  同一行数据库记录
用户C → Nginx → 节点3 (JVM锁_3)  ─┘  ⚠️ 三个锁互不感知!

三个 JVM 各自持有独立的锁,互不影响。用户 A、B、C 可以同时进入 deduct() 方法,库存扣减出现并发问题——超卖。

1.2 分布式锁的定义

分布式锁是跨 JVM、跨进程的互斥锁,保证同一时刻只有一个节点能执行临界区代码。它需要一个所有节点都能访问到的共享存储作为协调中心。

节点1 ────┐
节点2 ────┤  →  共享存储(Redis / MySQL / Zookeeper)← 只有一个人能拿到锁
节点3 ────┘

二、为什么选择 Redis 做分布式锁

方案

优点

缺点

Redis

性能极高(单机10w+ QPS),天然支持过期,API简单

集群模式下可能有脑裂问题(用 RedLock 缓解)

Zookeeper

强一致性,临时节点自动释放

性能差(基于磁盘),运维复杂

MySQL

无需额外组件

性能差,没有自动过期,容易死锁

Etcd

强一致性,Raft 协议,TTL

生态不如 Redis 丰富

结论:99% 的分布式锁场景首选 Redis。只有对一致性要求极高的金融场景考虑 ZK/Etcd。


三、Redis 分布式锁的演进过程

3.1 第一版:SETNX(最简陋,有缺陷)

# 加锁
SETNX lock:order "value"
# 返回 1 表示获取成功,0 表示已被别人持有

# 释放锁
DEL lock:order

致命问题:如果持有锁的线程崩溃,锁永远不会释放 → 死锁

3.2 第二版:SETNX + EXPIRE(不是原子操作)

SETNX lock:order "value"
EXPIRE lock:order 30   # 设置30秒过期

致命问题:SETNX 和 EXPIRE 不是原子操作。如果在 SETNX 成功之后、EXPIRE 执行之前进程崩溃,仍然死锁。

3.3 第三版:SET ... NX EX(正确的加锁姿势)

# SET key value NX EX seconds — 原子操作,一条命令完成
SET lock:order "unique_value" NX EX 30
# NX = Not eXists(不存在才设置)
# EX 30 = 30秒后自动过期

解决了死锁问题,但引出了新问题:锁被误删

3.4 第四版:加锁设置唯一标识(防止误删别人的锁)

场景:线程 A 获取锁,执行业务耗时 35 秒(锁 30 秒就过期了)。锁过期后线程 B 获取锁,此时线程 A 完成业务去释放锁,结果把 B 的锁删了。

# 正确做法:释放前验证是否是自己的锁
# 加锁时用唯一标识(UUID + 线程ID)
SET lock:order "thread-A-uuid" NX EX 30

# 释放时先 GET 验证再 DEL(但 GET + DEL 不是原子的!)

新问题:GET 判断 + DEL 删除不是原子操作。判断通过后、删除前,锁过期了被别人获取,仍然误删。

3.5 第五版:Lua 脚本实现原子性释放(生产可用版)

-- unlock.lua
if redis.call("GET", KEYS[1]) == ARGV[1] then
    return redis.call("DEL", KEYS[1])
else
    return 0
end
# 执行
EVAL "if redis.call('GET',KEYS[1])==ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end" 1 lock:order "thread-A-uuid"

Lua 脚本保证了 GET + 判断 + DEL 的原子性。


四、生产级 Redis 分布式锁完整代码

4.1 Java 版本(Redisson 框架 — 推荐)

// ============ 依赖 ============
// <dependency>
//     <groupId>org.redisson</groupId>
//     <artifactId>redisson-spring-boot-starter</artifactId>
//     <version>3.24.3</version>
// </dependency>

// ============ Redisson 配置 ============
@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer()
              .setAddress("redis://127.0.0.1:6379")
              .setPassword("your_password")
              .setConnectionPoolSize(64)
              .setConnectionMinimumIdleSize(10);
        return Redisson.create(config);
    }
}

// ============ 使用分布式锁 ============
@Service
public class OrderService {
    @Autowired
    private RedissonClient redissonClient;

    public void deductStock(Long orderId) {
        String lockKey = "lock:order:" + orderId;
        RLock lock = redissonClient.getLock(lockKey);

        try {
            // 尝试加锁,最多等待 10 秒,锁 30 秒后自动释放
            boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
            if (!locked) {
                throw new RuntimeException("系统繁忙,请稍后重试");
            }

            // ===== 临界区:执行业务逻辑 =====
            int stock = getStock(orderId);
            if (stock > 0) {
                stock--;
                updateStock(orderId, stock);
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("获取锁被中断", e);
        } finally {
            // ⚠️ 必须判断锁是否由当前线程持有,才能释放
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

4.2 Redisson 的 Watch Dog(看门狗)机制

Redisson 解决了锁过期而业务未完成的问题:

默认锁过期时间 30 秒
     │
     ▼
看门狗每 10 秒检查一次:线程还活着? → 续期到 30 秒
                                    → 线程挂了? → 不再续期,锁自动释放

关键:如果你显式指定了 leaseTime(如 tryLock(10, 15, TimeUnit.SECONDS)),看门狗不会启动,锁在 15 秒后必定过期。不指定 leaseTime 时才启动看门狗。

4.3 Java 版本(手写 RedisTemplate 实现)

@Component
public class RedisLock {
    @Autowired
    private StringRedisTemplate redisTemplate;

    // 随机唯一标识
    private final String lockValue = UUID.randomUUID().toString()
                                      + ":" + Thread.currentThread().getId();

    // ============ 加锁 ============
    public boolean lock(String key, long expireSeconds) {
        Boolean result = redisTemplate.opsForValue()
            .setIfAbsent(key, lockValue, Duration.ofSeconds(expireSeconds));
        return Boolean.TRUE.equals(result);
    }

    // ============ 释放锁(Lua 脚本保证原子性) ============
    public void unlock(String key) {
        String script = "if redis.call('GET', KEYS[1]) == ARGV[1] then " +
                        "return redis.call('DEL', KEYS[1]) " +
                        "else return 0 end";
        redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(key),
            lockValue
        );
    }

    // ============ 使用示例 ============
    public void executeWithLock(String lockKey) {
        try {
            if (!lock(lockKey, 30)) {
                throw new RuntimeException("获取锁失败");
            }
            // 执行业务逻辑...
        } finally {
            unlock(lockKey);
        }
    }
}

4.4 Go 版本

package main

import (
    "context"
    "errors"
    "fmt"
    "time"
    "github.com/go-redis/redis/v8"
    "github.com/google/uuid"
)

type RedisLock struct {
    client    *redis.Client
    key       string
    value     string // 唯一标识,用于安全释放
    ttl       time.Duration
}

func NewRedisLock(client *redis.Client, key string, ttl time.Duration) *RedisLock {
    return &RedisLock{
        client: client,
        key:    key,
        value:  uuid.New().String(),
        ttl:    ttl,
    }
}

// Lock 获取锁(自旋重试)
func (l *RedisLock) Lock(ctx context.Context, maxWait time.Duration) error {
    deadline := time.Now().Add(maxWait)
    for {
        ok, err := l.client.SetNX(ctx, l.key, l.value, l.ttl).Result()
        if err != nil {
            return err
        }
        if ok {
            return nil // 获取成功
        }
        if time.Now().After(deadline) {
            return errors.New("获取锁超时")
        }
        time.Sleep(50 * time.Millisecond) // 轮询间隔
    }
}

// Unlock 安全释放锁(Lua 脚本确保原子性)
func (l *RedisLock) Unlock(ctx context.Context) error {
    script := `
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end`
    result, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Int()
    if err != nil {
        return err
    }
    if result == 0 {
        return errors.New("锁已过期或不属于当前持有者")
    }
    return nil
}

// ============ 使用示例 ============
func main() {
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
    lock := NewRedisLock(rdb, "lock:order:123", 30*time.Second)

    ctx := context.Background()
    if err := lock.Lock(ctx, 10*time.Second); err != nil {
        panic(err)
    }
    defer lock.Unlock(ctx)

    // 执行业务逻辑...
    fmt.Println("do something...")
}

4.5 Python 版本

import uuid
import time
import redis

class RedisLock:
    def __init__(self, client, key, ttl=30):
        self.client = client
        self.key = key
        self.value = str(uuid.uuid4())
        self.ttl = ttl

    def acquire(self, max_wait=10):
        """获取锁,支持超时等待"""
        deadline = time.time() + max_wait
        while time.time() < deadline:
            if self.client.set(self.key, self.value, nx=True, ex=self.ttl):
                return True
            time.sleep(0.05)  # 50ms 轮询
        return False

    def release(self):
        """原子性释放锁(Lua 脚本)"""
        script = """
        if redis.call("GET", KEYS[1]) == ARGV[1] then
            return redis.call("DEL", KEYS[1])
        else
            return 0
        end"""
        return self.client.eval(script, 1, self.key, self.value)

    def __enter__(self):
        if not self.acquire():
            raise Exception("获取锁超时")
        return self

    def __exit__(self, *args):
        self.release()


# ============ 使用示例 ============
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

with RedisLock(r, 'lock:order:123', ttl=30) as lock:
    # 执行业务逻辑
    stock = int(r.get('stock') or 0)
    if stock > 0:
        r.decr('stock')

五、分布式锁的正确实现关键点(必看)

关键点

要求

错误做法

正确做法

互斥性

同一时刻只能有一个客户端持有锁

SETNX 和 EXPIRE 分开

SET NX EX 原子命令

防死锁

持有锁的客户端崩溃后锁必须释放

不设过期时间

必须设置过期时间

解铃还须系铃人

只能释放自己持有的锁

直接 DEL

Lua 脚本 GET + 判断 + DEL

锁续期

业务未完成时自动续期

固定过期时间(要么太长要么太短)

Watch Dog 机制自动续期

重入性

同一线程可重复获取

不支持重入 (SETNX 返回 0)

Redisson 的 RLock 支持可重入


六、分布式锁的常见坑

6.1 坑一:锁过期,业务未完成

场景:锁设置了 30 秒过期,但业务执行了 40 秒。锁在第 30 秒过期,其他线程获取锁,两个线程同时执行。

解决:使用 Redisson 的 Watch Dog 自动续期,或显式设置足够长的过期时间。

6.2 坑二:主从切换导致锁丢失

场景:Redis 主从架构。客户端 A 在 Master 上获取锁 → Master 还没来得及同步到 Slave 就宕机 → Slave 提升为新 Master → 新 Master 上没有锁数据 → 客户端 B 获取锁成功 → 两个客户端同时持有锁

Master(持有 A 的锁)
  │
  │  宕机!数据未同步
  ▼
Slave → 提升为新 Master(没有锁数据)
  │
  │  B 成功获取锁
  ▼
A 和 B 同时持有锁 → 💥

解决:使用 RedLock 算法(见下节),或使用 Zookeeper/Etcd 这类强一致性的存储。

6.3 坑三:集群脑裂

场景:Redis Cluster 或哨兵模式下,发生网络分区。Master 与多数节点断开连接,但客户端仍然能访问 Master。哨兵选举出新 Master,两个 Master 同时存在,锁在两个 Master 上都能获取。

6.4 坑四:可重入问题

场景:一个方法获取锁后调用了另一个也需要同一把锁的方法,SETNX 返回 0,死锁在自身的等待中。

解决:使用 Redisson 的 RLock(基于 Redis Hash 实现,通过 hincrby 记录重入次数)。

6.5 坑五:finally 中的解锁检查

// ❌ 错误
finally {
    lock.unlock(); // 如果锁已过期被释放,这里可能抛异常
}

// ✅ 正确
finally {
    if (lock.isHeldByCurrentThread()) {
        lock.unlock();
    }
}

七、RedLock(红锁)详解

7.1 什么是 RedLock

RedLock 是 Redis 作者 antirez 提出的算法,通过多个独立的 Redis 实例来解决主从切换导致的锁丢失问题。

核心思想:不依赖单个 Redis 实例,而是向 N 个独立 Redis Master(通常是 5 个)依次获取锁,获得大多数(N/2 + 1 = 3 个)才算成功。

7.2 RedLock 加锁流程

客户端
  │
  ├──→ Redis-1 (Master)   请求加锁 ──→ 成功 ✅
  ├──→ Redis-2 (Master)   请求加锁 ──→ 成功 ✅
  ├──→ Redis-3 (Master)   请求加锁 ──→ 超时 ❌
  ├──→ Redis-4 (Master)   请求加锁 ──→ 成功 ✅
  └──→ Redis-5 (Master)   请求加锁 ──→ 成功 ✅

    结果: 4/5 成功 ≥ 3 → 加锁成功
    即使 Redis-3 宕机,锁仍然安全

7.3 RedLock 的关键约束

  1. 每个 Redis 实例独立运行(不是主从/集群,是 5 个独立的 Master)
  2. 获取多数锁即成功(&gt; N/2)
  3. 加锁总耗时必须远小于锁 TTL(防止网络延迟导致加锁期间锁就过期)
  4. 失败时要在所有实例上释放(即使加锁失败的实例,也要尝试释放)
  5. 重试间隔用随机值(防止多个客户端同时竞争)

7.4 RedLock 的争议

Martin Kleppmann(剑桥大学研究员)的质疑

  • RedLock 依赖时钟(TTL),而分布式系统中时钟不可靠
  • 客户端 GC 停顿可能导致锁过期而客户端不知道
  • 用共识算法(Raft/Paxos)的 ZK/Etcd 才是正确的解法

antirez 的回应

  • 没有完全安全的分布式锁,RedLock 在工程上足够
  • 如果真的需要强一致性,应该优化业务设计而不是依赖锁

工程建议

  • 一般业务用 Redis 单实例/哨兵 + Redisson Watch Dog 就足够了
  • 金融/支付等对一致性要求极高的场景,用 ZK/Etcd 或数据库乐观锁
  • RedLock 在普通业务中投入产出比不高(要维护 5 个 Redis 实例)

7.5 Redisson 使用 RedLock

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonRedLock redLock(RedissonClient client1,
                                    RedissonClient client2,
                                    RedissonClient client3) {
        RLock lock1 = client1.getLock("lock:order");
        RLock lock2 = client2.getLock("lock:order");
        RLock lock3 = client3.getLock("lock:order");
        return new RedissonRedLock(lock1, lock2, lock3);
    }
}

// 使用
@Autowired
private RedissonRedLock redLock;

public void process() {
    try {
        redLock.tryLock(10, 30, TimeUnit.SECONDS);
        // 业务逻辑...
    } finally {
        redLock.unlock();
    }
}

八、面试高频问题

Q1: 什么是分布式锁?为什么要用分布式锁?

:分布式锁是跨进程的互斥锁,保证在分布式系统中同一时刻只有一个节点能执行临界区代码。原因:单体锁(synchronized/Lock)只能控制单个 JVM 内的线程安全,多节点部署时各 JVM 独立持有锁,无法保证全局互斥。

Q2: Redis 做分布式锁有什么优缺点?

  • 优点:性能极高(单机 10w+ QPS)、天然支持过期时间防死锁、API 简单、生态完善(Redisson)
  • 缺点:单实例不可靠(宕机则锁全丢)、主从切换可能丢锁(脑裂)、依赖过期时间(时钟漂移可能出问题)

Q3: SETNX 和 SET NX 有什么区别?

:SETNX 是早期命令,不能同时设置过期时间,需要配合 EXPIRE,两次操作不原子,可能死锁。SET NX EX 是一条原子命令,推荐使用。

Q4: 锁过期了怎么办?

:使用 Watch Dog 自动续期机制(Redisson 默认)。锁持有期间,Watch Dog 定时检查线程状态,存活则续期。如果线程崩溃,不再续期,锁在 TTL 后自动释放。

Q5: 释放锁时为什么要用 Lua 脚本?不能直接 DEL 吗?

:不能。必须先 GET 验证锁是否属于自己,再 DEL。但 GET + DEL 不是原子的:验证通过后、DEL 前,锁刚好过期被其他线程获取,就会误删别人的锁。Lua 脚本在 Redis 内部原子执行,保证了判断和删除的原子性。

Q6: 什么是 RedLock?解决了什么问题?

:RedLock 是 Redis 作者提出的多实例分布式锁算法。向 N 个独立 Redis Master(通常 5 个)分别获取锁,获得大多数(≥3个)才算成功。解决了主从架构中主节点宕机导致锁丢失的问题。缺点是运维成本高(需维护 5 个独立实例)。

Q7: 分布式锁的实现方式有哪些?如何选型?

实现

一致性

性能

运维成本

适用场景

Redis

极高

绝大多数业务场景

Zookeeper

金融/支付

Etcd

云原生环境

MySQL

极低

无其他组件的简单场景

Q8: 分布式锁一定是安全的吗?

:不是。所有分布式锁都可能因为 GC 停顿、网络延迟、时钟漂移而失效。Martin Kleppmann 的论文《How to do distributed locking》指出:分布式锁无法保证绝对的互斥。工程上要做好防御性设计(如加乐观锁版本号兜底)。

Q9: 你用 Redisson 遇到过什么问题?

  • 高并发下 tryLock 的等待队列堆积,大量线程阻塞等待
  • Redis 主从切换瞬间,出现短暂的双锁持有
  • 锁的粒度问题:锁太粗(整个方法加锁)影响并发;锁太细(每行数据加锁)Redis key 过多
  • 解决:预估业务耗时设合理 TTL,配置哨兵监控,合适的锁粒度

Q10: 分布式锁和数据库乐观锁什么关系?

:不是替代关系,是互补关系。分布式锁是悲观锁(先锁再操作),数据库乐观锁是先操作再校验(version 字段)。最佳实践:用分布式锁防止热点并发,用数据库乐观锁做最终兜底。


九、总结

分布式锁选择路径:

你的业务属于哪类?
    │
    ├── 普通业务(电商、社交、SaaS)
    │   → Redis 单实例/哨兵 + Redisson(简单够用)
    │
    ├── 对可用性要求高,但对一致性容忍度较高
    │   → Redis 哨兵/集群 + Redisson + Watch Dog
    │
    ├── 对一致性要求极高(金融、支付)
    │   → Zookeeper / Etcd 分布式锁
    │   → 或者:Redis 锁 + 数据库乐观锁兜底
    │
    └── 只有 1-2 个节点,不想引入 Redis
        → MySQL 基于行锁实现(性能较差,不推荐高并发场景)