文章目录
- 一、设计核心类
- Connection 类
- Channel 类
- ConnectionFactory 类
- 二、代码编写
- Connection 类
- Connection 类
- Channel 类
一、设计核心类
Connection 类
Connection类有以下特点与功能
- 表示一个TCP连接
- 该类持有 Socket对象
- 可以写入请求,读取响应
- 管理多个 Channel 对象
Channel 类
Channel类有以下特点与功能
- 表示一个逻辑上的连接
- 内部有多个方法去构造请求调用服务器端对应的API
ConnectionFactory 类
ConnectionFactory类有以下特点与功能
该类持有服务器的ip地址与端口号,主要功能是 实例化Connection 类.
二、代码编写
Connection 类
/**
* 一个TCP连接类
*/
public class Connection {
private Socket socket = null;
// 需要管理多个 channel 使用一个 hash表 把若干个 channel 组织起来
private ConcurrentHashMap<String,Channel> channelMap = new ConcurrentHashMap<>();
private InputStream inputStream;
private OutputStream outputStream;
private DataInputStream dataInputStream;
private DataOutputStream dataOutputStream;
// 线程池
private ExecutorService callbackPool = null;
public Connection(String host,int port) throws IOException {
socket = new Socket(host, port);
inputStream = socket.getInputStream();
outputStream = socket.getOutputStream();
dataInputStream = new DataInputStream(inputStream);
dataOutputStream = new DataOutputStream(outputStream);
callbackPool = Executors.newFixedThreadPool(5);
// 创建一个扫描线程,由这个线程负责不停的从 socket 中读取响应数据,把这个响应数据再交给对应的 channel 负责处理
Thread thread = new Thread(() -> {
try {
while (!socket.isClosed()) {
Response response = readResponse();
dispatchResponse(response);
}
} catch (SocketException e) {
// 连接正常断开的,此时这个异常直接忽略
System.out.println("[Connection] 连接正常断开!");
} catch (IOException | ClassNotFoundException | MqException e) {
System.out.println("[Connection] 连接异常断开!");
e.printStackTrace();
}
});
thread.start();
}
public void close() {
// 关闭 Connection 释放上述资源
try {
callbackPool.shutdownNow();
channelMap.clear();
inputStream.close();
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
// 使用这个方法来分别处理,当前这个响应是一个针对控制请求的响应,还是服务器推送的消息
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
if (response.getType() == 0xc) {
// 服务器推送来的消息数据
SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
// 根据 channelId 找到对应的 channel 对象
// 执行该 channel 对象内部的回调函数
Channel channel = channelMap.get(subScribeReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + subScribeReturns.getChannelId());
}
// 执行该 channel 内部的回调函数
callbackPool.execute(() -> {
try {
channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(),subScribeReturns.getBasicProperties(),
subScribeReturns.getBody());
} catch (MqException | IOException e) {
e.printStackTrace();
}
});
} else {
// 当前响应是针对刚才的控制请求的响应
BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
// 把这个结果给他放到对应的 channel 的 哈希表中
Channel channel = channelMap.get(basicReturns.getChannelId());
if (channel == null) {
throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + basicReturns.getChannelId());
}
channel.putReturns(basicReturns);
}
}
// 发送请求
public void writeRequest(Request request) throws IOException {
dataOutputStream.writeInt(request.getType());
dataOutputStream.writeInt(request.getLength());
dataOutputStream.write(request.getPayload());
dataOutputStream.flush();
System.out.println("[Connection 发送请求! type=" + request.getType() + ",length=" + request.getLength());
}
// 读取响应
public Response readResponse() throws IOException {
Response response = new Response();
response.setType(dataInputStream.readInt());
response.setLength(dataInputStream.readInt());
byte[] payload = new byte[response.getLength()];
int length = dataInputStream.read(payload);
if (response.getLength() != length) {
throw new IOException("读取的数据格式异常");
}
response.setPayload(payload);
System.out.println("[Connection 接收响应! type=" + response.getType() + ",length=" + response.getLength());
return response;
}
// 创建Channel(信道)
public Channel createChannel() {
String channelId = "C-" + UUID.randomUUID().toString();
Channel channel = new Channel(channelId,this);
// 同时也需要把 "创建 channel" 的这个消息也告诉服务器
boolean ok = false;
// 把这个 channel 对象 放到 Connection 管理 channel 的 哈希表 中
channelMap.put(channelId,channel);
System.out.println("Channel 创建成功!");
try {
ok = channel.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
if (!ok) {
// 服务器这里创建失败了,整个这次创建 channel 操作不顺利
channelMap.remove(channelId);
return null;
}
return channel;
}
}
Connection 类
/**
* Connection工厂类
*/
@Data
public class ConnectionFactory {
// broker server 的 ip 地址
private String host;
// broker server 的端口号
private int port;
// // 访问 broker server 的哪个虚拟主机
// private String virtualHostName;
// private String username;
// private String password;
public Connection newConnection() throws IOException {
Connection connection = new Connection(host,port);
return connection;
}
}
Channel 类
以下方法中对应服务器的API中的请求类型,一定要与之前定义的协议保持一致.
/**
* 逻辑连接类
*/
@Data
public class Channel {
private String channelId;
// 当前这个 channel 属于哪个连接
private Connection connection;
// 用来存储后续客户端收到的服务器的响应
private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
// 如果当前 Channel 订阅了某个队列,就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候,调用回调
// 此处约定一个 Channel 中只能有一个回调
private Consumer consumer = null;
public Channel(String channelId, Connection connection) {
this.channelId = channelId;
this.connection = connection;
}
// 期望使用这个方法来阻塞等待服务器的响应
private BasicReturns waitResult(String rid) {
BasicReturns basicReturns = null;
while ((basicReturns = basicReturnsMap.get(rid)) == null) {
// 查询结果为 null,证明 响应 还没到
// 此时就需要阻塞等待
synchronized (this) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 读取到响应后,删除 哈希表 中的存储的这个 响应
basicReturnsMap.remove(rid);
return basicReturns;
}
private String generateRid() {
return "R-" + UUID.randomUUID().toString();
}
public void putReturns(BasicReturns basicReturns) {
basicReturnsMap.put(basicReturns.getRid(),basicReturns);
synchronized (this) {
// 当前也不知道有多少个线程在等待上述的这个响应
// 全都唤醒
notifyAll();
}
}
// 在以下方法中,和服务器交互,告知服务器,此处客户端 要进行的操作
// 创建 channel
public boolean createChannel() throws IOException {
// 对于创建 Channel 操作来说,payload 就是一个 basicArguments 对象
BasicArguments basicArguments = new BasicArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(generateRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
Request request = new Request();
request.setType(0x1);
request.setLength(payload.length);
request.setPayload(payload);
// 构造出完整请求后,发送请求
connection.writeRequest(request);
// 等待服务器响应
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
// 销毁 channel
public boolean close() throws IOException {
// 对于创建 Channel 操作来说,payload 就是一个 basicArguments 对象
BasicArguments basicArguments = new BasicArguments();
basicArguments.setChannelId(channelId);
basicArguments.setRid(generateRid());
byte[] payload = BinaryTool.toBytes(basicArguments);
Request request = new Request();
request.setType(0x2);
request.setLength(payload.length);
request.setPayload(payload);
// 构造出完整请求后,发送请求
connection.writeRequest(request);
// 等待服务器响应
BasicReturns basicReturns = waitResult(basicArguments.getRid());
return basicReturns.isOk();
}
// 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
Map<String,Object> arguments) throws IOException {
ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
exchangeDeclareArguments.setRid(generateRid());
exchangeDeclareArguments.setChannelId(channelId);
exchangeDeclareArguments.setExchangeName(exchangeName);
exchangeDeclareArguments.setExchangeType(exchangeType);
exchangeDeclareArguments.setDurable(durable);
exchangeDeclareArguments.setAutoDelete(autoDelete);
exchangeDeclareArguments.setArguments(arguments);
byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
Request request = new Request();
request.setType(0x3);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
return basicReturns.isOk();
}
// 删除交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
exchangeDeleteArguments.setRid(generateRid());
exchangeDeleteArguments.setChannelId(channelId);
exchangeDeleteArguments.setExchangeName(exchangeName);
byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);
Request request = new Request();
request.setType(0x4);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());
return basicReturns.isOk();
}
// 创建队列
public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,
Map<String,Object> arguments) throws IOException {
QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
queueDeclareArguments.setRid(generateRid());
queueDeclareArguments.setChannelId(channelId);
queueDeclareArguments.setQueueName(queueName);
queueDeclareArguments.setDurable(durable);
queueDeclareArguments.setExclusive(exclusive);
queueDeclareArguments.setAutoDelete(autoDelete);
queueDeclareArguments.setArguments(arguments);
byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
Request request = new Request();
request.setType(0x5);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
return basicReturns.isOk();
}
// 删除队列
public boolean queueDelete(String queueName) throws IOException {
QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
queueDeleteArguments.setRid(generateRid());
queueDeleteArguments.setChannelId(channelId);
queueDeleteArguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(queueDeleteArguments);
Request request = new Request();
request.setType(0x6);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
return basicReturns.isOk();
}
// 创建绑定
public boolean bindingDeclare(String exchangeName,String queueName,String bindingKey) throws IOException {
BindingDeclareArguments bindingDeclareArguments = new BindingDeclareArguments();
bindingDeclareArguments.setRid(generateRid());
bindingDeclareArguments.setChannelId(channelId);
bindingDeclareArguments.setExchangeName(exchangeName);
bindingDeclareArguments.setQueueName(queueName);
bindingDeclareArguments.setBindingKey(bindingKey);
byte[] payload = BinaryTool.toBytes(bindingDeclareArguments);
Request request = new Request();
request.setType(0x7);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(bindingDeclareArguments.getRid());
return basicReturns.isOk();
}
// 解除绑定
public boolean bindingDelete(String exchangeName,String queueName) throws IOException {
BindingDeleteArguments bindingDeleteArguments = new BindingDeleteArguments();
bindingDeleteArguments.setRid(generateRid());
bindingDeleteArguments.setChannelId(channelId);
bindingDeleteArguments.setExchangeName(exchangeName);
bindingDeleteArguments.setQueueName(queueName);
byte[] payload = BinaryTool.toBytes(bindingDeleteArguments);
Request request = new Request();
request.setType(0x8);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(bindingDeleteArguments.getRid());
return basicReturns.isOk();
}
// 发送消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
basicPublishArguments.setRid(generateRid());
basicPublishArguments.setChannelId(channelId);
basicPublishArguments.setExchangeName(exchangeName);
basicPublishArguments.setRoutingKey(routingKey);
basicPublishArguments.setBasicProperties(basicProperties);
basicPublishArguments.setBody(body);
byte[] payload = BinaryTool.toBytes(basicPublishArguments);
Request request = new Request();
request.setType(0x9);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
return basicReturns.isOk();
}
// 订阅消息
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
// 先设置回调
if (this.consumer != null) {
throw new MqException("该 channel 已经设置过消费消息的回调函数了,不能重复设置");
}
this.consumer = consumer;
BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
basicConsumeArguments.setRid(generateRid());
basicConsumeArguments.setChannelId(channelId);
basicConsumeArguments.setConsumerTag(channelId); // 此处 ConsumerTag 也使用 channelId 表示
basicConsumeArguments.setQueueName(queueName);
basicConsumeArguments.setAutoAck(autoAck);
byte[] payload = BinaryTool.toBytes(basicConsumeArguments);
Request request = new Request();
request.setType(0xa);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
return basicReturns.isOk();
}
// 确认消息
public boolean basicAck(String queueName,String messageId) throws IOException {
BasicAckArguments basicAckArguments = new BasicAckArguments();
basicAckArguments.setRid(generateRid());
basicAckArguments.setChannelId(channelId);
basicAckArguments.setQueueName(queueName);
basicAckArguments.setMessageId(messageId);
byte[] payload = BinaryTool.toBytes(basicAckArguments);
Request request = new Request();
request.setType(0xb);
request.setLength(payload.length);
request.setPayload(payload);
connection.writeRequest(request);
BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
return basicReturns.isOk();
}
}