登陆

不必找了,根据 Redis 的分布式锁实战来了

admin 2019-11-01 187人围观 ,发现0个评论

前语:在分布式环境中,咱们常常运用锁来进行并发操控,锁可分为达观锁和失望锁,

依据数据库版别戳的完成是达观锁,依据redis或zookeeper的完成可认为是失望锁了。达观锁和失望锁最底子的差异在于线程之间是否彼此堵塞。

那么,本文首要来评论依据redis的分布式锁算法问题。

从2.6.12版别开端,redis为SET指令增加了一系列选项(set [key] NX/XX EX/PX [expiration]):

  • EX seconds – 设置键key的过期时刻,单位时秒
  • PX milliseconds – 设置键key的过期时刻,单位时毫秒
  • NX – 只需键key不存在的时分才会设置key的值
  • XX – 只需键key存在的时分才会设置key的值

中文地址:http://redis.cn/commands/set.html

留意: 由于SET指令加上选项现已能够彻底替代SETNX, SETEX, PSETEX的功用,所以在将来的版别中,redis或许会不引荐运用并且终究扔掉这几个指令。

这儿简略提一下,在旧版别的redis中(指2.6.12版别之前),运用redis完成分布式锁一般需求setNX、expire、getSet、del等指令。并且会发现这种完成有许多逻辑判别的原子操作以及本地时刻等并没有操控好。

而在旧版别的redis中,redis的超时时刻很难操控,用户迫切需求把setNX和expiration结合为一体的指令,把他们作为一个原子操作,这样新版别的多选项set指令诞生了。然而这并没有彻底处理杂乱的超时操控带来的问题。

接下来,咱们的全部评论都依据新版redis。

在这儿,我先提出几个在完成redis分布式锁中需求考虑的关键问题

1、死锁问题;

1.1、为了防止死锁,redis至少需求设置一个超时时刻;

1.2、由1.1引申出来,当锁主动开释了,可是程序并没有履行结束,这时分其他线程又获取到锁履行相同的程序,或许会形成并发问题,这个问题咱们需求考虑一下是否归归于分布式锁带来问题的领域。

2、锁开释问题,这儿会有两个问题;

2.1、每个获取redis锁的线程应该开释自己获取到的锁,而不是其他线程的,所以咱们需求在每个线程获取锁的时分给锁做上不同的符号以示区别;

2.2、由2.1带来的问题是线程在开释锁的时分需求判别当时锁是否归于自己,假如归于自己才开释,这儿涉及到逻辑判别句子,至少是两个操作在进行,那么不必找了,根据 Redis 的分布式锁实战来了咱们需求考虑这两个操作要在一个原子内履行,否者在两个行为之间或许会有其他线程刺进履行,导致程序紊乱。

3、更牢靠的锁;

单实例的redis(这儿指只需一个master节点)往往是不牢靠的,尽管完成起来相对简略一些,可是会晤临着宕机等不可用的场景,即使在主从复制的时分也显得并不牢靠(由于redis的主从复制往往是异步的)。

关于Martin Kleppmann的Redlock的剖析

原文地址:https://redis.io/topics/distlock

中文地址:http://redis.cn/topics/distlock.html

文章剖析得出,这种算法只需具有3个特性就能够完成一个最低确保的分布式锁。

  • 安全特点(Safety property): 独享(彼此排挤)。在恣意一个时刻,只需一个客户端持有锁。
  • 活性A(Liveness property A): 无死锁。即使持有锁的客户端溃散(crashed)或许网络被割裂(gets partitioned),锁依然能够被获取。
  • 活性B(Liveness property B): 容错。只需大部分Redis节点都活着,客户端就能够获取和开释锁.

咱们来剖析一下:

第一点安全特点意味着失望锁(互斥锁)是咱们做redis分布式锁的条件,否者将或许形成并发;

第二点标明为了防止死锁,咱们需求设置锁超时时刻,确保在必定的时刻往后,锁能够从头被运用;

第三点是说关于客户端来说,获取锁和手动开释锁能够有更高的牢靠性。

更进一步剖析,结合上文说到的关键问题,这儿能够引申出别的的两个问题:

  • 怎样才干合理判别程序真实处理的有用时刻规模?(这儿有个时刻偏移的问题)
  • redis Master节点宕机后康复(或许还没有耐久化到磁盘)、主从节点切换,(N/2)+1这儿的N应该怎样动态核算更合理?

接下来再看,redis之父antirez对Redlock的点评

原文地址:http://antirez.com/news/101

文中首要说到了网络推迟和本地时钟的修正(不管是时刻服务器或人为修正)对这种算法或许形成的影响。

最终,来点实践吧

I、传统的单实例redis分布式锁完成(关键步骤)

获取锁(含主动开释锁):

SET resource_name my_random_value NX PX 30000
手动删去锁(Lua脚本):
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end

II、分布式环境的redis(多master节点)的分布式锁完成

为了确保在尽或许短的时刻内获取到(N/2)+1个节点的锁,能够并行去获取各个节点的锁(当然,并行或许需求耗费更多的资源,由于串行只需求count到满足数量的锁就能够中止获取了);

别的,怎样动态实时一致获取redis mast红茶的功效与作用er nodes需求更进一步去考虑了。

QA,弥补一下阐明(以下为我与朋友交流的状况,以阐明文中我们或许不行理解的当地):

1、在关键问题2.1中,删去就删去了,会形成什么问题?

线程A超时,预备删去锁;但此刻的锁归于线程B;线程B还没履行完,线程A把锁删去了,这时线程C获取到锁,一起履行程序;所以不能乱删。

2、在关键问题2.2中,只需在key生成时,跟线程相关就不必考虑这个问题了吗?

不同的线程履行程序,线程之间肯尽管有差异呀,然后在redis锁的value设置有线程信息,比方线程id或线程称号,是分布式环境的话加个机器id前缀咯(类似于twitter的snowflake算法!),可是在del指令只会涉及到key,不会再次查看value,所以仍是需求lua脚本操控if(condition){xxx}的原子性。

3、那要不要考虑锁的重入性?

不需求重入;try…finally 没得重入的场景;关于单个线程来说,履行是串行的,获取锁之后必定会开释,由于finally的代码必定会履行啊(只需进入了try块,finally必定会履行)。

4、为什么两个线程都会去删去锁?(形似重复的问题。不管怎样,仍是耐性答复吧)

每个线程只能办理自己的锁,不能办理他人线程的锁啊。这儿能够联想一下ThreadLocal。

5、假如加锁的线程挂了怎样办?只能等候主动超时?

看你怎样写程序的了,一种是问题3的答复;别的,那就主动超时咯。这种状况也适用于网络over了。

6、时刻太长,程序反常就会蛋疼,时刻太短,就会呈现程序还没有处理完就超时了,这岂不是很为难?

是呀,所以需求更好的衡量这个超时时刻的设置。

实践部分首要代码:

RedisLock东西类:

package com.caiya.cms.web.component;
import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* redis完成分布式锁
* 可完成特性:
* 1、使多线程无序排队获取和开释锁;
* 2、丢掉未成功取得锁的线程处理;
* 3、只开释线程自身加持的锁;
* 4、防止死锁
*
* @author wangnan
* @since 1.0
*/
public final class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 测验加锁(仅一次)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时刻(秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
return Objects.equals(response, "OK");
} finally {
jedisCache.close();
}
}
/**
* 加锁(指定最大测验次数规模内)
*
* @param lockKey 锁key
* @param lockValue 锁value
* @param expireSeconds 锁超时时刻(秒)
* @param tryTimes 最大测验次数
* @param sleepMillis 每两次测验之间休眠时刻(毫秒)
* @return 是否加锁成功
* @throws CacheException
*/
public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
boolean result;
int count = 0;
do {
count++;
result = tryLock(lockKey, lockValue, expireSeconds);
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} while (!result && count <= tryTimes);
return result;
}
/**
* 开释锁
*
* @param lockKey 锁key
* @param lockValue 锁value
*/
public static void unlock(String lockKey, String lockValue) {
JedisC不必找了,根据 Redis 的分布式锁实战来了ache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
// Objects.equals(result, 1L);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisCache.close();
}不必找了,根据 Redis 的分布式锁实战来了
// return false;
}
private RedisLock() {
}
}

运用东西类的代码片段1:

 ...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟事务相关的仅有拼接键
String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + Sy不必找了,根据 Redis 的分布式锁实战来了stem.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群环境中的仅有值
boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只测验一次,在本次处理过程中直接回绝其他线程的恳求
if (!locked) {
throw new IllegalAccessException("您的操作太频频了,歇息一下再来吧~");
}
try {
// 开端处理中心事务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);// 在finally块中开释锁
}

运用东西类的代码片段2:

 ...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
String lockValue = Constant.DEFAULT_CACHE_NAME + ":机器编号:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公正锁,无序竞赛(这儿需求合理依据事务处理状况设置最大测验次数和每次休眠时刻)
if (!locked) {
throw new IllegalAccessException("体系太忙,本次操作失利");// 一般来说,不会走到这一步;假如真的有这种状况,并且在合理设置锁测验次数和等候呼应时刻之后依然处理不过来,或许需求不必找了,根据 Redis 的分布式锁实战来了考虑优化程序呼应时刻或许用音讯行列排队履行了
}
try {
// 开端处理中心事务逻辑
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);
}
...

附加:

依据redis的分布式锁完成客户端Redisson:

https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

依据zookeeper的分布式锁完成:

http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

请关注微信公众号
微信二维码
不容错过
Powered By Z-BlogPHP