新手问题 关于获取互斥锁超时后的处理问题?

Dounx · 2019年12月25日 · 最后由 Dounx 回复于 2019年12月27日 · 3021 次阅读

Discourse 中用 Redis 实现互斥锁有这么一段:

distributed_mutex.rb

if current_expire_time && now <= current_expire_time.to_i
  redis.unwatch

  got_lock = false
else
  result =
    redis.multi do
      redis.set key, expire_time.to_s
      redis.expire key, validity
    end

  got_lock = !result.nil?
end

当 A 线程还在执行代码时(但是已经超过超时时间),B 线程发现锁已经超过超时时间,然后 B 设置了新的超时时间并获取了锁(仅仅在 A 线程执行完毕后在 log 中记录一条 warning),这种做法是否合理? 感觉上即使一个线程的执行时间如果超过了超时时间,其他线程也不应该获取到这个锁,而是获取锁失败。

另外,也是在 distributed_mutex.rb:

# NOTE wrapped in mutex to maintain its semantics
def synchronize
  @mutex.synchronize do
    expire_time = get_lock

    begin
      yield
    ensure
      current_time = redis.time[0]
      if current_time > expire_time
        warn("held for too long, expected max: #{@validity} secs, took an extra #{current_time - expire_time} secs")
      end

      if !unlock(expire_time) && current_time <= expire_time
        warn("the redis key appears to have been tampered with before expiration")
      end
    end
  end
end

这里注释说使用 @mutex(Mutex.new)来保持互斥,是为了在下面这种情况下保持互斥吗?

mutex = DistributedMutex.new("foo_key", foo_redis)
10.times do
  Thread.new do
    mutex.synchronize do
      # some code
    end
  end
end

但是在 Discourse 中的使用方式都是下面这样,所以有点迷惑,这种方式使用的话是不是就不需要 Mutex.new.synchronize 来保证互斥语义了:

DistributedMutex.synchronize("model_#{id}") do
  # some code
end

附:Rails 中,关于多线程方面的知识可以在哪里接触到?是一个或 n 个请求就会对应一个线程,又或者是一个 session 会对应一个线程?

当 A 线程还在执行代码时(但是已经超过超时时间),B 线程发现锁已经超过超时时间,然后 B 设置了新的超时时间并获取了锁(仅仅在 A 线程执行完毕后在 log 中记录一条 warning),这种做法是否合理?感觉上即使一个线程的执行时间如果超过了超时时间,其他线程也不应该获取到这个锁,而是获取锁失败。

A 线程要负责释放锁,否则如果 A 异常退出那么就是死锁了。如果 A 在锁过期时间内执行不完,A 可以 extend 锁。

但是在 Discourse 中的使用方式都是下面这样,所以有点迷惑,这种方式使用的话是不是就不需要 Mutex.new.synchronize 来保证原子性了:

DistributedMutex.synchronize借助 redis 实现了跨进程/机器的锁,Mutex#synchronize实现了本进程起的线程级别的锁。的确可以只用 redis 实现线程级别的锁,但是毕竟有额外的 redis 读写操作,不如直接用内置的Mutex#synchronize

附:Rails 中,关于多线程方面的知识可以在哪里接触到?是一个或 n 个请求就会对应一个线程,又或者是一个 session 会对应一个线程?

Rails 用的多线程知识可能并不多,顶多是如何在多线程环境下初始化一个单例。多线程可以看 puma 或者 sidekiq。

piecehealth 回复

如果 A 在锁过期时间内执行不完,A 可以 extend 锁。

意思是 A 在执行的过程中,需要不断检查当前时间是否大于锁的过期时间吗?如果需要这么做的话应该怎么实现?

首先加锁后的操作如果占用太长时间会成为瓶颈,即第一优先级是尽量快的释放锁。

如果实在完不成,比如所有操作有 5 步,可以每步都 extend 一下锁,不用检查还有多少时间。

piecehealth 回复

那获得锁的线程怎么知道自己的操作时间是否太长呢?正常情况下临界区的操作并不会超时,但是在某些特殊情况下临界区操作会超时(比如临界区在操作 Redis 或者数据库),这种情况下这个线程并不知道自己到底有没有超时吧。

这个 DistributedMutex 的实现是 A 运行超时之后,B 会获取到这个锁,A 运行完毕之后会记录一个 warning 提示持有锁的时间过长。 这样的话 A 和 B 不就都进入到了同一个临界区,这样会出现问题吗?

还是说这个时间其实是一个死锁的时间,只要大于这个时间,任务就基本没有可能执行完毕?

看了一下是有些问题呢,也许 discourse 觉得他们的场景 A 超时处理,B 把锁抢过来也没问题,严肃的场景就不能这么搞了。

可以看看 https://github.com/ClosureTree/with_advisory_lock ,用数据库的锁。

Dounx 回复

DistributedMutex的实现是允许超时自动释放锁的,是一个不严谨的悲观锁。超时有 warning log,可以方便追踪,或者监控报警。第一优先级还是不要让操作超时。

真的要用悲观锁,可以参考 https://redis.io/topics/distlock

piecehealth 回复

看了一下 DistributedMutex 的测试,确实是允许超时自动释放锁。 @Rei @piecehealth 感谢!

distributed_mutex_spec.rb

it "handles auto cleanup correctly" do
  m = DistributedMutex.new(key)

  Discourse.redis.setnx key, Time.now.to_i - 1

  start = Time.now.to_i
  m.synchronize do
    "nop"
  end

  # no longer than a second
  expect(Time.now.to_i).to be <= start + 1
end
Dounx 关闭了讨论。 01月02日 11:20
需要 登录 后方可回复, 如果你还没有账号请 注册新账号