python-redis-lock
python-redis-lock
- 使用方法
- 导入包
- 阻塞锁
- 超时阻塞锁
- 非阻塞锁
- 释放锁
- 查看锁是否已被占用
- 使用上下文管理器 with
- 锁 id
- 日志
- 工作原理
- redis 命令
- 获取锁原理
- 释放锁原理
- 其他
- 参考文档
python-redis-lock 是基于 redis
SETNX和
BLPOP命令实现的锁上下文管理器,其接口类似于
threading.Lock。
使用方法
导入包
import redis_lock
from redis import StrictRedis
阻塞锁
线程拿不到锁时将一直阻塞等待。具体是控制acquire方法的blocking参数,缺省默认为True,即阻塞。
conn = StrictRedis()
lock = redis_lock.Lock(conn, "lock-name"):
if lock.acquire():print("Got the lock. Doing some work ...")time.sleep(5)
超时阻塞锁
线程拿不到锁时将一直阻塞等待(blocking=True),直到超时(timeout=5)返回拿锁失败,代码将走到else分支。具体是控制acquire方法的timeout参数,缺省默认为None,即无任何超时。
conn = StrictRedis()
lock = redis_lock.Lock(conn, "lock-name"):
if lock.acquire(timeout=5):print("Got the lock. Doing some work ...")time.sleep(5)
else:print("Someone else has the lock.")
非阻塞锁
线程不阻塞blocking=False,拿不到锁时立即返回拿锁失败,代码将走到else分支。
conn = StrictRedis()
lock = redis_lock.Lock(conn, "lock-name"):
if lock.acquire(blocking=False):print("Got the lock. Doing some work ...")time.sleep(5)
else:print("Someone else has the lock.")
释放锁
正常释放锁
conn = StrictRedis()
lock = redis_lock.Lock(conn, "lock-name")
lock.acquire()
print("Got the lock. Doing some work ...")
time.sleep(5)
lock.release()
释放从其他地方获得的锁
# 从其他地方获得锁
lock1 = Lock(conn, "lock-name")
lock1.acquire()
lock_id = lock1.id# 把 lock_id 传参到此处释放锁
lock2 = Lock(conn, "lock-name", id=lock_id)
lock2.release()
强制释放指定名称的锁
lock = Lock(conn, "lock-name")
lock.reset()
强制释放所有锁,一般用于应用程序启动或结束时。
import redis-lock
from redis import StrictRedisconn = StrictRedis()
reset_all(conn)
查看锁是否已被占用
具体实现是调用了redis-cli EXISTS lock-name命令查看"lock-name"这个key是否存在。如果存在,说明有人已经占用了该锁。
is_locked = Lock(conn, "lock-name").locked()
使用上下文管理器 with
注意:python-redis-lock 源码的 with 语句使用的是阻塞锁,如需其他类型的锁,可以自己定义 Lock 的子类,覆盖__enter__方法。
class Lock(object):...def __enter__(self):acquired = self.acquire(blocking=True)assert acquired, "Lock wasn't acquired, but blocking=True"return selfdef __exit__(self, exc_type=None, exc_value=None, traceback=None):self.release()
上面的例子可以使用上下文管理器重写
conn = StrictRedis()
with redis_lock.Lock(conn, "lock-name"):print("Got the lock. Doing some work ...")time.sleep(5)
锁 id
可以对锁设置 id,以便以后可以由同一进程或不同的进程检索它。在应用程序需要标识锁所有者(找出当前拥有该锁的人)的情况下,这非常有用。
import socket
host_id = "owned-by-%s" % socket.gethostname()
lock = redis_lock.Lock(conn, "lock-name", id=host_id)
if lock.acquire(blocking=False):assert lock.locked() is Trueprint("Got the lock.")lock.release()
else:if lock.get_owner_id() == host_id:print("I already acquired this in another process.")else:print("The lock is held on another machine.")
日志
您可以通过修改各种记录器来控制日志输出
logging.getLogger("redis_lock.thread").disabled = True
logging.getLogger("redis_lock").disable(logging.DEBUG)
工作原理

redis 命令
SETNX:SET if Not eXists,格式 SETNX key value。当key不存在时才能设置成功,如果key已存在则设置失败。
127.0.0.1:6379> SETNX lock-name 123456
(integer) 1
127.0.0.1:6379> SETNX lock-name 123456
(integer) 0
BLPOP:BLocking POP,阻塞式弹出,格式 BLPOP key timeout 。当timeout=0时,表示一直阻塞等待,直到有其他客户端执行rpush或lpush命令,插入数据后,阻塞才解除,并且弹出头部的第一个元素。如果设置了timeout,则阻塞超时会返回弹出失败。当多个客户端同时在一个key处出现阻塞时,如果插入一个新元素,则最先阻塞的客户端会解除阻塞;再插入一个新元素,第二个阻塞的客户端才解除阻塞,以此类推。
# 当列表lock-signal-name中没有任何元素时,redis-client将阻塞等待5秒
127.0.0.1:6379> BLPOP lock-signal-name 5
(nil)
(5.07s)# 向列表lock-signal-name的表头插入一个元素1
127.0.0.1:6379> LPUSH lock-signal-name 1
(integer) 1# 当列表lock-signal-name中有元素时,redis-client将弹出该元素并返回
127.0.0.1:6379> BLPOP lock-signal-name 5
1) "lock-signal-name"
2) "1"
获取锁原理
def acquire(self, blocking=True, timeout=None):# ...# 已经拿到锁的 Lock 实例不能重复拿锁if self._held: raise AlreadyAcquired("Already acquired from this Lock instance.")# ... busy = Trueblpop_timeout = timeout or self._expire or 0timed_out = Falsewhile busy:# SETNX失败表示锁已被占用,busy = not 0 = True# SETNX成功表示拿锁成功,使用SETEX设置超时,busy = not 1 = Falsebusy = not self._client.set(self._name, self._id, nx=True, ex=self._expire)if busy:if timed_out:return False# 如果是阻塞锁,使用BLPOP阻塞等待通知elif blocking: # BLPOP成功表示可以去拿锁了,timed_out = not 1 and None = False# BLPOP失败表示阻塞超时,timed_out = not 0 and 5 = 5timed_out = not self._client.blpop(self._signal, blpop_timeout) and timeout# 如果是非阻塞锁,SETNX失败就立即返回拿锁失败else:logger.debug("Failed to get %r.", self._name)return Falselogger.debug("Got lock for %r.", self._name)# 如果设置了自动刷新锁(auto_renewal=True, expire=5),则启动一个daemon thread进行自动刷新if self._lock_renewal_interval is not None: self._start_lock_renewer()return True
能拿到锁
1、调用redis-cli SETNX lock-name id命令设置一个名为"lock-name"的key,其值为 id。当lock-name这个key不存在时,SETNX才能设置成功,表示拿到了锁。
2、如果SETNX返回成功,则调用redis-cli SETEX lock-name timeout id命令设置锁的超时时间。
拿不到锁
1、当lock-name这个key已存在时,SETNX返回失败,表示锁已经被其他人占用。
2、如果是阻塞锁,则进入等锁阶段,调用redis-cli BLPOP lock-signal-name timeout命令加入阻塞等待的队列。
3、如果是非阻塞锁或超时,则返回拿锁失败。
释放锁原理
执行释放锁的Lua脚本。
def release(self):# 停止自动刷线线程if self._lock_renewal_thread is not None:self._stop_lock_renewer()logger.debug("Releasing %r.", self._name)# 执行释放锁的Lua脚本error = _eval_script(self._client, UNLOCK, self._name, self._signal, args=(self._id,))if error == 1:raise NotAcquired("Lock %s is not acquired or it already expired." % self._name)elif error:raise RuntimeError("Unsupported error code %s from EXTEND script." % error)else:self._delete_signal()def _delete_signal(self):self._client.delete(self._signal)
1、调用redis-cli GET lock-name命令得到当前锁的id,与传入的id值进行比较。如果不等,说明要求释放锁的人并不持有该锁,或者该锁已过期失效。
2、如果相等,说明持有该锁,则先调用redis-cli DEL lock-signal-name命令删除signal列表,再调用redis-cli LPUSH lock-signal-name 1命令向signal列表表头插入一个元素,用于通知阻塞队列中的下一个人,最后调用redis-cli DEL lock-name命令删除锁。
# KEYS[1]:lock-name
# ARGV[1]:id
# KEYS[2]:lock-signal-name# Check if the id match. If not, return an error code.
UNLOCK_SCRIPT = b"""if redis.call("get", KEYS[1]) ~= ARGV[1] thenreturn 1elseredis.call("del", KEYS[2])redis.call("lpush", KEYS[2], 1)redis.call("del", KEYS[1])return 0end
"""
UNLOCK_SCRIPT_HASH = sha1(UNLOCK_SCRIPT).hexdigest()
强制释放指定名称的锁
def reset(self):"""Forcibly deletes the lock. Use this with care."""_eval_script(self._client, RESET, self._name, self._signal, self._signal_expire)RESET_SCRIPT = b"""redis.call("del", KEYS[2])redis.call("lpush", KEYS[2], 1)redis.call("pexpire", KEYS[2], KEYS[3])return redis.call("del", KEYS[1])
"""
RESET_SCRIPT_HASH = sha1(RESET_SCRIPT).hexdigest()
强制释放所有锁
def reset(redis_client):"""Forcibly deletes all locks if its remains (like a crash reson). Use this with care."""_eval_script(redis_client, RESET_ALL)RESET_ALL_SCRIPT = b"""local locks = redis.call('keys', 'lock:*')local signalfor _, lock in pairs(locks) dosignal = 'lock-signal:' .. string.sub(lock, 6)redis.call("del", signal)redis.call("lpush", signal, 1)redis.call("expire", signal, 1)return redis.call("del", lock)endreturn #locks
"""
RESET_SCRIPT_HASH = sha1(RESET_SCRIPT).hexdigest()
其他
((UNLOCK, _, _, # noqaEXTEND, _, _,RESET, _, _,RESET_ALL, _, _,DELETE_ALL_SIGNAL_KEYS, _, _),SCRIPTS) = zip(*enumerate([UNLOCK_SCRIPT_HASH, UNLOCK_SCRIPT, 'UNLOCK_SCRIPT',EXTEND_SCRIPT_HASH, EXTEND_SCRIPT, 'EXTEND_SCRIPT',RESET_SCRIPT_HASH, RESET_SCRIPT, 'RESET_SCRIPT',RESET_ALL_SCRIPT_HASH, RESET_ALL_SCRIPT, 'RESET_ALL_SCRIPT',DELETE_ALL_SIGNAL_KEYS_SCRIPT_HASH, DELETE_ALL_SIGNAL_KEYS_SCRIPT, 'DELETE_ALL_SIGNAL_KEYS_SCRIPT'
]))
enumerate([...])得到了长度为15的枚举对象,其元素为tuple:(0, UNLOCK_SCRIPT_HASH),…,(14, 'DELETE_ALL_SIGNAL_KEYS_SCRIPT')。
*把这15个tuple变成传参,zip收到15个入参,把15个tuple[0]组合成(0, ..., 14),把15个tuple[1]组成(UNLOCK_SCRIPT_HASH, ..., 'DELETE_ALL_SIGNAL_KEYS_SCRIPT'),将两者一起返回。在接收第一个返回值时,使用变量保存以后需要的序号,其他序号用_表示丢弃。
参考文档
- python-redis-lock官方文档
- Redis分布式锁的Python实现[python-redis-lock]
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
