背景
经过通信层面的优化后,我们不再走 Electron 提供的内置进程间通信 IPC,改为利用 Express 提供的 Http 本地服务来进行多处直达通信机制,同时利用本地 Sqlite 来保存大量数据,但 Express 提供的本地服务是支持并发请求的,而 Sqlite 是不支持行锁的机制,一旦有写入操作,Sqlite 都是直接锁库,除了采用单表单库减少锁库问题外,另外就是走队列的方式来逐个入库,避免写锁问题。
一个Sqlite3教程好文档,分享到这里:函数sqlite3VdbeHalt | SQlite源码分析
解决方案
启用 WAL 模式
WAL 模式即将写的数据暂存在 WAL 文件中,不影响主库,这样就可以避开库锁问题,同时读也可以并行操作,大大提高了 Sqlite 读写并行能力
export const userDataPath = app.getPath('userData')
const storagePath = path.join(userDataPath, '/sqlite/wa_verify.db')
// 创建 Sequelize 实例
export const sequelize = new Sequelize({
dialect: 'sqlite',
storage: storagePath,
define: {
freezeTableName: true
},
logging: false
})
// 启用 WAL 模式
(async () => {
try {
await sequelize.authenticate()
await sequelize.query('PRAGMA journal_mode=WAL;')
console.log('WAL mode enabled.')
} catch (error) {
console.error('Unable to enable WAL mode:', error)
}
})()
Expess 层面加限流
p-limit 是个好东西,这个直接可控制请求的并发数,如果想搞成队列机制,直接设为 1 即可,省去了自己写队列的烦恼,另外我也第一次发现异步开发的优越性,写个队列也非常简单,而同步开发就没这么方便,必须分为两个进程来搞事情,一个写入队列,一个弹出队列,但是如何保证本地 http 返回结果就很难了,而异步可以一直等待着。
import express from 'express';
import pLimit from 'p-limit';
const app = express();
const port = 3000;
const limit = pLimit(1); // 限制并发请求为 1
app.get('/car', (req, res) => {
const startTime = Date.now();
limit(() => new Promise((resolve) => {
setTimeout(() => {
const endTime = Date.now();
const processingTime = endTime - startTime;
res.json({
message: '车信息处理完成',
startTime: new Date(startTime).toISOString(),
endTime: new Date(endTime).toISOString(),
processingTime: `${processingTime}ms`
});
resolve();
}, 3000); // 模拟处理时间
}));
});
app.listen(port, () => {
console.log(`服务器正在监听 http://localhost:${port}`);
});
客户端测试 Express 并发脚本
import fetch from 'node-fetch';
const url = 'http://localhost:3000/car'; // 你的服务地址
const concurrentRequests = 10; // 请求数
async function sendRequest() {
const startTime = Date.now();
try {
const response = await fetch(url);
const data = await response.json();
const endTime = Date.now();
console.log('响应数据:', data);
console.log(`请求开始时间: ${data.startTime}`);
console.log(`请求结束时间: ${data.endTime}`);
console.log(`处理时间: ${data.processingTime}`);
console.log(`单个请求的处理时间: ${endTime - startTime}ms`);
} catch (error) {
console.error('发生错误:', error);
}
}
async function testConcurrency() {
for (let i = 0; i < concurrentRequests; i++) {
console.log(`发起请求 ${i + 1}...`);
sendRequest(); // 逐个发送请求,等待每个请求完成
}
}
testConcurrency();
请求时间结果截图,明显串行执行,完美!
同一个limit可以作为多个请求限流队列来用
服务端代码
服务端代码这里要注意limit要框住整个接口的处理逻辑,这样才能保证整个接口逻辑都处理完毕后,才会处理下一个请求;
res.json({}) 返回一定要跟在limit的逻辑里面,不然就会出现客户端一请求,服务端就返回OK了, 而实际上现在的请求并没有得到处理,还好Express里的p-limit并没有像其他语言,如PHP,Python等一旦连接结束,其相关的线程都全部释放,这也许是协程调度的好处。
注意服务端语言务必都要使用async + await 来保证代码的同步执行,如果没有同步作为基础,发生任何不可预测的BUG都有可能
import express from 'express';
import pLimit from 'p-limit';
const app = express();
const port = 3000;
// 设置并发限制的数量
const maxConcurrentRequests = 3; // 限制并发请求为 3
const limit = pLimit(maxConcurrentRequests);
function createHandler(responseMessage) {
return (req, res) => {
const startTime = Date.now();
limit(() => new Promise((resolve) => {
setTimeout(() => {
const endTime = Date.now();
const processingTime = endTime - startTime;
res.json({
message: responseMessage,
startTime: new Date(startTime).toISOString(),
endTime: new Date(endTime).toISOString(),
processingTime: `${processingTime}ms`
});
resolve();
}, 3000); // 模拟处理时间
}));
};
}
app.get('/car', createHandler('车信息处理完成'));
app.get('/bus', createHandler('公交信息处理完成'));
app.listen(port, () => {
console.log(`服务器正在监听 http://localhost:${port}`);
});
客户端测试代码
import fetch from 'node-fetch';
const carUrl = 'http://localhost:3000/car'; // 车信息服务地址
const busUrl = 'http://localhost:3000/bus'; // 公交信息服务地址
const concurrentRequests = 10; // 请求数
async function sendRequest(url, route) {
const startTime = Date.now();
try {
const response = await fetch(url);
const data = await response.json();
const endTime = Date.now();
console.log(`${route} - 响应数据:`, data);
console.log(`${route} - 请求开始时间: ${data.startTime}`);
console.log(`${route} - 请求结束时间: ${data.endTime}`);
console.log(`${route} - 处理时间: ${data.processingTime}`);
console.log(`${route} - 单个请求的处理时间: ${endTime - startTime}ms`);
} catch (error) {
console.error(`${route} - 发生错误:`, error);
}
}
async function testConcurrency() {
console.log(`开始发起 ${concurrentRequests} 个请求到 /car 路由...`);
for (let i = 0; i < concurrentRequests; i++) {
console.log(`发起 /car 请求 ${i + 1}...`);
sendRequest(carUrl, '/car'); // 逐个发送请求到 /car 路由
}
console.log(`开始发起 ${concurrentRequests} 个请求到 /bus 路由...`);
for (let i = 0; i < concurrentRequests; i++) {
console.log(`发起 /bus 请求 ${i + 1}...`);
sendRequest(busUrl, '/bus'); // 逐个发送请求到 /bus 路由
}
}
testConcurrency();
测试结果截图