目录
- 一、什么是发布和订阅?
- 二、Redis的发布和订阅
- 三、发布和订阅的命令行实现
- 四、发布和订阅命令
- 1、subscribe:订阅一个或者多个频道
- 2、publish:发布消息到指定的频道
- 3、psubscribe:订阅一个或多个符合给定模式的频道
- 4、pubsub:查看订阅与发布系统状态
- 5、punsubscribe:退订所有给定模式的频道
- 6、unsubscribe:指退订给定的频道
- 五、Redis发布和订阅缺点
- 六、Jedis当中实现发布和订阅
一、什么是发布和订阅?
官网介绍:https://redis.com.cn/redis-pub-sub.html
Redis 发布/订阅是一种消息传模式,其中发送者(在Redis术语中称为发布者)发送消息,而接收者(订阅者)接收消息。传递消息的通道称为channel。
Redis的发布和订阅最大的缺点是消息不能持久化!也就是我们通过Redis发送的消息,压根在Redis当中根本没有存。
二、Redis的发布和订阅
在Redis中,客户端可以订阅任意数量的频道。
1、客户端可以订阅频道如下图
2、当给这个频道发布消息后,消息就会发送给订阅的客户端
三、发布和订阅的命令行实现
1、打开一个客户端订阅channel1:
订阅命令:
subscribe channel1 channel2
… ,可以订阅多个频道。当执行命令过后只要不终止会一直处于订阅监听状态。
2、打开另一个客户端,给channel1发布消息hello
发标消息命令:
publish channel 消息
,返回值表示有几个订阅者
3、切换到订阅者窗口,可以看到收到信息了
四、发布和订阅命令
1、subscribe:订阅一个或者多个频道
SUBSCRIBE channel [channel ...]
返回值:接收到的信息(请参见下面的代码说明)。
127.0.0.1:6379> subscribe channel1 channel2 # 订阅了两个频道channel1和channel2
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "channel1" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe" # 返回值的类型:显示订阅成功
2) "channel2" # 订阅的频道名字
3) (integer) 2 # 目前已订阅的频道数量
1) "message" # 返回值的类型:信息
2) "channel1" # 来源(从那个频道发送过来)
3) "hello" # 信息内容
1) "message"
2) "channel2"
3) "gagaga"
2、publish:发布消息到指定的频道
PUBLISH channel message
返回值:接收到信息 message 的订阅者数量。
127.0.0.1:6379> publish channel1 hello # 向有一个订阅者的频道发送信息
(integer) 1
127.0.0.1:6379> publish channel2 gagaga
(integer) 1
127.0.0.1:6379> publish channel3 aaaa # 对没有订阅者的频道发送信息,返回的就是0
(integer) 0
3、psubscribe:订阅一个或多个符合给定模式的频道
PSUBSCRIBE pattern [pattern ...]
每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、it.tweets 等等), news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
# 订阅 news.* 和 tweet.* 两个模式
redis> psubscribe news.* tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 返回值的类型:显示订阅成功
2) "news.*" # 订阅的模式
3) (integer) 1 # 目前已订阅的模式的数量
1) "psubscribe"
2) "tweet.*"
3) (integer) 2
1) "pmessage" # 返回值的类型:信息
2) "news.*" # 信息匹配的模式
3) "news.it" # 信息本身的目标频道
4) "Google buy Motorola" # 信息的内容
1) "pmessage"
2) "tweet.*"
3) "tweet.huangz"
4) "hello"
1) "pmessage"
2) "tweet.*"
3) "tweet.joe"
4) "@huangz morning"
1) "pmessage"
2) "news.*"
3) "news.life"
4) "An apple a day, keep doctors away"
4、pubsub:查看订阅与发布系统状态
(1)列出当前的活跃频道:PUBSUB CHANNELS [pattern]
活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。也就是通过psubscribe订阅的不在内。
pattern 参数是可选的:
- 如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
- 如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
127.0.0.1:6379> PUBSUB CHANNELS
1) "channel2"
2) "channel1"
127.0.0.1:6379> PUBSUB CHANNELS channel3 # 频道没有人订阅返回就是空
(empty list or set)
127.0.0.1:6379> PUBSUB CHANNELS channel1 # 只要有人订阅就返回频道名称
1) "channel1"
127.0.0.1:6379> PUBSUB CHANNELS channel* # 返回channel相关的活跃频道
1) "channel2"
2) "channel1"
(2)返回给定频道的订阅者数量, 通过PSUBSCRIBE 订阅的不统计在内:PUBSUB NUMSUB [channel-1 ... channel-N]
127.0.0.1:6379> pubsub numsub channel1
1) "channel1" # 频道名称
2) (integer) 2 # 订阅数量
127.0.0.1:6379> pubsub numsub aa1
1) "aa1"
2) (integer) 0 # 不存在返回0
(3)返回订阅模式的数量,也就是使用PSUBSCRIBE 订阅的:PUBSUB NUMPAT
5、punsubscribe:退订所有给定模式的频道
punsubscribe其实就是对应的psubscribe。
PUNSUBSCRIBE [pattern [pattern ...]]
如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
6、unsubscribe:指退订给定的频道
UNSUBSCRIBE [channel [channel ...]]
如果没有频道被指定,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
像unsubscribe和punsubscribe两个退订的命令是只退订当前客户端的,我们通过windows的客户端其实是不好测的,因为我们一旦开启订阅后,客户端会进入监听状态,我们根本没法输入别的命令。所以这些命令往往都是框架当中才可以测试。
五、Redis发布和订阅缺点
PubSub 的生产者传递过来一个消息,Redis 会直接找到相应的消费者传递过去。如果一个消费者都没有,那么消息直接丢弃。如果开始有三个消费者,一个消费者突然挂掉了,生产者会继续发送消息,另外两个消费者可以持续收到消息。但是挂掉的消费者重新连上的时候,这断连期间生产者发送的消息,对于这个消费者来说就是彻底丢失了。
如果 Redis 停机重启,PubSub 的消息是不会持久化的,毕竟 Redis 宕机就相当于一个消费者都没有,所有的消息直接被丢弃。
正是因为 PubSub 有这些缺点,它几乎找不到合适的应用场景。所以 Redis 的作者单独开启了一个项目 Disque 专门用来做多播消息队列。该项目目前没有成熟,一直长期处于Beta 版本,但是相应的客户端 sdk 已经非常丰富了,就待 Redis 作者临门一脚发布一个Release 版本。关于 Disque 的更多细节,本小册不会多做详细介绍,感兴趣的同学可以去阅读相关文档。
六、Jedis当中实现发布和订阅
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.1</version>
</dependency>
实际开发的时候发送者和接受者肯定不会在一个线程内,我这里也只是做简单的练习。
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
public class RedisPubSub {
public static void main(String[] args) {
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
// 连接池最大连接数
genericObjectPoolConfig.setMaxTotal(20);
// 创建redis连接池
JedisPool pool = new JedisPool(genericObjectPoolConfig, "127.0.0.1", 6379, 2000, "123456");
// JedisPubSub是jedis提供的一个消息处理抽象类,在通过Jedis.subscribe的时候可以绑定
JedisPubSub pubSub = new JedisPubSub() {
// 接受到消息的时候会访问
@Override
public void onMessage(String channel, String message) {
System.out.println("received message:" + channel + " -" + message);
}
// 开启监听的时候会访问
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("subscribed channel:" + channel);
}
};
Thread t = new Thread(() -> {
Jedis jedis = pool.getResource();
// 监听cece通道的消息
jedis.subscribe(pubSub, "cece");
jedis.close();
});
t.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Jedis jedis = pool.getResource();
// 向cece通道发送消息
Long publish = jedis.publish("cece", "test_info");
System.out.println(publish);
}
}