信号量semarphore 信号量是荷兰计算机科学家dijstra发明的,P、V两个字母是荷兰语字母缩写,分别表示acquire和release。
P、V操作

  • P:请求分配一个单位资源
  • V:释放一个资源

分布式锁的实现

基于mysql

  1. 表增加唯一索引
  2. 加锁:执行insert语句,若报错,则表明加锁失败
  3. 解锁:执行delete语句

基于zookeeper

基于paxos算法,最正规,API最清晰,不需要实现轮询,只需要实现回调。

package main

import (
    "time"
    "github.com/samuel/go-zookeeper/zk"
)

func main() {
  c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second) //*10)
  if err != nil {
    panic(err)
  }
  l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
  err = l.Lock()
  if err != nil {
    panic(err)
  }
  println("lock succ, do your business logic")

  time.Sleep(time.Second * 10)
 
  // do some thing
  l.Unlock()
  println("unlock succ, finish business logic")
}

如果你有了解过 Zookeeper,基于它实现的分布式锁是这样的:

  1. 客户端 1 和 2 都尝试创建「临时节点」,例如 /lock
  2. 假设客户端 1 先到达,则加锁成功,客户端 2 加锁失败
  3. 客户端 1 操作共享资源
  4. 客户端 1 删除 /lock 节点,释放锁

你应该也看到了,Zookeeper 不像 Redis 那样,需要考虑锁的过期时间问题,它是采用了「临时节点」,保证客户端 1 拿到锁后,只要连接不断,就可以一直持有锁。

而且,如果客户端 1 异常崩溃了,那么这个临时节点会自动删除,保证了锁一定会被释放。
不错,没有锁过期的烦恼,还能在异常时自动释放锁,是不是觉得很完美?

其实不然。

思考一下,客户端 1 创建临时节点后,Zookeeper 是如何保证让这个客户端一直持有锁呢?

原因就在于,客户端 1 此时会与 Zookeeper 服务器维护一个 Session,这个 Session 会依赖客户端「定时心跳」来维持连接。

如果 Zookeeper 长时间收不到客户端的心跳,就认为这个 Session 过期了,也会把这个临时节点删除。
同样地,基于此问题,我们也讨论一下 GC 问题对 Zookeeper 的锁有何影响:

  • 客户端 1 创建临时节点 /lock 成功,拿到了锁
  • 客户端 1 发生长时间 GC
  • 客户端 1 无法给 Zookeeper 发送心跳,Zookeeper 把临时节点「删除」
  • 客户端 2 创建临时节点 /lock 成功,拿到了锁
  • 客户端 1 GC 结束,它仍然认为自己持有锁(冲突)

可见,即使是使用 Zookeeper,也无法保证进程 GC、网络延迟异常场景下的安全性。

这就是前面 Redis 作者在反驳的文章中提到的:如果客户端已经拿到了锁,但客户端与锁服务器发生「失联」(例如 GC),那不止 Redlock 有问题,其它锁服务都有类似的问题,Zookeeper 也是一样!

所以,这里我们就能得出结论了:一个分布式锁,在极端情况下,不一定是安全的。

如果你的业务数据非常敏感,在使用分布式锁时,一定要注意这个问题,不能假设分布式锁 100% 安全。

好,现在我们来总结一下 Zookeeper 在使用分布式锁时优劣: Zookeeper 的优点:

  • 不需要考虑锁的过期时间
  • watch 机制,加锁失败,可以 watch 等待锁释放,实现乐观锁

但它的劣势是:

  • 性能不如 Redis
  • 部署和运维成本高
  • 客户端与 Zookeeper 的长时间失联,锁被释放问题

基于ETCD

package main

import (
    "log"

    "github.com/zieckey/etcdsync"
)

func main() {
  m, err := etcdsync.New("/lock", 10, []string{"http://127.0.0.1:2379"})
  if m == nil || err != nil {
    log.Printf("etcdsync.New failed")
    return
  }
  err = m.Lock()
  if err != nil {
    log.Printf("etcdsync.Lock failed")
    return
  }

  log.Printf("etcdsync.Lock OK")
  log.Printf("Get the lock. Do something here.")

  err = m.Unlock()
  if err != nil {
      log.Printf("etcdsync.Unlock failed")
  } else {
      log.Printf("etcdsync.Unlock OK")
  }
}

基于redis

  • 需要考虑删除(释放锁的过程) 基于redis:setnx+超时+(刷新锁)
package main

import (
"fmt"
"sync"
"time"

    "github.com/go-redis/redis"
)

func incr() {
client := redis.NewClient(&redis.Options{
Addr:     "localhost:6379",
Password: "", // no password set
DB:       0,  // use default DB
})

    var lockKey = "counter_lock"
    var counterKey = "counter"
 
    // lock
    resp := client.SetNX(lockKey, 1, time.Second*5)
    lockSuccess, err := resp.Result()
 
    if err != nil || !lockSuccess {
        fmt.Println(err, "lock result: ", lockSuccess)
        return
    }
 
    // counter ++
    getResp := client.Get(counterKey)
    cntValue, err := getResp.Int64()
    if err == nil || err == redis.Nil {
        cntValue++
        resp := client.Set(counterKey, cntValue, 0)
        _, err := resp.Result()
        if err != nil {
            // log err
            println("set value error!")
        }
    }
    println("current counter is ", cntValue)
 
    delResp := client.Del(lockKey)
    unlockSuccess, err := delResp.Result()
    if err == nil && unlockSuccess > 0 {
        println("unlock success!")
    } else {
        println("unlock failed", err)
    }
}

func main() {
  var wg sync.WaitGroup
  for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
      defer wg.Done()
      incr()
    }()
  }
  wg.Wait()
}

总结

锁的TTL怎么维护:MySQL的实现2与Redis的所有实现都依赖于配置一个大于业务处理逻辑的锁超时时间,并且为了维持锁的安全性,不得不增加续约机制。而在ZK与Etcd的实现中,基础组件使用自身的心跳机制把这部分能力从业务转移到了基础组件内部,提高了安全性,减少了业务的开发负担。