理论Redis 初识Redis Redis是一种键值型的NoSql数据库
键值型:以key,value形式存储
Nosql:
Nosql:非关系型数据库 结构化与非结构化
可以是键值型key-value,文档型Document,图形Graph等
关联和非关联 传统的数据库表之际,往往存在外键,进行关联
而非关系型数据库不存在关联关系,维护要么靠代码中的业务实现逻辑,要么靠数据之间的耦合
查询方式 传统的查询方式:基于SQL查询
不同的非关系型数据库,查询方法五花八门
事物 传统的数据库能满足事物的四个特性:原子性,一致性,隔离性,持久性
非关系型:不支持事物,只能实现基本的一致性
总结
扩展性
关系型数据库集群模式一般是主从,主从数据一致,起到数据备份的作用,称为垂直扩展。
非关系型数据库可以将数据拆分,存储在不同机器上,可以保存海量数据,解决内存大小有限的问题。称为水平扩展。
关系型数据库因为表之间存在关联关系,如果做水平扩展会给数据查询带来很多麻烦
认识Redis Redis诞生于2009年全称是Re mote D ictionary S erver 远程词典服务器,是一个基于内存的键值型NoSQL数据库。
特征 :
键值(key-value)型,value支持多种不同数据结构,功能丰富
单线程,每个命令具备原子性
低延迟,速度快(基于内存、IO多路复用、良好的编码)。
支持数据持久化
支持主从集群、分片集群
支持多语言客户端
作者 :Antirez
Redis的官方网站地址:https://redis.io/
安装Redis 1.Linux下安装 麻烦,但是跟着教程学一遍
1.3.安装Redis 大多数企业都是基于Linux服务器来部署项目,而且Redis官方也没有提供Windows版本的安装包。因此课程中我们会基于Linux系统来安装Redis.
此处选择的Linux版本为CentOS 7.
1.3.1.依赖库 Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:
1.3.2.上传安装包并解压 然后将课前资料提供的Redis安装包上传到虚拟机的任意目录:
例如,我放到了/usr/local/src 目录:
解压缩:
1 tar -xzf redis-6.2.6.tar.gz
进入redis目录:
运行编译命令:
如果没有出错,应该就安装成功了。
默认的安装路径是在 /usr/local/bin
目录下:
该目录已经默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:
redis-cli:是redis提供的命令行客户端
redis-server:是redis的服务端启动脚本
redis-sentinel:是redis的哨兵启动脚本
1.3.3.启动 redis的启动方式有很多种,例如:
1.3.4.默认启动 安装完成后,在任意目录输入redis-server命令即可启动Redis:
如图:
这种启动属于前台启动
,会阻塞整个会话窗口,窗口关闭或者按下CTRL + C
则Redis停止。不推荐使用。
1.3.5.指定配置启动 如果要让Redis以后台
方式启动,则必须修改Redis配置文件,就在我们之前解压的redis安装包下(/usr/local/src/redis-6.2.6
),名字叫redis.conf:
我们先将这个配置文件备份一份:
1 cp redis.conf redis.conf.bck
然后修改redis.conf文件中的一些配置:
1 2 3 4 5 6 bind 0.0.0.0 daemonize yes requirepass 123321
Redis的其它常见配置:
1 2 3 4 5 6 7 8 9 10 port 6379 dir . databases 1 maxmemory 512mb logfile "redis.log"
启动Redis:
1 2 3 4 cd /usr/local/src/redis-6.2.6redis-server redis.conf
停止服务:
1 2 3 redis-cli -u 123321 shutdown
1.3.6.开机自启 我们也可以通过配置来实现开机自启。
首先,新建一个系统服务文件:
1 vi /etc/systemd/system/redis.service
内容如下:
1 2 3 4 5 6 7 8 9 10 11 [Unit] Description=redis-server After=network.target [Service] Type=forking ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf PrivateTmp=true [Install] WantedBy=multi-user.target
然后重载系统服务:
现在,我们可以用下面这组命令来操作redis了:
1 2 3 4 5 6 7 8 systemctl start redis systemctl stop redis systemctl restart redis systemctl status redis
执行下面的命令,可以让redis开机自启:
2.Docker镜像安装 傻瓜式一键安装,但怕出问题,按教程走一遍
安装遇到的问题 问题 :Xftp7想Linux内传输文件的时候,传输报错
解决 :我已经sudo按照root用户登陆了,不知道为什么还是没有权限
1 sudo chmod 777 /usr/local/src
解释:charGpt
sudo
: 以超级用户权限执行命令。
chmod
: 是 change mode 的缩写,用于修改文件或文件夹的权限。
777
: 权限表示法,其中每个数字代表一组用户:第一个数字表示所有者的权限,第二个数字表示所属组的权限,第三个数字表示其他用户的权限。每个数字可以取值范围是 0 到 7,其中 0 表示没有权限,1 表示执行权限,2 表示写权限,4 表示读权限。而数字 7 表示读、写、执行权限的组合,因此 777
表示所有用户都拥有读、写、执行的权限。
/usr/local/src
: 指定要修改权限的文件夹路径。
问题 :使用图像化界面Resp连接Linux,Redis的时候,连接不成功
解决: 端口被防火墙阻止了。关闭防火墙或者开放这个6379端口即可
命令顾名思义:
1 2 3 4 5 6 7 systemctl stop firewalld.service systemctl start firewalld.service firewall-cmd --zone=public --add-port=80/tcp --permanent firewall-cmd --list-port firewall-cmd --reload
Redis常见命令 Redis是典型的key-value数据库,key一般是字符串,value包含很多不同的数据类型
Redis通用命令 部分通用数据类型,都可以使用的指令
KEYS:查看符合模板的所有的key
DEL:删除一个指定的key
EXISTS:判断key是否存在
EXPIRE:给key设置一个有效期,到期时该key会被自动删除
TTL:查看一个KEY的剩余有效期
String类型 String类型,也就是字符串类型,是Redis中最简单的存储类型。
其value是字符串,不过根据字符串的格式不同,又可以分为3类:
string:普通字符串
int:整数类型,可以做自增、自减操作
float:浮点类型,可以做自增、自减操作
不管是哪种格式,底层都是字节数组形式存储,只不过是编码方式不同。字符串类型的最大空间不能超过512m.
2.2.1.String的常见命令
String的常见命令有:
SET:添加或者修改已经存在的一个String类型的键值对
GET:根据key获取String类型的value
MSET:批量添加多个String类型的键值对
MGET:根据多个key获取多个String类型的value
INCR:让一个整型的key自增1
INCRBY:让一个整型的key自增并指定步长,例如:incrby num 2 让num值自增2
INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
SETNX:添加一个String类型的键值对,前提是这个key不存在,否则不执行
SETEX:添加一个String类型的键值对,并且指定有效期
key结构 通过给key添加前缀加以区分
Redis的key允许有多个单词形成层级结构,多个单词之间用’:’隔开,格式如下:
这个格式并非固定,也可以根据自己的需求来删除或添加词条。这样以来,我们就可以把不同类型的数据区分开了。从而避免了key的冲突问题。
例如我们的项目名称叫 heima,有user和product两种不同类型的数据,我们可以这样定义key:
如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:
KEY
VALUE
heima:user:1
{“id”:1, “name”: “Jack”, “age”: 21}
heima:product:1
{“id”:1, “name”: “小米11”, “price”: 4999}
并且,在Redis的桌面客户端中,还会以相同前缀作为层级结构,让数据看起来层次分明,关系清晰:
Hash类型 hash类型,散列,value是一个无序列表,类似于HashMap
即:key里面存的value,又是一个key-value
String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:
Hash的常见命令有:
HSET key field value:添加或者修改hash类型key的field的值
HGET key field:获取一个hash类型key的field的值
HMSET:批量添加多个hash类型key的field的值
HMGET:批量获取多个hash类型key的field的值
HGETALL:获取一个hash类型的key中的所有的field和value
HKEYS:获取一个hash类型的key中的所有的field
HINCRBY:让一个hash类型key的字段值自增并指定步长
HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行
List类型 Redis中的类型与Java中的LinkedList类似,可以看做是一个双向链表,可以正向检索,也可以反向检索
特点也与LinkedList相似:
List的常见命令有:
LPUSH key element … :向列表左侧插入一个或多个元素
LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil
RPUSH key element … :向列表右侧插入一个或多个元素
RPOP key:移除并返回列表右侧的第一个元素
LRANGE key star end:返回一段角标范围内的所有元素
BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil
思考:如何用List模拟一个栈?
入口和出口在同一边,只用LPUSH和LPOP
如何用List模拟一个队列?
入口出口在不同边,LPUSH,RPOP 或者 RPUSH,LPOP
如何用List模拟一个阻塞队列?
入口出口在不同边,出队采用BLPOP,BRPOP
Set结构 与Java中的HashSet类似,可以看做一个value为null的HashMap,也是hash表,具备和HashSet类似的特性:
无序
元素不可重复
查找快
支持交集,并集,差集的等功能
Set的常见命令有:
SADD key member … :向set中添加一个或多个元素
SREM key member … : 移除set中的指定元素
SCARD key: 返回set中元素的个数
SISMEMBER key member:判断一个元素是否存在于set中
SMEMBERS:获取set中的所有元素
SINTER key1 key2 … :求key1与key2的交集
SDIFF key1 key2…:求差集
SUNION key1 key2…:就并集
例如两个集合:s1和s2:
求交集:SINTER s1 s2
求s1与s2的不同:SDIFF s1 s2
SortedSet类型 Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表。
SortedSet具备下列特性:
因为SortedSet的可排序特性,经常被用来实现排行榜 这样的功能。
SortedSet的常见命令有:
ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
ZREM key member:删除sorted set中的一个指定元素
ZSCORE key member : 获取sorted set中的指定元素的score值
ZRANK key member:获取sorted set 中的指定元素的排名
ZCARD key:获取sorted set中的元素个数
ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
ZDIFF、ZINTER、ZUNION:求差集、交集、并集
注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可,例如:
Redis的Java客户端
标记为*的就是推荐使用的java客户端,包括:
Jedis和Lettuce :这两个主要是提供了Redis命令对应的API,方便我们操作Redis,而SpringDataRedis 又对这两种做了抽象和封装,因此我们后期会直接以SpringDataRedis来学习。
Redisson :是在Redis基础上实现了分布式的可伸缩的java数据结构,例如Map、Queue等,而且支持跨进程的同步机制:Lock、Semaphore等待,比较适合用来实现特殊的功能需求。
Jedis客户端 引入依赖
1 2 3 4 5 <dependency > <groupId > redis.clients</groupId > <artifactId > jedis</artifactId > <version > 5.0.0</version > </dependency >
创建jedis对象,建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private Jedis jedis;@BeforeEach public void setUp () { jedis = new Jedis ("192.168.239.129" ,6379 ); jedis.auth("123456" ); jedis.select(0 ); } @Test public void testString () { String set = jedis.set("name" , "FangYuan" ); System.out.println("result = " + set); String name = jedis.get("name" ); System.out.println("name = " + name); } @AfterEach public void close () { if (jedis != null ){ jedis.close(); } }
调用Redis的命令去操作jedis
释放资源
Jedis连接池 jedis是线程不安全的,频繁创建和销毁连接会有性能的损耗,因此,使用jedis线程连接池的方式,代替上面的jedis直连方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.heima.jedis.util;import redis.clients.jedis.*;public class JedisConnectionFactory { private static JedisPool jedisPool; static { JedisPoolConfig poolConfig = new JedisPoolConfig (); poolConfig.setMaxTotal(8 ); poolConfig.setMaxIdle(8 ); poolConfig.setMinIdle(0 ); poolConfig.setMaxWaitMillis(1000 ); jedisPool = new JedisPool (poolConfig, "192.168.239.129" , 6379 , 1000 , "123456" ); } public static Jedis getJedis () { return jedisPool.getResource(); } }
SpringDataRedis客户端 SpringData是Spring中数据操作的模块,包含对各种数据库的集成,其中对Redis的集成模块就叫做SpringDataRedis,官网地址:https://spring.io/projects/spring-data-redis
提供了对不同Redis客户端的整合(Lettuce和Jedis)
提供了RedisTemplate统一API来操作Redis
支持Redis的发布订阅模型
支持Redis哨兵和Redis集群
支持基于Lettuce的响应式编程
支持基于JDK、JSON、字符串、Spring对象的数据序列化及反序列化
支持基于Redis的JDKCollection实现
SpringDataRedis中提供了RedisTemplate工具类 ,其中封装了各种对Redis的操作。并且将不同数据类型的操作API封装到了不同的类型中:
引入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-redis</artifactId > </dependency > <dependency > <groupId > org.apache.commons</groupId > <artifactId > commons-pool2</artifactId > </dependency > <dependency > <groupId > com.fasterxml.jackson.core</groupId > <artifactId > jackson-databind</artifactId > </dependency >
配置Redis
1 2 3 4 5 6 7 8 9 10 11 spring: redis: host: 192.168 .150 .101 port: 6379 password: 123321 lettuce: pool: max-active: 8 max-idle: 8 min-idle: 0 max-wait: 100ms
再去@Autowired自动注入RedisTemplate 即可
自定义序列化 RedisTemplate可以接收任意Object作为值写入Redis
但是写入的时候,RedisTemplate会把Object序列化,写入Redis
只不过,默认采用的是JDk序列化,我们写入的汉字会转成这样
可读性差,并且内存占用大
这就需要我们自定义RedisTemplate的序列化方式
我们可以自定义RedisTemplate的序列化方式,代码如下:
自定义Redisconfig类,添加@Configuration标记为配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate (RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate <>(); template.setConnectionFactory(connectionFactory); GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer (); template.setKeySerializer(RedisSerializer.string()); template.setHashKeySerializer(RedisSerializer.string()); template.setValueSerializer(jsonRedisSerializer); template.setHashValueSerializer(jsonRedisSerializer); return template; } }
这里采用了JSON序列化来代替默认的JDK序列化方式 。最终结果如图:
整体可读性有了很大提升,并且能将Java对象自动的序列化为JSON字符串,并且查询时能自动把JSON反序列化为Java对象。
不过,其中记录了序列化时对应的class名称,目的是为了查询时实现自动反序列化。这会带来额外的内存开销。
StringRedisTemplate 为了节省内存空间,我们可以不使用JSON序列化来处理value,而使用统一的String序列化器,要求只能存储String类型的key和value,当我们要存储Java对象的时候,手动完成对象的序列化和反序列化
如图所示,先用mapper或者fastJSON手动吧对对象序列化为JSON字符串,存入Redis
读取结果是JSON字符串,在手动反序列化,放入对象中
因为存入和读取时的序列化及反序列化都是我们自己实现的,SpringDataRedis就不会将class信息写入Redis了 。
这种用法比较普遍,因此SpringDataRedis就提供了RedisTemplate的子类:StringRedisTemplate,它的key和value的序列化方式默认就是String方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Autowired private StringRedisTemplate stringRedisTemplate;private static final ObjectMapper mapper = new ObjectMapper ();@Test void testSaveUser () throws JsonProcessingException { User user = new User ("虎哥" , 21 ); String json = mapper.writeValueAsString(user); stringRedisTemplate.opsForValue().set("user:200" , json); String jsonUser = stringRedisTemplate.opsForValue().get("user:200" ); User user1 = mapper.readValue(jsonUser, User.class); System.out.println("user1 = " + user1); }
实战篇Redis
短信登陆:Redis共享session来实现
商户缓存查询:了解缓存击穿,缓存穿透,缓存雪崩问题
优惠券秒杀 :Redis的计数功能,结合Lua完成高性能的Redis操作,同时学会Redis分布式锁的原理,包括Redis的三种消息队列
附近的商户:使用Redis的GEOHash来完成对地理坐标的操作
UV统计:主要使用Redis来实现计数功能
用户签到:使用Redis的BitMap数据统计功能
好友关注:基于Set集合的关注,关注取消,共同关注等功能
达人探店:基于List完成点赞列表的操作,同时基于SortedSet来完成点赞排行傍的功能
导入项目框架 问题1:mysql建表错误
1 2 3 4 5 6 7 8 9 10 DROP TABLE IF EXISTS `tb_seckill_voucher`;CREATE TABLE `tb_seckill_voucher` ( `voucher_id` bigint (20 ) UNSIGNED NOT NULL COMMENT '关联的优惠券的id' , `stock` int (8 ) NOT NULL COMMENT '库存' , `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' , `begin_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '生效时间' , `end_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '失效时间' , `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' , PRIMARY KEY (`voucher_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '秒杀优惠券表,与优惠券是一对一关系' ROW_FORMAT = Compact;
错误分析:
错误分析: DEFAULT ‘0000-00-00 00:00:00’(零时间戳),这不满足sql_mode中的NO_ZERO_DATE而报错。
注:sql_mode有两种,一种是空值,一种是严格模式,会给出很多默认设置。在MySQL5.7之后默认使用严格模式。
解决方式:1.修改不为0时间戳,
2.或者在mysql安装目录下,打开my.ini文件,修改配置文件,去除sql_mode下的NO_ZERO_DATE
(不建议2,我修改配置文件后,MySQL80直接重启失败了,吓我一身冷汗,以为又要重装MySQL了)
问题2:导入别人初始框架的时候
JDK9~17+Springboot3 @Resource常见问题和解决方案
因为JDK版本升级的改动,在Jdk9~17环境下,搭建Springboot项目,会出现原有@Resource(javax.annotation.Resource)不存在的问题,导致项目从Jdk8 迁移到高版本时遇到的问题
1 2 3 4 5 6 7 java EE 即 java Enterprise Edition,企业级应用,目标是制定一系列企业级应用的标准服务。常见的 javax.servlet, javax.annotation。 Oracle 收购了创造 java 的 SUN 公司,Oracle 又不想发展 java EE 了, 就把 java EE 交给 Eclipse 社区了,但是又因为不知名的原因,禁止社区使用 javax 这个名字。 所以,javax.servlet 就变成了 jakarta.servlet, jakarta.annotation。api无法向前兼容。 java ee 的最后一个版本也是 8,以后就再也没有 java ee 的新版本
解决:使用Jakarta包下的类
1.短信登陆 基于session的流程 短信验证码登录、注册:
用户将验证码和手机号进行输入,后台从session中拿到当前验证码,然后和用户输入的验证码进行校验,如果不一致,则无法通过校验,如果一致,则后台根据手机号查询用户,如果用户不存在,则为用户创建账号信息,保存到数据库,无论是否存在,都会将用户信息保存到session中,方便后续获得当前登录信息
校验登录状态:
用户在请求时候,会从cookie中携带者JsessionId到后台,后台通过JsessionId从session中拿到用户信息,如果没有session信息,则进行拦截,如果有session信息,则将用户信息保存到threadLocal中,并且放行
基于session的实现流程:
我看不懂的地方全部写满了注释 !!!
发送验证码接口:/user/code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Resource StringRedisTemplate stringRedisTemplate; @Override public Result sendCode (String phone, HttpSession session) { if (RegexUtils.isPhoneInvalid(phone)){ return Result.fail("手机号格式错误" ); } String code = RandomUtil.randomNumbers(6 ); session.setAttribute("code" ,code); log.info("验证码发送成功,验证码:{}" ,code); return Result.ok(); }
短信认证接口:/user/login
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Override public Result login (LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)){ return Result.fail("手机号格式错误" ); } Object code = session.getAttribute("code" ); String loginFormCode = loginForm.getCode(); if (code == null || !code.toString().equals(loginFormCode)){ return Result.fail("验证码错误" ); } User user = query().eq("phone" , phone).one(); if (user == null ){ user = createWithPhone(phone); } UserDTO userDTO = BeanUtil.copyProperties(user,UserDTO.class); session.setAttribute("user" ,userDTO); return Result.ok(); }
校验登录状态:
Interception
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { HttpSession session = request.getSession(); Object user = session.getAttribute("user" ); if (user == null ){ response.setStatus(401 ); return false ; } UserHolder.saveUser((UserDTO) user); return true ; }
webconfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Resource private LoginInterceptor loginInterceptor;@Resource private RefreshTokenInterceptor refreshTokenInterceptor;@Override public void addInterceptors (InterceptorRegistry registry) { registry.addInterceptor(loginInterceptor) .excludePathPatterns( "/shop/**" , "/voucher/**" , "/shop-type/**" , "/upload/**" , "/blog/hot" , "/user/code" , "/user/login" ).order(1 ); registry.addInterceptor(refreshTokenInterceptor).addPathPatterns("/**" ).order(0 ); }
线程ThreadLocal Tomcat运行原理
Tomcat是一个开源的运行Java应用程序的Web服务器,springboot自带Tomcat服务器,我们启动Web服务就会自带
当用户发起请求的时候,会访问我们Tomcat注册的端口
当监听线程直到用户想与Tomcat建立链接的时候,会由监听线程创建socket链接
socket都是成对出现的,用户通过socket相互传送数据
当Tomcat中的socket接受到数据后,此时监听线程会从Tomcat的线程池取出一个线程来执行用户的请求
线程会找到我们用户想要访问的工程,用这个线程转到工程中测controller,service,mapper层,并访问数据库完成相应的操作
执行完毕后,再统一返回,找到Tomcat端的socket,将数据写会到用户端的socket,完成请求相应
我们可以得知 每个用户其实对应都是去找tomcat线程池中的一个线程来完成工作的, 使用完成后再进行回收,既然每个请求都是独立的,所以在每个用户去访问我们的工程时,我们可以使用threadlocal来做到线程隔离,每个线程操作自己的一份数据
温馨小贴士:关于threadlocal
如果小伙伴们看过threadLocal的源码,你会发现在threadLocal中,无论是他的put方法和他的get方法, 都是先从获得当前用户的线程,然后从线程中取出线程的成员变量map,只要线程不一样,map就不一样,所以可以通过这种方式来做到线程隔离
session共享的问题 每个tomcat中都有一份属于自己的session,假设用户第一次访问第一台tomcat,并且把自己的信息存放到第一台服务器的session中,但是第二次这个用户访问到了第二台tomcat,那么在第二台服务器上,肯定没有第一台服务器存放的session,所以此时 整个登录拦截功能就会出现问题,我们能如何解决这个问题呢?早期的方案是session拷贝,就是说虽然每个tomcat上都有不同的session,但是每当任意一台服务器的session修改时,都会同步给其他的Tomcat服务器的session,这样的话,就可以实现session的共享了
但是这种方案具有两个大问题
1、每台服务器中都有完整的一份session数据,服务器压力过大。
2、session拷贝数据时,可能会出现延迟
所以咱们后来采用的方案都是基于redis来完成,我们把session换成redis,redis数据本身就是共享的,就可以避免session共享的问题了
Redis的实现思路 1.设计存储的key和value的结构
key用字串,value用map<k,v>结构
在设计这个key的时候,我们之前讲过需要满足两点
1、key要具有唯一性
2、key要方便携带
如果我们采用phone:手机号这个的数据来存储当然是可以的,但是如果把这样的敏感数据存储到redis中并且从页面中带过来毕竟不太合适,所以我们在后台生成一个随机串token,然后让前端带来这个token就能完成我们的整体逻辑了
整体的访问流程
发送短信验证码端不变,短信登录注册端和登录校验端,有变化,拦截器优化
主要是涉及到从session中获取数据,到Redis中获取存入和获取数据
当注册完成后, 用户登录会去校验手机号和验证码,
如果一致,用MB查询用户信息,不存在则创建,
最后将用户信息保存到Redis中,并且生成随机token,作为Redis的key,设置key的有效期
当我们拦截器校验用户是否登录的时候,前端会携带token去访问
拦截器1拦截所有,从Redis中取出token对应的value,判断是否有这个数据
直接将其保存到ThreadLocal线程当中,刷新token有效期,再放行到拦截器2
拦截器2,拦截校验请求,判断ThreadLocal之中是否有用户,没有拦截,有则放行
我敲了一遍,主要的难点在于,使用stringRedisTemplate访问Redis的api不熟悉
将实体类转为hash用的工具类不熟悉
敲多了就熟悉了
短信验证码的登录
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public Result login (LoginFormDTO loginForm, HttpSession session) { String phone = loginForm.getPhone(); if (RegexUtils.isPhoneInvalid(phone)) { return Result.fail("手机号格式错误" ); } String code = stringRedisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone); String loginFormCode = loginForm.getCode(); if (code == null || !code.equals(loginFormCode)) { return Result.fail("验证码错误" ); } User user = query().eq("phone" , phone).one(); if (user == null ){ user = createWithPhone(phone); } String token = UUID.randomUUID().toString(true ); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> usermap = BeanUtil.beanToMap(userDTO, new HashMap <>(), CopyOptions.create() .setIgnoreNullValue(true ) .setFieldValueEditor((Name, Value) -> Value.toString()) ); String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey,usermap); stringRedisTemplate.expire(tokenKey,LOGIN_USER_TTL,TimeUnit.MINUTES); return Result.ok(token); } private User createWithPhone (String phone) { User user = new User (); user.setPhone(phone); user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10 )); save(user); return user; }
interceptor1 :拦截所有
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Component public class RefreshTokenInterceptor implements HandlerInterceptor { @Resource StringRedisTemplate stringRedisTemplate; @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { String token = request.getHeader("authorization" ); if (StrUtil.isBlank(token)) { return true ; } String key = LOGIN_USER_KEY + token; Map<Object, Object> userMap = stringRedisTemplate.opsForHash().entries(key); if (userMap.isEmpty()) { return true ; } UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO (), false ); UserHolder.saveUser(userDTO); stringRedisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES); return true ; } @Override public void afterCompletion (HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception { UserHolder.removeUser(); } }
interceptor2 :拦截校验请求
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Component public class LoginInterceptor implements HandlerInterceptor { @Resource private StringRedisTemplate stringRedisTemplate; @Override public boolean preHandle (HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (UserHolder.getUser() == null ) { response.setStatus(401 ); return false ; } return true ; } }
2.商品查询缓存 缓存(Cache),就是数据交换的 缓冲区 ,俗称的缓存就是缓冲区内的数据 ,一般从数据库中获取,存储于本地代码(例如:
1 2 3 4 5 例1 :Static final ConcurrentHashMap<K,V> map = new ConcurrentHashMap <>(); 本地用于高并发 例2 :static final Cache<K,V> USER_CACHE = CacheBuilder.newBuilder().build(); 用于redis等缓存 例3 :Static final Map<K,V> map = new HashMap (); 本地缓存
由于其被Static 修饰,所以随着类的加载而被加载到内存之中 ,作为本地缓存,由于其又被final 修饰,所以其引用(例3:map)和对象(例3:new HashMap())之间的关系是固定的,不能改变,因此不用担心赋值(=)导致缓存失效;
吗
添加商品缓存 标准的操作方式,是我们再查询商户信息的时候,直接从数据库中去查询,比较慢,所以添加缓存
1 2 3 4 5 @GetMapping("/{id}") public Result queryShopById (@PathVariable("id") Long id) { return shopService.queryById(id); }
缓存的思路: 在查数据库之前,如果缓存存在,则直接从缓存中返回,如果不存在,则查询数据库,然后再将缓存写入Redis
service层实现逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Service public class ShopServiceImpl extends ServiceImpl <ShopMapper, Shop> implements IShopService { @Resource StringRedisTemplate stringRedisTemplate; @Override public Result queryById (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } Shop shop = getById(id); if (shop == null ){ return Result.fail("店铺不存在!" ); } stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop)); return Result.ok(shop); } }
练习:给店铺类型查询业务添加缓存 比较:只是由单个的String字符串在Redis中的查取,转为了list链表在Redis中的查取
解决:多记几个api函数(无奈!)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Resource StringRedisTemplate stringRedisTemplate; @Override public Result queryList () { String key = "cache:shop:type:key" ; List<String> shopTtpes = stringRedisTemplate.opsForList().range(key, 0 , -1 ); if (!shopTtpes.isEmpty()){ ArrayList<ShopType> list = new ArrayList <>(); for (String s : shopTtpes){ ShopType type = JSONUtil.toBean(s, ShopType.class); list.add(type); } return Result.ok(list); } List<ShopType> list = query().orderByAsc("sort" ).list(); if (list == null ){ return Result.fail("店铺类型不存在!" ); } for (ShopType s : list){ String str = JSONUtil.toJsonStr(s); shopTtpes.add(str); } stringRedisTemplate.opsForList().leftPushAll(key,shopTtpes); return Result.ok(shopTtpes); }
Redis缓存更新策略 内存数据宝贵,我们向Redis插入太多数据,可能会导致缓存中的数据过多,多以Redis对部分数据更新,删除
内存淘汰 :Redis自动进行,当内存达到设定的max-memory后,会自动触发淘汰机制,淘汰一些不重要的数据
超时剔除 : 当我们给Redis中的数据插入过期时间ttl后,Redis会把过期的数据删除,方便继续有空间使用缓存
主动更新 :我们可以设置手动调用的方式 ,通常用于解决,缓存和数据库数据不一致的问题
数据缓存不一致解决方案
由于我们的缓存的数据源来自于数据库 ,而数据库的数据是会发生变化的 ,因此,如果当数据库中数据发生变化,而缓存却没有同步 ,此时就会有一致性问题存在 ,其后果是:
用户使用缓存中的过时数据,就会产生类似多线程数据安全问题,从而影响业务,产品口碑等;怎么解决呢?有如下几种方案
Cache Aside Pattern 人工编码方式:缓存调用者在更新完数据库后再去更新缓存,也称之为双写方案
Read/Write Through Pattern : 由系统本身完成,数据库与缓存的问题交由系统本身去处理
Write Behind Caching Pattern :调用者只操作缓存,其他线程去异步处理数据库,实现最终一致
方案一 :手动调用更新缓存,需要自己去编写代码开发,但是一致性好
方案二 :调用服务,想维护比较复杂,找不到现成这样的服务
方案三 :简化调用者的开发,只关心缓存,由一个线程异步将缓存数据持久化到数据库中去,批量从缓存中插入数据库,保存最终一致,一致性差
使用方案一问题:
1.更新缓存还是删除缓存?
更新缓存:每次更新数据库都更新缓存的话,不读取缓存,无效的写入缓存过多了
删除缓存:更新数据库的时候,让缓存失效,查询的时候,再去更新缓存
2.如何保证缓存与数据库操作的同时成功或者失败
单体系统,@Transaction,事物
分布式系统,利用TTC等分布式事物解决方案
3.先操作缓存还是先操作数据库?
应当先操作数据库,再删除缓存
如下图所示,原因是,写入缓存的数据快,而写入数据库的数据慢,这就导致先删后写的空间大,容易被其他线程操作,而带来数据的不一致,所以需要先操作数据库,再删除缓存
实现商铺缓存与数据库缓存双写一致 核心思路如下:
修改ShopController中的业务逻辑,满足下面的需求:
根据id查询店铺时,如果缓存未命中,则查询数据库,将数据库结果写入缓存,并设置超时时间
根据id修改店铺时,先修改数据库,再删除缓存
1 2 3 4 5 6 7 8 9 10 11 @Transactional @Override public Result update (Shop shop) { String key = RedisConstants.CACHE_SHOP_KEY + shop.getId(); updateById(shop); stringRedisTemplate.delete(key); return Result.ok(); }
缓存穿透 缓存穿透 :客户端请求的数据在数据库和缓存中都不存在,这样缓存永远都不会说生效,这写请求都会传到数据库中
如果恶意用户通过查询一个数据库中不存在的数据去直接访问数据库,导致缓存无效,直接打在数据库中,从而达到给数据库造成巨大压力的情况
缓存空对象思路分析: 当我们客户端访问不存在的数据时,先请求redis,但是此时redis中没有数据,此时会访问到数据库,但是数据库中也没有数据,这个数据穿透了缓存,直击数据库,我们都知道数据库能够承载的并发不如redis这么高,如果大量的请求同时过来访问这种不存在的数据,这些请求就都会访问到数据库,简单的解决方案就是哪怕这个数据在数据库中也不存在,我们也把这个数据存入到redis中去,这样,下次用户过来访问这个不存在的数据,那么在redis中也能找到这个数据就不会进入到缓存了
布隆过滤: 布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在 ,如果布隆过滤器判断存在,则放行,这个请求会去访问redis,哪怕此时redis中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到redis中,
假设布隆过滤器判断这个数据不存在,则直接返回
这种方式优点在于节约内存空间,存在误判,误判原因在于:布隆过滤器走的是哈希思想,只要哈希思想,就可能存在哈希冲突
编码解决缓存穿透问题 核心思路如下:
在原来的逻辑中,我们如果发现这个数据在mysql中不存在,直接就返回404了,这样是会存在缓存穿透问题的
现在的逻辑中:如果这个数据不存在,我们不会返回404 ,还是会把这个数据写入到Redis中,并且将value设置为空,欧当再次发起查询时,我们如果发现命中之后,判断这个value是否是null,如果是null,则是之前写入的数据,证明是缓存穿透数据,如果不是,则直接返回数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public Result queryById (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ Shop shop = JSONUtil.toBean(shopJson, Shop.class); return Result.ok(shop); } if (shopJson != null ){ return Result.fail("店铺不存在!" ); } Shop shop = getById(id); if (shop == null ){ stringRedisTemplate.opsForValue() .set(key,"" ,RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES); return Result.fail("店铺不存在!" ); } stringRedisTemplate.opsForValue() .set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); return Result.ok(shop); }
小总结:
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求,给数据库带来巨大压力
缓存穿透的解决方案有哪些?
缓存null值
布隆过滤
增强id的复杂度,避免被猜测id规律
做好数据的基础格式校验
加强用户权限校验
做好热点参数的限流
缓存雪崩 问题以及解决思路
缓存雪崩 :同地段大量的缓存key同时失效 或者Redis宕机 ,导致大量请求直接到达数据库,带来巨大压力
解决方案:
给不同的Key的TTL添加随机值
利用Redis集群提高服务的可用性
给缓存业务添加降级限流策略
给业务添加多级缓存
缓存击穿 缓存击穿问题,也叫热点击穿 问题,就是被一个高并发并且缓存重建业务复杂的key突然失效了 ,无数的访问请求会瞬间给数据库带来巨大冲击
解决方法 :
假设线程1在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程1走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程1没有走完的时候,后续的线程2,线程3,线程4同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
解决方案一、使用锁来解决:
因为锁能实现互斥性。假设线程过来,只能一个人一个人的来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询的性能从并行变成了串行,我们可以采用tryLock方法 + double check来解决这样的问题。
假设现在线程1过来访问,他查询缓存没有命中,但是此时他获得到了锁的资源,那么线程1就会一个人去执行逻辑,假设现在线程2过来,线程2在执行过程中,并没有获得到锁,那么线程2就可以进行到休眠,直到线程1把锁释放后,线程2获得到锁,然后再来执行逻辑,此时就能够从缓存中拿到数据了。
解决方案二、逻辑过期方案
方案分析:我们之所以会出现这个缓存击穿问题,主要原因是在于我们对key设置了过期时间,假设我们不设置过期时间,其实就不会有缓存击穿的问题,但是不设置过期时间,这样数据不就一直占用我们内存了吗,我们可以采用逻辑过期方案。
我们把过期时间设置在 redis的value中,注意:这个过期时间并不会直接作用于redis,而是我们后续通过逻辑去处理 。假设线程1去查询缓存,然后从value中判断出来当前的数据已经过期了,此时线程1去获得互斥锁,那么其他线程会进行阻塞,获得了锁的线程他会开启一个 线程去进行 以前的重构数据的逻辑,直到新开的线程完成这个逻辑后,才释放锁, 而线程1直接进行返回,假设现在线程3过来访问,由于线程线程2持有着锁,所以线程3无法获得锁,线程3也直接返回数据,只有等到新开的线程2把重建数据构建完后,其他线程才能走返回正确的数据。
这种方案巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
进行对比
互斥锁方案: 由于保证了互斥性,所以数据一致,且实现简单,因为仅仅只需要加一把锁而已,也没其他的事情需要操心,所以没有额外的内存消耗,缺点在于有锁就有死锁问题的发生,且只能串行执行性能肯定受到影响
逻辑过期方案: 线程读取过程中不需要等待,性能好,有一个额外的线程持有锁去进行重构数据,但是在重构数据完成前,其他的线程只能返回之前的数据,且实现起来麻烦
用互斥锁解决缓存击穿问题 核心思路:相较于原先从缓存中查不到数据后直接查询数据库而言,现在的方案是查询之后,如果缓存中没有查到数据,就进行互斥锁的获取,获取互斥锁后,判断是否获得到了锁,如果没有活的到,则说明有线程已经去访问了,则先休眠,过一会儿再去访问,知道获取到了锁为止,才能进行查询
如果有线程获取到了锁,再去进行查询,查询后将数据写入Redis,再释放锁,返回数据,利用互斥锁就保证只能有一个线程去执行数据库操作的逻辑,防止缓存击穿
核心思路就是利用redis的setnx方法来表示获取锁,该方法含义是redis中如果没有这个key,则插入成功,返回1,在stringRedisTemplate中返回true, 如果有这个key则插入失败,则返回0,在stringRedisTemplate返回false,我们可以通过true,或者是false,来表示是否有线程成功插入key,成功插入的key的线程我们认为他就是获得到锁的线程。
1 2 3 4 5 6 7 8 9 10 11 12 @Override public Result queryById (Long id) { Shop shop = queryWithMutex(id); if (shop == null ){ return Result.fail("店铺不存在" ); } return Result.ok(shop); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public Shop queryWithMutex (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ return JSONUtil.toBean(shopJson, Shop.class); } if (shopJson != null ){ return null ; } String lockKey = RedisConstants.LOCK_SHOP_KEY + id; Shop shop = null ; try { boolean isLock = tryLock(lockKey); if (!isLock){ log.info("未获取到互斥锁,休眠,再重试" ); Thread.sleep(50 ); return queryWithMutex(id); } shopJson = stringRedisTemplate.opsForValue().get(key); if (!StrUtil.isBlank(shopJson)){ return JSONUtil.toBean(shopJson, Shop.class); } if (shopJson != null ){ return null ; } shop = getById(id); Thread.sleep(500 ); if (shop == null ){ stringRedisTemplate.opsForValue() .set(key,"" ,RedisConstants.CACHE_NULL_TTL,TimeUnit.MINUTES); return null ; } stringRedisTemplate.opsForValue() .set(key,JSONUtil.toJsonStr(shop),RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { unlock(lockKey); } return shop; } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); }
测试 :删了缓存,加了500毫秒的延时,在每秒100次的请求冲击下,有大量的线程未获取到互斥锁,休眠再重试
当500毫秒过去后,就可以正常从缓存中读取程序了
查询数据库的操作,夹在大量线程的中间,数据库仅仅被查询了一次,达到了用互斥解决缓存击穿的目的
用逻辑过期解决缓存击穿问题 思路:当用户开始查询Redis的时候,判断是否命中,如果没有命中则直接返回空数据,不查询数据库,而一旦命中后,将value取出,判断value中的过期时间是否满足,如果没有过期,则直接返回Redis中的数据,如果过期了,则在开启独立线程后直接返回之前的数据,独立线程去重构数据,重构完成后释放互斥锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); public Shop queryWithLogicalExpire (Long id) { String key = RedisConstants.CACHE_SHOP_KEY + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(json)){ return null ; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); JSONObject data = (JSONObject)redisData.getData(); Shop shop = JSONUtil.toBean(data, Shop.class); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())){ return shop; } String lockKey = RedisConstants.LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock){ CACHE_REBUILD_EXECUTOR.submit(() -> { try { saveShopToRedis(id,20L ); } catch (Exception e) { throw new RuntimeException (e); } finally { unlock(lockKey); } }); } log.info("逻辑过期时间:{}" ,redisData.getExpireTime()); return shop; } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); } public void saveShopToRedis (Long id, Long expireSeconds) throws InterruptedException { Shop shop = getById(id); Thread.sleep(500 ); RedisData redisData = new RedisData (); redisData.setData(shop); redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); stringRedisTemplate.opsForValue(). set(RedisConstants.CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData)); }
测试:我们先在用test测试在缓存中预处理了一个RedisData对象,并且设置了逻辑过期时间为当前时间+10秒
再10秒后,我们去测试100个线程同时访问,看看已经过期的情况下,会有多少个数据库查询?
可以看到,在查询后的一段时间内,我设置的是10秒,逻辑过期时间由845毫秒改为069毫秒,实现了缓存重建
封装Redis工具类 泛型。函数式编程,lambda表达式,这块上强度了,先copy,有时间再看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 @Slf4j @Component public class CacheClient { @Resource private StringRedisTemplate stringRedisTemplate; private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10 ); public void set (String key, Object value, Long time, TimeUnit unit) { stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value), time, unit); } public void setWithLogicalExpire (String key, Object value, Long time, TimeUnit unit) { RedisData redisData = new RedisData (); redisData.setData(value); redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time))); stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData)); } public <R,ID> R queryWithPassThrough ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(json)) { return JSONUtil.toBean(json, type); } if (json != null ) { return null ; } R r = dbFallback.apply(id); if (r == null ) { stringRedisTemplate.opsForValue().set(key, "" , CACHE_NULL_TTL, TimeUnit.MINUTES); return null ; } this .set(key, r, time, unit); return r; } public <R, ID> R queryWithLogicalExpire ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String json = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isBlank(json)) { return null ; } RedisData redisData = JSONUtil.toBean(json, RedisData.class); R r = JSONUtil.toBean((JSONObject) redisData.getData(), type); LocalDateTime expireTime = redisData.getExpireTime(); if (expireTime.isAfter(LocalDateTime.now())) { return r; } String lockKey = LOCK_SHOP_KEY + id; boolean isLock = tryLock(lockKey); if (isLock){ CACHE_REBUILD_EXECUTOR.submit(() -> { try { R newR = dbFallback.apply(id); this .setWithLogicalExpire(key, newR, time, unit); } catch (Exception e) { throw new RuntimeException (e); }finally { unlock(lockKey); } }); } return r; } public <R, ID> R queryWithMutex ( String keyPrefix, ID id, Class<R> type, Function<ID, R> dbFallback, Long time, TimeUnit unit) { String key = keyPrefix + id; String shopJson = stringRedisTemplate.opsForValue().get(key); if (StrUtil.isNotBlank(shopJson)) { return JSONUtil.toBean(shopJson, type); } if (shopJson != null ) { return null ; } String lockKey = LOCK_SHOP_KEY + id; R r = null ; try { boolean isLock = tryLock(lockKey); if (!isLock) { Thread.sleep(50 ); return queryWithMutex(keyPrefix, id, type, dbFallback, time, unit); } r = dbFallback.apply(id); if (r == null ) { stringRedisTemplate.opsForValue().set(key, "" , CACHE_NULL_TTL, TimeUnit.MINUTES); return null ; } this .set(key, r, time, unit); } catch (InterruptedException e) { throw new RuntimeException (e); }finally { unlock(lockKey); } return r; } private boolean tryLock (String key) { Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1" , 10 , TimeUnit.SECONDS); return BooleanUtil.isTrue(flag); } private void unlock (String key) { stringRedisTemplate.delete(key); } }
优惠券秒杀 全局唯一ID: 当用户抢购时,就会生成订单并保存到tb_voucher_order这张表中,而订单表如果使用数据库自增ID就存在一些问题:
场景分析:如果我们的id具有太明显的规则,用户或者说商业对手很容易猜测出来我们的一些敏感信息,比如商城在一天时间内,卖出了多少单,这明显不合适。
场景分析二:随着我们商城规模越来越大,mysql的单表的容量不宜超过500W,数据量过大之后,我们要进行拆库拆表,但拆分表了之后,他们从逻辑上讲他们是同一张表,所以他们的id是不能一样的, 于是乎我们需要保证id的唯一性。
全局ID生成器 :是一种在分布式系统下用来生成全局唯一ID的工具,一般要满足下列特性:
为了增加ID的安全性,我们不可以直接使用Redis自增的数值,拼接其他一些信息
ID的组成部分:符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
Redis实现全局唯一ID 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Component public class RedisIdWorker { private static final long BEGIN_TIMESTAMP = 1640995200L ; private static final int COUNT_BITS = 32 ; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker (StringRedisTemplate stringRedisTemplate) { this .stringRedisTemplate = stringRedisTemplate; } public long nextId (String keyPrefix) { LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd" )); long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); return timestamp << COUNT_BITS | count; } }
多线程测试 :给300个线程,每个线程生成100个ID,一共三万个ID,在多线程的情况下,如何等全部线程执行完了再去查询方法的执行时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Test void testIdWorker () throws InterruptedException { CountDownLatch latch = new CountDownLatch (300 ); Runnable task = () -> { for (int i = 0 ; i < 100 ; i++) { long id = redisIdWorker.nextId("order" ); System.out.println("id = " + id); } latch.countDown(); }; long begin = System.currentTimeMillis(); for (int i = 0 ; i < 300 ; i++) { es.submit(task); } latch.await(); long end = System.currentTimeMillis(); System.out.println("time = " + (end - begin)); }
多线程下测试时间的解释:
countdownlatch名为信号枪:主要的作用是同步协调在多线程的等待于唤醒问题
我们如果没有CountDownLatch ,那么由于程序是异步的,当异步程序没有执行完时,主线程就已经执行完了,然后我们期望的是分线程全部走完之后,主线程再走,所以我们此时需要使用到CountDownLatch
CountDownLatch 中有两个最重要的方法
1、countDown
2、await
await 方法 是阻塞方法,我们担心分线程没有执行完时,main线程就先执行,所以使用await可以让main线程阻塞,那么什么时候main线程不再阻塞呢?当CountDownLatch 内部维护的 变量变为0时,就不再阻塞,直接放行,那么什么时候CountDownLatch 维护的变量变为0 呢,我们只需要调用一次countDown ,内部变量就减少1,我们让分线程和变量绑定, 执行完一个分线程就减少一个变量,当分线程全部走完,CountDownLatch 维护的变量就是0,此时await就不再阻塞,统计出来的时间也就是所有分线程执行完后的时间。
添加优惠券
tb_voucher:优惠券的基本信息,优惠金额、使用规则等 tb_seckill_voucher:优惠券的库存、开始抢购时间,结束抢购时间。特价优惠券才需要填写这些信息
平价卷由于优惠力度并不是很大,所以是可以任意领取
而代金券由于优惠力度大,所以像第二种卷,就得限制数量,从表结构上也能看出,特价卷除了具有优惠卷的基本信息以外,还具有库存,抢购时间,结束时间等等字段
**新增普通卷代码: **VoucherController
1 2 3 4 5 @PostMapping public Result addVoucher (@RequestBody Voucher voucher) { voucherService.save(voucher); return Result.ok(voucher.getId()); }
新增秒杀卷代码:
VoucherController
1 2 3 4 5 @PostMapping("seckill") public Result addSeckillVoucher (@RequestBody Voucher voucher) { voucherService.addSeckillVoucher(voucher); return Result.ok(voucher.getId()); }
VoucherServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
在postman或者apiair接口传入以下body
1 2 3 4 5 6 7 8 9 10 11 12 { "shopId" : 1 , "title" : "100元代金券" , "subTitle" : "周一至周五可用" , "rules" : "全场通用\\n无需预约\\n可无限叠加" , "payValue" : 8000 , "actualValue" : 10000 , "type" : 1 , "stock" : 100 , "beginTime" : "2022-01-01T00:00:00" , "endTime" : "2022-10-31T23:59:59" }
实现下单秒杀 下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
下单核心逻辑分析:
当用户开始进行下单,我们应当去查询优惠卷信息,查询到优惠卷信息,判断是否满足秒杀条件
比如时间是否充足,如果时间充足,则进一步判断库存是否足够,如果两者都满足,则扣减库存,创建订单,然后返回订单id ,如果有一个条件不满足则直接结束。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Override public Result seckikkVoucher (Long voucherId) { SeckillVoucher voucher = seckillVoucherService.getById(voucherId); if (voucher.getBeginTime().isAfter(LocalDateTime.now())){ return Result.fail("秒杀尚未开始" ); } if (voucher.getEndTime().isBefore(LocalDateTime.now())){ return Result.fail("秒杀已经结束" ); } if (voucher.getStock() < 1 ){ return Result.fail("库存不足!" ); } boolean success = seckillVoucherService .update().setSql("stock = stock -1" ) .eq("voucher_id" , voucherId).update(); if (!success){ return Result.fail("库存不足!" ); } VoucherOrder voucherOrder = new VoucherOrder (); long orderId = redisIdWorker.nextId("order" ); voucherOrder.setId(orderId); Long userId = UserHolder.getUser().getId(); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); save(voucherOrder); return Result.ok(orderId); }
库存超卖问题 模拟100个线程同时抢券1分钟
数据库中卖出了109条记录
秒杀券库存为-9
分析原因:
假设线程1过来查询库存,判断出来库存大于1,正准备去扣减库存,但是还没有来得及去扣减,此时线程2过来,线程2也去查询库存,发现这个数量一定也大于1,那么这两个线程都会去扣减库存,最终多个线程相当于一起去扣减库存,此时就会出现库存的超卖问题。
典型的多线程安全问题:
针对这一问题,我们就是要加锁,
悲观锁 :对数据串行化执行,synchroniedz,lock都是悲观锁,直接在我们的方法上加注解@Synchronized就行
乐观锁: 会有一个版本号,每次操作数据会对版本号+1,再提交回数据时,会去校验是否比之前的版本大1 ,如果大1 ,则进行操作成功,这套机制的核心逻辑在于,如果在操作过程中,版本号只比原来大1 ,那么就意味着操作过程中没有人对他进行过修改,他的操作就是安全的,如果不大1,则数据被修改过,
可以用stock来代替version,只需要在后面的查询条件里加上,and stock = stock就行
修改之后的代码
1 2 3 4 5 6 7 8 boolean success = seckillVoucherService .update().setSql("stock = stock -1" ) .eq("voucher_id" , voucherId) .eq("stock" ,voucher.getStock()).update(); if (!success){ return Result.fail("库存不足!" ); }
mybatisPlus中的语句,等效于
1 update seckill_tab set stock = stock - 1 where voucher_id = #{vocherID} and stock = #{stock}
来来来,上1秒200个线程的压力测试 ,我32G内存要吃满了
嗯?怎么结果只有23条?
一看库存,200人1秒抢,是不会库存超卖了,但是没卖完呐,这也太安全了吧
问题:不一定要相等,只要有票,就接着卖
1 2 3 4 5 6 7 8 boolean success = seckillVoucherService .update().setSql("stock = stock -1" ) .eq("voucher_id" , voucherId) .gt("stock" ,0 ).update(); if (!success){ return Result.fail("库存不足!" ); }
可以了,卖的很干净
知识小扩展:
针对cas中的自旋压力过大,我们可以使用Longaddr这个类去解决
Java8 提供的一个对AtomicLong改进后的一个类,LongAdder
大量线程并发更新一个原子性的时候,天然的问题就是自旋,会导致并发性问题,当然这也比我们直接使用syn来的好
所以利用这么一个类,LongAdder来进行优化
如果获取某个值,则会对cell和base的值进行递增,最后返回一个完整的值
一人一单 要求:修改秒杀业务,要求同一个优惠券,一个用户,只能下一单
具体的操作逻辑:在扣减库存前,判断订单人的ID和优惠券ID相等的个数,如果个数>1,则失败
1 2 3 4 5 6 7 Long userId = UserHolder.getUser().getId();Long count = query().eq("user_id" , userId).eq("voucher_id" , voucherId).count();if (count > 0 ){ return Result.fail("用户已经购买过一次了" ); }
测试:200个线程,同一用户压力测试走起
结果失败:还是有三个线程同一ID的用户,抢购了三张用户券
分析:还是线程安全问题,对于我们的代码,多个线程执行的时候,查出的数据都是为0 ,可以插入,插入需要时间,在这个时间内进入的线程,还是会执行插入
解决问题:加锁!!!
初步代码:把一人一单的逻辑代码都提交到一个方法中去,然后给这个方法加锁
不管哪一个线程(例如线程A),运行到这个方法时,都要检查有没有其它线程B(或者C、 D等)正在用这个方法(或者该类的其他同步方法),有的话要等正在使用synchronized方法的线程B(或者C 、D)运行完这个方法后再运行此线程A,没有的话,锁定调用者,然后直接运行。
但是! (我这里看不懂了)就用老师的吧
事物+多线程+锁,处理的方法
但是这样加锁,锁的细粒度太粗了,在使用锁的过程中,控制锁粒度是一个非常重要的事情,因为如果锁的粒度太大,会导致每个线程进来都会被锁住
,现在的情况就是所有用户都公用这一把锁,串行执行,效率很低,我们现在要完成的业务是一人一单
,所以这个锁,应该只加在单个用户上,用户标识可以用userId
由于toString的源码是new String,所以如果我们只用userId.toString()
拿到的也不是同一个用户,需要使用intern()
,如果字符串常量池中已经包含了一个等于这个string对象的字符串(由equals(object)方法确定),那么将返回池中的字符串。否则,将此String对象添加到池中,并返回对此String对象的引用。
(其实以上就可以用了,但是存在事物问题
)
但是以上代码还是存在问题,问题的原因在于当前方法被Spring的事务控制,如果你在内部加锁,可能会导致当前方法事务还没有提交,但是锁已经释放了,这样也会导致问题,所以我们选择将当前方法整体包裹起来,确保事务不会出现问题
但是以上做法依然有问题,因为你调用的方法,其实是this.的方式调用的,事务想要生效,还得利用代理来生效,所以这个地方,我们需要获得原始的事务对象, 来操作事务,这里可以使用AopContext.currentProxy()
来获取当前对象的代理对象,然后再用代理对象调用方法,记得要去IVoucherOrderService
中创建createVoucherOrder
方法
1 2 3 4 5 Long userId = UserHolder.getUser().getId();synchronized (userId.toString().intern()) { IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); return proxy.createVoucherOrder(voucherId); }
集群环境下的并发问题 通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式
下就不行了
1.复制服务,将服务启动两份,搭建集群
2.然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡:
修改配置文件的时候,一定把所有的nginx关闭,因为我手贱多点了几下,启动了太多nginx服务器了,所以一直配置不成功,因为一直有以前的nginx启动
有关锁失效原因分析
由于现在我们部署了多个tomcat,每个tomcat都有一个属于自己的jvm,那么假设在服务器A的tomcat内部,有两个线程,这两个线程由于使用的是同一份代码,那么他们的锁对象是同一个,是可以实现互斥的,但是如果现在是服务器B的tomcat内部,又有两个线程,但是他们的锁对象写的虽然和服务器A一样,但是锁对象却不是同一个,所以线程3和线程4可以实现互斥,但是却无法和线程1和线程2实现互斥,这就是 集群环境下,syn锁失效的原因,在这种情况下,我们就需要使用分布式锁来解决这个问题。
集群模式下,有多个jvm的存在,每个jvm内都有自己的锁,导致每一个锁都可以有一个线程获取,于是就出现了并行运行
分布式锁 分布式锁
:满足分布式系统或者集群系统模式下多进程可见并且互斥的锁
核心思想
:让大家都使用同一把锁,只要大家使用的是同一把锁,那么我们就能锁住线程,不让线程进行,让程序串行执行,这就是分布式锁的核心思路
那么分布式锁他应该满足一些什么样的条件呢?
可见性:多个线程都能看到相同的结果,注意:这个地方说的可见性并不是并发编程中指的内存可见性,只是说多个进程之间都能感知到变化的意思
互斥:互斥是分布式锁的最基本的条件,使得程序串行执行
高可用:程序不易崩溃,时时刻刻都保证较高的可用性
高性能:由于加锁本身就让性能降低,所有对于分布式锁本身需要他就较高的加锁性能和释放锁性能
安全性:安全也是程序中必不可少的一环
常见的分布式锁有三种
Mysql
:mysql本身就带有锁机制,但是由于mysql性能本身一般,所以采用分布式锁的情况下,其实使用mysql作为分布式锁比较少见
Redis
:redis作为分布式锁是非常常见的一种使用方式,现在企业级开发中基本都使用redis或者zookeeper作为分布式锁,利用setnx这个方法,如果插入key成功,则表示获得到了锁,如果有人插入成功,其他人插入失败则表示无法获得到锁,利用这套逻辑来实现分布式锁
Zookeeper:zookeeper也是企业级开发中较好的一个实现分布式锁的方案,由于本套视频并不讲解zookeeper的原理和分布式锁的实现,所以不过多阐述
基于Redis的分布式锁 我们虽然有多台服务器集群,但是我们都是在同一个Redis中获取锁,所以可以加锁
自定义锁的接口
1 2 3 4 5 6 7 8 9 10 11 12 package com.hmdp.utils;public interface ILock { boolean tryLock (long timeoutSec) ; void unlock () ; }
在工具类中实现接口,定义锁的一般方法
底层是Redis中的set NX互斥和set EX设置超时时间方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public boolean tryLock (long timeoutSec) { long threadId = Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId + "" , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { stringRedisTemplate.delete(KEY_PREFIX + name); }
分布式锁被误删的情况 逻辑说明:
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放
,这时其他线程,线程2来尝试获得锁,就拿到了这把锁,然后线程2在持有锁执行过程中,线程1反应过来,继续执行,而线程1执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程2的锁进行删除,这就是误删别人锁的情况说明
解决方案:解决方案就是在每个线程释放锁的时候,去判断一下当前这把锁是否属于自己,如果属于自己,则不进行锁的删除 ,假设还是上边的情况,线程1卡顿,锁自动释放,线程2进入到锁的内部执行逻辑,此时线程1反应过来,然后删除锁,但是线程1,一看当前这把锁不是属于自己,于是不进行删除锁逻辑,当线程2走到删除锁逻辑时,如果没有卡过自动释放锁的时间点,则判断当前这把锁是属于自己的,于是删除这把锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private static final String KEY_PREFIX = "lock:" ; private static final String ID_PREFIX = UUID.randomUUID().toString(true ); @Override public boolean tryLock (long timeoutSec) { String threadId = ID_PREFIX + Thread.currentThread().getId(); Boolean success = stringRedisTemplate.opsForValue() .setIfAbsent(KEY_PREFIX + name, threadId , timeoutSec, TimeUnit.SECONDS); return Boolean.TRUE.equals(success); } @Override public void unlock () { String threadId = ID_PREFIX + Thread.currentThread().getId(); String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); if (threadId.equals(id)){ stringRedisTemplate.delete(KEY_PREFIX + name); }
分布式锁的原子性的问题 更为极端的误删逻辑说明 :
线程1现在持有锁之后,在执行业务逻辑过程中,他正准备删除锁,而且已经走到了条件判断的过程中,比如他已经拿到了当前这把锁确实是属于他自己的,正准备删除锁,但是此时他的锁到期了,那么此时线程2进来,但是线程1他会接着往后执行,当他卡顿结束后,他直接就会执行删除锁那行代码,相当于条件判断并没有起到作用,这就是删锁时的原子性问题,之所以有这个问题,是因为线程1的拿锁,比锁,删锁,实际上并不是原子性的,我们要防止刚才的情况发生,
也就是代码中的,判断和删除,两行代码,执行就是会有时间,哪怕几微妙,几纳秒
就在这时,线程阻塞了,JVM中full gc了,又误删了
Lua脚本解决多条命令的原子性问题 Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html,这里重点介绍Redis提供的调用函数,我们可以使用lua去操作redis,又能保证他的原子性,这样就可以实现拿锁比锁删锁是一个原子性动作了。
Lua脚本在Redis中是原子性执行的,即使是由多个命令组成的脚本,Redis会将其作为一个整体进行执行,不会被其他客户端的操作打断,从而确保了操作的原子性。
这里重点介绍Redis提供的调用函数,语法如下:
1 redis.call('命令名称' , 'key' , '其它参数' , ...)
例如,我们要执行set name jack,则脚本是这样:
1 2 # 执行 set name jack redis.call('set' , 'name' , 'jack' )
例如,我们要先执行set name Rose,再执行get name,则脚本如下:
1 2 3 4 5 6 # 先执行 set name jack redis.call('set' , 'name' , 'Rose' ) # 再执行 get name local name = redis.call('get' , 'name' )# 返回 return name
写好脚本以后,需要用Redis命令来调用脚本,调用脚本的常见命令如下:EVAL
释放锁的业务流程是这样的
1、获取锁中的线程标示
2、判断是否与指定的标示(当前线程标示)一致
3、如果一致则释放锁(删除)
4、如果不一致则什么都不做
如果用Lua脚本来表示则是这样的:
最终我们操作redis的拿锁比锁删锁的lua脚本就会变成这样
1 2 3 4 5 6 7 8 if (redis.call('GET' , KEYS[1 ]) == ARGV[1 ]) then return redis.call('DEL' , KEYS[1 ]) end return 0
利用Java调用lua脚本 我们的RedisTemplate中,可以利用execute方法去执行lua脚本,参数对应关系就如下图股
初始化
1 2 3 4 5 6 private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;static { UNLOCK_SCRIPT = new DefaultRedisScript <>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource ("unlock.lua" )); UNLOCK_SCRIPT.setResultType(Long.class);
调用execute方法去删除锁
1 2 3 4 5 6 String threadId = ID_PREFIX + Thread.currentThread().getId();stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), threadId );
总结 基于Redis实现分布式锁的实现思路
选择合适的锁键(Lock Key) :
锁键是用于标识和获取锁的唯一标识符。它应该是具有唯一性的,以防止不同的应用程序或线程之间发生冲突。通常,锁键可以是一个字符串,可以包含命名空间、资源名称、唯一标识符等信息。
获取锁 :
通过在 Redis 中设置一个键值对来获取锁。键表示锁键,值表示锁的持有者标识符或者一个随机生成的唯一标识符。
可以使用 Redis 的 SETNX 命令(SET if Not eXists)来尝试设置锁,如果键不存在则设置成功,表示获取锁成功。
设置锁的超时时间 (可选):
可以为锁设置一个超时时间,防止锁被永久持有。可以使用 Redis 的 SETEX 或者 EXPIRE 命令来设置键的过期时间。
释放锁 :
当持有锁的线程完成了工作,或者锁超时时,需要将锁释放。可以使用 DEL 命令来删除键,释放锁。
在实现分布式锁时,还需要考虑以下几点:
锁的互斥性 :确保同一时刻只有一个线程或者进程能够持有锁。
锁的可重入性 :允许同一线程或者进程多次获取同一个锁。
锁的可靠性 :确保在各种异常情况下,锁都能够正确地释放,避免死锁或者锁失效。
锁的性能 :尽量减小锁操作对性能的影响,避免锁竞争和频繁的锁释放操作。
分布式锁redission 基于setnx实现的分布式锁存在下面的问题:
重入问题 :重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中,可重入锁的意义在于防止死锁,比如HashTable这样的代码中,他的方法都是使用synchronized修饰的,假如他在一个方法内,调用另一个方法,那么此时如果是不可重入的,不就死锁了吗?所以可重入锁他的主要意义是防止死锁,我们的synchronized和Lock锁都是可重入的。
不可重试 :是指目前的分布式只能尝试一次,我们认为合理的情况是:当线程在获得锁失败后,他应该能再次尝试获得锁。
超时释放: 我们在加锁时增加了过期时间,这样的我们可以防止死锁,但是如果卡顿的时间超长,虽然我们采用了lua表达式防止删锁的时候,误删别人的锁,但是毕竟没有锁住,有安全隐患
主从一致性: 如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
那么什么是Redission呢
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
Redission提供了分布式锁的多种多样的功能
快速入门 引入依赖:
1 2 3 4 5 <dependency > <groupId > org.redisson</groupId > <artifactId > redisson</artifactId > <version > 3.28.0</version > </dependency >
定义配置类,配置Redis地址
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient () { Config config = new Config (); config.useSingleServer().setAddress("redis://192.168.239.129:6379" ) .setPassword("123456" ); return Redisson.create(config); } }
然后直接注入对象,调用他的getClock和unlock方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Resource private RedissionClient redissonClient;@Test void testRedisson () throws Exception{ RLock lock = redissonClient.getLock("anyLock" ); boolean isLock = lock.tryLock(1 ,10 ,TimeUnit.SECONDS); if (isLock){ try { System.out.println("执行业务" ); }finally { lock.unlock(); } } }
分布式锁-redisson可重入锁的原理 可重入锁:以前的锁是不可冲重入的,所以当一个线程,里去调用多把锁的时候,就会出现第一把锁获取成功,其他锁获取失败的情况
为了解决锁的可重入性质。将以前的key值为lock,value属性只有线程自己的ID,改为用hashmap存储,存储当先线程的ID和线程获取锁的次数
调用锁的时候,判断是不是自己,统计数+1,释放的时候,判断是不是自己,统计数-1,知道统计数为0,再释放锁
具体的流程如下
为了保证原子性,就是执行获取锁和删除锁的过程是一起的,不会被其他线程阻挡
使用lua脚本实现
获取锁的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 local key = KEYS[1 ]; local threadId = ARGV[1 ]; local releaseTime = ARGV[2 ]; if (redis.call('exists' , key) == 0 ) then redis.call('hset' , key, threadId, '1' ); redis.call('expire' , key, releaseTime); return 1 ; end ;if (redis.call('hexists' , key, threadId) == 1 ) then redis.call('hincrby' , key, thread, 1 ); redis.call('expire' , key, releaseTime); return 1 ; end ;return 0 ;
释放锁的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 local key = KEYS[1 ];local threadId = ARGV[1 ];local releaseTime = ARGV[2 ];if (redis.call('HEXISTS' , key, threadId) == 0 ) then return nil ; end ;local count = redis.call('hincrby' , key, threadId, -1 );if (count > 0 ) then redis.call('expire' , key, releaseTime); return nil ; else redis.call('del' , key); return nil ; end ;
分布式锁redisson重试和看门狗机制
重试机制 :
在使用 Redisson 实现分布式锁时,为了应对网络异常、Redis 服务器故障等情况,通常会配置重试机制。重试机制可以在获取锁失败后自动进行多次重试,直到获取锁成功或达到最大重试次数为止。
Redisson 的重试机制是通过循环调用 Redis 的 SETNX(SET if Not eXists)命令来实现的。当某个节点在获取锁时失败后,会在指定的时间间隔内进行重试,直到获取锁成功或者超过最大重试次数。
看门狗机制 :
看门狗机制是用于防止锁持有者在执行业务逻辑时出现异常导致锁没有被释放而造成死锁
的情况。看门狗机制会在获取锁成功后启动一个定时任务,定时更新锁的过期时间。如果锁持有者因为异常或其他原因未能及时释放锁,锁的过期时间会被更新,确保锁在一定时间内仍然有效,避免死锁情况的发生。
在 Redisson 中,看门狗机制是通过 Redis 的 PEXPIRE(设置过期时间,带毫秒单位)命令实现的。当获取锁成功后,会启动一个定时任务定时更新锁的过期时间,保持锁的有效性。
综上所述,重试机制和看门狗机制是 Redisson 保证分布式锁可靠性和稳定性的重要机制。重试机制用于处理获取锁失败的情况,而看门狗机制用于防止锁的持有者出现异常导致锁未被释放而造成死锁的情况。这两种机制结合起来,可以有效地确保分布式锁的正确使用。
Redisson分布式锁的原理
可重入:利用hash结构记录线程ID和重入次数
可重试:利用信号量和PubSUb实现等待,唤醒,获取锁失败的重试机制
超时续约:利用看门狗,每隔一段时间(释放时间的1/3),重置超时时间
Redisson主从一致性问题–multiLock 为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
为了解决这个问题,redission提出来了MutiLock锁,使用这把锁咱们就不使用主从了,每个节点的地位都是一样的, 这把锁加锁的逻辑需要写入到每一个主丛节点上,只有所有的服务器都写入成功,此时才是加锁成功,假设现在某个节点挂了,那么他去获得锁的时候,只要有一个节点拿不到,都不能算是加锁成功,就保证了加锁的可靠性。
那么MutiLock 加锁原理是什么呢?笔者画了一幅图来说明
当我们去设置了多个锁时,redission会将多个锁添加到一个集合中,然后用while循环去不停去尝试拿锁,但是会有一个总共的加锁时间,这个时间是用需要加锁的个数 * 1500ms ,假设有3个锁,那么时间就是4500ms,假设在这4500ms内,所有的锁都加锁成功, 那么此时才算是加锁成功,如果在4500ms有线程加锁失败,则会再次去进行重试.
本章小结:
不可重入Redis分布式锁
原理:利用SETNX的互斥性;利用EX避免死锁;释放锁时判断线程标识
缺陷:不可重入、无法重试、锁超时失效
可重入Redis分布式锁
原理:利用Hash结构,记录线程标识与重入次数;利用WatchDog延续锁时间;利用信号量控制锁重试等待
缺陷:Redis宕机引起锁失效问题
Redisson的multiLock
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
秒杀优化 测试:在测试类中,将数据库中的所有用户,分发令牌的token写入Redis中,并且把token写入文件里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Autowired private IUserService userService; @Autowired private StringRedisTemplate stringRedisTemplate; @Test public void tokenWriter () throws IOException { long start = System.currentTimeMillis(); BufferedWriter writer = new BufferedWriter (new FileWriter ("tokens.txt" )); List<User> list = userService.list(); for (User user : list) { String token = UUID.randomUUID().toString(true ); UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); Map<String, Object> usermap = BeanUtil.beanToMap(userDTO, new HashMap <>(), CopyOptions.create() .setIgnoreNullValue(true ) .setFieldValueEditor((Name, Value) -> Value.toString()) ); String tokenKey = LOGIN_USER_KEY + token; stringRedisTemplate.opsForHash().putAll(tokenKey, usermap); writer.write(token); writer.newLine(); } writer.close(); long end = System.currentTimeMillis(); System.out.println("测试方法用时" + (end-start) + "ms" ); }
在jmeter中并发测试
断言就是我们判断返回值Jason中的success,如果等于true,判断成功,其他判断失败
并发1000次,我发现,我们的自己定义的锁,平均相应3222ms,太慢了
先尝试用redisson的锁
平均值由3222ms提升到了2107ms
但还是太慢,还得优化
异步秒杀思路 我们来回顾一下下单流程
当用户发起请求,此时会请求nginx,nginx会访问到tomcat,而tomcat中的程序,会进行串行操作,分成如下几个步骤
1、查询优惠卷
2、判断秒杀库存是否足够
3、查询订单
4、校验是否是一人一单
5、扣减库存
6、创建订单
在这六步操作中,又有很多操作是要去操作数据库的,而且还是一个线程串行执行, 这样就会导致我们的程序执行的很慢,所以我们需要异步程序执行
优化方案:我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息,这样程序不就超级快了吗?而且也不用担心线程池消耗殆尽的问题,因为这里我们的程序中并没有手动使用任何线程池,当然这里边有两个难点
第一个难点
是我们怎么在redis中去快速校验一人一单,还有库存判断
第二个难点
是由于我们校验和tomct下单是两个线程,那么我们如何知道到底哪个单他最后是否成功,或者是下单完成,为了完成这件事我们在redis操作完之后,我们会将一些信息返回给前端,同时也会把这些信息丢到异步queue中去,后续操作中,可以通过这个id来查询我们tomcat中的下单逻辑是否完成了。
我们现在来看看整体思路:当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
需求:
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
VoucherServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override @Transactional public void addSeckillVoucher (Voucher voucher) { save(voucher); SeckillVoucher seckillVoucher = new SeckillVoucher (); seckillVoucher.setVoucherId(voucher.getId()); seckillVoucher.setStock(voucher.getStock()); seckillVoucher.setBeginTime(voucher.getBeginTime()); seckillVoucher.setEndTime(voucher.getEndTime()); seckillVoucherService.save(seckillVoucher); stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString()); }
完整lua表达式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId) return 0
当以上lua表达式执行完毕后,剩下的就是根据步骤3,4来执行我们接下来的任务了
基于阻塞队列实现秒杀优化 没能实现数据库的事物,一加事物就报错,不知道为什么
一行行敲的代码,不懂的全写在注释里面了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 @Resource private ISeckillVoucherService seckillVoucherService; @Resource private RedisIdWorker redisIdWorker; @Resource private StringRedisTemplate stringRedisTemplate; @Resource private RedissonClient redissonClient; private static final DefaultRedisScript<Long> SECKILL_SCRIPT; static { SECKILL_SCRIPT = new DefaultRedisScript <>(); SECKILL_SCRIPT.setLocation(new ClassPathResource ("seckill.lua" )); SECKILL_SCRIPT.setResultType(Long.class); } private BlockingQueue<VoucherOrder> orderTasks =new ArrayBlockingQueue <>(1024 * 1024 ); private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor(); @PostConstruct private void init () { SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler ()); } private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { VoucherOrder voucherOrder = orderTasks.take(); handleVoucherOrder(voucherOrder); } catch (Exception e) { log.error("处理订单异常" , e); } } } } private void handleVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); RLock redisLock = redissonClient.getLock("lock:order:" + userId); boolean isLock = redisLock.tryLock(); if (!isLock) { log.error("不允许重复下单!" ); return ; } try { createVoucherOrder(voucherOrder); } finally { redisLock.unlock(); } } @Override public Result seckikkVoucher (Long voucherId) { Long userId = UserHolder.getUser().getId(); Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), voucherId.toString(), userId.toString() ); if (result != 0 ){ return Result.fail(result == 1 ? "库存不足" :"不能重复下单" ); } long orderId = redisIdWorker.nextId("order" ); VoucherOrder voucherOrder = new VoucherOrder (); voucherOrder.setId(orderId); voucherOrder.setUserId(userId); voucherOrder.setVoucherId(voucherId); orderTasks.add(voucherOrder); return Result.ok(orderId); } @Transactional public void createVoucherOrder (VoucherOrder voucherOrder) { Long userId = voucherOrder.getUserId(); Long count = query().eq("user_id" , userId).eq("voucher_id" , voucherOrder.getVoucherId()).count(); if (count > 0 ){ return ; } boolean success = seckillVoucherService .update().setSql("stock = stock -1" ) .eq("voucher_id" , voucherOrder.getVoucherId()) .gt("stock" ,0 ).update(); if (!success){ return ; } save(voucherOrder); }
本质就是:创建优惠券的时候,将优惠券的信息存入redis中,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功
,将要操作数据库的操作提取出来,放入阻塞队列
,后慢慢执行,将其他操作,在redis中执行,秒杀库存的判断,使用lua脚本,lua脚本执行是单线程的,确保操作是原子性的,不会有并发问题。从而达到优化的目的。
来,上压力测试
1000个用户并发抢200个,优化到了平均13ms每秒
第二次压力测试。74ms
秒杀业务的优化思路是什么?
先利用Redis完成库存余量、一人一单判断,完成抢单业务
再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
阻塞队列基于虚拟机JVM,内存限制问题
数据安全问题
快把消息队列端上来吧
Redis消息队列 什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理
(Message Broker)
生产者:发送消息到消息队列
消费者:从消息队列获取消息并处理消息
消息队列就类比于快递柜
,生产者把快递放在快递柜子,快递柜子作为消息队列,通知我们去拿快递,相当于异步处理,解除了耦合,大大提高了效率
这里我们可以使用一些现成的mq,比如kafka,rabbitmq等等 ,但是呢,如果没有安装mq,我们也可以直接使用redis提供的mq方案,降低我们的部署和学习成本。
Redis提供了三种不同的方式来实现消息队列:
list结构:
PubSub:
Stream:比较完善的消息队列模型
List实现消息队列 消息队列(Message Queue),字面意思就是存放消息的队列。而Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
队列是入口和出口不在一边,因此我们可以利用:LPUSH 结合 RPOP、或者 RPUSH 结合 LPOP来实现。 不过要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
基于List的消息队列有哪些优缺点? 优点:
利用Redis存储,不受限于JVM内存上限
基于Redis的持久化机制,数据安全性有保证
可以满足消息有序性
缺点:
基于PubSub消息队列 PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel] :订阅一个或多个频道 PUBLISH channel msg :向一个频道发送消息 PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道
基于PubSub的消息队列有哪些优缺点? 优点:
缺点:
不支持数据持久化
无法避免消息丢失
消息堆积有上限,超出时数据丢失
基于Stream的消息队列 (没听懂,就跟着敲了)
需求:
创建一个Stream类型的消息队列,名为stream.orders
修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
步骤一:在Linux中Redis创建一个Stream类型的消息队列,名为stream.orders
1 XGROUP CREATE stream.orders g1 0 MKSTREAM
步骤二:修改Lua脚本,新增orderId参数,并将订单信息加入到消息队列中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 local voucherId = ARGV[1 ]local userId = ARGV[2 ]local orderId = ARGV[3 ]local stockKey = 'seckill:stock:' .. voucherIdlocal orderKey = 'seckill:order:' .. voucherIdif (tonumber (redis.call('get' , stockKey)) <= 0 ) then return 1 end if (redis.call('sismember' , orderKey, userId) == 1 ) then return 2 end redis.call('incrby' , stockKey, -1 ) redis.call('sadd' , orderKey, userId) redis.call('xadd' , 'stream.orders' , '*' , 'userId' , userId, 'voucherId' , voucherId, 'id' , orderId) return 0
步骤三:修改秒杀逻辑(这stream真心看不懂啊)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 private class VoucherOrderHandler implements Runnable { @Override public void run () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ).block(Duration.ofSeconds(2 )), StreamOffset.create("stream.orders" , ReadOffset.lastConsumed()) ); if (list == null || list.isEmpty()) { continue ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder (), true ); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("s1" , "g1" , record.getId()); } catch (Exception e) { log.error("处理订单异常" , e); handlePendingList(); } } } } private void handlePendingList () { while (true ) { try { List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1" , "c1" ), StreamReadOptions.empty().count(1 ), StreamOffset.create("stream.orders" , ReadOffset.from("0" )) ); if (list == null || list.isEmpty()) { break ; } MapRecord<String, Object, Object> record = list.get(0 ); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder (), true ); createVoucherOrder(voucherOrder); stringRedisTemplate.opsForStream().acknowledge("s1" , "g1" , record.getId()); } catch (Exception e) { log.error("处理pendding订单异常" , e); try { Thread.sleep(20 ); }catch (Exception s){ s.printStackTrace(); } } } }
压力一测试,从阻塞队列的13ms,到stream消息队列的203毫秒,真服了 ,反向优化是吧,而且我代码还看不懂,去学其他的消息队列吧MQ吧
RabbitMQ消息队列实现 花了一天去学习了RabbitMQ,只是浅浅了解了安装和基础的使用方法
接下来,用RabbitMQ实现消息队列
我只谈代码修改的部分吧
详细的RabbitMQ我还记了一篇RabbitMQ笔记
把以前放入阻塞队列的消息,放入消息队列RabbitMQ中
引入依赖,注入RabbitTemplate对象
其中,convertAndSend的传入的参数为,交换机,关键字,和对象
1 2 3 4 5 6 7 8 9 10 11 12 @Resource private RabbitTemplate rabbitTemplate; try { rabbitTemplate.convertAndSend("voucherOrder.topic" ,"secKill.success" ,voucherOrder); } catch (AmqpException e) { log.info("消息发送失败,用户ID:{}" ,voucherOrder.getUserId()); }
新建一个Listener包,定义MQListener类,用于监听消息
注入之前的service层对象,直接调用其方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Slf4j @Component public class MqListener { @Resource private VoucherOrderServiceImpl voucherOrderService; @RabbitListener(queues = "hello.queue1") public void listensSimpleQueue (String msg) { System.out.println("消费者收到了hello.queue1的消息:" + msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "voucherOrder.queue"), exchange = @Exchange(name = "voucherOrder.topic",type = ExchangeTypes.TOPIC), key = "secKill.success" )) public void voucherOrderListener (VoucherOrder voucherOrder) { voucherOrderService.createVoucherOrder(voucherOrder); } }
1 2 3 4 5 @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "voucherOrder.queue"), exchange = @Exchange(name = "voucherOrder.topic",type = ExchangeTypes.TOPIC), key = "secKill.success" ))
使用注解嵌套注解的方式,来实现
具体解释如下:
@RabbitListener
: 这是一个监听器注解,用于声明一个方法作为 RabbitMQ 的消息消费者。当 RabbitMQ 中有消息到达时,被注解的方法将被调用。
bindings
: 这是 @RabbitListener
注解的一个属性,用于指定队列和交换机之间的绑定关系 。在这里,通过 @QueueBinding
注解指定了队列与交换机之间的绑定关系。
@Queue
: 在 @QueueBinding
注解中,@Queue
用于声明一个队列,并设置队列的属性。在这里,使用了 name
属性指定了队列的名称为 “voucherOrder.queue”。
@Exchange
: 在 @QueueBinding
注解中,@Exchange
用于声明一个交换机,并设置交换机的属性。在这里,使用了 name
属性指定了交换机的名称为 “voucherOrder.topic”,type
属性指定了交换机的类型为 “topic”。
key
: 在 @QueueBinding
注解中,key
属性用于指定路由键 ,表示消息从交换机发送到队列时所需匹配的路由规则。在这里,设置了 “secKill.success” 作为路由键。
综上所述,这段代码的作用是声明了一个 RabbitMQ 的消息监听器,监听名为 “voucherOrder.topic” 的交换机上的 “secKill.success” 路由键对应的队列 “voucherOrder.queue”,当有消息通过该路由键到达时,会触发被注解的方法进行消费。
来,上压力测试,好快啊,平均26ms
来三轮求取平均值 82ms
第三轮68ms
好像用了lua脚本后,就确保了一人一单不会被抢了,加Redisson分布式锁还有没有用了?
加锁,也是给后续写入数据库的操作加锁,感觉会没用,试试
果然,平均只有1s了
达人探店 发布探店笔记 探店笔记类似点评网站的评价,往往是图文结合。对应的表有两个: tb_blog:探店笔记表,包含笔记中的标题、文字、图片等 tb_blog_comments:其他用户对探店笔记的评价
一共两个接口 ,上传 和发布
上传接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Slf4j @RestController @RequestMapping("upload") public class UploadController { @PostMapping("blog") public Result uploadImage (@RequestParam("file") MultipartFile image) { try { String originalFilename = image.getOriginalFilename(); String fileName = createNewFileName(originalFilename); image.transferTo(new File (SystemConstants.IMAGE_UPLOAD_DIR, fileName)); log.debug("文件上传成功,{}" , fileName); return Result.ok(fileName); } catch (IOException e) { throw new RuntimeException ("文件上传失败" , e); } } }
需要修改SystemConstants.IMAGE_UPLOAD_DIR 自己图片所在的地址,在实际开发中图片一般会放在nginx上或者是云存储上。
BlogController
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @RestController @RequestMapping("/blog") public class BlogController { @Resource private IBlogService blogService; @PostMapping public Result saveBlog (@RequestBody Blog blog) { UserDTO user = UserHolder.getUser(); blog.setUpdateTime(user.getId()); blogService.saveBlog(blog); return Result.ok(blog.getId()); } }
达人探店点赞功能
完善点赞功能
需求:
同一个用户只能点赞一次,再次点击则取消点赞
如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段Blog类的isLike属性)
实现步骤:
给Blog类中添加一个isLike字段,标示是否被当前用户点赞
修改点赞功能,利用Redis的set集合判断是否点赞过,未点赞过则点赞数+1,已点赞过则点赞数-1
修改根据id查询Blog的业务,判断当前登录用户是否点赞过,赋值给isLike字段
修改分页查询Blog业务,判断当前登录用户是否点赞过,赋值给isLike字段
思路就是:用一个set集合去存储已经点赞的用户的列表集合
我们先去Redis中查询,
查询不到,数据库点赞++,保存用户到Redis说明已经点过赞
查询到,数据库点赞–,删除Redis中set保存的用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Override public Result likeBlog (Long id) { Long userId = UserHolder.getUser().getId(); String key = BLOG_LIKED_KEY + id; Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString()); if (BooleanUtil.isFalse(isMember)) { boolean isSuccess = update().setSql("liked = liked + 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForSet().add(key, userId.toString()); } } else { boolean isSuccess = update().setSql("liked = liked - 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForSet().remove(key, userId.toString()); } } return Result.ok(); } @Override public Result queryHotBlog (Integer current) { Page<Blog> page = query() .orderByDesc("liked" ) .page(new Page <>(current, SystemConstants.MAX_PAGE_SIZE)); List<Blog> records = page.getRecords(); records.forEach(blog ->{ Long userId = blog.getUserId(); User user = userService.getById(userId); blog.setName(user.getNickName()); blog.setIcon(user.getIcon()); this .isBlogLiked(blog); }); return Result.ok(records); }
点赞排行傍 在探店笔记的详情页面,应该把给该笔记点赞的人显示出来,比如最早点赞的TOP5,形成点赞排行榜:
之前的点赞是放到set集合,但是set集合是不能排序的,所以这个时候,咱们可以采用一个可以排序的set集合,就是咱们的sortedSet
思路:使用zset代替set集合,按照保存时间的毫秒值,获取排名前五的用户
做一些转换,封装到DTO中即可返回
controller层
1 2 3 4 @GetMapping("/likes/{id}") public Result likesBlog (@PathVariable("id") Long id) { return blogService.likesBlog(id); }
service层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Override public Result likeBlog (Long id) { Long userId = UserHolder.getUser().getId(); String key = BLOG_LIKED_KEY + id; Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString()); if (score == null ) { boolean isSuccess = update().setSql("liked = liked + 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForZSet().add(key, userId.toString(),System.currentTimeMillis()); } } else { boolean isSuccess = update().setSql("liked = liked - 1" ).eq("id" , id).update(); if (isSuccess) { stringRedisTemplate.opsForZSet().remove(key, userId.toString()); } } return Result.ok(); } @Override public Result likesBlog (Long id) { String key = BLOG_LIKED_KEY + id; Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0 , 4 ); if (top5 == null || top5.isEmpty()){ return Result.ok(Collections.emptyList()); } List<Long> ids = top5.stream().map(Long::valueOf).toList(); List<User> users = userService.listByIds(ids); List<UserDTO> userDTOS = new ArrayList <>(); for (User u : users){ UserDTO userDTO = BeanUtil.copyProperties(u, UserDTO.class); userDTOS.add(userDTO); } return Result.ok(userDTOS); } }
有好多用流的处理方法,我流使用不习惯,用传统for集合遍历也行
好友关注 好有关注和取关
基于该表数据结构,实现两个接口
关注是User之间的关系,是博主和粉丝的关系,是多对多的关系 ,数据库中有一张表,tb_follow表示
思路:
尝试关注思路:判断是关注还是取关,关注的话就定义对象,存到表中,取关就删除表中字段
是否关注用户:查询,符合就行
controller层
1 2 3 4 5 6 7 8 9 10 11 12 @Resource IFollowService followService; @PutMapping("/{id}/{isFollow}") public Result follow (@PathVariable("id") Long followUserId, @PathVariable("isFollow") Boolean isFollow) { return followService.follow(followUserId, isFollow); } @GetMapping("/or/not/{id}") public Result isFollow (@PathVariable("id") Long followUserId) { return followService.isFollow(followUserId); }
service层
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Resource private BlogMapper blogMapper; @Override public Result follow (Long followUserId, Boolean isFollow) { Long userId = UserHolder.getUser().getId(); if (isFollow){ Follow follow = new Follow (); follow.setUserId(userId); follow.setFollowUserId(followUserId); save(follow); }else { blogMapper.deleteFollow(userId,followUserId); } return Result.ok(); } @Override public Result isFollow (Long followUserId) { Long userId = UserHolder.getUser().getId(); Long count = query().eq("user_id" , userId).eq("follow_user_id" , followUserId).count(); return Result.ok(count > 0 ); }
共同关注 想要去看共同关注的好友,需要首先进入到这个页面,这个页面会发起两个请求
1、去查询用户的详情
2、去查询用户的笔记
1.查询用户详情
1 2 3 4 5 6 7 8 9 @GetMapping("/{id}") public Result queryUserById (@PathVariable("id") Long userId) { User user = userService.getById(userId); if (user == null ){ return Result.ok(); } UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class); return Result.ok(userDTO); }
2.查询用户笔记
blogService.page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE))
:
blogService
是一个博客服务对象,通过该对象调用 page
方法来查询博客信息。
page
方法接受一个 Page
对象作为参数,用于指定查询的分页信息。
new Page<>(current, SystemConstants.MAX_PAGE_SIZE)
创建了一个分页对象,其中 current
表示当前页数,SystemConstants.MAX_PAGE_SIZE
表示每页的记录数。
通过传入当前页数和每页记录数,实现了分页查询。
List<Blog> records = page.getRecords();
:
page.getRecords()
方法用于获取当前页的记录列表。
将获取到的记录列表赋值给 List<Blog> records
,以便后续返回给前端。
1 2 3 4 5 6 7 8 9 10 11 @GetMapping("/of/user") public Result queryBlogByUserId ( @RequestParam("id") Long id, @RequestParam(value = "current",defaultValue = "1") Integer current) { Page<Blog> objectPage = new Page <>(current, SystemConstants.MAX_PAGE_SIZE); Page<Blog> page = blogService.page(objectPage); List<Blog> records = page.getRecords(); return Result.ok(records); }
共同关注的实现
思路:在关注的时候,放入数据库的同时,指定key把当前ID的关注列表
以set的形式放入缓存中,
取消关注的时候,删除数据库的时候,删除缓存
最后在用set的求交集的api,就可以查找到共同关注
注意查到的的数据为String,转化为Long类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Override public Result follow (Long followUserId, Boolean isFollow) { Long userId = UserHolder.getUser().getId(); String key = "follows:" + userId; if (isFollow){ Follow follow = new Follow (); follow.setUserId(userId); follow.setFollowUserId(followUserId); boolean ifSuccess = save(follow); if (ifSuccess){ stringRedisTemplate.opsForSet().add(key,followUserId.toString()); } }else { blogMapper.deleteFollow(userId,followUserId); stringRedisTemplate.opsForSet().remove(key,followUserId.toString()); } return Result.ok(); } @Override public Result isFollow (Long followUserId) { Long userId = UserHolder.getUser().getId(); Long count = query().eq("user_id" , userId).eq("follow_user_id" , followUserId).count(); return Result.ok(count > 0 ); } @Override public Result followCommons (Long id) { Long userId = UserHolder.getUser().getId(); String key1 = "follows:" + userId; String key2 = "follows:" + id; Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2); if (intersect == null ){ return Result.ok(Collections.emptyList()); } List<Long> ids = intersect.stream().map(Long::valueOf).toList(); List<UserDTO> userDTOS = userService.listByIds(ids).stream() .map(user -> BeanUtil.copyProperties(user, UserDTO.class)) .toList(); return Result.ok(userDTOS); }
关注推送Feed流 当我们关注了用户后,这个用户发了动态,那么我们应该把这些数据推送给用户,这个需求,其实我们又把他叫做Feed流
关注推送也叫做Feed流
,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
对于新型的Feed流
的的效果:不需要我们用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。
Feed流产品有两种常见模式:Timeline
:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
该模式的实现方案有三种:
我们本次针对好友的操作,采用的就是Timeline的方式,只需要拿到我们关注用户的信息,然后按照时间排序即可
,因此采用Timeline的模式。该模式的实现方案有三种:
拉模式 :也叫是,读扩散
该模式的核心含义就是:当张三和李四和王五发了消息后,都会保存在自己的邮箱中假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序
优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
推模式 :也叫做写扩散。
推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了
优点:时效快,不用临时拉取
缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去
推拉结合模式 :也叫做读写混合
,兼具推和拉两种模式的优点。
推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式
,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去
,然后再直接写一份到活跃粉丝
收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。
大V:发件箱+活跃粉丝收件箱
普通人:收件箱
推送到粉丝邮件箱 需求:
修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
查询收件箱数据时,可以实现分页查询
Feed流中的数据会不断更新,所以数据的角标也在变化,因此不能采用传统的分页模式。
传统了分页在feed流是不适用的,因为我们的数据会随时发生变化
传统了分页在feed流是不适用的,因为我们的数据会随时发生变化
假设在t1 时刻,我们去读取第一页,此时page = 1 ,size = 5 ,那么我们拿到的就是106 这几条记录,假设现在t2时候又发布了一条记录,此时t3 时刻,我们来读取第二页,读取第二页传入的参数是page=2 ,size=5 ,那么此时读取到的第二页实际上是从6 开始,然后是62 ,那么我们就读取到了重复的数据,所以feed流的分页,不能采用原始方案来做。
Feed流的滚动分页
我们需要记录每次操作的最后一条,然后从这个位置开始去读取数据
举个例子:我们从t1时刻开始,拿第一页数据,拿到了10~6,然后记录下当前最后一次拿取的记录,就是6,t2时刻发布了新的记录,此时这个11放到最顶上,但是不会影响我们之前记录的6,此时t3时刻来拿第二页,第二页这个时候拿数据,还是从6后一点的5去拿,就拿到了5-1的记录。我们这个地方可以采用sortedSet来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了
核心的意思:就是我们在保存完探店笔记后,获得到当前笔记的粉丝,然后把数据推送到粉丝的redis中去。
好友关注:实现分页查询收邮箱 需求:在个人主页的“关注”卡片中,查询并展示推送的Blog信息:
具体操作如下:
1、每次查询完成后,我们要分析出查询出数据的最小时间戳,这个值会作为下一次查询的条件
2、我们需要找到与上一次查询相同的查询个数作为偏移量,下次查询时,跳过这些查询过的数据,拿到我们需要的数据
综上:我们的请求参数中就需要携带 lastId:上一次查询的最小时间戳 和偏移量这两个参数。
这两个参数第一次会由前端来指定,以后的查询就根据后台结果作为条件,再次传递到后台。
controller层
1 2 3 4 5 @GetMapping("/of/follow") public Result queryBlogOfFollow ( @RequestParam("lastId") Long max, @RequestParam(value = "offset", defaultValue = "0") Integer offset) { return blogService.queryBlogOfFollow(max, offset); }
service层
思路:使用zset中的方法,
tringRedisTemplate.opsForZSet()
:通过 RedisTemplate 获取操作有序集合的接口。
reverseRangeByScoreWithScores()
:这是有序集合操作的一个方法,用于按照分数范围反向获取指定数量的成员和分数。
key
:指定要查询的有序集合的键。
0
:指定分数的最小值,这里为 0。
max
:指定分数的最大值,具体数值由变量 max
决定。
offset
:指定结果集的偏移量,即从符合条件的结果中的第几个开始取值。
2
:指定要返回的成员数量。
该方法返回一个包含成员及其分数的 TypedTuple
集合,其中每个 TypedTuple
对象包含了一个成员和对应的分数。从有序集合中按照分数范围查询指定数量的成员,并按照分数从高到低进行排序,然后返回成员及其分数的集合。
难点:在于偏移量的处理
让我回想起刷leecode的那些夜晚
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Override public Result queryBlogOfFollow (Long max, Integer offset) { Long userId = UserHolder.getUser().getId(); String key = RedisConstants.FEED_KEY + userId; Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet() .reverseRangeByScoreWithScores(key, 0 , max, offset, 2 ); if (typedTuples == null || typedTuples.isEmpty()) { return Result.ok(); } List<Long> ids = new ArrayList <>(typedTuples.size()); long minTime = 0 ; int os = 1 ; for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { ids.add(Long.valueOf(tuple.getValue())); long time = tuple.getScore().longValue(); if (time == minTime){ os++; }else { minTime = time; os = 1 ; } } os = minTime == max ? os : os + offset; String idStr = StrUtil.join("," , ids); List<Blog> blogs = query().in("id" , ids).last("ORDER BY FIELD(id," + idStr + ")" ).list(); for (Blog blog : blogs) { queryBlogUser(blog); isBlogLiked(blog); } ScrollResult r = new ScrollResult (); r.setList(blogs); r.setOffset(os); r.setMinTime(minTime); return Result.ok(r); }
附近商户 GEO数据结构的基本用法 GEO就是Geolocation的简写形式,代表地理坐标。Redis在3.2版本中加入了对GEO的支持,允许存储地理坐标信息,帮助我们根据经纬度来检索数据。常见的命令有:
GEOADD:添加一个地理空间信息,包含:经度(longitude)、纬度(latitude)、值(member)
GEODIST:计算指定的两个点之间的距离并返回
GEOHASH:将指定member的坐标转为hash字符串形式并返回
GEOPOS:返回指定member的坐标
GEORADIUS:指定圆心、半径,找到该圆内包含的所有member,并按照与圆心之间的距离排序后返回。6.以后已废弃
GEOSEARCH:在指定范围内搜索member,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。6.2.新功能
GEOSEARCHSTORE:与GEOSEARCH功能一致,不过可以把结果存储到一个指定的key。 6.2.新功能
我们要做的事情是:将数据库表中的数据导入到redis中去,redis中的GEO,GEO在redis中就一个menber和一个经纬度,我们把x和y轴传入到redis做的经纬度位置去,但我们不能把所有的数据都放入到menber中去,毕竟作为redis是一个内存级数据库,如果存海量数据,redis还是力不从心,所以我们在这个地方存储他的id即可。
但是这个时候还有一个问题,就是在redis中并没有存储type,所以我们无法根据type来对数据进行筛选,所以我们可以按照商户类型做分组,类型相同的商户作为同一组,以typeId为key存入同一个GEO集合中即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Test public void loadShopData () { List<Shop> list = shopService.list(); Map<Long, List<Shop>> shopList = list.stream().collect(Collectors.groupingBy(Shop::getTypeId)); for (Map.Entry<Long, List<Shop>> entry : shopList.entrySet()) { Long typeId = entry.getKey(); String key = SHOP_GEO_KEY + typeId; List<Shop> value = entry.getValue(); List<RedisGeoCommands.GeoLocation<String>> locations = new ArrayList <>(value.size()); for (Shop shop : value) { locations.add(new RedisGeoCommands .GeoLocation<>( shop.getId().toString(), new Point (shop.getX(), shop.getY()) )); } stringRedisTemplate.opsForGeo().add(key, locations); } } }
存入后的数据
难点: 不是每次都与Redis链接,去插入数据,而是把数据封装成List<RedisGeoCommands.GeoLocation>集合,一次插入,减小与Redis链接的次数,提升效率
分页查询:
附近商户的实现功能 controller层
ShopController
1 2 3 4 5 6 7 8 9 @GetMapping("/of/type") public Result queryShopByType ( @RequestParam("typeId") Integer typeId, @RequestParam(value = "current", defaultValue = "1") Integer current, @RequestParam(value = "x", required = false) Double x, @RequestParam(value = "y", required = false) Double y ) { return shopService.queryShopByType(typeId, current, x, y); }
ShopServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Override public Result queryShopByType (Integer typeId, Integer current, Double x, Double y) { if (x == null || y == null ) { Page<Shop> page = query() .eq("type_id" , typeId) .page(new Page <>(current, SystemConstants.DEFAULT_PAGE_SIZE)); return Result.ok(page.getRecords()); } int from = (current - 1 ) * SystemConstants.DEFAULT_PAGE_SIZE; int end = current * SystemConstants.DEFAULT_PAGE_SIZE; String key = SHOP_GEO_KEY + typeId; GeoResults<RedisGeoCommands.GeoLocation<String>> results = stringRedisTemplate.opsForGeo() .search( key, GeoReference.fromCoordinate(x, y), new Distance (5000 ), RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end) ); if (results == null ) { return Result.ok(Collections.emptyList()); } List<GeoResult<RedisGeoCommands.GeoLocation<String>>> list = results.getContent(); if (list.size() <= from) { return Result.ok(Collections.emptyList()); } List<Long> ids = new ArrayList <>(list.size()); Map<String, Distance> distanceMap = new HashMap <>(list.size()); list.stream().skip(from).forEach(result -> { String shopIdStr = result.getContent().getName(); ids.add(Long.valueOf(shopIdStr)); Distance distance = result.getDistance(); distanceMap.put(shopIdStr, distance); }); String idStr = StrUtil.join("," , ids); List<Shop> shops = query().in("id" , ids).last("ORDER BY FIELD(id," + idStr + ")" ).list(); for (Shop shop : shops) { shop.setDistance(distanceMap.get(shop.getId().toString()).getValue()); } return Result.ok(shops); }
1 2 3 4 5 6 7 通过 opsForGeo() 方法获取了 Redis 的 Geo 操作对象。 调用 search() 方法执行地理位置的搜索操作。 search() 方法接收了一系列参数: key:指定了要搜索的 GeoKey(地理位置的键)。 GeoReference.fromCoordinate(x, y):指定了搜索的中心坐标,其中 x 和 y 分别表示了经度和纬度。 new Distance(5000):指定了搜索的半径范围,这里设置为 5000 米。 RedisGeoCommands.GeoSearchCommandArgs.newGeoSearchArgs().includeDistance().limit(end):这是可选的参数,用于配置搜索的行为。includeDistance() 表示在搜索结果中包含距离信息,limit(end) 表示限制搜索结果的数量为 end。
用户签到BitMap 我们针对签到功能完全可以通过mysql来完成,比如说以下这张表
用户一次签到,就是一条记录,假如有1000万用户,平均每人每年签到次数为10次,则这张表一年的数据量为 1亿条
每签到一次需要使用(8 + 8 + 1 + 1 + 3 + 1)共22 字节的内存,一个月则最多需要600多字节
我们可以采用类似这样的方案来实现我们的签到需求。
我们按月来统计用户签到信息,签到记录为1,未签到则记录为0.
把每一个bit位对应当月的每一天,形成了映射关系。用0和1标示业务状态,这种思路就称为位图(BitMap)。这样我们就用极小的空间,来实现了大量数据的表示
Redis中是利用string类型数据结构实现BitMap
,因此最大上限是512M,转换为bit则是 2^32个bit位。
BitMap的操作命令有:
SETBIT:向指定位置(offset)存入一个0或1
GETBIT :获取指定位置(offset)的bit值
BITCOUNT :统计BitMap中值为1的bit位的数量
BITFIELD :操作(查询、修改、自增)BitMap中bit数组中的指定位置(offset)的值
BITFIELD_RO :获取BitMap中bit数组,并以十进制形式返回
BITOP :将多个BitMap的结果做位运算(与 、或、异或)
BITPOS :查找bit数组中指定范围内第一个0或1出现的位置
需求:实现签到接口,将当前用户当天签到信息保存到Redis中
思路:我们可以把年和月作为bitMap的key,然后保存到一个bitMap中,每次签到就到对应的位上把数字从0变成1,只要对应是1,就表明说明这一天已经签到了,反之则没有签到。
我们通过接口文档发现,此接口并没有传递任何的参数,没有参数怎么确实是哪一天签到呢?这个很容易,可以通过后台代码直接获取即可,然后到对应的地址上去修改bitMap。
代码
UserController
1 2 3 4 @PostMapping("/sign") public Result sign () { return userService.sign(); }
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Override public Result sign () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); stringRedisTemplate.opsForValue().setBit(key, dayOfMonth - 1 , true ); return Result.ok(); }
用户签到-签到统计 问题1: 什么叫做连续签到天数? 从最后一次签到开始向前统计,直到遇到第一次未签到为止,计算总的签到次数,就是连续签到天数。
Java逻辑代码:获得当前这个月的最后一次签到数据,定义一个计数器,然后不停的向前统计,直到获得第一个非0的数字即可,每得到一个非0的数字计数器+1,直到遍历完所有的数据,就可以获得当前月的签到总天数了
问题2: 如何得到本月到今天为止的所有签到数据?
BITFIELD key GET u[dayOfMonth] 0
假设今天是10号,那么我们就可以从当前月的第一天开始,获得到当前这一天的位数,是10号,那么就是10位,去拿这段时间的数据,就能拿到所有的数据了,那么这10天里边签到了多少次呢?统计有多少个1即可。
问题3:如何从后向前遍历每个bit位?
注意:bitMap返回的数据是10进制,哪假如说返回一个数字8,那么我哪儿知道到底哪些是0,哪些是1呢?我们只需要让得到的10进制数字和1做与运算就可以了,因为1只有遇见1 才是1,其他数字都是0 ,我们把签到结果和1进行与操作,每与一次,就把签到结果向右移动一位,依次内推,我们就能完成逐个遍历的效果了。
需求:实现下面接口,统计当前用户截止当前时间在本月的连续签到天数
有用户有时间我们就可以组织出对应的key,此时就能找到这个用户截止这天的所有签到记录,再根据这套算法,就能统计出来他连续签到的次数了
代码
UserController
1 2 3 4 @GetMapping("/sign/count") public Result signCount () { return userService.signCount(); }
UserServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public Result signCount () { Long userId = UserHolder.getUser().getId(); LocalDateTime now = LocalDateTime.now(); String keySuffix = now.format(DateTimeFormatter.ofPattern(":yyyyMM" )); String key = USER_SIGN_KEY + userId + keySuffix; int dayOfMonth = now.getDayOfMonth(); List<Long> result = stringRedisTemplate.opsForValue().bitField( key, BitFieldSubCommands.create() .get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0 ) ); if (result == null || result.isEmpty()) { return Result.ok(0 ); } Long num = result.get(0 ); if (num == null || num == 0 ) { return Result.ok(0 ); } int count = 0 ; while (true ) { if ((num & 1 ) == 0 ) { break ; }else { count++; } num >>>= 1 ; } return Result.ok(count); }
额外加餐 -关于使用bitmap来解决缓存穿透的方案
回顾缓存穿透 :
发起了一个数据库不存在的,redis里边也不存在的数据,通常你可以把他看成一个攻击
解决方案:
第一种解决方案:遇到的问题是如果用户访问的是id不存在的数据,则此时就无法生效
第二种解决方案:遇到的问题是:如果是不同的id那就可以防止下次过来直击数据
所以我们如何解决呢?
我们可以将数据库的数据,所对应的id写入到一个list集合中,当用户过来访问的时候,我们直接去判断list中是否包含当前的要查询的数据,如果说用户要查询的id数据并不在list集合中,则直接返回,如果list中包含对应查询的id数据,则说明不是一次缓存穿透数据,则直接放行。
现在的问题是这个主键其实并没有那么短,而是很长的一个 主键
哪怕你单独去提取这个主键,但是在11年左右,淘宝的商品总量就已经超过10亿个
所以如果采用以上方案,这个list也会很大,所以我们可以使用bitmap来减少list的存储空间
我们可以把list数据抽象成一个非常大的bitmap,我们不再使用list,而是将db中的id数据利用哈希思想,比如:
id % bitmap.size = 算出当前这个id对应应该落在bitmap的哪个索引上,然后将这个值从0变成1,然后当用户来查询数据时,此时已经没有了list,让用户用他查询的id去用相同的哈希算法, 算出来当前这个id应当落在bitmap的哪一位,然后判断这一位是0,还是1,如果是0则表明这一位上的数据一定不存在, 采用这种方式来处理,需要重点考虑一个事情,就是误差率,所谓的误差率就是指当发生哈希冲突的时候,产生的误差。
UV统计 UV统计-HyperLogLog
通常来说UV会比PV大很多,所以衡量同一个网站的访问量,我们需要综合考虑很多因素,所以我们只是单纯的把这两个值作为一个参考值
UV统计在服务端做会比较麻烦,因为要判断该用户是否已经统计过了,需要将统计过的用户信息保存。但是如果每个访问的用户都保存到Redis中,数据量会非常恐怖,那怎么处理呢?
Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。相关算法原理大家可以参考:https://juejin.cn/post/6844903785744056333#heading-0 Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb ,内存占用低 的令人发指!作为代价,其测量结果是概率性的,有小于0.81%的误差 。不过对于UV统计来说,这完全可以忽略。
1 2 3 4 5 6 7 8 PFADD key element [element...] summary: Adds the specified elements to the specified HyperLogLog PFCOUNT key [key ...] Return the approximated cardinality of the set (s) observed by the HyperLogLog at key(s). PFMERGE destkey sourcekey [sourcekey ...] lnternal commands for debugging HyperLogLog values
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Test public void testHyperLogLog () { String[] users = new String [1000 ]; int j = 0 ; for (int i = 0 ; i < 1000000 ; i++) { j = i % 1000 ; users[j] = "user_" + i; if (j == 999 ) { stringRedisTemplate.opsForHyperLogLog().add("HLL" , users); } } Long count = stringRedisTemplate.opsForHyperLogLog().size("HLL" ); System.out.println("count = " + count); }
1000000存入997593,误差率约为0.2%
实战篇结束,完结撒花