登录
首页 >  数据库 >  Redis

Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况

来源:脚本之家

时间:2023-01-07 12:09:34 222浏览 收藏

亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下超时、Redis分布式锁,希望所有认真读完的童鞋们,都有实质性的提高。

如果你通过网络搜索分布式锁,最多的就是基于redis的了。基于redis的分布式锁得益于redis的单线程执行机制,单线程在执行上就保证了指令的顺序化,所以很大程度上降低了开发人员的思考设计成本。

一、通过setnx实现

1、setnx key value

当且仅当key不存在,将key的值设置为value,并且返回1;若是给定的key已经存在,则setnx不做任何动作,返回0。

public static Boolean setnx(final String key, final String value, final long seconds) {
        return getShardedJedisClient().execute(new ShardedJedisAction() {
            public Boolean doAction(ShardedJedis shardedJedis) {
                Jedis jedis = (Jedis) shardedJedis.getShard(key);
                String result = jedis.set(key, value, "NX", "EX", seconds);
                return "OK".equalsIgnoreCase(result);
            }
        });
    }

2、get key

获取key对应的value值,如果不存在该key,返回0。

public String get(final String key) {
        this.checkIsInMulti();
        return (String)this.execute(new SmartJedis.Action() {
            public String doAction(Jedis jedis) {
                return jedis.get(key);
            }
        }, SmartJedis.RW.R, key);
    }

3、getset key value

获取key的旧值,将新value放入

public static String getset(final String key, final String value) {
        return getShardedJedisClient().execute(new ShardedJedisAction() {
            @Override
            public String doAction(ShardedJedis shardedJedis) {
                return shardedJedis.getSet(key, value);
            }
        });
    }

至此,我们先举个手机三要素验证的列子:(A渠道系统,业务B系统,外部厂商C系统)
(1)B业务系统调用A渠道系统,验证传入的手机、身份证、号码三要素是否一一致。
(2)A渠道系统再调用外部厂商C系统。
(3)A渠道系统将结果返回给B业务系统。
这3个过程中,(2)过程,外部厂商的调用时是需要计费的。
当B业务系统并发量很高时,有100笔相同的三要素校验,由于是相同的三要素,A渠道只要调用一次厂商即可知道结果。那么A渠道系统如何控制不让100笔请求全部去访问外部厂商C系统呢?

小明提出了方案一:

在A系统中,
当100个线程同时请求过来,进行redis.setnx(“LOCK_KEY_phone&idNo&name”,”demo”),这样第一笔线程率先拿到锁,其他的线程等待,当thread(0)处理结束后,thread(0)进行delete(“LOCK_KEY_phone&idNo&name”),把锁放开,thread(i)进行get(“LOCK_KEY_phone&idNo&name”)拿到0,说明上一笔已经处理完成,这个时候,我们可以去查询上一笔的记录。

RedisUtils.setnx("LOCK_KEY_phone&idNo&name","demo");
JSONObject result = A.request(B);
AssetUtils.notNull(result,ResponseCodeEnum.Success,"拿到结果");
ResultDmo resultDmo = (ResultDmo)BeanUtils.maptoBean(result);
resultDao.insert(resultDmo);
if(result!=0){
  //上一笔同样的请求还未处理完成,轮训等待(具体如何轮训在此不展开)
}else{
  //上一笔同样的请求处理完成,进行查库操作
  resultDao.select("参数");
}

小宏说:小明的思想不严谨

问题:当100笔线程中一些线程超时或者系统宕机等意外情况发现,锁会一直被某些线程持有,造成死锁状态。
应该给缓存key设置一个超时时间。比如:200ms

RedisUtils.setnx("LOCK_KEY_phone&idNo&name","demo",200);

这种情况是,大致判断了外部厂商C系统业务处理时间大概为200ms,

网上看还有一种方式(B):

RedisUtils.setnx("LOCK_KEY_phone&idNo&name",currentTime,200);
Long old = RedisUtils.get("LOCK_KEY_phone&idNo&name");
Long new = System.currentTimeMillis();
Long time = new - old;
if(time>0){
//处理已经超时
RedisUtils.delete("LOCK_KEY_phone&idNo&name");
}

(B)这种情况不严谨:当a获取setnx锁,a线程崩溃或超时,b、c线程同时get到old,且判断超时,可能出现b线程delete a线程的锁,并且setnx后;c线程又将b线程的锁delete,并且setnx。这种情况完全锁不住线程了。

(B)方案的升级版—->>(C)方案:

当a获取setnx锁,a线程崩溃或超时,b线程getset,获取old且判断超时,c线程getset,获取old(此时这个值是b刚刚set进去的),判断未超时,c继续等待。b线程delete a线程的锁,并且setnx后。这种情况是安全的。

需要注意的地方:
①不要轻易将get和getset混用,笔者认为getset单独使用比较好。
有一种情况,a、b、c、三个线程,a、b同时get,a立即返回了old,突然来了个c,卡在b之前getset了,且删除锁,那么b的get只能返回nil了。此时再根据时间戳对比:
a.get != (a.set)
b.get ! = (b.set)
这样a、b都没拿到锁,但是a其实已经获取到了锁。
②多个服务器时间的同步问题。

总结: 锁超时了该如何处理,通过getset方式判断时间戳差的方式,多比同时getset都得到超时,同时去setnx。总会有一个更快地去setnx。

二、通过incr抢占资源实现

1、incr

将 key 中储存的数字值增一。如果 key 不存在,那么 key 的值会先被初始化为 0 ,然后再执行 INCR 操作。如果值包含错误的类型,或字符串类型的值不能表示为数字,那么返回一个错误。

public static Long incr(final String key) {
        return shardedClient.execute(new ShardedJedisAction() {
            @Override
            public Long doAction(ShardedJedis shardedJedis) {
                shardedJedis.expire(key, 200);
                return shardedJedis.incr(key);
            }
        });
    }

还是上面的三要素的例子

Long result = RedisUtils.incr("LOCK_KEY_phone&idNo&name");
        if (result > 1) {
            //如果计数器>1,说明已经有请求进来
            throw new AppException(ResponseCode.FAIL.getCode(), "操作频繁");
        }


 JSONObject result = A.request(B);
 Long endTime = System.currentTimeMillis();
 Long time = endTime - startTime;
  //如果处理时间大于incr的key存活时间,说明该笔请求已经超时
  if (time > 200) {
      //全局ID,统计超时次数
      String key = "LOCK_KEY_phone&idNo&name" + source;
      RedisUtils.incr(key);
      int total = Integer.valueOf(RedisUtils.get(key));
      //断言若超时10次,进行报警(报警不在次展开)
      AssertUtils.isTrue(total 

这里设置了计数器的超时时间为200ms,如果请求超时,会有大量的线程同时访问,笔者这里有10笔同时过来,就启动报警。人为排查渠道。和setnx的不同是,某个线程超时,setnx的方式需要手动去判断,再去加锁,防止大量线程进入(这里可以通过轮训实现);而incr的方式超时了,大量线程进来,我不做处理,但是这里的time>200是具有误差的。

本篇关于《Redis实现分布式锁(setnx、getset、incr)以及如何处理超时情况》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于数据库的相关知识,请关注golang学习网公众号!

声明:本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
相关阅读
更多>
最新阅读
更多>
课程推荐
更多>
评论列表