背景

在分布式环境下,需要控制多个节点对同一个资源的并发访问,此时本地的加锁已经不能满足需要。为了实现在分布式环境下的锁。

使用场景

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

特点

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

常见实现方式

  • MySql
  • ZK/etcd
  • Redis
  • Chubby

mysql

事务

利用mysql的事务可以实现分布式资源的互斥访问,一般可以建立一个锁表。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE TABLE `lock_table` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `resourcce_name` varchar(128) DEFAULT '''''' COMMENT '资源名称',
  `node_info` varchar(128) DEFAULT NULL COMMENT '机器信息',
  `count` int(11) NOT NULL DEFAULT '0' COMMENT '锁的次数,统计可重入锁',
  `desc` varchar(128) DEFAULT NULL COMMENT '额外的描述信息',
  `utime` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '更新时间戳',
  `ctime` int(11) unsigned NOT NULL DEFAULT '0' COMMENT '创建时间戳',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uniq_resource` (`resourcce_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

Lock:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Transcation
public void lock(resourceName){
  if (select * from lock_table where resource_name = resourceName;有数据){
    if(node_info == currentNode){
      update lock_table set count=count+1 where resource_name = resourceName;
      return true;
    }else{
      return false;
    }
  }else{
    insert into lock_table;
  }
}

利用了sql的事务,获取锁时,先去查询锁是否存在,如果存在则比较node_info【ip+线程id】是否一致。如果一致就加可重入锁count的值,如果不一致那么就返回false。如果没有值,则可以创建这个锁。

tryLock:

1
2
3
public boolean tryLock(){
  return mysqlLock.lock();
}

tryLock(timeout):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public boolean tryLock(long timeout){
  long endtime = timenow + timeout;
  while(true){
    if mysqlLock.lock(){
      return true;
    }
    if(timenow > endtime){
      return false;
    }
  }
}

unlock():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
@Transcation
public boolean unlock(){
  if(select * from lock_table 有数据){
    if(currentNode == node_info){
      if (count>1){
        update count = count-1;
      }else{
        delete;
      }
    }else{
      return false;
    }
  }else{
    return false;
  }
}

锁超时:如果持有锁的节点挂了,那么该锁就不会被释放,我们可以启动一个定时任务来认定节点挂了之后释放锁。

etcd

事务

etcd3中可以序列化多个操作为一个条件性的迷你事务。每个事务包含一组条件守护的组合,当所有提交满足时一组操作被执行,并且任何条件不满足时另外一组操作被执行。事务可以保证分布式锁的安全。

1
Txn().If(cond1, cond2, ...).Then(op1, op2, ...,).Else(op1, op2, )

etcd3中的事务有三个部分,条件块,If(cond1,cond2,...) ,成功块 Then(op1,op2,...),失败块Else(op1,op2,...) 条件块中的所有条件都满足的情况下就执行Then,否则就执行Else。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func doTxnXfer(etcd *v3.Client, from, to string, amount uint) (bool, error) {
    getresp, err := etcd.Txn(ctx.TODO()).Then(OpGet(from), OpGet(to)).Commit()	//事务查出from和get
    if err != nil {
         return false, err
    }
    fromKV := getresp.Responses[0].GetRangeResponse().Kvs[0]
    toKV := getresp.Responses[1].GetRangeResponse().Kvs[1]
    fromV, toV := toUInt64(fromKV.Value), toUint64(toKV.Value)
    if fromV < amount {
        return false, fmt.Errorf("insufficient value")
    }
    txn := etcd.Txn(ctx.TODO()).If(
        v3.Compare(v3.ModRevision(from), "=", fromKV.ModRevision),
        v3.Compare(v3.ModRevision(to), "=", toKV.ModRevision))			//如果from 和 get还没被修改
    txn = txn.Then(
        OpPut(from, fromUint64(fromV - amount)),					//那么久同时更新这两个key
        OpPut(to, fromUint64(toV + amount))
    putresp, err := txn.Commit()												//提交事务
    if err != nil {
        return false, err
    }
    return putresp.Succeeded, nil
}

而利用etcd的事务实现分布式锁的代码如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package concurrency

import (
	"context"
	"fmt"
	"sync"

	v3 "go.etcd.io/etcd/clientv3"
	pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
)

// Mutex implements the sync Locker interface with etcd
type Mutex struct {
	s *Session

	pfx   string
	myKey string
	myRev int64
	hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
	return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
	s := m.s
	client := m.s.Client()

	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
	// put self in lock waiters via myKey; oldest waiter holds lock
	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
	// reuse key in case this session already holds the lock
	get := v3.OpGet(m.myKey)
	// fetch current holder to complete uncontended path with only one RPC
	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
  //类似于原子操作cas,compare and swap
	if err != nil {
		return err
	}
	m.myRev = resp.Header.Revision
	if !resp.Succeeded {
		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	// if no key on prefix / the minimum rev is key, already hold the lock
	ownerKey := resp.Responses[1].GetResponseRange().Kvs
	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
		m.hdr = resp.Header
		return nil
	}

	// wait for deletion revisions prior to myKey
	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
	// release lock key if wait failed
	if werr != nil {
		m.Unlock(client.Ctx())
	} else {
		m.hdr = hdr
	}
	return werr
}

func (m *Mutex) Unlock(ctx context.Context) error {
	client := m.s.Client()
	if _, err := client.Delete(ctx, m.myKey); err != nil {
		return err
	}
	m.myKey = "\x00"
	m.myRev = -1
	return nil
}

func (m *Mutex) IsOwner() v3.Cmp {
	return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
}

func (m *Mutex) Key() string { return m.myKey }

// Header is the response header received from etcd on acquiring the lock.
func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }

type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
	client := lm.s.Client()
	if err := lm.Mutex.Lock(client.Ctx()); err != nil {
		panic(err)
	}
}
func (lm *lockerMutex) Unlock() {
	client := lm.s.Client()
	if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
		panic(err)
	}
}

// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(s *Session, pfx string) sync.Locker {
	return &lockerMutex{NewMutex(s, pfx)}
}

redis

单节点redis锁

SET resource_name my_random_value NX PX 30000

在上面的SET命令中:

  • my_random_value是由客户端生成的一个随机字符串,它要保证在足够长的一段时间内在所有客户端的所有获取锁的请求中都是唯一的。
  • NX表示只有当resource_name对应的key值不存在的时候才能SET成功。这保证了只有第一个请求的客户端才能获得锁,而其它客户端在锁被释放之前都无法获得锁。
  • PX 30000表示这个锁有一个30秒的自动过期时间。当然,这里30秒只是一个例子,客户端可以选择合适的过期时间。

最后,当客户端完成了对共享资源的操作之后,执行下面的Redis Lua脚本来释放锁

1
2
3
4
5
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

这段Lua脚本在执行的时候要把前面的my_random_value作为ARGV[1]的值传进去,把resource_name作为KEYS[1]的值传进去.

有几个点要注意下:

  1. 锁的过期时间,如果一个客户端获取到锁只有崩溃了,那么其他客户端再也不能获取到这个锁。所以要设置这个有效时间。

  2. 设置一个随机字符串 my_random_value很有必要,这保证一个客户端释放的锁是自己持有的那个锁。假如获取锁时SET的不是一个随机字符串,而是一个固定值,那么可能会发生下面的执行序列:

    1. 客户端1获取锁成功。
    2. 客户端1在某个操作上阻塞了很长时间。
    3. 过期时间到了,锁自动释放了。
    4. 客户端2获取到了对应同一个资源的锁。
    5. 客户端1从阻塞中恢复过来,释放掉了客户端2持有的锁。

    之后,客户端2在访问共享资源的时候,就没有锁为它提供保护了。

  3. 释放锁的过程:释放锁的过程必须使用lua脚本或(multi)来实现。释放锁其实包含三步操作:‘GET’、判断和’DEL’,用Lua脚本来实现能保证这三步的原子性。

redlock

我们想象一个这样的场景当机器A申请到一把锁之后,如果Redis主宕机了,这个时候从机并没有同步到这一把锁,那么机器B再次申请的时候就会再次申请到这把锁,为了解决这个问题Redis作者提出了RedLock红锁的算法,在Redission中也对RedLock进行了实现。

1
2
3
4
5
6
7
8
9
//三个redis的集群
Rlock lock1 = redissonInstance1.getLock("lock1");
Rlock lock2 = redissonInstance2.getLock("lock2");
Rlock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new REdissonRedLock(lock1,lock2,lock3);
lock.lock();
  ...
lock.unlock();

通过上面的代码,我们需要实现多个Redis集群,然后进行红锁的加锁,解锁。具体的步骤如下:

  1. 首先生成多个Redis集群的Rlock,并将其构造成RedLock。
  2. 依次循环对三个集群进行加锁,加锁的过程和5.2里面一致。
  3. 如果循环加锁的过程中加锁失败,那么需要判断加锁失败的次数是否超出了最大值,这里的最大值是根据集群的个数,比如三个那么只允许失败一个,五个的话只允许失败两个,要保证多数成功。
  4. 加锁的过程中需要判断是否加锁超时,有可能我们设置加锁只能用3ms,第一个集群加锁已经消耗了3ms了。那么也算加锁失败。
  5. 3,4步里面加锁失败的话,那么就会进行解锁操作,解锁会对所有的集群在请求一次解锁。

可以看见RedLock基本原理是利用多个Redis集群,用多数的集群加锁成功,减少Redis某个集群出故障,造成分布式锁出现问题的概率。

redis锁小结

  • 优点:对于Redis实现简单,性能对比ZK和Mysql较好。如果不需要特别复杂的要求,那么自己就可以利用setNx进行实现,如果自己需要复杂的需求的话那么可以利用或者借鉴Redission。对于一些要求比较严格的场景来说的话可以使用RedLock。
  • 缺点:需要维护Redis集群,如果要实现RedLock那么需要维护更多的集群。