用Java实现一个简单的KV数据库
开发思路:
用map存储数据,再用一个List记录操作日志,开一个新线程将List中的操作写入日志文件中,再开一个线程用于网络IO服务接收客户端的命令,再启动时检查日志,如果有数据就读入map中
关于redis:
- 存储结构:
- redis:
redis的数据保存其实比较复杂,使用一个哈希表保存所有键值对,一个哈希表就是一个数组,数组的每一个元素是一个哈希桶,哈希桶中保存的是key和value的指针目录,再通过,指针去找对应的key和value,当然对于value是List等数据结构还用到跳表,双向列表,压缩列表,整数数组等数据结构 - SimpleKVDB:
只用了Java的HashMap(偷懒~)
- redis:
- 线程:
- redis:
redis虽然成为单线程,但是redis的网络IO和键值对读写是由一个线程,但是另外的持久化,异步删除,集群数据同步等,都是额外线程 - SimpleKVDB:
数据读写网络IO一个线程,持久化一个线程(集群同步本来想做但是后来没有写,也是新开一条线程)
- redis:
- 网络IO:
- redis:
单线程多路复用高性能IO模式 - SimpleKVDB:
直接用Java标准库NIO,多路复用IO模式
- redis:
- 持久化:
- redis:
AOF操作日志,RDB快照,AOF用来记录每一次的操作(增删改)可以实时同步也可以每隔一个时间同步文件中,RDB全量数据快照但是需要开一条子进程开销比较大,redis4.0以后使用一种新的模式,RDB每隔一段时间全量快照内存数据,AOF记录每个RDB之间的操作记录,当下一次全量RDB以后清空AOF再重新记录操作日志 - SimpleKVDB
只记录AOF操作日志,开一个新线程,有新的操作就写入
- redis:
- 主从数据一致
- redis:
选一台主服务器用于写入,从服务器用于读取,主服务器有数据写入就同步从服务器,哨兵机制,用于监控所有服务器,如果主服务器崩溃,就选择一台从服务器作为主服务器(会根据是否下线,网络速度,读写速度等选择主服务器),然后通知其他从服务器连接到新的主服务器 - SimpleKVDB:
没写,设想:本来是想写一个配置文件,写入主服务器IP,其他从服务器IP,开一个线程在服务端中写一个客户端当作主服务器,读取配置文件,只有主服务器才能开这个线程,其他从服务器还是开启服务,用来接收主服务器的数据,同步从数据库的内存和操作日志里
- redis:
操作展示:
客户端:
服务端:
日志文件:
目录结构:
- SimpleKVDB
- SimpleKVDBClient(客户端)
- SimpleKVDBClient.java(客户端)
- SimpleKVDBService(服务端)
- AofAnnotation.java (注解)
- AofInterface.java(接口)
- DynamicAgent.java(动态代理)
- SimpleKVDBService.java(服务端)
- SimpleKVDBClient(客户端)
SimpleKVDBClient.java(客户端):
package SimpleKVDB.SimpleKVDBClient;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleKVDBClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("127.0.0.1",5555));
while (true){
selector.select();//阻塞 等待事件发生
Set<SelectionKey> selectionKeys = selector.selectedKeys();
selectionKeys.forEach(key ->{
try {
if (key.isConnectable()){
SocketChannel channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()){//是否正在连接
channel.finishConnect(); //结束正在连接
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put((LocalDateTime.now() + " 连接成功").getBytes());
writeBuffer.flip();
channel.write(writeBuffer);//将buffer写入channel
ExecutorService service = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
service.submit(()->{//线程,从键盘读入数据
try {
while (true){
writeBuffer.clear();//清空buffer
InputStreamReader input = new InputStreamReader(System.in);
BufferedReader bufferedReader = new BufferedReader(input);
String senderMessage = bufferedReader.readLine();
writeBuffer.put(senderMessage.getBytes());
writeBuffer.flip();
channel.write(writeBuffer);
}
}catch (Exception e){
e.printStackTrace();
}
});
}
channel.register(selector,SelectionKey.OP_READ);//注册事件
}else if (key.isReadable()){//channel 有信息的输入
SocketChannel channel = (SocketChannel) key.channel();//哪个channel 触发了 read
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int count = channel.read(readBuffer);//server发来的
if (count > 0){
String receiveMessage = new String(readBuffer.array(),0,count);
System.out.println("响应结果:"+receiveMessage);
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
selectionKeys.clear();//移除已经发生的事件
}
});
}
}
}
AofAnnotation.java(注解):
package SimpleKVDB.SimpleKVDBService;
import java.lang.annotation.*;
// ----------- 自定义的注解,用于区分是什么操作(其实也可以不用,直接获取方法名区分也一样) -----------
// 自定义的注解
@Retention(RetentionPolicy.RUNTIME)//注解会在class中存在,运行时可通过反射获取
@Target(ElementType.METHOD)//目标是方法
@Documented
//文档生成时,该注解将被包含在javadoc中,可去掉
@interface AofAnnotation {
String name() default "";
}
AofInterface.java(动态代理接口):
package SimpleKVDB.SimpleKVDBService;
// ----------- 动态代理需要的接口,主要想实现切面效果在每一个操作后面加一个日志 -----------
// 动态代理需要的接口
// 只需要给增删改上加操作日志,保证数据一致性
interface AofInterface {
// @AofAnnotation(name="clear")
// int hashClear();
@AofAnnotation(name="set")
Object hashSet(String key, Object value);
@AofAnnotation(name="remove")
Object hashRemove(String key);
}
DynamicAgent.java(动态代理):
package SimpleKVDB.SimpleKVDBService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
// ----------- 动态代理(实现切面效果的逻辑代码) -----------
// 动态代理
public class DynamicAgent<T> implements InvocationHandler {
// 接口实现类实例,如果不使用泛型,这里可以直接用Object
T rent;
void setObject(T obj){
this.rent = obj;
}
// aof内存
List<String> listData;
public void setListData(List<String> list){
this.listData = list;
}
// 生成代码类
public Object getProxy(){
// 第一个参数是代理类的类加载器,第二个参数是代理类要实现的接口,第三个参数是处理接口方法的程序
// 这里代理类是自己,所以直接this,getClass().getClassLoader()是获取加载器
// getClass().getInterfaces() 是获取实现类的接口
// 因为invoke()就是执行方法,所以第三个参数也是本身this
return Proxy.newProxyInstance(this.getClass().getClassLoader(), rent.getClass().getInterfaces(),this);
}
// 处理代理实例,并返回执行结果
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 动态代理本质就是通过反射实现,这里就是执行这个对象的方法
Object result = method.invoke(rent, args);
// 获取注解
AofAnnotation say = method.getAnnotation(AofAnnotation.class);
// 注解的name内容
String name = say.name();
System.out.println("name::"+name);
// aof日志写入
aofSetLog(name, args);
return result;
}
// 给aof开辟一个内存
public void aofSetLog(String name, Object[] args){
Map<String, Object> dataMap = new HashMap<String, Object>();
// 日志格式
String aofData = "*|";
if("set".equals(name)){
dataMap.put(args[0].toString(), args[1]);
aofData = aofData + name+"|"+args[0].toString()+"|"+dataMap.get(args[0].toString());
}
if("remove".equals(name)){
if(null != dataMap && dataMap.size()>0){
dataMap.remove(args[0].toString());
}
aofData = aofData + name+"|"+args[0].toString()+"|";
}
// 日志内存
listData.add(aofData);
// System.out.println("listData:::"+listData);
}
// 返回日志数据
public List<String> getAofDatas(){
return listData;
}
}
SimpleKVDBService.java(服务端):
package SimpleKVDB.SimpleKVDBService;
import java.io.*;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.*;
// ----------- KV数据库的服务端实现 -----------
public class SimpleKVDBService implements AofInterface {
// 全局存储
Map<String, Object> globalMap;
public void setGlobalMap(Map<String, Object> map){
this.globalMap = map;
}
// 动态代理对象
AofInterface dl;
public void setAofInterface(AofInterface i){
this.dl = i;
}
// 写入修改操作
public Object hashSet(String key, Object value){
return globalMap.put(key, value);
}
// 读取操作
public Object hashGet(String key){
return globalMap.get(key);
}
// 删除操作
public Object hashRemove(String key){
return globalMap.remove(key);
}
// 获取长度操作
public int hashSize(){
return globalMap.size();
}
// 是否为空操作操作
public boolean hashIsEmpty(){
return globalMap.isEmpty();
}
// aof日志
List<String> aofList;
// 引用全局aof日志变量,用来存储aof操作日志
public void setAofList(List<String> list){
this.aofList = list;
}
// 创建aof文件
public File createAofFile(){
final String ROOT = '.' + File.separator;
File newFolder = new File(ROOT+"simpleKVDB");
if(newFolder.exists() && newFolder.isDirectory()){
System.out.println("文件夹已经存在");
}else {
boolean isFolder = newFolder.mkdir();
if(!isFolder){
System.out.println("文件夹创建失败");
}
}
// 创建一个文件
File newFile = new File(newFolder.getPath(),"aofDatas.aof");
if(newFile.exists() && newFile.isFile()){
System.out.println("文件已经存在");
}
boolean isFile;
try {
isFile = newFile.createNewFile();
if(!isFile){
System.out.println("文件创建失败");
}
} catch (IOException e) {
e.printStackTrace();
}
return newFile;
}
// 开一个线程,写aof写入文件
public void aofFileThread() {
new Thread(()->{
System.out.println("aof日志写入线程:"+Thread.currentThread().getName());
while (true){this.setAofFile(this.aofList);}
}).start();
}
// aof写入日志文件逻辑,将aof操作日志写入文件中,持久化
public void setAofFile(List<String> aofList){
if(null != aofList && aofList.size()>0){
// 休眠一秒再写入,不频繁使用IO写入
try{
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 为什么文件夹和文件检测放这里每次都要检测是防止文件被误删除
File newFile = this.createAofFile();
// 使用try的话自动回收/关闭资源,会自动调用close方法,不需要手动关闭
// 将需要关闭的资源放在try(xxx; yyy;zzz;)
// 流的关闭是有顺序的,自己手动关闭很繁琐,自动关闭大大降低了难度,非常方便
try(
// 创建一个FileOutputStream,Output是写入,文件的byte数据传输流
// FileOutputStream 第二参数是否追加
FileOutputStream fos = new FileOutputStream(newFile, true);
// FileOutputStream是通过byte字节流的,OutputStreamWriter是将字节流包装成想要的字符集的字符流写入
OutputStreamWriter osw = new OutputStreamWriter(fos, StandardCharsets.UTF_8);
// 使用PrintWriter,可以方便的写入一行字符,第二个参数自动清空缓冲区
PrintWriter pw = new PrintWriter(osw, true);
){
// 一边遍历一边删除aof操作日志
Iterator<String> iterator = aofList.iterator();
// 判断是否还有下一个元素
while (iterator.hasNext()){
// 获取下一个元素
String str = iterator.next();
// println是每段换行写入,print是不换行写入
// 写入其实是一层一层走的,先是写入内容进入PrintWriter中,然后再OutputStreamWriter根据编码转成字节byte,然后再是FileOutputStream字节流写入文件
pw.println(str);
// 因为是引用传递,所以直接删除元素
iterator.remove();
}
// 清空缓冲区,因为数据是先进入缓冲区再写入文件,需要在关闭前将缓冲区的数据全部写入文件才算完成,这样才能关闭整个流,缓存区的作用是,一个字节一个字节写入太费事儿,所以会等到一定量的字节再一起写入,所以会出现一种可能就是缓存区还有少量的字节因为没达到量没有写入,所以需要清空一下,将里面所有剩余的字节都写入
// PrintWriter中设置了自动清空缓冲区
// pw.flush();
}catch (IOException e){
e.printStackTrace();
}
}
}
// socket服务,与客户端通讯
public void socketServer(AofInterface dl){
try {
//创建ServerSocketChannel,-->> ServerSocket
// 打开通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打开 SocketChannel 并连接到端口
InetSocketAddress inetSocketAddress = new InetSocketAddress(5555);
serverSocketChannel.socket().bind(inetSocketAddress);
// 配置通道为非阻塞模式
serverSocketChannel.configureBlocking(false);
//开启selector,并注册accept事件
// 获取一个选择器实例
Selector selector = Selector.open();
// 将套接字通过到注册到选择器
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true){
// 阻塞,等待事件发生
selector.select();
// 返回已发生的注册事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 判断事件类型,进行相应操作
selectionKeys.forEach(key ->{
final SocketChannel client;
try {
// 根据key获得channel
if (key.isAcceptable()){
// 之所以转换ServerSocketChannel,因为前面注册的就是这个类
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 新的channel 和客户端建立了通道
client = serverChannel.accept();
// 非阻塞
client.configureBlocking(false);
// 将新的channel和selector,绑定
client.register(selector,SelectionKey.OP_READ);
//是否有数据可读
}else if (key.isReadable()){
client = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int count = client.read(readBuffer);
if (count>0){
readBuffer.flip();
Charset charset = StandardCharsets.UTF_8;
String receiveMassage = String.valueOf(charset.decode(readBuffer).array());
// 显示哪个client发消息
System.out.println(client +": "+receiveMassage);
// 向客户端返回的信息
String serverStr = "";
// 根据客户端不同的命令,执行不同的方法
if(Objects.equals(receiveMassage.split(" ")[0], "set")){
dl.hashSet(receiveMassage.split(" ")[1], receiveMassage.split(" ")[2]);
serverStr = "set OK";
}
if(Objects.equals(receiveMassage.split(" ")[0], "remove")){
dl.hashRemove(receiveMassage.split(" ")[1]);
serverStr = "remove OK";
}
if(Objects.equals(receiveMassage.split(" ")[0], "get")){
serverStr = this.hashGet(receiveMassage.split(" ")[1]).toString();
}
if(Objects.equals(receiveMassage.split(" ")[0], "isempty")){
serverStr = String.valueOf(this.hashIsEmpty());
}
if(Objects.equals(receiveMassage.split(" ")[0], "size")){
serverStr = String.valueOf(this.hashSize());
}
if(receiveMassage.contains("连接成功")){
serverStr = receiveMassage;
}
SocketChannel channel = (SocketChannel) key.channel();;
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
//返回客户端数据
writeBuffer.put((serverStr).getBytes());
writeBuffer.flip();
channel.write(writeBuffer);
}
}
// 处理完事件一定要移除
//selectionKeys.clear();
}catch (Exception e){
e.printStackTrace();
}finally {
// 处理完事件一定要移除
selectionKeys.clear();
}
});
}
}catch (IOException e){
e.printStackTrace();
}
}
// socket服务线程
public void socketThread(){
new Thread(()->{
System.out.println("socketServer线程:"+Thread.currentThread().getName());
this.socketServer(this.dl);
}).start();
}
// 启动时检查持久化aof日志文件
public void setAofToMap(){
System.out.println("开始从AOF中恢复数据!");
File readFile = this.createAofFile();
// 使用try的话自动回收/关闭资源,会自动调用close方法,不需要手动关闭
// 将需要关闭的资源放在try(xxx; yyy;zzz;)
// 流的关闭是有顺序的,自己手动关闭很繁琐,自动关闭大大降低了难度,非常方便
try(
// 创建一个FileInputStream,Input是写入,文件的byte数据传输流
FileInputStream fis = new FileInputStream(readFile);
// FileInputStream是通过byte字节流的,InputStreamReader是将字节流包装成想要的字符集的字符流写入
InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8);
// 使用BufferedReader,增加缓存,可以方便的写入一行字符
BufferedReader reader = new BufferedReader(isr);
){
// reader.lines().map(String::trim).forEach(System.out::println); 这是一种lambda写法,效果和下面一样
String str;
// 为什么要放在while的条件里面赋值呢?是因为readLine()一行一行读取如果到文件结尾了会返回一个null,如果放在while的代码体里赋值,就需要多一步null的判断
// 读取和写入正好相反,是先从文件读取内容到缓存区,然后从缓存区读出来
while ((str = reader.readLine()) != null){
String methodStr = str.split("\\|")[1];
String keyStr = str.split("\\|")[2];
// 根据不同指令操作不同方法
if("set".equals(methodStr)){
Object valueStr = str.split("\\|")[3];
this.hashSet(keyStr, valueStr);
}
if("remove".equals(methodStr)){
this.hashRemove(keyStr);
}
}
System.out.println("AOF中恢复数据结束!");
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
System.out.println("主线程: "+Thread.currentThread().getName());
// 全局内存
Map<String, Object> maps = new HashMap<>();
// 全局aof日志内存
List<String> lists = new ArrayList<>();
// 服务主体类
SimpleKVDBService sKvService = new SimpleKVDBService();
// 全局存储内存
sKvService.setGlobalMap(maps);
// 动态代理,主要是用于给操作添加日志
DynamicAgent<AofInterface> nd = new DynamicAgent<AofInterface>();
// 全局aof内存
nd.setListData(lists);
nd.setObject(sKvService);
// 获取代理对象
AofInterface dl = (AofInterface) nd.getProxy();
// 启动时检查aof文件是否存在
sKvService.setAofToMap();
// 服务主体获取已经有日志信息的aof日志信息
sKvService.setAofList(nd.getAofDatas());
// 引用动态代理
sKvService.setAofInterface(dl);
// 子线程,写aof写入文件
sKvService.aofFileThread();
// 子线程,socket服务线程
sKvService.socketThread();
System.out.println(sKvService.globalMap);
System.out.println("22222:"+nd.getAofDatas());
System.out.println("list:"+sKvService.aofList);
System.out.println("333333:"+sKvService.globalMap);
}
}