Redis

Redis

SQL和NoSQL的区别

image-20240122160130354

认识Redis

  • 是一个基于内存的键值型NoSQl数据库
  • value支持多种不同数据结构,功能丰富
  • 单线程,每个命令具备原子性
  • 低延迟,速度快(基于内存、IO多路复用、良好的编码)
  • 支持数据持久化
  • 支持主从集群、分片集群
  • 支持多语言客户端

Redis数据结构

  • image-20240122163329122

Redis命令

Redis通用命令

  • 通过help[commend] 来查看帮助文档

  • 通用指令是部分数据类型的,都可以使用的指令,常见的有:

    • keys:查看符合模板的所有key,不建议在生产环境设备上使用
    • del : 删除一个指定的key
    • exists:判断一个key是否存在
    • expire:给一个key设置有效期,有效期到期时该key会被自动删除
    • TTL: 查看一个key的剩余时间 -1 为无限

String类型命令

  • 也就是字符串类型,是Redis中最简单的存储类型。其value是字符串,不过根据字符串的格式不同,又可以分为3类
    • string:普通字符串
    • int :整形类型,可以做自增、自减操作
    • float:浮点类型,可以做自增、自减操作
  • 底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m

String类型的常见命令

  • SET:添加或者修改已经存在的一个string类型的键值对
  • GET:根据key获取String类型的value
  • MSET:批量添加多个String类型的键值对
  • MGET:根据多个key获取多个String类型的value
  • INCR:让一个整型的key自增1
  • INCRBY:让一个整型的key自增并指定步长,例如:incrby num 2让num值自增2
  • INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
  • SETNX:添加一个String类型的键值对,前提是这个key不存在,否则不执行
  • SETEX:添加一个String类型的键值对,并且指定有效期

key的结构

  • Redis的key允许有多个单词形成层级结构,多个单词之间用∵’隔开,格式如下: 项目名:业务名:类型:id

Hash类型命令

  • 其value是一个无序字典,类似于java中的HashMap结构
  • String结构是将对象序列化为json字符串存储,当需要修改某个字段时很不方便
  • image-20240122175035508

Hash类型的常见命令

  • HSET key field value:添加或者修改hash类型key的field的值
  • HGET key field:获取一个hash类型key的field的值
  • HMSET:批量添加多个hash类型key的field的值
  • HMGET:批量获取多个hash类型key的field的值
  • HGETALL:获取一个hash类型的key中的所有的field和value
  • HKEYS:获取一个hash类型的key中的所有的field
  • HVALS:获取一个hash类型的key中的所有的value
  • HINCRBY:让一个hash类型key的字段值自增并指定步长
  • HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行

List类型命令

  • Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。既可以支持正向检索和也可以支持反向检索。

特征

  • 有序
  • 元素可以重复
  • 插入和删除快
  • 查询速度一般

List类型的常见命令

  • LPUSH key element …:向列表左侧插入一个或多个元素

  • LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil

  • RPUSH key element …:向列表右侧插入一个或多个元素

  • RPOP key:移除并返回列表右侧的第一个元素

  • LRANGE key star end:返回一段角标范围内的所有元素

  • BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil

  • image-20240122180139682

  • 如何利用List模拟一个栈?

    • 入口和出口在同一边
  • 如何利用List结构模拟一个队列

    • 入口和出口在不同边
  • 如何利用list结构模拟一个阻塞队列?

    • 入口和出口在不同边
    • 出队时采用Blpop或Brpop

set类型命令

  • Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。因为也是一个hash表,因此具备与HashSet类似的特征:
    1. 无序
    2. 元素不可重复
    3. 查找快
    4. 支持交集、并集、差集等功能

set常见命令

  • SADD key member …:向set中添加一个或多个元素
  • SREM key member …:移除set中的指定元素
  • SCARD key:返回set中元素的个数
  • SISMEMBER key member:判断一个元素是否存在于set中
  • SMEMBERS:获取set中的所有元素
  • SINTER key1 key2 …:求key1与key2的交集
  • SDIFF key1 key2 …:求key1与key2的差集
  • SUNION key1 key2 ..:求key1和key2的并集

SortedSet类型

  • Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对完素排序,底层的实现是一个跳表(SkipList)加hash表。

特性

  • 可排序
  • 元素不重复
  • 查询速度快
  • 可用来实现像排行榜的功能

SortedSet类型的常见命令

  • ZADD key score member:添加一个或多个元素到sorted set,如果已经存在则更新其score值
  • ZREM key member:删除sorted set中的一个指定元素
  • ZSCORE key member:获取sorted set中的指定元素的score值
  • ZRANK key member:获取sorted set中的指定元素的排名
  • ZCARD key:获取sorted set中的元素个数
  • zCOUNT key min max:统计score值在给定范围内的所有元素的个数
  • ZINCRBY key increment member: 让sorted set中的指定元素自增,步长为指定的increment值
  • ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
  • ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
  • ZDIFF、ZINTER、ZUNION:求差集、交集、并集
  • 注意:所有排名默认升序,如果要降序则在命令的Z后面添加REV即可

SpringDataRedis

  • SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis
  • 提供了对不同Redis客户端的整合( Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis哨兵和Redis集群支持基于Lettuce的响应式编程
  • 支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化支持
  • 基于Redis的JDKcollection实现
  • SpringDataRedis中提供了RedisTemplate工具类,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:
  • image-20240122212305379

使用

  1. 引入依赖
   <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 连接池依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
  1. 配置连接池
spring:
redis:
host: 127.0.0.1
port: 6379
lettuce:
pool:
max-active: 8
max-idle: 8
min-idle: 0
max-wait: 100ms
  1. 在要使用的地方注入RedisTemplate
@Autowired
private RedisTemplate redisTemplate;
  1. 使用测试
@Test
void testString(){
//写入一条数据
redisTemplate.opsForValue().set("name","yaya");
//获取string数据
Object name = redisTemplate.opsForValue().get("name");
System.out.println("name = " + name);
}
  • 这里写入的字符串变成字节 是因为底层默认用了jdk序列化转化器 将对象传为字节存入到redis中 在set方法打断点 F7一路追溯可以看到是用ObjectOutputStream转成字节流

  • image-20240122220452338

  • 默认的序列化 缺点:可读性差 内存占用较大

RedisTemplate的两种序列化方案

自定义RedisTemplate

  • 所以要自己编写配置类去设置他的序列化方法
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory connectionFactory){
//创建RedisTemplate 对象
RedisTemplate<String,Object> template=new RedisTemplate<>();
//设置连接工厂
template.setConnectionFactory(connectionFactory);
//创建json序列化工具
GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//设置key的序列化
template.setKeySerializer(RedisSerializer.string());
template.setHashKeySerializer(RedisSerializer.string());
//设置value 的序列化
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer);
return template;
}
  • 注意:如果这里没用引入spring-boot-starter-web 要引入如下依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
  • Json的序列化方式可以满足问题但是还存在问题

    • 测试代码

    •    @Test
          void testSaveUser(){
              redisTemplate.opsForValue().set("user:100",new User("yaya",21));
              User user = (User)redisTemplate.opsForValue().get("user:100");
              System.out.println("user = " + user);
          }
      

      - redis 客户端查看结果

      - `{
      "@class": "com.ya.pojo.User",
      "name": "yaya",
      "age": 21
      }`

      - 为了在反序列化时知道对象的类型,json序列化器会将类的class类型写入json的结果中,存入redis,会带来内存额外的开销
      - 所以为了节省内存空间,我们并不会使用JSON序列化器来处理value,而是统一使用String序列化器,要求只能存储String类型的key和value。当需要存储Java对象时,手动完成对象的序列化和反序列化。

      #### StringRedisTemplate

      - 写入redis 手动把对象序列化为JSON
      - 读取Redis ,要手动把读取到的JSON 反序列化为对象

      ```java
      @SpringBootTest
      class RedisStringTests {
      @Autowired
      private StringRedisTemplate stringRedisTemplate;

      private static final ObjectMapper mapper=new ObjectMapper();//SpringMvc 默认的使用json序列化工具
      @Test
      void testSaveUser() throws JsonProcessingException {
      //创建对象
      User user=new User("鸭鸭",21);
      //手动序列化
      String json=mapper.writeValueAsString(user);
      //写入数据
      stringRedisTemplate.opsForValue().set("user:200",json);
      //获取
      String jsonUser = stringRedisTemplate.opsForValue().get("user:200");
      //手动反序列化
      User user1=mapper.readValue(jsonUser, User.class);
      System.out.println("user1 = " + user1);
      }
      }

Redis 案列

短信登录

基于Session实现登录

image-20240127174239563

  • 使用ThreadLocal 会将数据保存到每一个线程内部,在线程内部,会创建一个Map来保存,这样做的好处就是每一个线程都有自己独立的存储空间,并且每一个线程之间都能做到互不干扰

集群的session共享问题

  • 多台Tomcat并不共享session存储空间,当请求切换到不同tomcat服务时导致数据丢失的问题。
  • session拷贝浪费空间 时间延迟(pass)
  • session 的替代方案应该满足:
    • 数据共享
    • 内存存储
    • key、value结构
  • 正是redis的功能 神中神

基于Redis实现共享session

image-20240128215316586

public Result login(LoginFormDTO loginForm, HttpSession session) {
//1. 校验手机号
String phone=loginForm.getPhone();
if (RegexUtils.isPhoneInvalid(phone)){
//2. 如果不符合,返回错误信息
return Result.fail("手机号格式错误!");
}
// 2.校验验证码
Object cacheCode = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
String code=loginForm.getCode();
if (cacheCode==null || !cacheCode.equals(code)){
return Result.fail("验证码错误");
}

//3. 一致,根据手机号查询用户 select * from tb_user where phone =?
User user = query().eq("phone", phone).one(); // .one()表示查一个
//4. 判断用户是否存在 不存在 创建新用户
if (user==null){
//创建用户
user= createUserWithPhone(phone);
}
String token= UUID.randomUUID().toString(true); //后面的true表示生成的字符串不带下划线
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String,Object> userMap=BeanUtil.beanToMap(userDTO,new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName,fieldValue)->fieldValue.toString()));
//- `setIgnoreNullValue` 忽略为null的值
//- `setFieldValueEditor` 对字段值的修改器
//- `fieldName`字段名`fieldValue`字段值
//- 将所有的字段名转为string
//5. 保存用户到redis
stringRedisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,userMap);
//设置有效期
stringRedisTemplate.expire(LOGIN_USER_KEY+token,LOGIN_USER_TTL,TimeUnit.MINUTES);
// 生成token 作为登录令牌
return Result.ok(token);
}
  • expire 是redis中设置有效器的方法
  • 这里是设置30分钟过期,但是实际项目中,应该为用户停止操作30分钟后,删除用户登录信息,所以我们在每次请求时,通过拦截器将用户时长刷新
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
//不存在token ,拦截,返回401状态码
response.setStatus(401);
return false;
}
//2. 基于token获取redis中的用户
String key = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3. 判断用户是否存在
if (userMap.isEmpty()){
//不存在 ,拦截,返回401状态码
response.setStatus(401);
return false;
}
//4. 将查询到的Hash数据转为userDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 5.存在 保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);
//刷新token有效期
stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}
  • 在redis用token表示每一个用户

  • 登录成功后向redis中存并将token返回

  • Redis代替session需要考虑的问题:

    • 选择合适的数据结构
    • 选择合适的key
    • 选择合适的存储粒度

登录拦截器的优化

  • 如果用户一直访问的是未拦截页,拦截器无法刷新用户登录有效期

  • image-20240129205418274

  • 所有请求的拦截器

package com.hmdp.utils;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* @program: hm-dianping
* @description: 拦截器
* @author: ChestnutDuck
* @create: 2024-01-28 19:04
**/
public class RefreshInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
//这里不能用@Autowired 和 Resource 注入 是因为我们没有通过spring容器创建,而是自己创建的类,spring无法帮助我们进行依赖注入
public RefreshInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1. 获取请求头中的token
String token = request.getHeader("authorization");
if (StrUtil.isBlank(token)){
//不存在token ,拦截,返回401状态码
return true;
}
//2. 基于token获取redis中的用户
String key = RedisConstants.LOGIN_USER_KEY + token;
Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key);
//3. 判断用户是否存在
if (userMap.isEmpty()){
//不存在 ,拦截,返回401状态码
return true;
}
//4. 将查询到的Hash数据转为userDTO对象
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 5.存在 保存用户信息到ThreadLocal
UserHolder.saveUser(userDTO);
//刷新token有效期
stringRedisTemplate.expire(key,RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// afterCompletion 线程处理完后执行

// 移除用户
UserHolder.removeUser();
}
}

  • 不管什么请求都拦截,存在用户则刷新token 否则直接放行到下面的拦截器
  • 刷新token的拦截器
package com.hmdp.utils;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.hmdp.dto.UserDTO;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.HandlerInterceptor;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* @program: hm-dianping
* @description: 拦截器
* @author: ChestnutDuck
* @create: 2024-01-28 19:04
**/
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.判断是否需要拦截(ThreadLocal中是否有用户)
if (UserHolder.getUser()==null){
//没有 拦截 设置状态码
response.setStatus(401);
//拦截
return false;
}
//有用户,则放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// afterCompletion 线程处理完后执行

// 移除用户
UserHolder.removeUser();
}
}

商户查询缓存

什么是缓存

  • 缓存就是数据交换的缓冲区,是贮存数据的临时地方,一般读写性能较高

缓存的作用

  1. 降低后端负载
  2. 提高读写效率,降低响应时间

缓存的成本

  1. 数据一致性成本
  2. 代码维护成本
  3. 运维成本

添加Redis缓存

  • image-20240129214601792
public Result queryById(Long id) {
//1.从redis 查询商铺缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)){
// 3 存在,直接返回
Shop shop = JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}

// 4. 不存在 根据id查询数据库
Shop shop = getById(id);
// 5.不存在,返回错误
if (shop == null){
return Result.fail("店铺不存在");
}

// 6, 存在,写入redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));
return Result.ok(shop);
}

缓存更新策略

  • image-20240129224414874

业务场景:

  • 低一致性需求:使用内存淘汰机制。例如店铺类型的查询缓存
  • 高一致性需求:主动更新,并以超时剔除作为兜底方案。例如店铺详情查询的缓存

主动更新策略

  • image-20240129224917048
  • 操作缓存和数据库时有三个问题需要考虑:
    • 删除缓存还是更新缓存?
      1. 更新缓存:每次更新数据库都更新缓存,无效写操作较多
      2. 删除缓存:更新数据库时让缓存失效,查询时再更新缓存
    • 如何保证缓存与数据库的操作的同时成功或失败?
      1. 单体系统,将缓存与数据库操作放在一个事务分布式系统,
      2. 利用TCC等分布式事务方案
    • 先操作缓存还是先操作数据库?
    • image-20240129231307849
    • 视频
  • 缓存更新策略的最佳实践方案:
    1. 低一致性需求:使用Redis自带的内存淘汰机制

    2. 高一致性需求:主动更新,并以超时剔除作为兜底方案

    3. 读操作:

      • 缓存命中则直接返回

      • 缓存未命中则查询数据库,并写入缓存,设定超时时间

    4. 写操作:

      • 先写数据库,然后再删除缓存
      • 要确保数据库与缓存操作的原子性

缓存穿透

  • 指客户端请求的数据在缓存中和数据库中都不存在,这样的缓存永远不会生效,请求都会打到数据库,给数据库带来巨大压力

解决方案

  • 增强id的复杂度,避免被猜测id规律
  • 做好数据的基础格式校验
  • 加强用户权限校验
  • 做好热点参数的限流
缓存空对象
  • 优点:实现简单,维护方便

    缺点:

    • 额外的内存消耗
    • 可能造成短期的不一致
  • image-20240131193632309

public Shop queryWithPassThrough(Long id) {
//1.从redis 查询商铺缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3 存在,直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
// 4. 不存在 根据id查询数据库
Shop shop = getById(id);
// 5.不存在,返回错误
if (shop == null) {
//将空值写入Redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6, 存在,写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
return shop;

}
布隆过滤
  • image-20240131194050556

  • 优点:用2进制保存数据,内存占用较少,没有多余key

  • 缺点: 实现复炸,存在误判可能

缓存雪崩

  • 指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力
  • image-20240131210931614

解决方案

  • 给不同的key的TTL添加随机值 (一个固定的过期时间+随机的一个数,防止缓存同时过期,大量请求直接打到数据库,造成雪崩)
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略 牺牲部分服务,保证数据安全
  • 给业务添加多级缓存 在多个层面添加缓存,nginx ,redis jvm

缓存击穿

  • 缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击
  • image-20240131213025909

解决方案

image-20240131213948262

  • 互斥锁 线程有一个获取到锁,其他线程会循环等待进入阻塞,会降低系统性能

对比

  • 解决方案 优点 缺点
    互斥锁 没有额外的内存消耗
    保持一致性
    实现简单
    线程需要等待,性能受影响
    可能有死锁风险
    逻辑过期 线程无需等待,性能较好 不保证一致性
    有额外内存消耗
    实现复杂

案例

基于互斥锁解决缓存击穿问题
  • 需求 :修改根据id查询商铺的业务,基于互斥锁方式来解决缓存击穿问题
  • image-20240131215052996
public Shop queryWithMutex(Long id) {
//1.从redis 查询商铺缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isNotBlank(shopJson)) {
// 3 存在,直接返回
return JSONUtil.toBean(shopJson, Shop.class);
}
//判断命中的是否是空值
if (shopJson != null) {
// 返回一个错误信息
return null;
}
//4. 实现缓存重建
// 4.1 获取互斥锁
String lockKey=RedisConstants.LOCK_SHOP_KEY+id;
Shop shop = null;
try {
boolean isLock=tryLock(lockKey);
// 4.2 判断是否获取成功
if (!isLock){
// 4. 3 失败则休眠重试
Thread.sleep(50);
return queryWithMutex(id);
}
//注意获取锁成功应该再次检测redis缓存是否存在,做双重检测,如果存在则无需重建缓存

// 4.4 成功 根据id查询数据库
shop = getById(id);
//模拟重建的延时
Thread.sleep(200);
// 5.不存在,返回错误
if (shop == null) {
//将空值写入Redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6, 存在,写入redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
//7. 释放锁
unlock(lockKey);
}
return shop;
}
  • 这里的锁是用redis string 的setnx 来模拟
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag); //这里用工具包 是因为flag是Boolean 不是boolean Boolean可以为空,为了避免空指针异常,
// 用工具包,工具包的isTrue()会将null 和false返回false
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
基于逻辑过期方式解决缓存击穿问题
  • 需求:修改根据id查询商铺的业务,基于逻辑过期方式来解决缓存击穿问题

  • image-20240201160415220

  • 代码实现

//线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿问题
* @param id
* @return
*/
public Shop queryWithLogicalExpire(Long id) {
//1.从redis 查询商铺缓存
String key = RedisConstants.CACHE_SHOP_KEY + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isBlank(shopJson)) {
// 3 存在,直接返回
return null;
}
//命中 需要将json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
JSONObject data = (JSONObject) redisData.getData(); //他不知道是什么类型,实际上是JsonObject
Shop shop = JSONUtil.toBean(data, Shop.class);
//如果没过期
if (expireTime.compareTo(LocalDateTime.now())>0){
return shop;
}
//如果过期 缓存重建
//获取锁
String lockKey = RedisConstants.LOCK_SHOP_KEY + id;
boolean flag = tryLock(lockKey);
//注意获取锁成功应该再次检测redis缓存是否过期,做双重检测.如果存在无需重建缓存 如果两个线程同时,其中一个缓存未命中,获取锁时,恰好缓存重建完成,不做第二次验证会重复缓存重建
if (flag){
// 成功 ,开启新线程
CACHE_REBUILD_EXECUTOR.submit(()->{
//重建缓存
try {
this.saveShop2Redis(id,20L);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
//释放锁
unlock(lockKey);
}
});
}
return shop;
}

缓存工具封装

  • 基于StringRedisTemplate封装一个缓存工具类,满足下列需求:
    • 方法1∶将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
    • 方法2∶将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击
      穿问题
    • 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
    • 方法4∶根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
package com.hmdp.utils;

import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hmdp.entity.Shop;
import com.mysql.cj.util.TimeUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
* @program: hm-dianping
* @description: 缓存工具类
* @author: ChestnutDuck
* @create: 2024-02-01 18:59
**/
@Component
@Slf4j
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;

public CacheClient(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public void set (String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire (String key, Object value, Long time, TimeUnit unit){
//设置逻辑过期
RedisData redisData=new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); //通过传进来的单位转变成秒
//写入Redis
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
}

public <R,ID>R queryWithPassThrough(String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,
Long time, TimeUnit unit) {
//Function 有惨有返回值 R返回值类型 ID参数 这里根据id差商品,你用的时候就要传一个函数进来
//Class你要告诉我是什么类型我才能给你返回对应的类型
//1.从redis 查询商铺缓存
String key = keyPrefix+ id;
String json = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isNotBlank(json)) {
// 3 存在,直接返回
return JSONUtil.toBean(json, type);
}
//判断命中的是否是空值
if (json != null) {
// 返回一个错误信息
return null;
}
// 4. 不存在 根据id查询数据库
R r= dbFallback.apply(id);
// 5.不存在,返回错误
if (r == null) {
//将空值写入Redis
stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
// 6, 存在,写入redis
this.set(key,r,time,unit);
return r;

}

//线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
/**
* 逻辑过期解决缓存击穿问题
* @param id
* @return
*/
public <R,ID>R queryWithLogicalExpire(
String keyPrefix,String lockKeyPrefix,ID id,Class<R> type,Function<ID,R> dbFallback,
Long time, TimeUnit unit) {
//1.从redis 查询商铺缓存
String key =keyPrefix + id;
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2. 判断是否存在
if (StrUtil.isBlank(shopJson)) {
// 3 存在,直接返回
return null;
}
//命中 需要将json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
LocalDateTime expireTime = redisData.getExpireTime();
JSONObject data = (JSONObject) redisData.getData(); //他不知道是什么类型,实际上是JsonObject
R r = JSONUtil.toBean(data, type);
//如果没过期
if (expireTime.compareTo(LocalDateTime.now())>0){
return r;
}
//如果过期 缓存重建
//获取锁
String lockKey = lockKeyPrefix+ id;
boolean flag = tryLock(lockKey);
//注意获取锁成功应该再次检测redis缓存是否过期,做双重检测.如果存在无需重建缓存 如果两个线程同时,其中一个缓存未命中,获取锁时,恰好缓存重建完成,不做第二次验证会重复缓存重建
if (flag){
// 成功 ,开启新线程
//重建缓存
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
R r1=dbFallback.apply(id);
//写入redis
this.setWithLogicalExpire(key,r1,time,unit);
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
unlock(lockKey);
}
});
}
return r;


}

private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue()
.setIfAbsent(key, "1", RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag); //这里用工具包 是因为flag是Boolean 不是boolean Boolean可以为空,为了避免空指针异常,
// 用工具包,工具包的isTrue()会将null 和false返回false
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
}

  • 这是工具类不知道到时侯传什么类型,这里用泛型
  • 定义泛型使用泛型
  • 数据库查询就是一个函数,工具类并不知道用哪个方法和返回值类型,我们要把函数传进去
  • 有参有返回值的函数在Java中叫Function
  • ID是参数类型 R是返回值类型

优惠卷秒杀

全局唯一ID

  • 当用户抢购时,就会生成订单并保存到订单表中,而订单表如果使用数据库自增ID就存在一些问题
    • ID的规律性太明显,会将数据信息泄露给用户
    • 受单表数据量的限制

全局ID生成器

  • 是一种在分布式系统下用来生成全局唯一ID的工具

  • image-20240201203109908

  • 为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:

  • image-20240201203510324

全局唯一ID生成策略

  • uuid
  • Redis自增
  • snowflake算法
Redis自增ID策略
  • 每天一个key,方便统计
  • ID构造是 时间戳+计数器
package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

/**
* @program: hm-dianping
* @description: id 生成器
* @author: ChestnutDuck
* @create: 2024-02-01 20:54
**/
@Component
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP=1704067200L; //2024 1.1. 00:00:00
/**
* 序列号的位数
*/
private static final int Count_BITS=32;
private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix){
//1. 生成时间戳
LocalDateTime now=LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2.生成序列号
//获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

//3. 拼接并返回
return timestamp<<Count_BITS|count; //时间戳左移32位,和自增数或运算
}

}

  • 用这个方法生成的id是
- order = 11843071671039270
- order = 11843071671039271
- order = 11843071671039272
- order = 11843071671039273
- order = 11843071671039274
  • 转化为2进制后
- 00101010000100110011011000000000000000000111010100100110
  • 后三十二位为自增号

实现秒杀下单的功能

  • 注意

  • 秒杀是否开始或结束,如果尚未开始或已经接受则无法下单

  • 库存是否充足,不足则无法下单

  • image-20240203105502087

超卖问题

  • image-20240203114138840

  • 多个线程同时操作数据,引发问题

乐观锁和悲观锁

  • image-20240203114451519

  • 乐观锁:会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,

  • 乐观锁的关键是判断之前查询得到的数据是否有被修改过,常见有两种方式

    • 版本号法
    • image-20240203114842104
    • CAS法(用库存)
    • image-20240203115048982
boolean success = seckillVoucherService.update()
.setSql(&quot;stock= stock -1&quot;) //set stock = stock -1
.eq(&quot;voucher_id&quot;, voucherId).eq(&quot;stock&quot;,voucher.getStock()).update(); //where id = ? and stock = ?
  • 以上逻辑的核心含义是:只要我扣减库存时的库存和之前我查询到的库存是一样的,就意味着没有人在中间修改过库存,那么此时就是安全的,但是以上这种方式通过测试发现会有很多失败的情况,失败的原因在于:在使用乐观锁过程中假设100个线程同时都拿到了100的库存,然后大家一起去进行扣减,但是100个人中只有1个人能扣减成功,其他的人在处理时,他们在扣减时,库存已经被修改过了,所以此时其他线程都会失败
  • 但是这样分析过,成功的概率太低,所以乐观锁需要变一下,改成stock大于0 即可
boolean success = seckillVoucherService.update()
.setSql(&quot;stock= stock -1&quot;)
.eq(&quot;voucher_id&quot;, voucherId).update().gt(&quot;stock&quot;,0); //where id = ? and stock &gt; 0
  • 原理 优点 缺点
    乐观锁 不加锁,在更新时判断是否有其他线程在修改 性能好 存在成功率低的问题
    悲观锁 添加同步锁,让线程串行执行 简单 性能差

一人一单

  • 需求:修改秒杀业务,要求同一个优惠券,一个用户只能下一单

现在的问题在于:

  • 优惠卷是为了引流,但是目前的情况是,一个人可以无限制的抢这个优惠卷,所以我们应当增加一层逻辑,让一个用户只能下一个单,而不是让一个用户下多个单

  • 具体操作逻辑如下:比如时间是否充足,如果时间充足,则进一步判断库存是否足够,然后再根据优惠卷id和用户id查询是否已经下过这个订单,如果下过这个订单,则不再下单,否则进行下单

VoucherOrderServiceImpl

初步代码:增加一人一单逻辑

@Override
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
// 尚未开始
return Result.fail("秒杀已经结束!");
}
// 4.判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5.一人一单逻辑
// 5.1.用户id
Long userId = UserHolder.getUser().getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

//6,扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1")
.eq("voucher_id", voucherId).update();
if (!success) {
//扣减库存
return Result.fail("库存不足!");
}
//7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);

voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);

}
  • 存在问题:现在的问题还是和之前一样,并发过来,查询数据库,都不存在订单,所以我们还是需要加锁,但是乐观锁比较适合更新数据,而现在是插入数据,所以我们需要使用悲观锁操作

  • 注意:在这里提到了非常多的问题,需要慢慢的来思考,首先我们的初始方案是封装了一个createVoucherOrder方法,同时为了确保他线程安全,在方法上添加了一把synchronized 锁

@Transactional
public synchronized Result createVoucherOrder(Long voucherId) {

Long userId = UserHolder.getUser().getId();
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}
  • 但是这样添加锁,锁的粒度太粗了,在使用锁过程中,控制锁粒度 是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会锁住,这里应该是统一用户进来才加锁所以我们需要去控制锁的粒度,以下这段代码需要修改为:intern() 这个方法是从常量池中拿到数据,如果我们直接使用userId.toString() 他拿到的对象实际上是不同的对象,new出来的对象,我们使用锁必须保证锁是同一把,所以我们需要使用intern()方法
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()){
// 5.1.查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
return Result.fail("用户已经购买过一次!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherId).gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2.用户id
voucherOrder.setUserId(userId);
// 7.3.代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);

// 7.返回订单id
return Result.ok(orderId);
}
}
  • 但是以上代码还是存在问题,问题的原因在于当前方法被spring的事务控制,如果你在方法内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题:如下:

  • 在seckillVoucher 方法中,添加以下逻辑,这样就能保证事务的特性,同时也控制了锁的粒度

  • image-20240203172215969

  • 但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务

Long userId = UserHolder.getUser().getId();
synchronized(userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); // 获取IVoucherOrderService的代理对象
return proxy.createVoucherOrder(voucherId);
}
  • 这里需要引入依赖和在启动类上暴露代理对象@EnableAspectJAutoProxy(exposeProxy = true)
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

集群环境下的并发问题

  • 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。

  • image-20240203180935427

  • 由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。

分布式锁

基本原理和实现方式对比

  • 分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁。
  • 分布式锁的核心思想就是让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路

分布式锁应该满足什么样的条件

  • image-20240203194031405

  • 可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思

  • 互斥:互斥是分布式锁的最基本的条件,使得程序串行执行

  • 高可用:程序不易崩溃,时时刻刻都保证较高的可用性

  • 高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能

  • 安全性:安全也是程序中必不可少的一环

分布式锁的实现

MySQL Redis Zookeeper
互斥 利用MySQL本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放
  • redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁

基于Redis的分布式锁

  • 获取锁:

    • 互斥:确保只能有一个线程获取锁
    • 非阻塞:尝试一次,成功返回true,失败返回false
  • 释放锁:

    • 手动释放
    • 超时释放:获取锁时添加一个超时时间
package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

/**
* @program: hm-dianping
* @description:
* @author: ChestnutDuck
* @create: 2024-02-03 20:08
**/
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;

private String name; //锁名称

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

private static final String KEY_PREFIX="lock:";
@Override
public boolean tryLock(long timeoutSec) {
//获取线程表示
long threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success); //类型不一样 拆分时可能造成空指针问题 这样解决避免空指针
}

@Override
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);

}
}

  • 核心思路:
    • 我们利用redis 的setNx 方法,当有多个线程进入时,就利用该方法,第一个线程进入时,redis 中就有这个key 了,返回了1,如果结果是1,则表示他抢到了锁,那么他去执行业务,然后再删除锁,退出锁逻辑,没有抢到锁的哥们,等待一定时间后重试即可

Redis分布式锁误删情况

  • image-20240203203503004

  • 持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明

  • 解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。

  • image-20240203203859414

解决Redis分布式锁误删问题

  • 需求:修改之前的分布式锁实现,满足:
    1. 在获取锁时存入线程标示(可以用UUID表示)
    2. 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
      • 如果一致则释放锁
      • 如果不一致则不释放锁

核心逻辑:在存入锁时,放入自己线程的标识,在删除锁时,判断当前这把锁的标识是不是自己存入的,如果是,则进行删除,如果不是,则不进行删除。

  • 加锁
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
  • 释放锁
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
  • 测试操作
    • 在我们修改完此处代码后,我们重启工程,然后启动两个线程,第一个线程持有锁后,手动释放锁,第二个线程 此时进入到锁内部,再放行第一个线程,此时第一个线程由于锁的value值并非是自己,所以不能释放锁,也就无法删除别人的锁,此时第二个线程能够正确释放锁,通过这个案例初步说明我们解决了锁误删的问题。

分布式锁的原子性

  • 线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生
  • image-20240203210724466

Redis的lua脚本

redis.call('命令名称', 'key', '其它参数', ...)
  • 例如,我们要执行set name jack,则脚本是这样:
# 执行 set name jack
redis.call('set', 'name', 'jack')
  • 例如,我们要先执行set name Rose,再执行get name,则脚本如下:
# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name
  • 写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:

  • image-20240203211536828

  • 例如,我们要执行 redis.call(‘set’, ‘name’, ‘jack’) 这个脚本,语法如下:

  • image-20240203211709971

  • 如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在脚本中可以从KEYS和ARGV数组获取这些参数:

  • image-20240203212320353

用Lua脚本来释放锁

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

利用Java代码调用Lua脚本改造分布式锁

  • RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下

  • image-20240203213328773

  • 代码

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;

/**
* @program: hm-dianping
* @description:
* @author: ChestnutDuck
* @create: 2024-02-03 20:08
**/
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;

private String name; //锁名称

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

private static final String KEY_PREFIX="lock:";
private static final String ID_PREFIX= UUID.randomUUID().toString(true)+"-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT ; //加载lua脚本所在的文件位置
static {
UNLOCK_SCRIPT=new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); // ClassPath就是Resource
UNLOCK_SCRIPT.setResultType(Long.class);
}


@Override
public void unlock(){
//调用线程表示
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX+ Thread.currentThread().getId()
);
}

}

基于Redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程标示
  • 释放锁时先判断线程标示是否与自己一致,一致则删除锁
    • 特性:
      • 利用set nx满足互斥性
      • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
      • 利用Redis集群保证高可用和高并发特性

基于setnx实现的分布式锁存在的问题:

  • 重入问题:重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。

  • 不可重试:是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。

  • 超时释放:我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患

  • 主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

Redisson

  • Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现

Redission快速入门

  1. 引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
  1. 配置Redisson客户端:
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.150.101:6379")
.setPassword("123321");
// 创建RedissonClient对象
return Redisson.create(config);
}
}

  1. 如何使用Redission的分布式锁
@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
//获取锁(可重入),指定锁的名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
//判断获取锁成功
if(isLock){
try{
System.out.println("执行业务");
}finally{
//释放锁
lock.unlock();
}

}



}

redission可重入锁原理

  • 在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。 8 2