如何利用 redis 分布式锁,解决秒杀场景下的订单超卖问题?

1. 秒杀场景

Controller层:

@RestController
@RequestMapping("/skill")
@Slf4j
public class SecKillController {
    @Autowired
    private SecKillService secKillService;
 
    //查询秒杀活动特价商品的信息
    @GetMapping("/query/{productId}")
    public String query(@PathVariable String productId)throws Exception {
        return secKillService.querySecKillProductInfo(productId);
    }

    //秒杀
    @GetMapping("/order/{productId}")
    public String skill(@PathVariable String productId)throws Exception {
        log.info("@skill request, productId:" + productId);
        secKillService.orderProductMockDiffUser(productId);
        return secKillService.querySecKillProductInfo(productId);
    }
}

Service层:

@Service
public class SecKillServiceImpl implements SecKillService {
    
    private static final int TIMEOUT = 10 * 1000; //超时时间 10s
    
    @Autowired
    private RedisLock redisLock;

   // 雅诗兰黛特价小棕瓶,限量100000份
    static Map<String,Integer> products;
    static Map<String,Integer> stock;
    static Map<String,String> orders;
    static {
        //模拟多个表,商品信息表,库存表,秒杀成功订单表
        products = new HashMap<>();
        stock = new HashMap<>();
        orders = new HashMap<>();
        //商品Id---商品库存
        products.put("123456", 100000);
        //商品id---商品库存
        stock.put("123456", 100000);
    }

    private String queryMap(String productId) {
        return "雅诗兰黛小棕瓶特价,限量份"
                + products.get(productId)
                +" 还剩:" + stock.get(productId)+" 份"
                +" 该商品成功下单用户数目:"
                +  orders.size() +" 人" ;
    }

    @Override
    public String querySecKillProductInfo(String productId) {
        return this.queryMap(productId);
    }

    //秒杀的逻辑:可以在该方法生加上Synchronized解决超卖
    @Override
    public void  orderProductMockDiffUser(String productId) {
        //1.查询该商品库存,为0则活动结束。
        int stockNum = stock.get(productId);
        if(stockNum == 0) {
            throw new SellException(100,"活动结束");
        }else {
            //2.下单(模拟不同用户openid不同)
            orders.put(KeyUtil.genUniqueKey(),productId);
            //3.减库存
            stockNum =stockNum-1;
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //4.更新库存
            stock.put(productId,stockNum);
        }
    }
}

使用压测工具测试结果:雅诗兰黛小棕瓶特价,限量份100000 还剩:99938份 该商品成功下单用户数目:646人

很明显订单超卖了,如何解决?可以在秒杀方法上加上Synchronized,但是只适合单点服务器,且性能低。更多redis面试题见公众号Java精选,回复Java面试,获取Redis相关面试题资料。

2. Redis分布式锁解决订单超卖

2.1 两个命令介绍

业务场景:

天猫双11热卖过程中,对已经售罄的货物追加补货,且补货完成。客户购买热情高涨, 3秒内将所有商品购买完毕。本次补货已经将库存全部清空,如何避免最后一件商品不被多人同时购买?【超卖问题】就是说如果只剩一件商品,但是有5个人要买,如何保证不被超卖???

业务分析:

使用watch监控一个key有没有改变已经不能解决问题,此处要监控的是具体数据 ,我们要监控的是商品的数量什么时候到1,这个商品的数量是一直变化的,不可能别每次变化,都放弃执行。

解决方案:

使用 setnx 设置一个公共锁:setnx lock-key value,操作完毕通过del操作释放锁

利用setnx命令的返回值特征,有值则返回设置失败,无值则返回设置成功

对于返回设置成功的,拥有控制权进行下一步的具体业务操作,对于返回设置失败的,不具有控制权,排队或等待

127.0.0.1:6379> set num 10
OK
127.0.0.1:6379> setnx lock-num 1 -- 加锁
(integer) 1
127.0.0.1:6379> incrby num -1 
(integer) 9
127.0.0.1:6379> del lock-num  -- 释放锁
(integer) 1

127.0.0.1:6379> setnx lock-num 1  --  当前客户端加锁
(integer) 1 

127.0.0.1:6379> setnx lock-num 1  -- 其他客户端获取不到锁
(integer) 0

死锁: 如果加了锁,但是没有释放,就会导致死锁,其他客户端一直获取不到锁。

使用expire为锁key添加时间限定,到时间不释放,放弃锁

由于操作通常都是微秒或毫秒级,因此该锁定时间不宜设置过大。具体时间需要业务测试后确认。

  • expire lock-key second
  • pexpire lock-key milliseconds
127.0.0.1:6379> set name 123
OK
127.0.0.1:6379> setnx lock-name 1 -- 锁的名称key
(integer) 1
127.0.0.1:6379> expire lock-name 20 -- 使用expire为锁key添加时间限定
(integer) 1
127.0.0.1:6379> get name
"123"

GETSET key value

将给定 key 的值设为 value ,并返回 key 的旧值(old value)。

redis> GETSET db mongodb    # 没有旧值,返回 nil
(nil)

redis> GET db
"mongodb"

redis> GETSET db redis      # 返回旧值 mongodb
"mongodb"

redis> GET db
"redis"

2.2 RedisLock

@Component
@Slf4j
public class RedisLock {

    @Autowired
    private StringRedisTemplate redisTemplate;

    /**
     * 加锁
     * @param key:productId
     * @param value 当前时间+超时时间
     */
    public boolean lock(String key, String value) {
        //setnx----对应方法 setIfAbsent(key, value),如果可以加锁返回true,不可以加锁返回false
        if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
            return true;
        }
 
        //下面这段代码时为了解决可能出现的死锁情况
        String currentValue = redisTemplate.opsForValue().get(key);
        //如果锁过期
        if (!StringUtils.isEmpty(currentValue)
                && Long.parseLong(currentValue) < System.currentTimeMillis()) {
            //获取上一个锁的时间:重新设置锁的过期时间value,并返回上一个过期时间
            String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
            //currentValue =2020-12-28,两个线程的value=2020-12-29,只会有一个线程拿到锁
            if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
                return true;
            }
        }
        return false;
    }

    //解锁
    public void unlock(String key, String value) {
        try {
            String currentValue = redisTemplate.opsForValue().get(key);
            if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
                redisTemplate.opsForValue().getOperations().delete(key);
            }
        }catch (Exception e) {
            log.error("【redis分布式锁】解锁异常, {}", e);
        }
    }
}

2.3 将redis分布式锁应用于秒杀业务

@Override
public void orderProductMockDiffUser(String productId) {
    //加锁
    //锁的过期时间为当前时间+过期时长
    long time = System.currentTimeMillis()+TIMEOUT;
    if(!redisLock.lock(productId,String.valueOf(time))){
        throw new SellException(101,"人太多,稍后再来");
    }
    //1.查询该商品库存,为0则活动结束。
    int stockNum = stock.get(productId);
    if(stockNum == 0) {
        throw new SellException(100,"活动结束");
    }else {
        //2.下单(模拟不同用户openid不同)
        orders.put(KeyUtil.genUniqueKey(),productId);
        //3.减库存
        stockNum =stockNum-1;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.更新库存
        stock.put(productId,stockNum);
    }
    //解锁
    redisLock.unlock(productId,String.valueOf(time));
}

2.4 分析RedisLock

重点分析加锁逻辑,有两个逻辑需要考虑:

public boolean lock(String key, String value) {

    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    //下面的代码是为了解决可能出现的死锁的情况????
    String currentValue = redisTemplate.opsForValue().get(key);
    if (!StringUtils.isEmpty(currentValue)
        && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
            return true;
        }
    }
    return false;
}

为了解决可能出现的死锁的情况????

//秒杀业务方法
@Override
public void orderProductMockDiffUser(String productId) {
    //加锁
    //锁的过期时间为当前时间+过期时长
    long time = System.currentTimeMillis()+TIMEOUT;
    if(!redisLock.lock(productId,String.valueOf(time))){
        throw new SellException(101,"人太多,稍后再来");
    }
    //1.查询该商品库存,为0则活动结束。
    int stockNum = stock.get(productId);
    if(stockNum == 0) {
        throw new SellException(100,"活动结束");
    }else {
        //2.下单
        orders.put(KeyUtil.genUniqueKey(),productId);
        //3.减库存
        stockNum =stockNum-1;
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //4.更新库存
        stock.put(productId,stockNum);
    }
    //解锁
    redisLock.unlock(productId,String.valueOf(time));
}

假如我们将中间那段逻辑去掉会出现声明情况???

public boolean lock(String key, String value) {
    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    return false;
}

① 线程A执行秒杀的业务逻辑方法,并对这个方法加了锁,key=proeuctId,value=加锁时间+过期时长,然后开始执行下单----》减库存-----》更新库存等操作,如果在执行的过程中,这段代码发生了异常,那么线程A是不会释放锁的,导致其他线程都无法获取到锁导致死锁的产生,所以下面的逻辑是很有必要加的,即如果当前时间晚于锁的过期时间,那么就会向下走if()条件:

//下面的代码是为了解决可能出现的死锁的情况????
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue)
    && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
        return true;
    }
}

② 可是下面的if()条件怎么理解?

currentValue=2020-12-18

假如现在两个线程A和B同时执行lock()方法,也就是这两个线程的value是完全相同的,都为value=2020-12-19,而他们都执行 String oldValue = redisTemplate.opsForValue().getAndSet(key, value);,会有一个先执行一个后执行:

假如线程A先执行,返回的oldValue=2020-12-18,同时设置value = 2020-12-19,由于oldvalue=currentValue返回true,即A线程加了锁;

此时B线程继续执行 ,返回的oldValue=2020-12-19,oldvalue!=currentValue,返回false,加锁失败。

所以这段代码的逻辑是只会让一个线程加锁。推荐公众号Java精选,回复Java面试,获取Redis相关面试题资料。

public boolean lock(String key, String value) {
    if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
        return true;
    }
    String currentValue = redisTemplate.opsForValue().get(key);
    if (!StringUtils.isEmpty(currentValue)
        && Long.parseLong(currentValue) < System.currentTimeMillis()) {
        //下面这个逻辑又怎么理解????
        String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
        if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
            return true;
        }
    }
    return false;
}

注意:这个写法比较适合公平锁,也就是谁先拿到锁,谁就有资格执行,比如job任务的执行。

~阅读全文-人机检测~

关注下方微信公众号“Java精选”(w_z90110),回复关键词领取资料:如Mysql、Hadoop、Dubbo、Spring Boot等,免费领取视频教程、资料文档和项目源码。微信搜索小程序“Java精选面试题”,内涵3000+道Java面试题!

Java精选专注程序员推送一些Java开发知识,包括基础知识、各大流行框架(Mybatis、Spring、Spring Boot等)、大数据技术(Storm、Hadoop、MapReduce、Spark等)、数据库(Mysql、Oracle、NoSQL等)、算法与数据结构、面试专题、面试技巧经验、职业规划以及优质开源项目等。其中一部分由小编总结整理,另一部分来源于网络上优质资源,希望对大家的学习和工作有所帮助。

评论

分享:

支付宝

微信