设计思路
利用消息队列的特性进行消息投递,假设客户端 A 要与客户端 B 进行通信。
客户端 A :创建队列 A-B ,发送的消息推送到 A-B 队列, 绑定 B-A 队列,接收 B-A 队列推送给客户端的消息。
客户端 B :创建队列 B-A ,发送的消息推送到 B-A 队列, 绑定 A-B 队列,接收 A-B 队列推送给客户端的消息。
如果其中某个客户端主动退出或者异常中断时,销毁队列 A-B 以及队列 B-A(是否销毁视实际应用场景来定)。
实现
创建RabbitMQ 工具类,用以获取连接及销毁队列。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ClientUtils {
private static ConnectionFactory DEFAULT_FACTORY_INSTANCE = null;
static {
DEFAULT_FACTORY_INSTANCE = new ConnectionFactory();
DEFAULT_FACTORY_INSTANCE.setHost("127.0.0.1");
DEFAULT_FACTORY_INSTANCE.setUsername("user");
DEFAULT_FACTORY_INSTANCE.setPassword("123456");
DEFAULT_FACTORY_INSTANCE.setVirtualHost("testVirtualHost");
}
public static Connection getConnection() throws IOException, TimeoutException {
return DEFAULT_FACTORY_INSTANCE.newConnection();
}
public static void queueDelete(Channel channel, String queue) throws IOException {
channel.queueDelete(queue);
}
public static void queueDelete(Channel channel, String... queues) throws IOException {
for (int i = 0; i < queues.length; i++) {
channel.queueDelete(queues[i]);
}
}
}
创建聊天客户端核心类,用于处理消息发送逻辑以及消息接收逻辑。
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 需保证 send 与 reciver 的值是唯一的
*/
public class ChatClient {
private String send;
private String reciver;
public ChatClient(String send, String reciver) {
this.send = send;
this.reciver = reciver;
}
public void start() {
try (
Channel channel = ClientUtils.getConnection().createChannel();
Scanner scanner = new Scanner(System.in);
) {
String pub = send + "-" + reciver;
String rec = reciver + "-" + send;
channel.queueDeclare(pub, false, false, false, null);
channel.queueDeclare(rec, false, false, false, null);
System.out.println(String.format("%s连接到%s", send, reciver));
String message = null;
while (true) {
//接收到信息回调接口,目的是当接收到一条信息时,进行一些操作,比如可以在控制台里打印出来
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//todo 这里可以做一个消息类型的识别,使用策略模式不同类型使用不同的解析器
String receiveMessage = new String(delivery.getBody(), "UTF-8");
System.out.println(String.format("收到%s消息:%s", rec, receiveMessage));
};
//取消接收的回调接口,目的是如在接收消息的时候队列被删除掉了,可以进行一些操作。
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息中断");
ClientUtils.queueDelete(channel, pub, rec);
//退出
System.exit(0);
};
//管道接收消息
channel.basicConsume(rec, true, deliverCallback, cancelCallback);
if ((message = scanner.nextLine()) != null && StringUtils.hasText(message)) {
if ("exit".equals(message)) {
System.out.println("终止发送消息");
ClientUtils.queueDelete(channel, pub, rec);
System.exit(0);
break;
}
// todo 对发送的消息进行转换,以支持更多的类型
channel.basicPublish("", pub, null, message.getBytes());
System.out.println(String.format("%s发送消息:%s", pub, message));
message = null;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
实现客户端 A
public class Client1 {
public static void main(String[] args) {
ChatClient client = new ChatClient("A", "B");
client.start();
}
}
实现客户端 B
public class Client2 {
public static void main(String[] args) {
ChatClient client = new ChatClient("B", "A");
client.start();
}
}
测试
客户端 A 上线,发送消息,此时客户端 B 未上线。
由于此时客户端 B 还未上线,消息会积压在RabbitMQ队列中,等待客户端 B 上线消费。
客户端 B 上线,消费积压在MQ的消息。
此时客户端 A 与客户端 B 同时在线,可以进行即时通信。
小结
这里只是讲述了文本消息的传递,输入内容也是使用控制台输入。实际上可以拓展到其它类型,比如图片,音频等,只需要统一消息体,在消息发送及接收时进行消息的封装和解析。
比如,发送图片消息时可以标识此条消息为图片类型,然后将图片转换为二进制文本,封装成一个完整的消息投递。客户端接收到的消息根据消息投递的类型使用不同的解析器去进行解析。