有文件描述符的队列原型
#include <iostream>
#include <queue>
#include <mutex>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <thread>
#include <unistd.h>
#include <vector>
#include <optional>
typedef struct {
char strOutTime[24];
char strModuleId[5];
char strLevel[5];
short iContentLength;
char strContent[4096];
} DataItem;
class DataItemQueue {
private:
std::queue<DataItem> queue_;
std::mutex mtx_;
int event_fd;
public:
DataItemQueue() : event_fd(eventfd(0, 0)) {}
~DataItemQueue() { close(event_fd); }
void push(DataItem value) {
{
std::lock_guard<std::mutex> lock(mtx_);
queue_.push(value);
}
uint64_t signal = 1;
write(event_fd, &signal, sizeof(signal)); // 通知 eventfd
}
std::optional<DataItem> pop() {
uint64_t result;
read(ready_fd, &result, sizeof(result)); // 清除 eventfd 信号
std::lock_guard<std::mutex> lock(mtx_);
if (queue_.empty()) return std::nullopt;
DataItem value = queue_.front();
queue_.pop();
return value;
}
int get_event_fd() const { return event_fd; }
};
void monitor_queues(std::vector<DataItemQueue*>& queues) {
int epoll_fd = epoll_create1(0);
// 注册每个队列的 event_fd
for (auto& queue : queues) {
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = queue->get_event_fd();
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, queue->get_event_fd(), &event);
}
struct epoll_event events[10];
while (true) {
int nfds = epoll_wait(epoll_fd, events, 10, -1);
for (int i = 0; i < nfds; ++i) {
int ready_fd = events[i].data.fd;
// 找到对应的队列并读取数据
for (auto& queue : queues) {
if (queue->get_event_fd() == ready_fd) {
auto value = queue->pop();
if (value) {
std::cout << "iContentLength: " << value->iContentLength << std::endl;
std::cout << "strContent: " << value->strContent << std::endl;
std::cout << "strLevel: " << value->strLevel << std::endl;
std::cout << "strModuleId: " << value->strModuleId << std::endl;
std::cout << "strOutTime: " << value->strOutTime << std::endl;
}
}
}
}
}
close(epoll_fd);
}
int main() {
DataItemQueue queue1, queue2, queue3;
std::vector<DataItemQueue*> queues = {&queue1, &queue2, &queue3};
std::thread monitor_thread(monitor_queues, std::ref(queues));
// 模拟向队列写入数据
DataItem item1 = {"abcde", "fghi", "lmno", 1, "qrstu"};
DataItem item2 = {"edcba", "ihgf", "onml", 2, "utsrq"};
DataItem item3 = {"12345", "4321", "5678", 3, "78901"};
queue1.push(item1);
queue2.push(item2);
queue3.push(item3);
monitor_thread.join();
return 0;
}
线程类原型
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using FunctionPointer = void(*)();
class ControlledThread {
public:
ControlledThread(FunctionPointer func) : workerFunction(func), ready(false), stop(false) {}
void start() {
worker = std::thread(&ControlledThread::threadFunction, this);
}
void triggerStart() {
{
std::lock_guard<std::mutex> lock(mtx);
ready = true;
}
cv.notify_one();
}
void stopThread() {
{
std::lock_guard<std::mutex> lock(mtx);
stop = true;
ready = true;
}
cv.notify_one();
if (worker.joinable()) {
worker.join();
}
}
private:
std::thread worker;
std::mutex mtx;
std::condition_variable cv;
FunctionPointer workerFunction;
bool ready;
bool stop;
void threadFunction() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this] { return ready; });
if (stop) {
return;
}
if (workerFunction) {
workerFunction();
}
}
};
void myFunction() {
std::cout << "Executing myFunction in thread.\n";
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟任务执行
std::cout << "myFunction finished executing.\n";
}
int main() {
// 创建 ControlledThread,并传入函数指针
ControlledThread controlledThread(myFunction);
controlledThread.start(); // 创建并启动线程,进入等待状态
std::cout << "Main thread doing some work...\n";
std::this_thread::sleep_for(std::chrono::seconds(1)); // 模拟主线程工作
std::cout << "Triggering thread start.\n";
controlledThread.triggerStart(); // 启动子线程
controlledThread.stopThread(); // 等待子线程完成并清理
return 0;
}
dlopen原型
#include <stdio.h>
#include <stdlib.h>
#include <dlfcn.h>
// 结构体用于封装一个共享库的管理
typedef struct {
void *handle; // dlopen 返回的句柄
const char *so_path; // 共享库路径
} SharedLib;
// 加载共享库并获取句柄
SharedLib *load_shared_lib(const char *so_path) {
SharedLib *lib = (SharedLib *)malloc(sizeof(SharedLib));
if (lib == NULL) {
fprintf(stderr, "Failed to allocate memory for SharedLib\n");
return NULL;
}
// 使用 RTLD_LAZY 模式加载共享库
lib->handle = dlopen(so_path, RTLD_LAZY);
if (!lib->handle) {
fprintf(stderr, "Failed to load %s: %s\n", so_path, dlerror());
free(lib);
return NULL;
}
lib->so_path = so_path;
return lib;
}
// 获取函数符号
void *get_function(SharedLib *lib, const char *func_name) {
// 清除上一个错误
dlerror();
void *func_ptr = dlsym(lib->handle, func_name);
char *error = dlerror();
if (error != NULL) {
fprintf(stderr, "Failed to get symbol %s: %s\n", func_name, error);
return NULL;
}
return func_ptr;
}
// 关闭共享库
void close_shared_lib(SharedLib *lib) {
if (lib->handle) {
dlclose(lib->handle);
lib->handle = NULL;
}
free(lib);
}
// 示例:调用共享库中的函数
int main() {
// 例如加载 "libm.so" 数学库
const char *so_path = "/lib/x86_64-linux-gnu/libm.so.6";
SharedLib *math_lib = load_shared_lib(so_path);
if (math_lib == NULL) {
return 1;
}
// 获取 sin 函数
double (*sin_func)(double) = (double (*)(double))get_function(math_lib, "sin");
if (sin_func == NULL) {
close_shared_lib(math_lib);
return 1;
}
// 调用 sin 函数
double result = sin_func(3.14159 / 2); // sin(π/2)
printf("sin(π/2) = %f\n", result);
// 关闭并释放共享库
close_shared_lib(math_lib);
return 0;
}
查找json文件和so文件配对的方法
// 扫描目录中的文件,按前缀存储指定扩展名文件的路径
void scan_directory(const std::string& directory, const std::string& extension, std::map<std::string, std::string>& files) {
DIR* dir = opendir(directory.c_str());
if (!dir) {
std::cerr << "无法打开目录: " << directory << std::endl;
return;
}
struct dirent* entry;
while ((entry = readdir(dir)) != nullptr) {
std::string filename = entry->d_name;
// 检查是否为常规文件并匹配扩展名
if (entry->d_type == DT_REG && filename.size() > extension.size() &&
filename.substr(filename.size() - extension.size()) == extension) {
// 提取文件前缀(去掉扩展名部分)
std::string prefix = filename.substr(0, filename.size() - extension.size());
files[prefix] = directory + "/" + filename;
}
}
closedir(dir);
}
// 查找目录1中的 .json 文件和目录2中的 .so 文件,匹配相同前缀
std::vector<Config> find_module_files(const std::string& dir1, const std::string& dir2) {
std::map<std::string, std::string> json_files;
std::map<std::string, std::string> so_files;
// 扫描目录1中的 .json 文件
scan_directory(dir1, ".json", json_files);
// 扫描目录2中的 .so 文件
scan_directory(dir2, ".so", so_files);
// 查找相同前缀的文件名
std::vector<Config> matched_files;
for (const auto& json_file : json_files) {
const std::string& prefix = json_file.first;
std::string libprefix = "lib" + prefix;
if (so_files.find(libprefix) != so_files.end()) {
matched_files.push_back({prefix, json_file.second, so_files[libprefix]});
}
}
return matched_files;
}
std::vector<Config> configs = find_module_files(json_dir, so_dir);
for (const auto& config : configs) {
std::cout << "Name: " << config.name << "\n";
std::cout << "JSON file: " << config.json_path << "\n";
std::cout << "SO file: " << config.so_path << "\n";
}
动态库接口原型
extern "C" int open(const char *pathname, int flags);
extern "C" int close(int fd);
extern "C" ssize_t read(int fd, void *buf, size_t count);
extern "C" ssize_t write(int fd, const void *buf, size_t count);
extern "C" int ioctl(int fd, unsigned long request, ...);
int open(const char *pathname, int flags)
{
if (handle != nullptr) {
return 0;
}
handle = new HANDLE();
handle->thread = new ControlledThread(thread);
handle->thread->start();
handle->queue = new DataItemQueue();
return handle->queue->get_event_fd();
}
int close(int fd)
{
if (handle == nullptr) {
return 0;
}
delete handle->thread;
delete handle;
handle = nullptr;
return 0;
}
ssize_t read(int fd, void *buf, size_t count)
{
DataItem value = handle->queue->pop();
// if (value) {
std::cout << "iContentLength: " << value.iContentLength << std::endl;
std::cout << "strContent: " << value.strContent << std::endl;
std::cout << "strLevel: " << value.strLevel << std::endl;
std::cout << "strModuleId: " << value.strModuleId << std::endl;
std::cout << "strOutTime: " << value.strOutTime << std::endl;
// }
return 0;
}
ssize_t write(int fd, const void *buf, size_t count)
{
return 0;
}
int ioctl(int fd, unsigned long request, ...)
{
printf("module ioctl \n");
va_list args;
va_start(args, request);
void* arg = va_arg(args, void*);
char *str = (char *)arg;
switch (request) {
case THREAD_START:
handle->thread->triggerStart();
break;
case THREAD_WAIT:
handle->thread->stopThread();
break;
case CONFIG_NEELINK:
if(str == NULL || access((const char *)str, R_OK) != 0) {
LOG_E("Wrong parameter.");
break;
}
if (SUCCESS != readConfigFile((const char *)str, &handle->param)) {
LOG_E("thread configuration file read error.");
freeParam(handle->param);
break;
}
break;
default:
break;
}
va_end(args);
return 0;
}
监控fd事件读取多个动态库队列
void monitor(std::vector<Config> configs) {
const char *func_name[FUNC_NUM] = {"open", "close", "read", "write", "ioctl"};
void *funcptr[MAX_SO_FILES_NUM][FUNC_NUM] = {0};
SharedLib *shareLib[MAX_SO_FILES_NUM] = {0};
int epoll_fd = epoll_create1(0);
std::map<int32_t, size_t> mp;
size_t module_cnt = configs.size();
for(size_t i = 0; i < module_cnt; i++) {
shareLib[i] = load_shared_lib(configs.at(i).so_path.c_str());
if (shareLib[i] == NULL) {
printf("load so: %s failed \n", configs.at(i).so_path.c_str());
continue;
}
for(size_t j = 0; j < FUNC_NUM; j++) {
funcptr[i][j] = get_function(shareLib[i], func_name[j]);
if (funcptr[i][j] == NULL) {
close_shared_lib(shareLib[i]);
printf("get func: %s failed from: %s \n", func_name[j], configs.at(i).so_path.c_str());
break;
}
}
int fd = ((openptr)funcptr[i][OPEN])(configs.at(i).name.c_str(), 0);
if (fd == -1) {
perror("Failed to open");
}
printf("openptr fd = %d \n", fd);
mp.insert(std::map<int32_t, size_t>::value_type(fd, i));
struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = fd;
epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &event);
int res = ((ioctlptr)funcptr[i][IOCTL])(fd, CONFIG_NEELINK, configs.at(i).json_path.c_str());
if (res == -1) {
perror("Failed to configure CONFIG_NEELINK");
}
printf("ioctlptr res = %d \n", res);
res = ((ioctlptr)funcptr[i][IOCTL])(fd, THREAD_START);
if (res == -1) {
perror("Failed to configure THREAD_START");
}
printf("ioctlptr res = %d \n", res);
}
struct epoll_event events[10];
while (true) {
int nfds = epoll_wait(epoll_fd, events, 10, -1);
for (int i = 0; i < nfds; ++i) {
int ready_fd = events[i].data.fd;
try {
((readptr)funcptr[mp.at(ready_fd)][READ])(0, nullptr, 0);
}
catch(const std::exception& e)
{
std::cerr << e.what() << '\n';
}
}
}
for(size_t i = 0; i < module_cnt; i++) {
close_shared_lib(shareLib[i]);
((closeptr)funcptr[i][CLOSE])(0);
}
}
static std::thread* thread = nullptr;
int run(std::string json_dir, std::string so_dir) {
std::vector<Config> configs = find_module_files(json_dir, so_dir);
for (const auto& config : configs) {
std::cout << "Name: " << config.name << "\n";
std::cout << "JSON file: " << config.json_path << "\n";
std::cout << "SO file: " << config.so_path << "\n";
}
thread = new std::thread(monitor, configs);
// thread->join();
return 0;
}