c++异步编程之std::promise和std::future
- 1.std::future获取std::asnyc结果
- 2.模拟一个异步函数接口
- i.模拟一个客户端类包含异步请求接口
- ii.调用异步接口获取结果
c11以后标准库提供了thread,说起异步可能会第一时间想起thread,线程确实好东西,不过使用线程需要很注意数据同步问题。当然了线程也不能把线程执行结果返回回来,一般都是需要使用全局变量来做数据同步。今天要探讨的是另外一些异步方式如:std::async 、std::packaged_task ,配合以std::promise和std::future来获取执行结果,使用起来有很不错的疗效。
1.std::future获取std::asnyc结果
// 模拟耗时任务
std::string sendRequest()
{
// 休眠模拟耗时操作3s
std::this_thread::sleep_for(std::chrono::seconds(3));
return "good boy";
}
int main(int argc, char* argv[])
{
// 启动异步任务并获取future
std::future<std::string> result_future = std::async(std::launch::async, sendRequest);
std::cout << "doing something ...\n";
// 获取结果,调用get会阻塞直到结果可用
std::string result = result_future.get();
std::cout << "get a result: " << result << std::endl;
char ch = 0;
while (std::cin >> ch)
{
if (ch == 'q')
{
std::cout << "pressed 'q' exit ...\n";
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return 0;
}
也可以先调用wait等待结果可用再去获取结果, 这里调用get相当于先调用了wait和get结果获取。
2.模拟一个异步函数接口
i.模拟一个客户端类包含异步请求接口
namespace AClient
{
struct Position
{
float x;
float y;
float z;
};
struct Request
{
// 唯一的一个id
uint64_t uid;
std::string params;
};
struct Response
{
int status;
Position pos;
};
using SharedRequest = std::shared_ptr<Request>;
using SharedResponse = std::shared_ptr<Response>;
using Promise = std::promise<SharedResponse>;
using SharePromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<SharedResponse>;
using CallbackType = std::function<void(SharedFuture)>;
class AsyncRequestClient
{
public:
AsyncRequestClient() = default;
virtual ~AsyncRequestClient() = default;
static inline char* getLocalTime()
{
static char local[26] = { 0 };
SYSTEMTIME wtm;
struct tm tm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm.tm_isdst = -1;
strftime(local, 26, "%Y-%m-%d %H:%M:%S", &tm);
return local;
}
void sendRequest(SharedRequest request)
{
std::thread req_thread([request, this] {
std::this_thread::sleep_for(std::chrono::seconds(3));
// 假设这里通过远程调用解析得到了数据
SharedResponse _response = std::make_shared<Response>();
_response->status = 1;
_response->pos.x = 1.234567;
_response->pos.y = 3.141592;
_response->pos.z = 1.0f;
std::cout << "Time:[" << getLocalTime() << "] tid:" << std::this_thread::get_id() << " ,request id:" << request->uid << " get resonse." << std::endl;
std::unique_lock<std::mutex> lck(mutex_);
if (this->request_list_.count(request->uid) == 0)
{
std::cout << "Invalid request id, return." << std::endl;
return;
}
auto req_tuple = this->request_list_[request->uid];
auto promise = std::get<0>(req_tuple);
auto callback = std::get<1>(req_tuple);
auto future = std::get<2>(req_tuple);
this->request_list_.erase(request->uid);
lck.unlock();
promise->set_value(_response);
callback(future);
});
req_thread.detach();
}
template<typename CallbackT>
SharedFuture asyncRequest(SharedRequest request, CallbackT&& cb)
{
std::lock_guard<std::mutex> lock(mutex_);
std::cout << "Time:[" << getLocalTime() << "] tid:" << std::this_thread::get_id() << request->uid << " ,send async request." << std::endl;
SharePromise promise = std::make_shared<Promise>();
SharedFuture future(promise->get_future());
this->request_list_[request->uid] = std::make_tuple(promise, std::forward<CallbackType>(cb), future);
// 此处只简单的用一个线程来模拟调用服务过程(实际可能是TCP、IPC、RPC)
sendRequest(request);
return future;
}
private:
std::map<uint64_t, std::tuple<SharePromise, CallbackType, SharedFuture>> request_list_;
std::mutex mutex_;
};
}
ii.调用异步接口获取结果
int main(int argc, char* argv[])
{
auto request = std::make_shared<AClient::Request>();
request->uid = 10086;
request->params = "{\"timestamp\": 1678222981}";
// 开始异步调用
AClient::AsyncRequestClient client;
auto future_result = client.asyncRequest(request, [](AClient::SharedFuture future) {
auto response = future.get();
//std::cout << "Time:[" << AClient::AsyncRequestClient::getLocalTime() << "] tid:" << std::this_thread::get_id() << ",resonse status:" << response->status
// << ",x:" << response->pos.x << ",y:" << response->pos.y << ",z:" << response->pos.z << std::endl;
});
// 等待结果
future_result.wait();
std::cout << "Time:[" << ASYClient::AsyncRequestClient::getLocalTime() << "] tid:" << std::this_thread::get_id() << " ,wait a result " << "status:" << future_result.get()->status
<< ",x: " << future_result.get()->pos.x << ",y: " << future_result.get()->pos.y << ",z: " << future_result.get()->pos.z << std::endl;
char ch = 0;
while (std::cin >> ch)
{
if (ch == 'q')
{
std::cout << "pressed 'q' exit ...\n";
break;
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
return 0;
}
可以看到我们的调用过程和回调过程是在2个不同的线程, 耗时操作同样是用3秒休眠来代替。显而易见是调用3秒后得到了结果。上面例程获取结果有2种方式:1.通过future 等待结果 2.通过回调函数获取。
作者:费码程序猿
欢迎技术交流:QQ:255895056
转载请注明出处,如有不当欢迎指正