接上文 redis基本类型
HyperLogLog
简介
- HyperLogLog是用于「统计基数」的数据集合类型,基数统计就是指统计一个集合中不重复的元素个数, 但是准确率不是百分百,即他可以提供不精确的去重计数。
- HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的内存空间总是固定的、并且是很小的。
- 每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数,和 Set 和 Hash 类型相比,HyperLogLog 非常节省空间
- 因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。
使用
// 添加元素到HyperLogLog中
127.0.0.1:6379> pfadd me qian male 23
(integer) 1
// 返回HyperLogLog的元素数量
127.0.0.1:6379> pfcount me
(integer) 3
127.0.0.1:6379> pfadd you zhou female 24
(integer) 1
// 合并两个HyperLogLog
127.0.0.1:6379> pfmerge me you
OK
127.0.0.1:6379> pfcount me
(integer) 6
应用场景
1、百万级网页 UV 计数
在统计 UV 时,你可以用 PFADD 命令(用于向 HyperLogLog 中添加新元素)把访问页面的每个用户都添加到 HyperLogLog 中。接下来,就可以用 PFCOUNT 命令直接获得 page1 的 UV 值了,这个命令的作用就是返回 HyperLogLog 的统计结果。(如果需要精确统计还是使用set/hash)
PFADD page1:uv user1 user2 user3 user4 user5
PFCOUNT page1:uv
GEO
简介
- 主要用于存储地理位置信息,并对存储的信息进行操作
- 常用于LBS应用(位置信息服务:Location-Based Service)
使用
// 将位置信息添加到指定的key中
redis> GEOADD Sicily 13.361389 38.115556 "Palermo" 15.087269 37.502669 "Catania"
(integer) 2
// 从key中返回所有指定名称的位置
redis> GEOPOS Sicily Palermo Catania NonExisting
1) 1) "13.36138933897018433"
2) "38.11555639549629859"
2) 1) "15.08726745843887329"
2) "37.50266842333162032"
3) (nil)
// 返回两个给定位置之间的距离
redis> GEODIST Sicily Palermo Catania
"166274.1516"
// GEO使用geohash存储位置。获取若干个位置元素的geohash值
redis> GEOHASH Sicily Palermo Catania
1) "sqc8b49rny0"
2) "sqdtr74hyu0"
// 以给定的经纬度为中心,返回key的元素中与中心的距离不超过给定距离的所有位置元素。
// WITHDIST:在返回位置元素的同时, 将位置元素与中心之间的距离也一并返回。
// WITHCOORD: 将位置元素的经度和纬度也一并返回。
// WITHHASH: 以 52 位有符号整数的形式, 返回位置元素经过原始 geohash 编码的有序集合分值。 这个选项主要用于底层应用或者调试, 实际中的作用并不大。
// COUNT 限定返回的记录数。
// ASC: 查找结果根据距离从近到远排序。
// DESC: 查找结果根据从远到近排序。
redis> GEORADIUS Sicily 15 37 200 km WITHDIST
1) 1) "Palermo"
2) "190.4424"
2) 1) "Catania"
2) "56.4413"
应用场景
1、打车/导航
把 ID 号为 33 的车辆的当前经纬度位置存入 GEO 集合中,当用户想要寻找自己附近的网约车时,LBS 应用就可以使用 GEORADIUS 命令,查找以这个经纬度为中心的 5 公里内的车辆信息,并返回给 LBS 应用。
GEOADD cars:locations 116.034579 39.030452 33
GEORADIUS cars:locations 116.054579 39.030452 5 km ASC COUNT 10
Stream
简介
- 主要用于消息队列(MQ,Message Queue)
- Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它的缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。并且对于离线重连的客户端不能读取历史消息。
- List 实现消息队列的方式不能重复消费,一个消息消费完就会被删除,而且生产者需要自行实现全局唯一 ID
- stream支持消息的持久化、支持自动生成全局唯一 ID、支持 ack 确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠
- 每个stream都有一个唯一的key
- Consumer Group:消费组,一个消费组有多个消费者(Consumer)。
- last_delivered_id:游标,每个消费组会有个游标last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。
- pending_ids:消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。pending_ids记录了当前已经被客户端读取的消息,但是还没有ack (Acknowledge character:确认字符)。
使用
// 创建一个消息队列,并向其中添加消息
// * 的意思是让redis自动生成一个唯一的id
127.0.0.1:6379> XADD mystream * field1 value1 field2 value2 field3 value3
"1683964950002-0"
127.0.0.1:6379> xadd mystream * field1 value1 field2 value2 field3 value3
"1683964991845-0"
// 查看消息队列中消息数量
127.0.0.1:6379> xlen mystream
(integer) 2
// 获取消息列表, - 表示最小值, + 表示最大值
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1683964950002-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
2) 1) "1683964991845-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
// 对流进行修剪,限制最大长度
127.0.0.1:6379> XTRIM mystream MAXLEN 1
(integer) 2
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1683964995353-0"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
// 删除消息
127.0.0.1:6379> xdel mystream 1683964995353-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
(empty array)
// xread读取消息
// 读取从1683968637731-0这个id向后的两条,并且是阻塞读(没有数据时等待100ms)
127.0.0.1:6379> xread count 2 block 100 streams mystream mystream2 1683968637731-0 0-0
1) 1) "mystream"
2) 1) 1) "1683968643002-0"
2) 1) "field2"
2) "value2"
2) 1) "1683968647251-0"
2) 1) "field3"
2) "value3"
2) 1) "mystream2"
2) 1) 1) "1683969214535-0"
2) 1) "field1"
2) "value1"
consumer group:
// 创建消费者组,从头部开始消费
127.0.0.1:6379> XGROUP CREATE mystream consumer-group-name 0-0
OK
// 创建消费者组2,从尾部开始消费
127.0.0.1:6379> XGROUP CREATE mystream consumer-group-name2 $
OK
// 读取消费组中的信息,>表示从第一条尚未消费的消息开始读取
// 消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了
// 即同一个消费组里的消费者不能消费同一条消息。
// 不同消费组的消费者可以消费同一条消息(但是有前提条件,创建消息组的时候,不同消费组指定了相同位置开始读取消息)
// 消费组名,消费者名,读取消息的数量,队列名
127.0.0.1:6379> XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1683968637731-0"
2) 1) "field1"
2) "value1"
127.0.0.1:6379> XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1683968643002-0"
2) 1) "field2"
2) "value2"
// 查看已读取、但尚未确认处理完成的消息。
// 会显示读取了的消息的最小和最大
127.0.0.1:6379> xpending mystream consumer-group-name
1) (integer) 3
2) "1683968637731-0"
3) "1683968647251-0"
4) 1) 1) "consumer-name"
2) "3"
// 使用ack确认读取
127.0.0.1:6379> xack mystream consumer-group-name 1683968637731-0
(integer) 1
127.0.0.1:6379> xpending mystream consumer-group-name
1) (integer) 2
2) "1683968643002-0"
3) "1683968647251-0"
4) 1) 1) "consumer-name"
2) "2"
127.0.0.1:6379> xack mystream consumer-group-name 1683968647251-0
(integer) 1
127.0.0.1:6379> xpending mystream consumer-group-name
1) (integer) 1
2) "1683968643002-0"
3) "1683968643002-0"
4) 1) 1) "consumer-name"
2) "1"
127.0.0.1:6379> xack mystream consumer-group-name 1683968643002-0
(integer) 1
127.0.0.1:6379> xpending mystream consumer-group-name
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
应用场景
1、消息队列
生产者通过xadd向stream中放消息,消费者通过xread读取stream中的消息。
通常让同一个消费者组的每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。
# 让 group2 中的 consumer1 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
# 让 group2 中的 consumer2 从 mymq 消息队列中消费一条消息
> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
> # consumer3读取一条
> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
另外,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”。
消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 XACK 命令确认消息已经被消费完成。
一个合格的消息队列应该做到消息不丢、消息可堆积
消息队列一般包含三部分:生产者、消费者、队列中间件:
- 消息不丢:
- redis生产者产生了消息,然后提交给mq,只要能正常收到(MQ中间件)的ack确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的
- redis消费者会保存已读取未提交的消息。Stream(mq中间件)会自动使用内部队列(也称为PENDING List)留存消费组里每个消费者读取但是未被确认的消息。消费者可以在重启后,用XPENDING 命令查看已读取、但尚未确认处理完成的消息。等到消费者执行完业务逻辑后,再发送消费确认XACK命令,也能保证消息的不丢失。
- 以下两种情况下Redis的队列中间件会导致消息丢失:
1、AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
2、主从复制也是异步的,主从切换时,也存在丢失数据的可能 (opens new window)。
RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
- 消息可堆积:
- redis的数据都存储在内存中,这意味着一旦发生消息积压,会导致redis的内存持续增长,如果超过机器内存上限,就会面临OOM的危险。所以redis的stream提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
- 当指定队列最大长度时,队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息,又会产生消息丢失
- Kafka、RabbitMQ专业的消息队列它们的数据都是存储在磁盘上,当消息积压时,无非就是多占用一些磁盘空间。