文章目录
- 一、创建SpringBoot项目
- 二、创建核心类
- 创建 Exchange类
- 创建 MSGQueue类
- 创建 Binding类
- 创建Message类
一、创建SpringBoot项目
在项目中添加这四个依赖!
二、创建核心类
交换机 :Exchange
队列 :Queue
绑定关系: Binding
消息 :Message
这些核心类都存在于 BrokerServer 中.
先创建出服务器与客户端的包.
再在服务器中创建 core包,用来存放这些核心类.
创建 Exchange类
首先考虑,咱们在此处共实现了三种交换机类型,所以咱们可以创建一个枚举类来表示交换机类型.
/**
* 表示交换机类型
*/
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
private ExchangeType(int type) {
this.type = type;
}
public int getType() {
return type;
}
}
咱们再考虑,Exchange类中有哪些属性?
- 1.name,当作交换机的唯一身份标识
- 2.ExchangeType,表示交换机类型
- 3.durable,表示这个交换机是否需要持久化存储
- 4.autoDelete,表示该交换机在无人使用后,是否会自动删除
- 5.arguments,表示后续的一些拓展功能
/**
* 表示一个交换机
* 交换机的使用者是生产者
*/
@Data
public class Exchange {
// 此处使用 name 作为交换机的身份标识,(唯一的)
private String name;
// 交换机类型,DIRECT,FANOUT,TOPIC
private ExchangeType type = ExchangeType.DIRECT;
// 该交换机是否要持久化存储,true表示要,false表示不要
private boolean durable = false;
// 如果当前交换机,没人使用了,就会自动删除
// 这个属性暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
private boolean autoDelete = false;
// arguments 表示的是创建交换机时指定的一些额外参数
// 这个属性也暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
// 为了把这个 arguments 存到数据库中,需要将 arguments 转换为 json 格式的字符串
private Map<String,Object> arguments = new HashMap<>();
// 这里的 get set 用于与数据库交互使用
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
// 将 arguments 按照 JSON 格式 转换成 字符串
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
// 如果代码抛出异常,返回一个空的 json 字符串
return "{}";
}
public void setArguments(String arguments) {
ObjectMapper objectMapper = new ObjectMapper();
try {
// 将库中的 arguments 按照 JSON 格式解析,转换成 Map 对象
this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public void setArguments(Map<String,Object> arguments) {
this.arguments = arguments;
}
// 这里的 get set ,用来更方便的获取/设置 arguments 中的键值对
// 这一组 getter setter 是在Java内部代码使用的(比如测试的时候)
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key,Object value) {
arguments.put(key, value);
}
}
创建 MSGQueue类
MSGQueue类中有哪些属性?
与Exchange类大差不差.
直接贴代码
/**
* 表示一个存储消息的队列
* MSG =》Message
* 消息队列的使用者是消费者
*/
@Data
public class MSGQueue {
// 表示队列的身份标识
private String name;
// 表示队列是否持久化
private boolean durable = false;
// true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列
// 后续代码不实现相关功能
private boolean exclusive = false;
// true -> 没人使用后,自动删除,false -> 没人使用,不自动删除
private boolean autoDelete = false;
// 表示扩展参数,后续代码没有实现
private Map<String,Object> arguments = new HashMap<>();
public String getArguments() {
ObjectMapper objectMapper = new ObjectMapper();
try {
return objectMapper.writeValueAsString(arguments);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
}
public void setArguments(String arguments) {
ObjectMapper objectMapper = new ObjectMapper();
try {
this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
public void setArguments(Map<String,Object> arguments) {
this.arguments = arguments;
}
public Object getArguments(String key) {
return arguments.get(key);
}
public void setArguments(String key,Object value) {
arguments.put(key, value);
}
}
创建 Binding类
/**
* 表示队列和交换机之间的绑定关系
*/
@Data
public class Binding {
private String exchangeName;
private String queueName;
// 主题交换机的匹配key
private String bindingKey;
}
创建Message类
Message类,大致可以分为三个部分.
- 消息自身的属性
- 消息的正文
- 消息的持久化存储所需属性
我们新建一个 BasicProperties 类来表示 消息的属性.
/**
* 这个类表示消息的属性
*/
@Data // 实现 Serializable 接口是为了后续的序列化操作
public class BasicProperties implements Serializable {
// 消息的唯一身份标识
private String messageId;
// 如果当前交换机是 DIRECT,此时 routingKey 表示要转发的队列名
// 如果当前交换机是 FANOUT,此时 routingKey 无意义
// 如果当前交换机是 TOPIC,此时 routingKey 就要和bindingKey进行匹配,匹配成功才转发给对应的消息队列
private String routingKey;
// 这个属性表示消息是否要持久化,1表示不持久化,2 表示持久化
private int deliverMode = 1;
}
持久化存储会在下面讲到,莫慌.
/**
* 这个类表示一个消息
*/
@Data // 实现 Serializable 接口是为了后续的序列化操作
public class Message implements Serializable {
// 消息的属性
private BasicProperties basicProperties = new BasicProperties();
// 消息的正文
private byte[] body;
// 相当于消息的版本号,主要针对 Message 类有改动后,再去反序列化之前旧的 message时,可能会出现错误
// 因此引入消息版本号,如果版本号不匹配,就不允许反序列化直接报错,来告知程序猿,后续代码中并未实现该功能
private static final long serialVersionUid = 1L;
// 下面的属性是持久化存储需要的属性
// 消息存储到文件中,使用一下两个偏移量来确定消息在文件中的位置 [offsetBeg,offsetEnd)
// 这两个属性不需要 序列化 存储到文件中,存储到文件中后位置就固定了,
// 这两个属性的作用是让 内存 中的 message 能顺利找到 文件 中的 message
// 被 transient 修饰的属性,不会被 标准库 的 序列化方式 序列化
private transient long offsetBeg = 0; // 消息数据的开头举例文件开头的位置偏移(字节)
private transient long offsetEnd = 0; // 消息数据的结尾举例文件开头的位置偏移(字节)
// 使用这个属性表示该消息在文件中是否是有效信息(逻辑删除)
// 0x1表示有效,0x0表示无效
private byte isValid = 0x1;
// 创建工厂方法,让工厂方法封装 new Message 对象的过程
// 该方法创建的 Message 对象,会自动生成唯一的MessageId
public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body) {
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
message.basicProperties.setRoutingKey(routingKey);
// 此处生成的 MessageId 以 M- 作为前缀
message.setMessageId("M-" + UUID.randomUUID());
message.setBody(body);
// 此处先将 message的核心部分 basicProperties 与 body设置了
// 而 offsetBeg,offsetEnd,isValid,这些属性是持久化时才设置的
return message;
}
// 直接获取消息id
public String getMessageId() {
return basicProperties.getMessageId();
}
// 直接更改消息id
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
// 直接获取 消息的key
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
// 直接更改 消息的key
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
// 直接获取 消息的是否持久化存储字段
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
// 直接修改 消息的是否持久化存储字段
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
}
这些核心类就都建好了,下篇文章就来考虑他们的持久化存储与内存存储!