学习视频链接:B站 遇见狂神说
一、简介
Redis 是完全开源的,遵守 BSD 协议,是一个高性能的 key-value 数据库。
Redis 是当前互联网世界最为流行的 NoSQL(Not Only SQL)数据库。NoSQL 在互联网系统中的作用很大,因为它可以在很大程度上提高互联网系统的性能。
Redis 具备一定持久层的功能,也可以作为一种缓存工具。对于 NoSQL 数据库而言,作为持久层,它存储的数据是半结构化的,这就意味着计算机在读入内存中有更少的规则,读入速度更快。
Redis和MongoDB是当前使用最广泛的NoSQL,而就Redis技术而言,它的性能十分优越,可以支持每秒十几万次的读/写操作,其性能远超数据库,并且还支持集群、分布式、主从同步等配置,原则上可以无限扩展,让更多的数据存储在内存中,更让人欣慰的是它还支持一定的事务能力,这保证了高并发的场景下数据的安全和一致性。
1-1、什么是NoSQL
NoSQL泛指非关系型数据库。
关系型数据库:表格,行,列。(POI,一个开源的Java读写Excel、WORD等微软OLE2组件文档的项目。)
NoSQL特点
1、方便扩展(数据之间没有关系,很好扩展)。
2、大数据量、高性能(Redis,一秒读写8万次,读取11万次,NoSQL的缓存记录级,是一种细粒度的缓存,性能会比较高)。
3、数据是多类型的(不需要事先设计数据库,随取随用)。
4、传统RDBMS(一般指关系数据库管理系统)和NoSQL
传统的RDBMS:
- 结构化组织
- SQL
- 数据和关系都存在单独的表中(row col)
- 数据操作,数据定义语言
- 严格的一致性
- 基础的事务
- ...
NoSQL
- 不仅仅是数据
- 没有固定的查询语言
- 键值对存储,列存储,文件存储,图形数据库
- 最终一致性
- CAP定理和BASE
- 高性能,高可用,高可扩展
- ...
了解3V+3高
大数据时代的3V:主要是描述问题的
- 海量Volume
- 多样Variety
- 实时Velocity
互联网需求的3高:主要是对程序的基本要求
- 高并发
- 高可扩
- 高性能
1-2、NoSQL四大分类
KV键值对
- 新浪:Redis
- 美团:Redis+Tair
- 阿里、百度:Redis+memecache
文档型数据库(bson格式,与json一样)
- MongoDB
- MongoDB是一个基于分布式文件存储的数据库(C++编写),主要用来处理大量的文档。
- MongoDB是一个介于关系型数据库和菲关系型数据库中间的产品。
- MongoDB是菲关系型数据库中功能最丰富的,最像关系型数据库。
- ConthDB
列存储数据库
- HBase
- 分布式文件系统
图形关系数据库
不是存图形,存放的是关系。
- Neo4j,InfoGrid…
分类 | Examples举例 | 典型应用场景 | 数据模型 | 优点 | 缺点 |
---|---|---|---|---|---|
键值(key-value) | Tokyo Cabinet/Tyrant, Redis, Voldemort, Oracle BDB | 内容缓存,主要用于处理大量数据的高访问负载,也用于一些日志系统等等。 | Key 指向 Value 的键值对,通常用hash table来实现 | 查找速度快 | 数据无结构化,通常只被当作字符串或者二进制数据 |
列存储数据库 | Cassandra, HBase, Riak | 分布式的文件系统 | 以列簇式存储,将同一列数据存在一起 | 查找速度快,可扩展性强,更容易进行分布式扩展 | 功能相对局限 |
文档型数据库 | CouchDB, MongoDb | Web应用(与Key-Value类似,Value是结构化的,不同的是数据库能够了解Value的内容) | Key-Value对应的键值对,Value为结构化数据 | 数据结构要求不严格,表结构可变,不需要像关系型数据库一样需要预先定义表结构 | 查询性能不高,而且缺乏统一的查询语法。 |
图形(Graph)数据库 | Neo4J, InfoGrid, Infinite Graph | 社交网络,推荐系统等。专注于构建关系图谱 | 图结构 | 利用图结构相关算法。比如最短路径寻址,N度关系查找等 | 很多时候需要对整个图做计算才能得出需要的信息,而且这种结构不太好做分布式的集群方案。 |
二、Redis
2-1、概述
什么是Redis?
Redis(Remote Dictionary Server ),即远程字典服务,是一个开源的,使用C语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value数据库,并提供多种语言的API。
redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Redis有什么用?
1、内存存储、持久化,内存中是断电即失。(持久化,rdb、aof)
2、效率高,可以用于高速缓存。
3、发布订阅系统。
4、地图信息分析。
5、计时器、计数器(浏览量)。
6、……
特性
1、多样的数据类型。
2、持久化。
3、集群。
4、事务。
5、……
中文官网:http://www.redis.cn/
下载地址:https://redis.io/download
2-2、window下安装
1、下载地址:https://github.com/microsoftarchive/redis/releases
2、解压压缩包。
3、进入解压后的目录,开启Redis(双击运行redis-server.exe)
4、使用Redis客户端连接Redis,打开客户端(运行redis-cli.exe)。
127.0.0.1:6379> ping #测试连接
PONG
127.0.0.1:6379> set name lskj # set基本值(key value)
OK
127.0.0.1:6379> get name # get key获取值
"lskj"
127.0.0.1:6379>
Windows下使用简单,但redis推荐使用Linux开发。
2-3、Linux下安装
1、下载地址:
2、将下载好的压缩包解压(usr/local/)。
tar -zxvf redis-6.0.9.tar.gz
3、进入解压后的文件目录,可以看到redis的配置文件redis.conf
。
4、基本环境安装。
yum install gcc-c++
gcc -v
命令进行gcc版本查看,版本为4.4.7。
redis6.0+需要的gcc版本为5.3及以上,所以需要升级gcc版本。
# 升级gcc到9以上
yum -y install centos-release-scl
yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils
# 临时将此时的gcc版本改为9,退出xshell或者重启就会恢复到原来的gcc版本。
scl enable devtoolset-9 bash
# 永久改变
echo "source /opt/rh/devtoolset-9/enable" >>/etc/profile
# 此时查看gcc版本,便已升级
gcc -v
继续执行以下命令:
make
make install
清除之前编译的可执行文件及配置文件
make clean
清除所有生成的文件
make distclean
5、redis的默认安装路径:usr/local/bin
。
前台启动
redis-server
后台启动
redis-server &
根据配置文件启动:启动命令 配置文件 &
redis-server redis.conf &
6、将redis配置文件,复制到usr/local/bin/config
目录下。
mkdir config
cp /usr/local/redis-6.0.9/redis.conf config
以后,可以使用复制的这个配置文件(redis.conf)进行启动。
7、redis默认不是后台启动的,需修改配置文件redis.conf
。
################################# GENERAL #####################################
# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
daemonize yes
将daemonize no
改为daemonize yes
。
8、启动redis服务(回到bin目录下执行)。
redis-server config/redis.conf
通过指定的配置文件启动服务。
9、使用redis-cli进行连接测试。
redis-cli -p 6379
10、查看redis的进程是否开启。
ps -ef|grep redis
11、关闭redis服务。
shutdown:关闭redis。
exit:退出。
12、再次查看进程是否存在。
三、测试性能
redis-benchmark是一个压力测试工具。
redis 性能测试工具可选参数如下所示:
序号 | 选项 | 描述 | 默认值 |
---|---|---|---|
1 | -h | 指定服务器主机名 | 127.0.0.1 |
2 | -p | 指定服务器端口 | 6379 |
3 | -s | 指定服务器 socket | |
4 | -c | 指定并发连接数 | 50 |
5 | -n | 指定请求数 | 10000 |
6 | -d | 以字节的形式指定 SET/GET 值的数据大小 | 2 |
7 | -k | 1=keep alive 0=reconnect | 1 |
8 | -r | SET/GET/INCR 使用随机 key, SADD 使用随机值 | |
9 | -P | 通过管道传输 |
1 |
10 | -q | 强制退出 redis。仅显示 query/sec 值 | |
11 | –csv | 以 CSV 格式输出 | |
12 | -l | 生成循环,永久执行测试 | |
13 | -t | 仅运行以逗号分隔的测试命令列表。 | |
14 | -I | Idle 模式。仅打开 N 个 idle 连接并等待。 |
redis-benchmark 命令参数
测试:
# 测试100个并发连接 100000个请求
redis-benchmark -h localhost -p 6379 -c 100 -n 100000
====== PING_INLINE ======
100000 requests completed in 1.30 seconds #对10万个请求进行写入测试
100 parallel clients # 100个并发客户端
3 bytes payload # 每次写入3个字节
keep alive: 1 # 只有一台服务器来处理这些请求,单机性能
host configuration "save": 900 1 300 10 60 10000
host configuration "appendonly": no
multi-thread: no
...
99.72% <= 1.8 milliseconds
99.78% <= 1.9 milliseconds
99.85% <= 2 milliseconds
100.00% <= 2 milliseconds # 所有请求在3毫秒内处理完成
76804.91 requests per second #每秒处理76804.91个请求
四、基础知识
redis默认有16个数据库。默认使用的是第0个数据库。
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases'-1
databases 16
使用select进行切换数据库。
127.0.0.1:6379> select 1 # 切换数据库
OK
127.0.0.1:6379[1]> dbsize # 查看数据库大小
(integer) 0
127.0.0.1:6379[1]>
查看数据库所有的key
127.0.0.1:6379> keys * # 查看数据库所有的key
1) "key:__rand_int__"
2) "mylist"
3) "counter:__rand_int__"
4) "name"
5) "myhash"
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> keys *
(empty array)
127.0.0.1:6379[1]>
清除当前数据库
127.0.0.1:6379[1]> keys *
(empty array)
127.0.0.1:6379[1]> set name lskj
OK
127.0.0.1:6379[1]> keys *
1) "name"
127.0.0.1:6379[1]> flushdb # 清除当前数据库
OK
127.0.0.1:6379[1]> keys *
(empty array)
127.0.0.1:6379[1]>
清除全部数据库的内容
127.0.0.1:6379[1]> select 0
OK
127.0.0.1:6379> keys *
1) "key:__rand_int__"
2) "mylist"
3) "counter:__rand_int__"
4) "name"
5) "myhash"
127.0.0.1:6379> select 1
OK
127.0.0.1:6379[1]> flushall # 清除全部数据库的内容
OK
127.0.0.1:6379[1]> select 0
OK
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>
redis是单线程的
redis是基于内存操作,CPU不是redis的性能瓶颈,redis的瓶颈是根据机器的内存和网络带宽,既然可以使用单线程来实现,就使用单线程了。
redis是用C语言写的。
redis为什么是单线程,还怎样快?
1、误区一:高性能的服务器一定是多线程的。
2、误区二:多线程(CPU上下文会切换)一定比单线程效率高。
redis是将所有的数据全部放在内存中的,所以使用单线程去操作就是最高的。多线程(CPU上下文切换:是耗时的操作),对于内存系统而言,如果没有上下文切换效率最高,多次读写都是在一个CPU上,就内存情况来看,这个是最佳的方案。
五、五大数据类型
Redis 是一个开源(BSD许可)的,内存中的数据结构存储系统,它可以用作数据库
、缓存
和消息中间件
。 它支持多种类型的数据结构,如 字符串(strings), 散列(hashes), 列表(lists), 集合(sets), 有序集合(sorted sets) 与范围查询, bitmaps, hyperloglogs 和 地理空间(geospatial) 索引半径查询。 Redis 内置了 复制(replication),LUA脚本(Lua scripting), LRU驱动事件(LRU eviction),事务(transactions) 和不同级别的 磁盘持久化(persistence), 并通过 Redis哨兵(Sentinel)和自动 分区(Cluster)提供高可用性(high availability)。
关于Key的操作命令
1、查看数据库中的key
*
匹配0个或多个字符,?
匹配1个字符,[]
匹配[]里面的1个字符
keys k*
查看所有以k开头的key
keys k*y
查看所有以h开头、以y结尾的key
keys k?y
查看所有以h开头、以o结尾的、并且中间只有一个字符的key
keys h[abce]llo
查看所有以h开头、以llo结尾,并且h后面只能读取abc中的一个字符的key
2、判断key在数据库中是否存在
命令:
exists key #如果存在,则返回1;如果不存在,则返回0
exists key [key key ...] #返回值是存在的key数量
3、移动指定key到指定的数据库实例
命令:
move key index
4、查看指定key的剩余生存时间
命令:
ttl key
如果key没有设置生存时间,返回-1;如果key不存在,返回-2。
5、设置key的最大生存时间(过期时间)
命令:
expire key seconds
6、查看指定key的数据类型
命令:
type key
7、重命名key
命令:
rename key newkey
8、删除key
命令:
del key [key key ...]
返回值是实际删除key的数量
127.0.0.1:6379> set name lskj
OK
127.0.0.1:6379> keys * #查看所有的key的名字
1) "name"
127.0.0.1:6379> set age 20 #set key
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> exists name #判断当前的key是否存在
(integer) 1
127.0.0.1:6379> exists test
(integer) 0
127.0.0.1:6379> move name 1 #移除当前的key
(integer) 1
127.0.0.1:6379> keys *
1) "age"
127.0.0.1:6379> set name lskj
OK
127.0.0.1:6379> keys *
1) "age"
2) "name"
127.0.0.1:6379> get name
"lskj"
127.0.0.1:6379> expire name 10 # 设置key的过期时间,单位为秒
(integer) 1
127.0.0.1:6379> ttl name #查看当前key的剩余时间
(integer) 7
127.0.0.1:6379> ttl name
(integer) 2
127.0.0.1:6379> ttl name
(integer) -2
127.0.0.1:6379> get name
(nil)
127.0.0.1:6379> type name #查看当前key对应的value的类型
string
127.0.0.1:6379> type age
string
127.0.0.1:6379>
官网命令大全:http://www.redis.cn/commands.html
string
字符串类型。
1、将string类型的数据设置到redis中
如果key已经存在,则后来的value会将之前的value覆盖掉。
set key value
2、从redis中获取string类型的数据
get key
3、追加字符串
append key value
返回追加之后的字符串长度。如果key不存在,则新创建一个key,并且把value值设置为value。
4、获取字符串数据的长度
strlen key
5、将字符串数值进行加1运算
incr key
要求key所表示的value必须是数值,否则报错。
返回加1运算之后的数据。如果key不存在,首先设置一个key,值初始化为0,然后进行incr运算。
6、将字符串数值进行减1运算
decr key
要求key所表示的value必须是数值,否则报错。
返回减1运算之后的数据。如果key不存在,首先设置一个key,值初始化为0,然后进行decr运算。
7、将字符串数值进行加offset运算
incrby key offset
要求key所表示的value必须是数值,否则报错。
返回加offset运算之后的数据。如果key不存在,首先设置一个key,值初始化为0,然后进行incrby运算。
8、将字符串数值进行减offset运算
decrby key offset
要求key所表示的value必须是数值,否则报错。
返回减offset运算之后的数据。如果key不存在,首先设置一个key,值初始化为0,然后进行decrby运算。
9、闭区间获取字符串中从startIndex到endIndex的字符组成的子字符串
getrange key startIndex endIndex
下标自左向右,从0开始,依次往后。闭区间截取。
字符串中每一个下标也可以是负数,负数下标表示自右向左,从-1开始,一次往前,最右边的字符的下标为-1。
10、用value从下标为startIndex开始的字符串
命令:
setrange key startIndex value
11、设置字符串数据的同时,设置它的最大生命周期
命令:
setex key seconds value
12、设置字符串类型的数据value到redis数据库中,当key不存在时设置成功,否则,放弃设置。
命令:
setnx key value
13、批量将string类型的数据设置到redis中
命令:
mset key1 value1 key2 value2 ...
14、批量从redis中获取string类型的数据
命令:
mget key1 key2 key3 ...
15、批量设置string类型的数据value到redis数据库中,当所有key都不存在时设置成功,否则(只有一个已经存在)全部放弃设置
命令:
msetnx key1 value1 key2 value2 ...
127.0.0.1:6379> set key1 v1 #设置值
OK
127.0.0.1:6379> get key1 #获取值
"v1"
127.0.0.1:6379> exists key1 #判断某一个key是否存在
(integer) 1
127.0.0.1:6379> append key1 "hello" #追加字符串,如果当前key不存在,就相当于setkey
(integer) 7
127.0.0.1:6379> get key1
"v1hello"
127.0.0.1:6379> strlen key1 #获取字符串的长度
(integer) 7
127.0.0.1:6379> append key1 ",test!"
(integer) 13
127.0.0.1:6379> append key1 ",test!"
(integer) 13
127.0.0.1:6379> append name "test"
(integer) 4
127.0.0.1:6379> keys *
1) "key1"
2) "name"
127.0.0.1:6379> get name
"test"
127.0.0.1:6379>
#####################################################
#步长 i+=
127.0.0.1:6379> set views 0 #初始浏览量为0
OK
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incr views #自增1 浏览量+1
(integer) 1
127.0.0.1:6379> incr views
(integer) 2
127.0.0.1:6379> get views
"2"
127.0.0.1:6379> decr views #自减1 浏览量—1
(integer) 1
127.0.0.1:6379> decr views
(integer) 0
127.0.0.1:6379> get views
"0"
127.0.0.1:6379> incrby views 10 #设置步长,指定增量
(integer) 10
127.0.0.1:6379> decrby views 5
(integer) 5
127.0.0.1:6379>
#####################################################
#字符串范围 range
127.0.0.1:6379> flushdb
OK
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> set key1 "hello,test" #设置key1的值
OK
127.0.0.1:6379> get key1
"hello,test"
127.0.0.1:6379> getrange key1 0 3 #截取字符串[0,3]
"hell"
127.0.0.1:6379> getrange key1 0 -1 #获取全部的字符串,与get key一样
"hello,test"
127.0.0.1:6379>
#替换
127.0.0.1:6379> set key2 abcdefg
OK
127.0.0.1:6379> get key2
"abcdefg"
127.0.0.1:6379> setrange key2 1 xx #替换指定位置开始的字符串
(integer) 7
127.0.0.1:6379> get key2
"axxdefg"
127.0.0.1:6379>
#####################################################
# setex(set with expire) #设置过期时间
# setnx (set if not exist) #不存在再设置(在分布式锁中常用)
127.0.0.1:6379> setex key3 30 "hello" #设置key3的值为hello,30秒后过期
OK
127.0.0.1:6379> ttl key3
(integer) 24
127.0.0.1:6379> ttl key3
(integer) 19
127.0.0.1:6379> get key3
"hello"
127.0.0.1:6379> setnx mykey "redis" #若mykey不存在,创建mykey
(integer) 1
127.0.0.1:6379> keys *
1) "key2"
2) "key1"
3) "mykey"
127.0.0.1:6379> ttl key3
(integer) -2
127.0.0.1:6379> setnx mykey "MongoDB" #若mykey存在,创建失败
(integer) 0
127.0.0.1:6379> get mykey
"redis"
127.0.0.1:6379>
#####################################################
# mset
# mget
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 #同时设置多个值
OK
127.0.0.1:6379> keys *
1) "k3"
2) "k2"
3) "k1"
127.0.0.1:6379> mget k1 k2 k3 #同时获取多个值
1) "v1"
2) "v2"
3) "v3"
127.0.0.1:6379> msetnx k1 v1 k4 v4 #msetnx是一个原子性操作,要么一起成功,要么一起失败
(integer) 0
127.0.0.1:6379> get k4
(nil)
127.0.0.1:6379>
#对象
# set user:1 {name:zhangsan,age:20} #设置一个user:1对象 值为json字符来保存一个对象。
# user:{id}:{filed} #这样的设计在redis中是可以的
127.0.0.1:6379> mset user:1:name zhangsan user:1:age 20
OK
127.0.0.1:6379> mget user:1:name user:1:age
1) "zhangsan"
2) "20"
127.0.0.1:6379>
#####################################################
# getset #先get后再set
127.0.0.1:6379> getset db redis #若不存在值,则返回nil
(nil)
127.0.0.1:6379> get db
"redis"
127.0.0.1:6379> getset db mongdb #若存在值,则获取原来的值,并设置新值
"redis"
127.0.0.1:6379> get db
"mongdb"
127.0.0.1:6379>
String类型的使用:value除了是字符串,还可以是数字。(计数器、统计多单位的数量、粉丝数、对象缓存存储)
list
基本的数据类型,列表。在redis中,可以把list当成栈、队列、阻塞队列,元素可以重复。
- 一个key有多个value
- 多个value之间有顺序,最左侧是表头,最右侧是表尾
- 每一个元素都有下标,表头元素的下标是0,依次往后排序,最后一个元素的下标是列表长度-1
- 每一个元素的下标又可以用负数表示,负下标表示从表尾计算,最后一个元素下标用-1表示
- 元素只在列表中的顺序或者下标由放入的顺序来决定
所有的list命令都是以l
开头的。redis命令不区分大小写。
1、将一个或多个值一次插入到列表的表头(左侧)
命令:
lpush key value [value value ...]
2、获取指定列表中指定下标区间的元素
命令:
lrange key startIndex endIndex
3、将一个或者多个值依次插入到列表的表尾(右侧)
命令:
rpush key value [value value ...]
4、从指定列表中移除并且返回表头元素
命令:
lpop key
5、从指定列表中移除并且返回表尾元素
命令:
rpop key
6、获取指定列表中指定下标的元素
命令:
lindex key index
7、获取指定列表的长度
命令:
llen key
8、根据count的值移除指定列表中与value相等的数据
命令:
lrem key count value
如果count大于0:从列表的左侧移除count个与value相等的数据。
如果count小于0:从列表的右侧移除count个与value相等的数据。
127.0.0.1:6379> lpush list one #将一个值或多个值,插入到列表头部(左)
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrange list 0 -1 #获取list中的值
1) "three"
2) "two"
3) "one"
127.0.0.1:6379> lrange list 0 1 #通过区间获取具体的值
1) "three"
2) "two"
127.0.0.1:6379> rpush list righr #将一个值或多个值,插入到列表尾部(右)
(integer) 4
127.0.0.1:6379> lrange list 0 1
1) "three"
2) "two"
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "righr"
127.0.0.1:6379>
#####################################################
# lpop
# rpop
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
3) "one"
4) "righr"
127.0.0.1:6379> lpop list #移除list的第一个元素
"three"
127.0.0.1:6379> rpop list #移除list的最后一个元素
"righr"
127.0.0.1:6379> lrange list 0 -1
1) "two"
2) "one"
127.0.0.1:6379>
#####################################################
# lindex
127.0.0.1:6379> lindex list 1 #通过下标获得list中的某一个值
"one"
127.0.0.1:6379> lindex list 0
"two"
127.0.0.1:6379>
#####################################################
# llen
127.0.0.1:6379> lpush list one
(integer) 1
127.0.0.1:6379> lpush list two
(integer) 2
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> llen list #返回列表的长度
(integer) 3
127.0.0.1:6379>
#####################################################
# 移除指定的值
# lrem
127.0.0.1:6379> lpush list three
(integer) 4
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "three"
3) "two"
4) "one"
127.0.0.1:6379> lrem list 1 one #移除list集合中指定的个数的value,精确匹配
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "three"
3) "two"
127.0.0.1:6379> lrem list 1 three
(integer) 1
127.0.0.1:6379> lrange list 0 -1
1) "three"
2) "two"
127.0.0.1:6379> lpush list three
(integer) 3
127.0.0.1:6379> lrem list 2 three
(integer) 2
127.0.0.1:6379> lrange list 0 -1
1) "two"
127.0.0.1:6379>
#####################################################
# trim
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> rpush mylist "hello3"
(integer) 4
127.0.0.1:6379> ltrim mylist 1 2 #通过下标截取指定的长度,这个list已经被改变了,截断了,只剩下截取的元素
OK
127.0.0.1:6379> lrange mylist 0 -1
1) "hello1"
2) "hello2"
127.0.0.1:6379>
#####################################################
# rpoplpush #移除列表的最后一个元素,将它移动到心得列表中
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "hello1"
(integer) 2
127.0.0.1:6379> rpush mylist "hello2"
(integer) 3
127.0.0.1:6379> rpoplpush mylist myotherlist #移除列表的最后一个元素,将它移动到心得列表中
"hello2"
127.0.0.1:6379> lrange mylist 0 -1 #查看原来的列表
1) "hello"
2) "hello1"
127.0.0.1:6379> lrange myotherlist 0 -1 #查看目标列表
1) "hello2"
127.0.0.1:6379>
#####################################################
# lset 将列表中指定下标的值替换为另外一个值,更新操作
127.0.0.1:6379> exists list #判断这个列表是否存在
(integer) 0
127.0.0.1:6379> lset list 0 item #若不存在列表,更新就会报错
(error) ERR no such key
127.0.0.1:6379> lpush list value
(integer) 1
127.0.0.1:6379> lrange list 0 0
1) "value"
127.0.0.1:6379> lset list 0 item #若存在,更新当前下标的值
OK
127.0.0.1:6379> lrange list 0 0
1) "item"
127.0.0.1:6379> lset list 1 other #若不存在,则会报错
(error) ERR index out of range
127.0.0.1:6379>
#####################################################
# linsety 将某一个具体的value插入到列表中某个元素的前面或后面
127.0.0.1:6379> rpush mylist "hello"
(integer) 1
127.0.0.1:6379> rpush mylist "redis"
(integer) 2
127.0.0.1:6379> linsert mylist before "redis" ",test"
(integer) 3
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) ",test"
3) "redis"
127.0.0.1:6379> linsert mylist after "redis" new
(integer) 4
127.0.0.1:6379> lrange mylist 0 -1
1) "hello"
2) ",test"
3) "redis"
4) "new"
127.0.0.1:6379>
小结
- list实际上是一个链表,before Node after,left,right都可以插入值
- 若key不存在,创建新的链表
- 若key存在,新增内容
- 若移除了所有的值,空链表,也代表不存在
- 在两边插入或改动值,效率最高。中间元素,相对来说,效率会低一点
消息排队,消息队列(Lpush Rpop),栈(Lpush Lpop)
set
集合类型。set中的值不能重复。单key多value(无序)。
- 一个key对应多个value
- value之间没有顺序,并且不能重复
- 元素没有下标
- 通过业务数据直接操作集合
1、将一个或多个元素添加到集合中
命令:member也就是value
sadd key member [member member ...]
- 如果元素已经存在,则会忽略
- 返回成功加入的元素个数
2、获取指定集合中所有的元素
命令:
smembers key
3、判断指定元素在指定集合中是否存在
命令:
sismember key member
- 存在,返回1
- 不存在,返回0
4、获取指定集合的长度
命令:
scard key
5、移除指定集合中一个或多个元素
命令:
srem key member [member member ...]
- 不存在,则会忽略
- 返回成功移除的个数
6、随机获取指定集合中的一个或多个元素
命令:
srandmember key [count]
- 如果count大于0:随机获取的多个元素之间不能重复
- 如果count小于0:随机获取的多个元素之间可能重复
7、从指定集合中随机移除一个或多个元素
命令:
spop key [count]
返回移除的元素。
8、将指定集合中的指定元素移动到另一个元素
命令:
smove source destination member
9、获取一个集合中,但是其它集合中没有的元素组成的新集合
命令:
sdiff key [key ...]
10、获取所有指定集合中都有的元素组成的集合
命令:
sinter key [key ...]
11、获取所有指定集合中所有元素组成的大集合
命令:
sunion key [key ...]
127.0.0.1:6379> sadd myset "hello" #set集合中添加元素
(integer) 1
127.0.0.1:6379> sadd myset "test"
(integer) 1
127.0.0.1:6379> sadd myset "lskj"
(integer) 1
127.0.0.1:6379> smembers myset #查看指定set的所有值
1) "lskj"
2) "hello"
3) "test"
127.0.0.1:6379> sismember myset hello #判断某一个值是否在set集合中
(integer) 1
127.0.0.1:6379> sismember myset redis
(integer) 0
#####################################################
127.0.0.1:6379> scard myset #获取set集合中的内容元素个数
(integer) 3
127.0.0.1:6379> sadd myset "test" #将一个或多个成员元素加入到集合中,已经存在于集合的成员元素将被忽略。
(integer) 0
127.0.0.1:6379> sadd myset "test_2"
(integer) 1
127.0.0.1:6379> scard myset
(integer) 4
127.0.0.1:6379>
#####################################################
127.0.0.1:6379> srem myset "hello" #移除set集合中的指定元素
(integer) 1
127.0.0.1:6379> scard myset
(integer) 3
127.0.0.1:6379> smembers myset
1) "lskj"
2) "test"
3) "test_2"
127.0.0.1:6379>
#####################################################
# set无序不重复集合,抽随机
127.0.0.1:6379> srandmember myset #随机抽选出一个元素
"lskj"
127.0.0.1:6379> srandmember myset
"test"
127.0.0.1:6379> srandmember myset 2 #随机抽选出指定个数的袁术
1) "lskj"
2) "test"
127.0.0.1:6379>
#####################################################
#删除指定的key,随机删除一个key
127.0.0.1:6379> smembers myset
1) "lskj"
2) "test"
3) "test_2"
127.0.0.1:6379> spop myset #随机删除set集合中的元素
"test"
127.0.0.1:6379> spop myset
"lskj"
127.0.0.1:6379> smembers myset
1) "test_2"
127.0.0.1:6379>
#####################################################
# 将一个指定的值,移动到另一个set集合中
127.0.0.1:6379> sadd myset "hello"
(integer) 1
127.0.0.1:6379> sadd myset "test"
(integer) 1
127.0.0.1:6379> sadd myset "test_2"
(integer) 1
127.0.0.1:6379> sadd myset2 "t"
(integer) 1
127.0.0.1:6379> sadd myset2 "t2"
(integer) 1
127.0.0.1:6379> smove myset myset2 "test_2" # 将一个指定的值,移动到另一个set集合中
(integer) 1
127.0.0.1:6379> smembers myset
1) "hello"
2) "test"
127.0.0.1:6379> smembers myset2
1) "test_2"
2) "t2"
3) "t"
127.0.0.1:6379>
#####################################################
# 一些视频网站中,存在共同关注(并集)
# 数字集合类:差集、交集、并集
127.0.0.1:6379> sadd key1 a
(integer) 1
127.0.0.1:6379> sadd key1 b
(integer) 1
127.0.0.1:6379> sadd key1 c
(integer) 1
127.0.0.1:6379> sadd key2 c
(integer) 1
127.0.0.1:6379> sadd key2 d
(integer) 1
127.0.0.1:6379> sadd key2 e
(integer) 1
127.0.0.1:6379> sdiff key1 key2 #差集
1) "b"
2) "a"
127.0.0.1:6379> sinter key1 key2 #交集(共同关注、好友就可以这样实现)
1) "c"
127.0.0.1:6379> sunion key1 key2 #并集
1) "e"
2) "a"
3) "c"
4) "b"
5) "d"
127.0.0.1:6379>
B站,A用户将所有关注的人放在一个set集合中,将粉丝也放在一个set集合中。
共同关注、共同爱好、二度好友、推荐好友(六度分割理论)
hash
哈希,map集合,key-map集合=>key-<key-value>
。
hash本质和String类型无太大区别,还是一个key-value。
1、将一个field-value对设置到哈希表中
命令:
hset key field value
如果key field已经存在,则value会把以前的值覆盖掉。
2、获取指定哈希表中指定field的值
命令:
hget key field
3、批量将多个field-value对设置到哈希表中
命令:
hmset key field value [field value ...]
4、批量获取指定哈希表中的field的值
命令:
hmget key field [field ...]
5、获取指定哈希表中所有的field和value
命令:
hgetall key
6、从指定哈希表中删除一个或者多个field
命令:
hdel key field [field ...]
7、获取指定哈希表中所有的field个数
命令:
hlen key
8、判断指定哈希表中是否存在某一个field
命令:
hexists key field
9、获取指定哈希表中所有的field列表
命令:
hkeys key
10、获取指定哈希表中所有的value列表
命令:
hvals key
11、对指定哈希表中指定的field的值进行整数加法运算
命令:
hincrby key field int
12、对指定哈希表中指定的field的值进行浮点数加入运算
命令:
hincrbyfloat key field float
13、将一个field-value对设置到哈希表中,当key-field已经存在时,则放弃设置;否则,设置filed-value
命令:
hsetnx key field value
127.0.0.1:6379> hset myhash field1 test #set一个具体key-value
(integer) 1
127.0.0.1:6379> hget myhash field1 #获取一个字段值
"test"
127.0.0.1:6379> hmset myhash field1 "t1" field2 "t" #set多个key-value
OK
127.0.0.1:6379> hmget myhash field1 field2 #获取多个字段值
1) "t1"
2) "t"
127.0.0.1:6379> hgetall myhash #获取全部的数据
1) "field1"
2) "t1"
3) "field2"
4) "t"
127.0.0.1:6379> hdel myhash field1 #删除hash指定key字段,对应的value值也一同删除了
(integer) 1
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "t"
127.0.0.1:6379>
#####################################################
127.0.0.1:6379> hmset myhash field1 hello field2 world
OK
127.0.0.1:6379> hgetall myhash
1) "field2"
2) "world"
3) "field1"
4) "hello"
127.0.0.1:6379> hlen myhash #获取hash表的字段数量
(integer) 2
127.0.0.1:6379>
#####################################################
127.0.0.1:6379> hexists myhash field1 #判断hash中指定字段是否存在
(integer) 1
127.0.0.1:6379> hexists myhash field3
(integer) 0
127.0.0.1:6379>
#####################################################
# 只获取所有的field
# 只获取所有的value
127.0.0.1:6379> hkeys myhash # 只获取所有的field
1) "field2"
2) "field1"
127.0.0.1:6379> hvals myhash # 只获取所有的value
1) "world"
2) "hello"
127.0.0.1:6379>
#####################################################
127.0.0.1:6379> hset myhash field3 5 #指定增量为5
(integer) 1
127.0.0.1:6379> hincrby myhash field3 1
(integer) 6
127.0.0.1:6379> hincrby myhash field3 -1
(integer) 5
127.0.0.1:6379> hsetnx myhash field4 hello #若不存在,可以设置
(integer) 1
127.0.0.1:6379> hsetnx myhash field4 redis #若存在,则不能设置
(integer) 0
127.0.0.1:6379>
hash可以存储变更的数据,特别是用户信息之类的,以及变动的信息。hash更适合于对象的存储,String更加适合字符串存储。
zset
有序集合。在set的基础上,增加了一个值(set k1 v1,zset k1 score1 v1)
有序集合zset和集合set一样也是string类型元素的集合,且不允许重复的成员。不同的是zset的每个元素都会关联一个分数(分数可以重复),redis通过份数来为集合中的成员进行从小到大的排序。
有序集合中每一个元素都有顺序,所以也有下标。但有序集合中元素的排序规则又与列表中元素的排序规则不一样。
1、将一个或多个member及其score值加入到有序集合
命令:
zadd key score member [score member ...]
如果元素已经存在,则把分数覆盖。
2、获取指定有序集合中指定下标区间的元素
命令:
zrange key startIndex endIndex [withscores]
3、获取指定有序集合中指定分数区间(闭区间)的元素
命令:
zrangebyscore key mix max [withscores]
4、删除指定有序集合中一个或多个元素
命令:
zrem key member [member ...]
5、获取指定有序集合中所有元素的个数
命令:
zcard key
6、获取指定有序集合中指定元素的排名(排名从0开始)
命令:
zrank key member
7、获取指定有序集合中分数在指定区间内(闭区间)的元素的个数
命令:
zcount key min max
8、获取指定有序集合中指定元素的分数
命令:
zscore key member
9、获取指定有序集合中指定元素的排名(按照分数从大到小的排名)
命令:
zrevrank key member
127.0.0.1:6379> zadd myset 1 one #添加一个值
(integer) 1
127.0.0.1:6379> zadd myset 2 two 3 three #添加多个值
(integer) 2
127.0.0.1:6379> zrange myset 0 -1
1) "one"
2) "two"
3) "three"
127.0.0.1:6379>
#####################################################
# 排序如何实现
127.0.0.1:6379> zadd salary 2500 zhangsan #添加三个员工
(integer) 1
127.0.0.1:6379> zadd salary 5000 lisi
(integer) 1
127.0.0.1:6379> zadd salary 1500 wanger
(integer) 1
127.0.0.1:6379> zrangebyscore salary -inf +inf #显示全部的员工,从小到大
1) "wanger"
2) "zhangsan"
3) "lisi"
127.0.0.1:6379> zrangebyscore salary -inf +inf withscores #显示全部的员工,并附带成绩score
1) "wanger"
2) "1500"
3) "zhangsan"
4) "2500"
5) "lisi"
6) "5000"
127.0.0.1:6379> zrevrange salary 0 -1 #从大到小进行排序
1) "lisi"
2) "zhangsan"
127.0.0.1:6379> zrangebyscore salary -inf 2500 withscores #显示工资小于2500的员工(升序)
1) "wanger"
2) "1500"
3) "zhangsan"
4) "2500"
127.0.0.1:6379>
#####################################################
# rem移除元素
127.0.0.1:6379> zrange salary 0 -1
1) "wanger"
2) "zhangsan"
3) "lisi"
127.0.0.1:6379> zrem salary wanger #移除有序集合中指定的元素
(integer) 1
127.0.0.1:6379> zrange salary 0 -1
1) "zhangsan"
2) "lisi"
127.0.0.1:6379> zcard salary #获取有序集合中的个数
(integer) 2
127.0.0.1:6379>
#####################################################
127.0.0.1:6379> zadd myset 1 hello 2 redis 3 test
(integer) 3
127.0.0.1:6379> zcount myset 1 3 #获取指定区间的成员数量
(integer) 3
127.0.0.1:6379> zcount myset 1 2
(integer) 2
127.0.0.1:6379>
排行榜应用实现,存储班级成绩表、员工工资表排序。
六、三种特殊数据类型
geospatial
地理位置。redis的Geo在redis3.2版本就推了。这个功能可以推算地理位置的信息、两地之间的距离以及方圆几里的人。
城市经纬度查询:
http://www.gjw123.com/tools-gps
geoadd
- 有效的经度从-180度到180度。
- 有效的纬度从-85.05112878度到85.05112878度。
当坐标位置超出上述指定范围时,该命令将会返回一个错误。
# getadd 添加地理位置
# 规则:两极无法直接添加,一般会下载城市数据,直接通过java程序一次性导入。
# 参数 key 值(经度、纬度、名称)先经度再纬度
127.0.0.1:6379> geoadd china:city 116.40 39.90 beijing
(integer) 1
127.0.0.1:6379> geoadd china:city 121.47 31.23 shanghai
(integer) 1
127.0.0.1:6379> geoadd china:city 106.50 29.53 chongqing
(integer) 1
127.0.0.1:6379> geoadd china:city 114.05 22.54 shenzhen
(integer) 1
127.0.0.1:6379> geoadd china:city 120.16 30.24 hangzhou
(integer) 1
127.0.0.1:6379> geoadd china:city 108.96 34.26 xian
(integer) 1
127.0.0.1:6379>
geopos
获得当前定位:一定是一个坐标值。
127.0.0.1:6379> geopos china:city beijing #获取指定城市的经度和纬度
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
127.0.0.1:6379> geopos china:city beijing chongqing
1) 1) "116.39999896287918091"
2) "39.90000009167092543"
2) 1) "106.49999767541885376"
2) "29.52999957900659211"
127.0.0.1:6379>
geodist
返回两个给定位置之间的距离。
如果两个位置之间的其中一个不存在, 那么命令返回空值。
指定单位的参数 unit 必须是以下单位的其中一个:
- m 表示单位为米。
- km 表示单位为千米。
- mi 表示单位为英里。
- ft 表示单位为英尺。
127.0.0.1:6379> geodist china:city beijing chongqing #查看北京到重庆的直线距离
"1464070.8051"
127.0.0.1:6379> geodist china:city beijing chongqing km
"1464.0708"
127.0.0.1:6379> geodist china:city beijing shanghai km #查看北京到上海的直线距离
"1067.3788"
127.0.0.1:6379>
georadius
以给定的经纬度为中心, 返回键包含的位置元素当中, 与中心的距离不超过给定最大距离的所有位置元素。
附近的人,通过半径来查询。(所有的数据应都录入,结果才会更精确)
127.0.0.1:6379> georadius china:city 110 30 1000 km #获取经纬度为100 30为中心,寻找方圆1000km内的城市
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
127.0.0.1:6379> georadius china:city 110 30 500 km
1) "chongqing"
2) "xian"
127.0.0.1:6379> georadius china:city 110 30 500 km withdist #显示到中心距离的位置
1) 1) "chongqing"
2) "341.9374"
2) 1) "xian"
2) "483.8340"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord #显示其它的位置信息
1) 1) "chongqing"
2) 1) "106.49999767541885376"
2) "29.52999957900659211"
2) 1) "xian"
2) 1) "108.96000176668167114"
2) "34.25999964418929977"
127.0.0.1:6379> georadius china:city 110 30 500 km withcoord count 1 #筛选出1条查询的结果
1) 1) "chongqing"
2) 1) "106.49999767541885376"
2) "29.52999957900659211"
127.0.0.1:6379>
georadiusbymember
找出位于指定元素周围的其它元素。
127.0.0.1:6379> georadiusbymember china:city beijing 1000 km
1) "beijing"
2) "xian"
127.0.0.1:6379> georadiusbymember china:city shanghai 500 km
1) "hangzhou"
2) "shanghai"
127.0.0.1:6379>
geohash
返回一个或多个位置元素的 Geohash 表示。该命令将返回11个字符的Geohash字符串
# 将二维的经纬度转换成一维的字符串,如果两个字符串与接近,则距离越远。
127.0.0.1:6379> geohash china:city beijing chongqing
1) "wx4fbxxfke0"
2) "wm5xzrybty0"
127.0.0.1:6379>
geo底层的实现原理其实就是Zset。可以使用Zset命令来操作geo。
127.0.0.1:6379> zrange china:city 0 -1 #查看地图中全部元素
1) "chongqing"
2) "xian"
3) "shenzhen"
4) "hangzhou"
5) "shanghai"
6) "beijing"
127.0.0.1:6379> zrem china:city chongqing #移除指定元素
(integer) 1
127.0.0.1:6379> zrange china:city 0 -1
1) "xian"
2) "shenzhen"
3) "hangzhou"
4) "shanghai"
5) "beijing"
127.0.0.1:6379>
hyperloglog
什么是基数?
不重复的元素,可以接受误差。
例如:A{1,3,5,7,9,5},B{1,5,3} 基数为2
redis2.8.9版本就更新了Hyperloglog数据结构。
Redis Hyperloglog基数统计的算法。(优点:占用的内存是固定的。若从内存角度来比较的话,Hyperloglog首选)
网页的UV(一个人访问一个网站多次,但还是算作一个人)
127.0.0.1:6379> pfadd mykey a b c d e f g h i j #创建第1组元素mykey
(integer) 1
127.0.0.1:6379> pfcount mykey #统计mykey元素的基数数量
(integer) 10
127.0.0.1:6379> pfadd mykey2 i j z x c v b n m #创建第1组元素mykey2
(integer) 1
127.0.0.1:6379> pfcount mykey2
(integer) 9
127.0.0.1:6379> pfmerge mykey3 mykey mykey2 #合并两组元素mykey mykey2 -> mykey3并集
OK
127.0.0.1:6379> pfcount mykey3
(integer) 15
127.0.0.1:6379>
如果允许容错,那么一定可以使用Hyperloglog。
若不允许容错,就使用set或自定义的数据类型。
Bitmap
位存储
统计用户信息:活跃与不活跃,登录与未登录,打卡与未打卡。两个状态的,都可以使用bitmap。
bitmap位图,数据结构。都是操作二进制位来进行记录,就只有0和1两个状态。
测试
使用bitmap来记录周一到周日的打卡。
127.0.0.1:6379> setbit sign 0 0
(integer) 0
127.0.0.1:6379> setbit sign 1 0
(integer) 0
127.0.0.1:6379> setbit sign 2 1
(integer) 0
127.0.0.1:6379> setbit sign 3 1
(integer) 0
127.0.0.1:6379> setbit sign 4 0
(integer) 0
127.0.0.1:6379> setbit sign 5 1
(integer) 0
127.0.0.1:6379> setbit sign 6 0
(integer) 0
127.0.0.1:6379>
查看某一天是否打卡
127.0.0.1:6379> getbit sign 3
(integer) 1
127.0.0.1:6379> getbit sign 1
(integer) 0
127.0.0.1:6379>
统计操作,统计打卡的天数
127.0.0.1:6379> bitcount sign #统计一周的打卡记录,可以看出是否有全勤
(integer) 3
127.0.0.1:6379>
七、事务
要么同时成功,要么同时失败。=>原子性
Redis单条命令是保证原子性的,但是事务不保证原子性。
Redis事务本质:一组命令的集合。一个事务中的所有命令都会被序列化,在事务执行过程中,会按照顺序执行。(一次性、顺序性、排他性 =>执行一系列的命令)
Redis事务没有隔离级别的概念。
所有的命令在事务中,并没有直接被执行。只有发起执行命令时才会执行,Exex。
redis事务:
- 开启事务(multi)
- 命令入队(…)
- 执行事务(exec)
正常执行事务
127.0.0.1:6379> multi #开启事务
OK
#命令入队
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> get k2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> exec #执行事务
1) OK
2) OK
3) "v2"
4) OK
127.0.0.1:6379>
放弃事务(discard)
清除所有已经压入队列中的命令,并且结束整个事务。
127.0.0.1:6379> multi #开启事务
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> discard #取消事务,事务队列中的命令都不会被执行
OK
127.0.0.1:6379> get k4
(nil)
127.0.0.1:6379>
编译型异常
(代码有问题,命令有错),事务中所有的命令都不会被执行,能够保证事务的原子性。
127.0.0.1:6379> multi
OK
127.0.0.1:6379> set k1 v1
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> getset k3 #错误的命令
(error) ERR wrong number of arguments for 'getset' command
127.0.0.1:6379> set k4 v4
QUEUED
127.0.0.1:6379> set k5 v5
QUEUED
127.0.0.1:6379> exec #执行事务报错,所有的命令都不会被执行
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379> get k5
(nil)
127.0.0.1:6379>
运行时异常
如果事务队列中代码逻辑存在错误,那么执行命令时,其它命令是可以正常执行的,错误命令抛出异常,不能够保证事务的原子性。
127.0.0.1:6379> set k1 "v1"
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> incr k1 #会执行失败
QUEUED
127.0.0.1:6379> set k2 v2
QUEUED
127.0.0.1:6379> set k3 v3
QUEUED
127.0.0.1:6379> get k3
QUEUED
127.0.0.1:6379> exec
1) (error) ERR value is not an integer or out of range #虽然第一条命令报错,但是依旧能正常执行
2) OK
3) OK
4) "v3"
127.0.0.1:6379> get k2
"v2"
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379>
监控:Watch(面试常问)
悲观锁:
- 非常悲观,认为什么时候都会出问题,无论做什么都会加锁
乐观锁:
- 非常乐观,认为什么时候都不会出问题,所以不会上锁。更新数据时,判断一下,在此期间,是否有人修改过这个数据。
- 获取version
- 更新时,比较version
Redis的监控测试
正常执行成功:
127.0.0.1:6379> set money 100
OK
127.0.0.1:6379> set out 0
OK
127.0.0.1:6379> watch money #监视money对象
OK
127.0.0.1:6379> multi #事务正常结束,数据期间没有发生变动,这时可正常执行成功。
OK
127.0.0.1:6379> decrby money 20
QUEUED
127.0.0.1:6379> incrby out 20
QUEUED
127.0.0.1:6379> exec
1) (integer) 80
2) (integer) 20
127.0.0.1:6379>
测试多线程修改值,使用watch可以当做redis的乐观锁操作。
127.0.0.1:6379> watch money #监视money
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 10
QUEUED
127.0.0.1:6379> incrby out 10
QUEUED
#先别执行exec 因为此时watch后的值还是原来的值
127.0.0.1:6379> exec #执行之前,另外一个线程,修改了值,这时就会导致事务执行失败
(nil)
127.0.0.1:6379>
#另外修改money的线程
127.0.0.1:6379> set money 1000
若修改失败,获取最新的值即可。
127.0.0.1:6379> unwatch #解锁
OK
127.0.0.1:6379> watch money #获取最新的值,再次监视
OK
127.0.0.1:6379> multi
OK
127.0.0.1:6379> decrby money 10
QUEUED
127.0.0.1:6379> incrby out 10
QUEUED
127.0.0.1:6379> exec #比对监视的值是否发生了变化
1) (integer) 990
2) (integer) 30
127.0.0.1:6379>
每次提交执行exec后都会自动释放锁,不管是否成功
1、如果发现事务执行失败,就先解锁。
2、获取最新的值,再次监视,select version
3、比对监视的值,是否发生了变化,若无变化,可执行成功。若有变化,则执行失败。
单独的隔离操作:事务中的所有命令都会序列化、顺序地执行。事务在执行过程中,不会被其它客户端发来的命令请求所打断,除非使用watch命令监控某些键。
不保证事务的原子性:redis同一个事务中如果一条命令执行失败,其后的命令仍然可能会执行,redis的事务没有回滚。redis已经在系统内部进行功能简化,这样可以确保更快的运行速度,因为redis不需要事务回滚的能力。
八、Jedis
使用java来操作Redis。
Jedis是Redis官方推荐的java连接开发工具。使用java操作Redis中间件。
测试
1、新建一个空项目redis,并创建一个普通maven。
2、导入依赖。
<dependencies>
<!--导入Jedis包-->
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>
3、编码测试。
- 连接数据库
- 操作命令
- 断开连接
package com.lskj;
import redis.clients.jedis.Jedis;
public class TestPing {
public static void main(String[] args) {
//1、new Jedis对象
Jedis jedis = new Jedis("127.0.0.1",6379);
System.out.println(jedis.ping());
}
}
运行测试,输出PONG
。
常用API
key操作
public class TestKey {
public static void main(String[] args) {
//1、new Jedis对象
Jedis jedis = new Jedis("127.0.0.1",6379);
//System.out.println(jedis.ping());
System.out.println("清空数据:"+jedis.flushDB());
System.out.println("判断某个键是否存在:"+jedis.exists("username"));
System.out.println("新增<'username','lskj'>的键值对:"+jedis.set("username","lskj"));
System.out.println("新增<'password','lskj'>的键值对:"+jedis.set("password","lskj"));
System.out.print("系统中所有的键如下:");
Set<String> keys = jedis.keys("*");
System.out.println(keys);
System.out.println("删除键password:"+jedis.del("password"));
System.out.println("判断建password是否存在:"+jedis.exists("password"));
System.out.println("查看键username所存储的值的类型:"+jedis.type("username"));
System.out.println("随即返回key空间的一个key:"+jedis.randomKey());
System.out.println("重命名key:"+jedis.rename("username","name"));
System.out.println("取出改名后的name:"+jedis.get("name"));
System.out.println("按索引查询:"+jedis.select(0));
System.out.println("删除当前选择数据库中的所有key:"+jedis.flushDB());
System.out.println("返回当前数据库中key的数目:"+jedis.dbSize());
System.out.println("删除所有数据库中的所有key:"+jedis.flushAll());
}
}
String
public class TestString {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1",6379);
jedis.flushDB();
System.out.println("增加数据<k1,v1>:"+jedis.set("k1","v1"));
System.out.println("增加数据<k2,v2>:"+jedis.set("k2","v2"));
System.out.println("增加数据<k3,v3>:"+jedis.set("k3","v3"));
System.out.println("删除键k2:"+jedis.del("k2"));
System.out.println("获取键k2:"+jedis.get("k2"));
System.out.println("修改k1:"+jedis.set("k1","v1_1"));
System.out.println("获取k1的值:"+jedis.get("k1"));
System.out.println("在k3后面加入值:"+jedis.append("k3","End"));
System.out.println("获取k3的值:"+jedis.get("k3"));
System.out.println("增加多个健值对:"+jedis.mset("k01","v01","k02","v02","k03","v03"));
System.out.println("获取多个健值对:"+jedis.mget("k01","k02","k03"));
System.out.println("获取多个健值对:"+jedis.mget("k01","k02","k03","k04"));
System.out.println("删除多个键值对:"+jedis.del("k01","k02"));
System.out.println("获取多个键值对:"+jedis.mget("k01","k02","k03"));
jedis.flushDB();
System.out.println("==========新增键值对,防止覆盖原来的值==========");
System.out.println("新增键值对<k1,v1>:"+jedis.setnx("k1","v1"));
System.out.println("新增键值对<k2,v2>:"+jedis.setnx("k2","v2"));
System.out.println("新增键值对<k2,v2>:"+jedis.setnx("k2","v2_new"));
System.out.println("k1 = " +jedis.get("k1"));
System.out.println("k2 = " +jedis.get("k2"));
System.out.println("==========新增键值对,并设置有效时间==========");
System.out.println("新增键值对<k3,v3>:"+jedis.setex("k3",5,"v3"));
System.out.println("k3 = "+jedis.get("k3"));
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("6秒过后,k3 = "+jedis.get("k3"));
System.out.println("==========获取原值,更新为新值==========");
System.out.println("获取k2的值,并将其设置为v2_getset:"+jedis.getSet("k2","v2_getset"));
System.out.println("此时,k2 = "+jedis.get("k2"));
System.out.println("获得k2的值的字串[2-4]:"+jedis.getrange("k2",2,4));
}
}
List
public class TestList {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("==========添加一个list:collections==========");
jedis.lpush("collections","ArrayList","Vector","Stack","HashMap","WeakHashMap","LinkedHashMap");
jedis.lpush("collections","HashSet");
jedis.lpush("collections","TreeSet");
jedis.lpush("collections","TreeMap");
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("collections中0-3区间的元素:"+jedis.lrange("collections",0,3));
System.out.println("====================");
//删除列表指定的值,第二个参数为删除的个数(有重复时),后面add进去的值先被删除,类似于出栈
System.out.println("删除指定元素个数:"+jedis.lrem("collections",2,"HashMap"));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("删除下标0-3区间之外的元素:"+jedis.ltrim("collections",0,3));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("collections列表出栈(左端):"+jedis.lpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("collections添加元素,从列表右端,与lpush相对应:"+jedis.rpush("collections","test","collection","redis"));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("collections列表出栈(右端):"+jedis.rpop("collections"));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("修改collections指定下标1的内容:"+jedis.lset("collections",1,"hello"));
System.out.println("collections的内容:"+jedis.lrange("collections",0,-1));
System.out.println("====================");
System.out.println("collections的长度:"+jedis.llen("collections"));
System.out.println("获取collections下标为2的元素:"+jedis.lindex("collections",2));
System.out.println("==========添加一个list:sortedList==========");
System.out.println(jedis.lpush("sortedList","5","1","7","9","3","0"));
System.out.println("sortedList排序前:"+jedis.lrange("sortedList",0,-1));
System.out.println("sortedList进行排序:"+jedis.sort("sortedList"));
}
}
Set
public class TestSet {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
System.out.println("==========向集合eleSet中添加元素(不重复)==========");
System.out.println(jedis.sadd("eleSet","e5","e3","e1","e4","e2","e0","e7","e8"));
System.out.println(jedis.sadd("eleSet","e6"));
System.out.println(jedis.sadd("eleSet","e6"));
System.out.println("eleSet的所有元素:"+jedis.smembers("eleSet"));
System.out.println("删除一个元素e0:"+jedis.srem("eleSet","e0"));
System.out.println("eleSet的所有元素:"+jedis.smembers("eleSet"));
System.out.println("删除两个元素e7和e6:"+jedis.srem("eleSet","e7","e6"));
System.out.println("eleSet的所有元素:"+jedis.smembers("eleSet"));
System.out.println("随机移除集合eleSet中的一个元素:"+jedis.spop("eleSet"));
System.out.println("随机移除集合eleSet中的一个元素:"+jedis.spop("eleSet"));
System.out.println("eleSet的所有元素:"+jedis.smembers("eleSet"));
System.out.println("eleSet中包含所有的元素的个数为:"+jedis.scard("eleSet"));
System.out.println("e3是否在eleSet中:"+jedis.sismember("eleSet","e3"));
System.out.println("e1是否在eleSet中:"+jedis.sismember("eleSet","e2"));
System.out.println("e6是否在eleSet中:"+jedis.sismember("eleSet","e6"));
System.out.println("====================");
System.out.println("添加集合eleSet1:"+jedis.sadd("eleSet1","e1","e2","e3","e4","e5"));
System.out.println("添加集合eleSet2:"+jedis.sadd("eleSet2","e3","e1","e0","e6"));
System.out.println("将eleSet1中e4删除,并存入eleSet3中:"+jedis.smove("eleSet1","eleSet3","e4"));
System.out.println("将eleSet1中e2删除,并存入eleSet3中:"+jedis.smove("eleSet1","eleSet3","e2"));
System.out.println("eleSet1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleSet3中的元素:"+jedis.smembers("eleSet3"));
System.out.println("===========集合运算=========");
System.out.println("eleS1中的元素:"+jedis.smembers("eleSet1"));
System.out.println("eleS2中的元素:"+jedis.smembers("eleSet2"));
System.out.println("eleSet1与eleSet2的交集:"+jedis.sinter("eleSet1","eleSet2"));
System.out.println("eleSet1与eleSet2的并集:"+jedis.sunion("eleSet1","eleSet2"));
System.out.println("eleSet1与eleSet2的差集:"+jedis.sdiff("eleSet1","eleSet2"));
System.out.println("求交集,并将交集保存到eleS4集合中:"+jedis.sinterstore("eleSet4","eleSet1","eleSet2"));
System.out.println("eleSet4中的元素:"+jedis.smembers("eleSet4"));
}
}
Hash
public class TestHash {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
HashMap<String,String> map = new HashMap<>();
map.put("k1","v1");
map.put("k2","v2");
map.put("k3","v3");
map.put("k4","v4");
System.out.println("添加名称(key)为hash的hash元素:"+jedis.hmset("hash",map));
System.out.println("向hash中添加<k5,v5>:"+jedis.hset("hash","k5","v5"));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("散列hash的所有键为:"+jedis.hkeys("hash"));
System.out.println("散列hash的所有值为:"+jedis.hvals("hash"));
System.out.println("将k6保存的值加上一个整数,如果k6不存在,则添加k6:"+jedis.hincrBy("hash","k6",1));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("将k6保存的值加上一个整数,如果k6不存在,则添加k6:"+jedis.hincrBy("hash","k6",2));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("删除一个或者多个键值对:"+jedis.hdel("hash","k2"));
System.out.println("散列hash的所有键值对为:"+jedis.hgetAll("hash"));
System.out.println("散列hash的所有键值对的个数为:"+jedis.hlen("hash"));
System.out.println("判断hash中是否存在k2:"+jedis.hexists("hash","k2"));
System.out.println("判断hash中是否存在k3:"+jedis.hexists("hash","k3"));
System.out.println("获取hash中k3的值:"+jedis.hmget("hash","k3"));
System.out.println("获取hash中k3,k4的值:"+jedis.hmget("hash","k3","k4"));
}
}
事务
public class TestTransaction {
public static void main(String[] args) {
Jedis jedis = new Jedis("127.0.0.1", 6379);
jedis.flushDB();
JSONObject jsonObject = new JSONObject();
jsonObject.put("test","hello");
jsonObject.put("name","lskj");
//开启事务
Transaction multi = jedis.multi();
String result = jsonObject.toJSONString();
//jedis.watch(result);
try {
multi.set("user1",result);
multi.set("user2",result);
//int i = 1/0;
multi.exec();
}catch (Exception e){
multi.discard(); //放弃事务
e.printStackTrace();
}finally {
System.out.println(jedis.get("user1"));
System.out.println(jedis.get("user2"));
jedis.close(); //关闭连接
}
}
}
九、SpringBoot整合
SpringBoot操作数据:spring-data jpa jdbc MongoDB redis。
在SpringBoot2.x后,原来使用的Jedis被替换成了lettuce。
Jedis:底层采用直连,多个线程操作的话,不安全。若想要避免不安全,须使用jedis pool连接池,更像BIO模式。
lettuce:底层采用netty,实例可以在多个线程中进行共享,不存在线程不安全的情况。可以减少x线程数量,更像NIO模式。
RedisAutoConfiguration源码查看
public class RedisAutoConfiguration {
public RedisAutoConfiguration() {
}
@Bean
@ConditionalOnMissingBean(
name = {"redisTemplate"}
) //可以自定义一个redisTemplate来替换这个默认的
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
//默认的RedisTemplate没有过多的设置,redis对象都是需要序列化
//两个泛型都是Object类型的,后面使用需要强制转换为<String,Object>
RedisTemplate<Object, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
@Bean
@ConditionalOnMissingBean //由于String是redis中最常使用的类型,所以单独提出来一个bean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
return template;
}
}
整合测试
导入依赖
配置连接
测试
1、新建一个Spring Initializr Module。
2、选中Developer=>Spring Boot DevTools、Lombok、Spring Configuration Processor
Web=>Spring Web
NoSQL=>Spring Data Redis [Access+Driver]。
依赖(已导入):
<!--操作redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
3、配置连接。
application.properties
# 配置Redis
spring.redis.host=127.0.0.1
spring.redis.port=6379
4、测试。
@Autowired
private RedisTemplate redisTemplate;
@Test
void contextLoads() {
//redisTemplate 操作不同的数据类型
//opsForValue操作字符串,类似String
//redisTemplate.opsForValue()
//除了基本的操作,常用的方法也可直接通过redisTemplate操作,如事务、基本的CRUD
//获取redis的连接对象
/*RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
connection.flushDb();*/
redisTemplate.opsForValue().set("mykey","test");
System.out.println(redisTemplate.opsForValue().get("mykey"));
}
RedisTemplate.java
RedisTemplate.java中序列化配置。
@Nullable
private RedisSerializer keySerializer = null;
@Nullable
private RedisSerializer valueSerializer = null;
@Nullable
private RedisSerializer hashKeySerializer = null;
@Nullable
private RedisSerializer hashValueSerializer = null;
RedisTemplate.java默认序列化方式。
if (defaultSerializer == null) {
//默认序列化方式是JDK序列化,后面可能会使用json来序列化
defaultSerializer = new JdkSerializationRedisSerializer(
classLoader != null ? classLoader : this.getClass().getClassLoader());
}
自定义redisTemplate
关于对象的保存需要序列化。
package com.lskj.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnSingleCandidate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
//自定义redisTemplate
@Bean
@SuppressWarnings("all")
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
//为了方便开发,一般直接使用<String,Object>
RedisTemplate<String, Object> template = new RedisTemplate<String,Object>();
template.setConnectionFactory(redisConnectionFactory);
//序列化配置
Jackson2JsonRedisSerializer<Object> objectJackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
objectJackson2JsonRedisSerializer.setObjectMapper(mapper);
//String的序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
//key采用String的序列化方式
template.setKeySerializer(stringRedisSerializer);
//hash的key也采用String的序列化方式
template.setHashKeySerializer(stringRedisSerializer);
//value序列化方式采用jackson
template.setValueSerializer(objectJackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
Redis工具类
package com.lskj.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Component
public final class RedisUtil {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
* @param key 键
* @param time 时间(秒)
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据key 获取过期时间
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 判断key是否存在
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
* @param key 可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
* @param key 键
* @return 值
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 普通缓存放入
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 递增
* @param key 键
* @param delta 要增加几(大于0)
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}
/**
* 递减
* @param key 键
* @param delta 要减少几(小于0)
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}
// ================================Map=================================
/**
* HashGet
* @param key 键 不能为null
* @param item 项 不能为null
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
* @param key 键
* @param map 对应多个键值
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
* @param key 键
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
* @param key 键
* @param value 值
* @param time 时间(秒)
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}
十、Redis.conf
启动redis时,是通过redis.conf这个配置文件来启动的。
如果不使用配置文件,redis会按照默认的参数运行。
如果使用配置文件,在启动redis服务时必须指定所使用的配置文件。
单位
# Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth:
#
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.
# 配置文件unit单位对大小写不敏感。
包含
################################## INCLUDES ###################################
# Include one or more other config files here. This is useful if you
# have a standard template that goes to all Redis servers but also need
# to customize a few per-server settings. Include files can include
# other files, so use this wisely.
#
# Note that option "include" won't be rewritten by command "CONFIG REWRITE"
# from admin or Redis Sentinel. Since Redis always uses the last processed
# line as value of a configuration directive, you'd better put includes
# at the beginning of this file to avoid overwriting config change at runtime.
#
# If instead you are interested in using includes to override configuration
# options, it is better to use include as the last line.
#
# include /path/to/local.conf
# include /path/to/other.conf
网络
bind 127.0.0.1 #绑定ip
protected-mode yes #保护模式
port 6379 #端口设置,默认使用6379
tcp-keepalive:TCP连接保活策略,可以通过tcp-keepalive配置项来进行设置,单位为秒,加入设置为60秒,则server端会没60秒向连接空闲的客户端发起一次ACK请求,以检查客户端是否已经挂掉,对于无响应的客户端则会关闭其连接。如果设置为0,则不会进行保活检测。
如果配置了port和bind,则客户端连接redis服务时,必须指定端口和ip。
redis-cli -h ip -p port
redis-cli -h ip -p port shutdown
通用GENERAL
daemonize yes #以守护进程的方式运行,默认是no,需要手动设置为yes
pidfile /var/run/redis_6379.pid #若以后台方式运行,就需要指定一个pid文件
# 日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably)
# warning (only very important / critical messages are logged)
loglevel notice
logfile "" #日志文件位置名
database 16 #数据库的数量,默认是16个数据库
always-show-logo yes #是否总是显示logo
loglevel:配置日志级别,开发阶段配置debug,上线阶段配置为notice或warning。
logfile:指定日志文件名,如果不指定,redis只进行标准输出。需要保证日志文件所在的目录必须存在,文件可以不存在。redis在运行过程中,会输出一些日志信息。默认情况下,这些日志信息会输出到控制台。可以使用logfile配置日志文件,使redis把日志信息输出到指定文件中。
databases:配置redis服务默认创建的数据库实例个数,默认值是16。
快照
持久化,在规定时间内,执行了多少次操作,则会持久化到文件.rdb .aof。
redis是内存数据库,若没有持久化,那么数据断电即失。
save 900 1 #若在900秒内,如果至少有1个key进行修改,便进行持久化操作
save 300 10 #若在300秒内,如果至少有10个key进行修改,便进行持久化操作
save 60 10000 #若在60秒内,如果至少有10000个key进行修改,便进行持久化操作
stop-writes-on-bgsave-error yes #持久化若出错,是否还需要继续工作
rdbcompression yes #是否压缩 rdb 文件,需要消耗一些CPU资源
rdbchecksum yes #保存rdb文件时,进行错误的检查校验
dir ./ #rdb文件保存目录
SECURITY安全
requirepass:配置redis的访问密码。默认不配置密码,即访问不需要密码验证。此配置项需要在protected-mode=yes时起作用。
redis默认没有密码。
127.0.0.1:6379> config get requirepass #获取redis的密码
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "redis" #设置redis密码
OK
#重启redis后,测试ping
[root@iZ2zehx0enix3ec7e1pfuqZ bin]# redis-server config/redis.conf
[root@iZ2zehx0enix3ec7e1pfuqZ bin]# redis-cli
127.0.0.1:6379> ping #执行命令没有权限
(error) NOAUTH Authentication required.
127.0.0.1:6379> auth redis 使用密码授权(登录)
OK
127.0.0.1:6379> ping
PONG
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) "redis"
127.0.0.1:6379>
CLIENTS 限制
# maxclients 10000 #设置能连接上redis的最大客户端数量
# maxmemory <bytes> #redis配置最大的内存数量
# maxmemory-policy noeviction #内存达到上限之后的处理策略
maxmemory-policy 六种方式
volatile-lru:只对设置了过期时间的key进行LRU(默认值)
allkeys-lru : 删除lru算法的key
volatile-random:随机删除即将过期key
allkeys-random:随机删除
volatile-ttl : 删除即将过期的
noeviction : 永不过期,返回错误
APPEND ONLY模式 aof配置
appendonly no #默认是不开启aof模式,默认使用的rdb方式持久化,多数情况下,rdb完全够用。
appendfilename "appendonly.aof" #持久化的文件名字 .rdb
# appendfsync always #每次修改都会sync,消耗性能
appendfsync everysec #每秒执行一次sync,可能会丢失这1秒的数据
# appendfsync no #不执行sync,这时操作系统自已同步数据,速度最快
十一、Redis持久化
Redis是内存数据库,如果不将内存中的数据库状态保存到磁盘,一旦服务器进程退出,服务器中的数据库状态也会消失。故Redis提供了持久化功能。
RDB(Redis DataBase)
在指定的时间间隔内将内存中的数据集快照写入磁盘(Snapshot快照),它恢复时是将快照文件直接读到内存里。
在指定时间间隔内,redis服务执行指定次数的写操作,会自动触发一次持久化操作。
Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换上次持久化好的文件。整个过程中,主进程是不进行任何IO操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,RDB方式要比AOF方式更加高效。RDB的缺点是最后一次持久化后的数据可能会丢失。
在主从复制中,rdb用来备用的,置放于从机上。
rdb保存的文件是dump.rdb(有时在生产环境中,会将这个文件进行备份)
# The filename where to dump the DB
dbfilename dump.rdb
dbfilename:redis持久化数据生成的文件名,默认是dump.rdb,可自行配置。
dir:redis持久化数据生成文件保存的目录,默认是./
即redis的启动目录,可自行配置。
save <seconds> <changes>
:配置持久化策略。
测试:只要60s内修改5次key,就会触发rdb操作。
# save ""
#save 900 1
#save 300 10
#save 60 10000
sava 60 5
触发机制
- save的规则满足的情况下,会自动触发rdb规则。
- 执行flushall命令,也会触发rdb规则。
- 退出redis,也会产生rdb文件。
备份就自动生成一个dump.rdb
文件。
怎样恢复rdb文件?
1、只需要将rdb文件存放在redis启动目录,redis启动时,会自动检查dump.rdb文件,并恢复其中的数据。
2、查看需要存放的位置。
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin" #若这个目录下存在dump.rdb文件,启动时就会自动恢复其中的数据
127.0.0.1:6379>
优点:
- 适合大规模的数据恢复。
- 对数据的完整性要求不高。
缺点:
- 需要一定的时间间隔进行操作,若redis意外宕机,最后一次修改的数据就没有了。
- fork进程时,会占用一定的内存空间。
AOF(Append Only File)
将所有写操作的命令进行记录,恢复时,把这个文件全部执行一遍。
以日志的形式来记录每个写操作,将Redis执行过的所有指令记录下来(读操作不记录),只允许追加文件,不可以改写文件,redis启动时,会读取该文件,并重新构建数据。换句话来说,redis重启的话,就根据日志文件的内容将写指令从头到尾执行一次,以完成数据的恢复工作。
aof保存的是appendonly.aof文件
appendonly no #默认不开启,需手动配置
只需将appendonly的no改为yes,就开启了aof。重启redis,便生效。
appendonly.aof
文件记录了所有写操作。若这个aof文件有错误,此时redis是无法启动的,需要修复这个aof文件。
redis提供了redis-check-aof --fix
工具。
[root@iZ2zehx0enix3ec7e1pfuqZ bin]#redis-check-aof --fix appendonly.aof
若文件正常,重启redis即可恢复。
重写规则
no-appendfsync-on-rewrite no #重写时是否可以运用appendsync,默认no,可以保证数据的安全性
auto-aof-rewrite-percentage 100 #设置重写的基准百分比
auto-aof-rewrite-min-size 64mb #设置重写的基准值
aof默认是文件的无限制追加,文件会越来越大。
若aof文件大于64M,fork一个新的进程将文件进行重写。
优缺点:
appendonly no #默认是不开启aof模式,默认使用的rdb方式持久化,多数情况下,rdb完全够用。
appendfilename "appendonly.aof" #持久化的文件名字 .rdb
# appendfsync always #同步持久化,每次发生数据变化会立刻写入到磁盘中,消耗性能但数据完整性比较好(慢,安全)
appendfsync everysec #默认值,每秒执行一次sync,可能会丢失这1秒的数据
# appendfsync no #不执行sync,不及时同步,这时操作系统自已同步数据,速度最快
优点:
- 每次修改都同步,文件的完整性更好。
- 每秒同步一次,可能会丢失一秒的数据。
- 从不同步,效率最高。
缺点:
- 相对于数据来说,aof远大于rdb,修复速度也比rdb慢。
- aof运行效率比rdb慢,故redis默认的配置是rdb持久化。
总结
1、RDB持久化方式能够在指定的时间间隔内对数据进行快照保存。
2、AOF持久化方式记录每次对服务器的写操作,当服务器重启时,会重新执行这些命令来恢复原始的数据,AOF命令以redis协议追加保存每次写的操作到文件末尾,redis还能对aof文件进行后台重写,使得aof文件的体积不至于过大。
3、只做缓存,如果只希望数据在服务器运行时存在,可不使用任何持久化。
4、同时开启两种持久化方式:
- 在任何情况下,当redis重启时,会优先载入aof文件来恢复原始数据,因为通常情况下,aof文件保存的数据要比rdb文件保存的数据集更完整。
- RDB的数据不实时,同时使用两者时,服务器重启也只会找aof文件,那是否只使用aof呢?rdb更适合用于备份数据库(AOF在不断变化,不便于备份),快速重启,而且不会有aof可能潜在的bug,留着作为一个万一的手段。
5、性能建议:
- 因为RDB文件只用作后备用途,建议只在Slave上持久化RDB文件,而且只要15分钟备份一次即可,只保留
save 900 1
这条规则。 - 如果Enable AOF,好处是在最恶劣情况下也只会丢失不超过两秒的数据,启动脚本较简单,只load自己的aof文件就可以了,代价一是带来了持续的IO,二是AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小,可以设到5G以上,默认超过原大小100%大小重写可以改到适当的数值。
- 如果不Enable AOF,仅靠Master-Slave Repllcation实现高可用性也可以,能省调一大笔IO,也减少了rewrite时带来的系统波动和。代价是如果Master/Slave同时倒掉,会丢失十几分钟的数据,启动脚本也要比较两个Master/.Slave中的RDB文件,载入较新的那个,微博就是这种架构。
十二、Redis发布订阅
Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
Redis客户端可以订阅任意数量的频道。消息的发布者往频道上发布消息,所有订阅此频道的客户端都能接收到消息。
订阅发布命令
序号 | 命令及描述 |
---|---|
1 | PSUBSCRIBE pattern [pattern …] 订阅一个或多个符合给定模式的频道。 |
2 | [PUBSUB subcommand [argument [argument …]]](https://www.runoob.com/redis/pub-sub-pubsub.html) 查看订阅与发布系统状态。 |
3 | PUBLISH channel message 将信息发送到指定的频道。 |
4 | [PUNSUBSCRIBE [pattern [pattern …]]](https://www.runoob.com/redis/pub-sub-punsubscribe.html) 退订所有给定模式的频道。 |
5 | SUBSCRIBE channel [channel …] 订阅给定的一个或多个频道的信息。 |
6 | [UNSUBSCRIBE [channel [channel …]]](https://www.runoob.com/redis/pub-sub-unsubscribe.html) 指退订给定的频道。 |
测试
订阅端:
127.0.0.1:6379> subscribe test #订阅一个频道 test
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "test"
3) (integer) 1
#等待读取推送的消息
1) "message" #消息
2) "test" #那个频道的消息
3) "hello" #消息的具体内容
发送端:
127.0.0.1:6379> publish test "hello" #发布者发布消息到频道
(integer) 1
127.0.0.1:6379>
原理
Redis是使用C实现的。Redis通过publish、subscribe和psubscribe等命令实现发布和订阅功能。
通过subscribe命令订阅某频道后,redis-server里维护了一个字典,字典的键就是一个个channel(频道),而字典的值则是一个链表,链表中保存了所有订阅这个channel的客户端。subscribe命令的关键,就是将客户端添加到给定channel的订阅链表中。
通过publish命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的channel字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
pub/sub从字面上理解就是发布(publish)与订阅(subscribe),在Redis中,可以设定对某一个key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。这一功能最明显的用法就是用作实时消息系统,比如普通的及时聊天,群聊等功能。
使用场景
- 实时消息系统
- 实时聊天。(频道当做聊天室,将信息回显给所有人)
十三、Redis主从复制
主从复制,是指将一台Redis服务器的数据,复制到其他的Redis服务器(主机数据更新后根据配置和策略,自动同步到从机的master/slave机制)。前者称为主节点(master/leader),后者称为从节点(slave/follower)。
主少从多,主写从读,读写分离,主写同步复制到从。
数据的复制是单向的,只能由主节点到从节点。Master以写为主,Slave以读为主。
默认情况下,每台Redis服务器都是主节点,且一个主节点可以有多个从节点(或没有从节点),但一个从节点只能有一个主节点。
主从复制的作用主要包括:
- 数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。
- 故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复,实际上是一种服务的冗余。
- 负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写Redis数据时,应用连接主节点,读Redis数据时应用连接从节点),分担服务器负载。尤其是在写少读多的情况下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。
- 高可用基石:除了上述作用外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis高可用的基础。
一般来说,要将Redis运用于工程项目中,只使用一台Redis是不可行的的(可能会宕机),原因如下:
- 从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大。
- 从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G,也不能将所有的内存用作Redis存储内存,一般来说,单台Redis最大使用内存不应该超过20G。
电商网站上的商品,一般都是一次上传,无数次浏览(多读多写)。
环境配置
只配置从库,不需要配置主库。
127.0.0.1:6379> info replication #查看当前库的信息
# Replication
role:master #角色 master
connected_slaves:0 #无从机
master_replid:0dca53cfb9da266d2e3fe67cae0ff67afb2ab72e
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:0
second_repl_offset:-1
repl_backlog_active:0
repl_backlog_size:1048576
repl_backlog_first_byte_offset:0
repl_backlog_histlen:0
127.0.0.1:6379>
复制三个配置文件,然后修改对应的信息。
port 6380
pidfile /var/run/redis_6380.pid
logfile "6380.log"
dbfilename dump6380.rdb
修改完成后,使用3个配置文件分别启动3个redis服务器,可通过进程信息查看。
[root@iZ2zehx0enix3ec7e1pfuqZ bin]# ps -ef|grep redis
root 25557 1 0 10:32 ? 00:00:47 redis-server 127.0.0.1:6379
root 26302 1 0 21:27 ? 00:00:00 redis-server 127.0.0.1:6380
root 26308 1 0 21:27 ? 00:00:00 redis-server 127.0.0.1:6381
root 26316 26242 0 21:29 pts/4 00:00:00 grep redis
[root@iZ2zehx0enix3ec7e1pfuqZ bin]#
一主二从
默认情况下,每台Redis服务器都是主节点。一般情况下只用配置从机即可。
一主(79)二从(80、81)
#从机80
127.0.0.1:6380> slaveof 127.0.0.1 6379 #slaveof host 6379 找谁作为主机
OK
127.0.0.1:6380> info replication
# Replication
role:slave #当前角色是从机
master_host:127.0.0.1 #主机信息
master_port:6379
master_link_status:up
master_last_io_seconds_ago:5
master_sync_in_progress:0
slave_repl_offset:14
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:988a7eba9f0c5df73c20486eb2ce0efba8275880
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:14
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:14
127.0.0.1:6380>
#从机81
127.0.0.1:6381> slaveof 127.0.0.1 6379
OK
127.0.0.1:6381> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6379
master_link_status:up
master_last_io_seconds_ago:2
master_sync_in_progress:0
slave_repl_offset:294
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:988a7eba9f0c5df73c20486eb2ce0efba8275880
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:294
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:281
repl_backlog_histlen:14
127.0.0.1:6381>
#在主机中查看
# Replication
role:master
#多了从机的配置
connected_slaves:2
slave0:ip=127.0.0.1,port=6380,state=online,offset=308,lag=0
slave1:ip=127.0.0.1,port=6381,state=online,offset=308,lag=1
master_replid:988a7eba9f0c5df73c20486eb2ce0efba8275880
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:308
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:308
127.0.0.1:6379>
真实的主从配置应在配置文件中配置,才能实现永久配置,现在使用的命令配置,只是暂时的。
################################# REPLICATION #################################
# Master-Replica replication. Use replicaof to make a Redis instance a copy of
# another Redis server. A few things to understand ASAP about Redis replication.
#
# +------------------+ +---------------+
# | Master | ---> | Replica |
# | (receive writes) | | (exact copy) |
# +------------------+ +---------------+
#
# 1) Redis replication is asynchronous, but you can configure a master to
# stop accepting writes if it appears to be not connected with at least
# a given number of replicas.
# 2) Redis replicas are able to perform a partial resynchronization with the
# master if the replication link is lost for a relatively small amount of
# time. You may want to configure the replication backlog size (see the next
# sections of this file) with a sensible value depending on your needs.
# 3) Replication is automatic and does not need user intervention. After a
# network partition replicas automatically try to reconnect to masters
# and resynchronize with them.
#
# replicaof <masterip> <masterport>
# If the master is password protected (using the "requirepass" configuration
# directive below) it is possible to tell the replica to authenticate before
# starting the replication synchronization process, otherwise the master will
# refuse the replica request.
#
# masterauth <master-password>
主机可以写,从机不能写只能读。主机中的所有信息和数据,都会自动被从机保存。
主机写:
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> set k1 v1
OK
127.0.0.1:6379> get k1
"v1"
127.0.0.1:6379>
从机只能读:
127.0.0.1:6380> keys *
1) "k1"
127.0.0.1:6380> get k1
"v1"
127.0.0.1:6380> set k2 v2
(error) READONLY You can't write against a read only replica.
127.0.0.1:6380>
测试
主机断开连接,从机依旧连接到主机,但是没有写操作,若主机恢复连接,从机依旧可以获取到主机写的信息。
如果是使用命令配置的主从,这时如果重启,从机就变回主机了。只要变为从机(连接上主机),立马就可以从主机中获取值。
从机宕机,从机少一个从机,其它从机不受影响。从机恢复,需要重新设置其主从关系。
复制原理
Slave启动成功连接到master后会发送一个sync同步命令。
Master接到命令,启动后台的存盘进程,同时收集所有接收的用于修改数据库集命令,在后台进程执行完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。一旦主从关系确定,会自动把主库上已有的数据同步复制到从库。
增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步。主库写数据会自动同步到从库。
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行。数据在从机中一定可以被看到。
上一个主机连接下一个从机,这样也可以完成主从复制。
若现在主机断开了,这时,能否选择一个主机出来?
如果主机断开了,可以使用命令slaveof no one
让本机(从机)成为主机(断开原来的主从关系),其它的结点就可以手动连接到最新的这个主节点(手动)。
127.0.0.1:6381> slave no one
(error) ERR unknown command `slave`, with args beginning with: `no`, `one`,
127.0.0.1:6381> clear
127.0.0.1:6381> slaveof no one
OK
127.0.0.1:6381> info replication
# Replication
role:master
connected_slaves:0
master_replid:5e38c48495d98c1c1b51234103752e1915d10c85
master_replid2:988a7eba9f0c5df73c20486eb2ce0efba8275880
master_repl_offset:3024
second_repl_offset:3025
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:281
repl_backlog_histlen:2744
127.0.0.1:6381>
一台主机可以配置多台从机,一台从机又可以配置多台从机,从而形成一个庞大的集群结构。
这样减轻了一台主机的压力,但是增加了服务间的延迟时间。
哨兵模式
自动选择主机的模式。
主从切换技术的方法是,当主服务器宕机后,需要手动把一台从服务器切换为主服务器,这就需要人工干预,费事费力,还会造成一段时间内不可用。这不是一种推荐的方式,更多时候,优先考虑哨兵模式。Redis从2.8开始正式提供了Sentinel(哨兵)架构来解决这个问题。
谋权篡位的自动版,能够后台监控主机是否故障,如果故障了就根据投票数自动将从库转换为主库。
哨兵模式是一种特殊的模式,首先Redis提供了哨兵的命令,哨兵是一个独立的进程,作为进程,它会独立运行。其原理是哨兵通过发送命令,等待Redis服务器响应,从而监控运行的多个Redis实例。
这里的哨兵有两个作用:
- 通过发送命令,让Redis服务器返回监控其运行状态,包括主服务器和从服务器。
- 当哨兵监测到master宕机,会自动将slave切换成master,然后通过发布订阅模式通知其它从服务器,修改配置文件,让它们切换主机。
然而一个哨兵进程对Redis服务器进行监控,可能会出现问题,为此,可以使用多个哨兵进行监控。各个哨兵之间还会进行监控,这样就形成了多哨兵模式。
假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观认为主服务器不可用,这个现象称为主观下线。当后面的哨兵也检测主服务器不可用,并且数量达到一定值时,哨兵之间就会进行一次投票,投票的结果又一个哨兵发起,进行failover【故障转移】操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。
测试
1、配置哨兵配置文件sentinel.conf(redis安装目录下,可创建其它命名的配置文件)。
#sentinel monitor 被监控的名称 host port 数字
sentinel monitor myredis 127.0.0.1 6379 1
后面的数字,代表主机挂了,slave投票看谁接替称为主机,票数最多者,就会成为主机。
2、启动哨兵。
[root@iZ2zehx0enix3ec7e1pfuqZ config]# vim sentinel.conf
[root@iZ2zehx0enix3ec7e1pfuqZ config]# cd ..
[root@iZ2zehx0enix3ec7e1pfuqZ bin]# redis-sentinel config/sentinel.conf
27294:X 20 Nov 2020 10:04:01.922 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
27294:X 20 Nov 2020 10:04:01.922 # Redis version=6.0.9, bits=64, commit=00000000, modified=0, pid=27294, just started
27294:X 20 Nov 2020 10:04:01.922 # Configuration loaded
_._
_.-``__ ''-._
_.-`` `. `_. ''-._ Redis 6.0.9 (00000000/0) 64 bit
.-`` .-```. ```\/ _.,_ ''-._
( ' , .-` | `, ) Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'| Port: 26379
| `-._ `._ / _.-' | PID: 27294
`-._ `-._ `-./ _.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'
|`-._`-._ `-.__.-' _.-'_.-'|
| `-._`-._ _.-'_.-' |
`-._ `-._`-.__.-'_.-' _.-'
`-._ `-.__.-' _.-'
`-._ _.-'
`-.__.-'
27294:X 20 Nov 2020 10:04:01.923 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128.
27294:X 20 Nov 2020 10:04:01.927 # Sentinel ID is 70676cdcefbce6df0e7dfc918d3a43f73f1d8544
27294:X 20 Nov 2020 10:04:01.927 # +monitor master myredis 127.0.0.1 6379 quorum 1
27294:X 20 Nov 2020 10:04:01.928 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:04:01.931 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
如果master节点断开,这时就会从从机中随机选择一个服务器。(这里有一个投票算法)
127.0.0.1:6381> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6380,state=online,offset=80383,lag=1
master_replid:1cefe2fe33f522f486f8f0acc0d2c2a21243ec72
master_replid2:988a7eba9f0c5df73c20486eb2ce0efba8275880
master_repl_offset:80383
second_repl_offset:77679
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:3459
repl_backlog_histlen:76925
127.0.0.1:6381>
哨兵日志:
27294:X 20 Nov 2020 10:04:01.927 # Sentinel ID is 70676cdcefbce6df0e7dfc918d3a43f73f1d8544
27294:X 20 Nov 2020 10:04:01.927 # +monitor master myredis 127.0.0.1 6379 quorum 1
27294:X 20 Nov 2020 10:04:01.928 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:04:01.931 * +slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.573 # +sdown master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.573 # +odown master myredis 127.0.0.1 6379 #quorum 1/1
27294:X 20 Nov 2020 10:08:30.573 # +new-epoch 1
27294:X 20 Nov 2020 10:08:30.573 # +try-failover master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.577 # +vote-for-leader 70676cdcefbce6df0e7dfc918d3a43f73f1d8544 1
27294:X 20 Nov 2020 10:08:30.577 # +elected-leader master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.577 # +failover-state-select-slave master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.654 # +selected-slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.654 * +failover-state-send-slaveof-noone slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:30.725 * +failover-state-wait-promotion slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:31.096 # +promoted-slave slave 127.0.0.1:6381 127.0.0.1 6381 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:31.096 # +failover-state-reconf-slaves master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:31.182 * +slave-reconf-sent slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:32.105 * +slave-reconf-inprog slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:32.105 * +slave-reconf-done slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:32.160 # +failover-end master myredis 127.0.0.1 6379
27294:X 20 Nov 2020 10:08:32.160 # +switch-master myredis 127.0.0.1 6379 127.0.0.1 6381
27294:X 20 Nov 2020 10:08:32.160 * +slave slave 127.0.0.1:6380 127.0.0.1 6380 @ myredis 127.0.0.1 6381
27294:X 20 Nov 2020 10:08:32.160 * +slave slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381
27294:X 20 Nov 2020 10:09:02.244 # +sdown slave 127.0.0.1:6379 127.0.0.1 6379 @ myredis 127.0.0.1 6381
如果主机重新连接回来,只能归并到新的主机下作为从机,这就是哨兵模式的规则。
哨兵模式
优点:
- 哨兵集群,基于主从复制模式,所有的主从配置优点,它全有。
- 主从可以切换,故障可以转移,系统的可用性更好。
- 哨兵模式就是主从模式的升级,手动到自动,更加健壮。
缺点:
- Redis不易在线扩容,集群容量一旦达到上限,在线扩容十分麻烦。
- 实现哨兵模式的配置比较繁琐,里面有很多配置项。
哨兵模式的全部配置
# Example sentinel.conf
# 哨兵sentinel实例运行的端口 默认26379
port 26379
# 哨兵sentinel的工作目录
dir /tmp
# 哨兵sentinel监控的redis主节点的 ip port
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum>
sentinel monitor mymaster 127.0.0.1 6379 1
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供密码
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码
# sentinel auth-pass <master-name> <password>
sentinel auth-pass mymaster MySUPER--secret-0123passw0rd
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒
# sentinel down-after-milliseconds <master-name> <milliseconds>
sentinel down-after-milliseconds mymaster 30000
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步,
这个数字越小,完成failover所需的时间就越长,
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面:
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那里同步数据时。
#3.当想要取消一个正在进行的failover所需要的时间。
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了
# 默认三分钟
# sentinel failover-timeout <master-name> <milliseconds>
sentinel failover-timeout mymaster 180000
# SCRIPTS EXECUTION
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知相关人员。
#对于脚本的运行结果有以下规则:
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),将会去调用这个脚本,
#这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传给脚本两个参数,
#一个是事件的类型,
#一个是事件的描述。
#如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执行的,否则sentinel无法正常启动成功。
#通知脚本
# sentinel notification-script <master-name> <script-path>
sentinel notification-script mymaster /var/redis/notify.sh
# 客户端重新配置主节点参数脚本
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已经发生改变的信息。
# 以下参数将会在调用脚本时传给脚本:
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port>
# 目前<state>总是“failover”,
# <role>是“leader”或者“observer”中的一个。
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通信的
# 这个脚本应该是通用的,能被多次调用,不是针对性的。
# sentinel client-reconfig-script <master-name> <script-path>
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh
如果有哨兵集群,还需配置每个哨兵端口。
十四、Redis缓存穿透和雪崩
Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。
缓存穿透
在默认情况下,用户请求数据时,会先在缓存(Redis)中查找,若没找到即缓存未命中,再到数据库中进行查找。数量少可能问题不大,可是一旦大量的请求数据(例如秒杀场景)缓存都没有命中的话,就会全部转移到数据库上,造成数据库极大的压力,就有可能导致数据库崩溃。
网络安全中有人恶意使用这种手段进行攻击被称为洪水攻击。
缓存穿透是因数据查找不到导致的。
解决方案
布隆过滤器(BloomFilter)
布隆过滤器是一种数据结构,对所有可能查询的参数一hash形式存储,在控制层先进行校验,不符合则丢弃,从而避免了对底层存储系统的查询压力。
将数据库中所有的查询条件,放入布隆过滤器中,当一个查询请求过来时,先经过布隆过滤器进行查,如果判断请求查询值存在,则继续查;如果判断请求查询不存在,直接丢弃。
缓存空对象
将数据库中的空值也缓存到缓存层中,这样查询该空值就不会再访问DB,而是直接在缓存层访问就行。
但是这样有个弊端就是缓存太多空值占用了更多的空间,可以通过给缓存层空值设立一个较短的过期时间来解决,例如60s。
使用该方法存在以下两个问题:
- 如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很多的空值的键。
- 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对于需要保持一致性的业务会有影响。
缓存击穿
缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。
简单来说,缓存击穿就是指缓存中没有但数据库中有的数据(一般是缓存时间到期),这时由于并发用户特别多,同时读缓存没读到数据,又同时去数据库去取数据,引起数据库压力瞬间增大,造成过大压力。
缓存击穿是由于一个数据(热点数据)找的人太多导致的。
解决方案
设置热点数据永不过期
从缓存层面来看,没有设置过期时间,所以就不会出现热点key过期后产生的问题。
这样就不会出现热点数据过期的情况,但是当Redis内存空间满的时候也会清理部分数据,而且此种方案会占用空间,一旦热点数据多了起来,就会占用部分空间。
加互斥锁
在访问key之前,采用SETNX(set if not exists)来设置另一个短期key来锁住当前key的访问,访问结束再删除该短期key。保证同时刻只有一个线程访问。这样对锁的要求就十分高。
分布式锁:使用分布式锁,保证对于每个key,同时只有一个线程去查询后端服务,其它线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。
缓存雪崩
缓存雪崩是指缓存中数据大批量到过期时间,而查询数据量巨大,引起数据库压力过大甚至宕机。
换句话说,大量的key设置了相同的过期时间 或者 Redis 服务器宕机,导致在缓存在同一时刻全部失效,造成瞬时数据库请求量大、压力骤增,引起雪崩。
与缓存击穿不同的是,缓存击穿指并发查同一条数据,缓存雪崩是不同数据都过期了,很多数据都查不到,从而查数据库。
其中集中过期,倒不是最致命的,比较致命的缓存雪崩是缓存服务器某个结点宕机或断网。因为自然形成的缓存雪崩,一定是在某个时间段集中创建缓存,此时,数据库也是可以顶住压力的,无非就是对数据库产生周期性的压力而已。而缓存服务器结点的宕机,对数据库造成的压力是不可预知的,很有可能瞬间就把数据库压垮。
解决方案
redis高可用
这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis,这样一台挂掉之后其他的还可以继续工作,其实就是搭建的集群。
例如,在双十一那天,多增加几台redis(会或者停掉一些服务),保证主要的服务可用。
限流降级
这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个key只允许一个线程查询数据和写缓存,其他线程等待。
数据预热
数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就会加载到缓存中。在即将发生大并发访问前手动触发加载缓存不同的key,设置不同的过期时间,让缓存失效的时间点尽量均匀。