C++并发编程 -- 四十四
- 前言
- 一、通讯顺序进程 CSP 范型
- 1. 细节分解
- 1. lambda 封装类成员函数和 std::function< > 函数封装器
- 2. 模板和 dynamic_cast< >类型转换
- 二、ATM 示例
- 总结
前言
并发编程除了常规的使用锁, 或无锁结构实现某些并发计算, 还有没有其它实现形式?
有, 而且是非常成熟的并发编程范型, CSP 通讯顺序进程范型.
掌握这种编程, 可以令业务逻辑梳理, 变得及其方便简单.
如果没有学习过函数式编程, 可能初步了解起来稍有困难, 不过只要稍加思考, 一定会掌握.
一、通讯顺序进程 CSP 范型
CSP, 也就是所谓通讯顺序进程, 就是用几个线程各自完成各自的逻辑, 每个线程都是状态机, 它们之间没有真正的共享数据, 只是通过相互通讯消息, 完成自己的业务.
我们以银行 ATM 的运行逻辑, 来用图形梳理一下具体的运行.
除了主线程, 我们会准备三个线程, 这三个线程分别处理某一类业务:
ATM 线程, 负责整个取款流程的业务推进,
interfaceMachine 线程, 负责为客户返回信息, 比如 提示用户插卡, 提示输入 PIN, 提示输入提款及额度等.
bankMachine 负责存款数据的提取和更改.
程序运转开始, ATM 线程处于等待插卡状态, 此状态会发一个消息给接口线程, 让其发送插卡提示给用户,
然后用户接到提示, 进行插卡,
然后 ATM线程接到用户的信息,发送信息给接口, 让其发送信息给用户, 让其输入PIN
同时 ATM 状态转变为 获取PIN
用户接到消息, 输入PIN号
PIN号的消息又传递给ATM, 当 PIN 位数等于 4, ATM 将打包有用户卡号, PIN的消息发给银行后台
同时 ATM 的状态变为验证PIN
银行后台进行验证, 并发回验证结果消息给 ATM, ATM 根据结果进行处理
1. 细节分解
为了实现以上设计, 我们至少需要知道 lambda 封装类成员函数, std::function< > 函数封装器, 模板, dynamic_cast< >类型转换.
1. lambda 封装类成员函数和 std::function< > 函数封装器
在 C++ 11 引入 lambda 之后, 我们可以抛弃 C 语言所用的函数指针调用成员函数了.
在 atm 类中, 我们有一个用于确定函数状态的状态函数对象, 可以是 std::function 封装的函数对象, 也可以是类成员函数指针.
如果使用纯 C++, 而非 C/C++ 混合编程, 我建议不要用函数指针, 用 std::function 配合 lambda.
struct atm
{
// 类成员函数对象
std::function<void()> stateFuncType;
// 类成员函数指针, 可用但不建议
void (atm::*stateFuncTypePtr)();
// 提款执行
void processWithdrawal();
// 等待插卡
void waitingForCard();
// 获取 pin
void gettingPin()
// 正在核实 PIN 函数
void verifyingPin()
}
我们可以用如下方式改变 atm 对象状态:
函数 run( ) 是 atm 类公有函数, 会在主进程中被封装到 thread 线程, 通过如下语句:
stateFuncType = [this]() { waitingForCard(); };
将状态函数对象赋值, 更改对象状态, 并通过如下语句调用:
stateFuncType();
atm 类的 run( ) 函数的代码如下:
// ATM 后台运行
// 通过执行状态函数, 取得下一个状态函数, 再执行, 如此循环
// 同时在执行过程中发消息给硬件接口后台及银行后台
void run()
{
// 获取 waitingForCard 函数对象并试图运行
stateFuncType = [this]() { waitingForCard(); };
// 循环运行状态函数
try
{
while (true)
{
stateFuncType();
}
}
// 捕获 closeQueue 异常
catch (const messaging::closeQueue &)
{}
}
2. 模板和 dynamic_cast< >类型转换
为了通过通讯信息转换状态机的状态, 我们需要使用模板封装信息, 同时需要用模板封装函数进行调度, 而调度类通过调度函数又产生调度类, 这样一层层封装.
为了接收不同类型的信息, 我们将实现一个队列数据结构, 它封装的是信息基类的智能指针:
std::mutex mtx;
std::condition_variable conVar;
std::queue<std::shared_ptr<messageBase>,
std::list<std::shared_ptr<messaging::messageBase>>>
threadSafeQue;
这里使用 list 封装的 queue, 也可以用 deque. 而条件变量和互斥量的引入除了为了一点点线程安全, 更重要的是为了实现等待,
这不同于并发算法需要榨干效率, 多线程分工的各个状态机没有共享数据, 它们只是互相发送和接收数据, 发送端只发送, 不使用, 接收端只使用不发送, 完全独立.
消息数据由一个messageBase基类的派生类模板封装, 而 Msg 消息可以是各种类型, 这种结构非常重要, 配合后面封装函数的模板, 就可以根据消息类型进行相应的操作.
函数被封装在调度类和调度模板类中, 调度类和调度模板类的公有句柄函数 handle<Msg, Func>( Func function ) 会返回新的调度模板对象, 通过不停的调用句柄函数, 会产生一个调度对象链, 也就是状态机.
这是 ATM 对象验证 PIN 的状态函数, 每个调用的 handle<Msg>(Func func)函数, 都返回一个调度模板对象, 通常 func 函数对象都是一个以 Msg 类对象为参数的 lambda.
// 正在核实 PIN 函数
void verifyingPin()
{
// 正在进行等待
atmIncoming
.wait()
// 句柄, 返回 pin 核实调度模板
.handle<pinVerified>([&](pinVerified const & /*msg*/) {
// 状态函数指针赋值为 waitForAction 等待行动
stateFuncType = [this]() { waitForAction(); };
})
// 句柄, 返回 pin 错误调度模板
.handle<pinIncorrect>([&](pinIncorrect const & /*msg*/) {
// 硬件接口发送显示 Pin 错误信息
interfaceHardware.send(displayPinIncorrectMessage());
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回 card 错误调度模板
.handle<cardIncorrect>([&](cardIncorrect const & /*msg*/) {
// 硬件接口发送显示 card 错误信息
interfaceHardware.send(displayCardIncorrectMessage());
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回已按结束调度模板
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
});
}
由于 handle 函数返回调度模板对象的都是临时对象, 当生成最后一个调度模板, 则开始析构,
析构时会等待 ATM 队列的消息, 如果有消息, 则根据消息的类型判断究竟是那个 handle 对象的函数进行调用, 并在调用时发出信息给其它线程, 比如银行后台或硬件接口, 同时转换 ATM 状态函数.
为了执行正确的状态, 需要确定用哪个 handle 函数产生的对象, 我们需要利用 C++ 类型转换函数 dynamic_cast<wrappedMessage<Msg> *>( )
此处的 Msg 消息类型, 是handle 对应的类型, 而 msg 是 ATM 消息队列弹出的消息, 是智能指针封装的 baseMessage 基类, 当 msg.get( ) 返回的 baseMassage* 能转化为 wrappedMessage<Msg> * 时, wrapper 才能获取地址, 继而调用 handle 封装的函数 func(msg).
否则将获取 nullptr, 执行上一个 handle 的调度, 直至匹配到 Msg 类型相同的 handle, 执行其封装的函数.
if (wrappedMessage<Msg> *wrapper =
dynamic_cast<wrappedMessage<Msg> *>(msg.get()))
上述内容是 C++ 设计 CSP 的关键点, 也就是线程的状态转换细节,
状态转换依赖于信息类型, 信息类型决定了执行哪种调度, 从而引发何种状态的变化以及执行什么函数.
// 析构函数
~TemplateDispatcher() noexcept(false)
{
// 如果没有连接
if (!chained)
{
// 等待调度
waitAndDispatch();
}
}
private:
// 等待调度
void waitAndDispatch()
{
// 循环
while (true)
{
// 从队列获取信息
auto msg = threadSafeQuePtr->waitAndPop();
// 如果调度成功
if (dispatch(msg))
{
// 结束循环
break;
}
}
}
// 调度函数
auto dispatch(std::shared_ptr<messageBase> const &msg) -> bool
{
// 获取信息包装指针不为空
// 条件是 msg.get() 的指针可以转换为 wrappedMessage<Msg> *,
// Msg 是类型, 是 TemplateDispatcher 对象带的类型,
// 这个类型没有成员与之对应, 是构造的时候显性声明的,
// 通常是 handle<Msg> 带的,
// 如果不能安全转换为对应的 wrappedMessage<Msg>* 指针,
// 就只能得到 0 , 跳过条件分支, 调取上一个调度对象
if (wrappedMessage<Msg> *wrapper =
dynamic_cast<wrappedMessage<Msg> *>(msg.get()))
{
// 通过函数处理, 并传入从包装指针获取的信息
func(wrapper->getContents());
// 返回 true
return true;
}
// 否则从先前调度指针的调度函数进行调度
return prev->dispatch(msg);
}
我们已经介绍了 CSP 范型在 C++ 实践中的所有细节, 以下是完整的示例代码.
二、ATM 示例
#include <condition_variable>
#include <future>
#include <iostream>
#include <list>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <utility>
namespace messaging
{
// 信息类基类
struct messageBase
{
// 虚析构函数
virtual ~messageBase() = default;
};
// 包装信息
template <typename Msg>
struct wrappedMessage : messageBase
{
// 包裹信息
explicit wrappedMessage(Msg const &theContents)
: contents(theContents)
{}
// 获取信息
auto getContents() -> Msg
{
return contents;
}
private:
Msg contents;
};
// 信息存储队列
struct threadSafeQueue
{
// 推入队列
template <typename T>
void push(const T &msg)
{
// 加锁
const std::lock_guard<std::mutex> theLock(mtx);
// 包装信息, 封装到智能指针
threadSafeQue.push(std::make_shared<wrappedMessage<T>>(msg));
// 通知
conVar.notify_all();
}
// 等待弹出
auto waitAndPop() -> std::shared_ptr<messageBase>
{
// 加锁
std::unique_lock<std::mutex> uniLock(mtx);
// 等待通知, 队列不空则停止等待
conVar.wait(uniLock, [&] { return !threadSafeQue.empty(); });
// 取得结果
auto result = threadSafeQue.front();
// 弹出信息的智能指针
threadSafeQue.pop();
// 返回结果
return result;
}
private:
std::mutex mtx;
std::condition_variable conVar;
// 用 list 封装的队列
std::queue<std::shared_ptr<messageBase>,
std::list<std::shared_ptr<messaging::messageBase>>>
threadSafeQue;
};
// 信息发送类, 含有一个线程安全的队列指针
struct sender
{
// 默认构造
sender() = default;
// 拷贝构造
sender(const sender &rhs)
{
threadSafeQuePtr = rhs.threadSafeQuePtr;
}
// 拷贝赋值
auto operator=(const sender &rhs) -> sender & = default;
// 获取信息队列指针构造信息发送类对象
explicit sender(threadSafeQueue *quePtr)
: threadSafeQuePtr(quePtr)
{}
// 发送信息
template <typename Message>
void send(Message const &msg)
{
// 信息队列指针不空
// 则推入信息
if (threadSafeQuePtr)
{
threadSafeQuePtr->push(msg);
}
}
private:
// 线程安全队列指针
threadSafeQueue *threadSafeQuePtr = nullptr;
};
// 关闭队列
struct closeQueue
{
};
// 调度模板类
template <typename PreviousDispatcher, typename Msg, typename Func>
struct TemplateDispatcher
{
// 不可拷贝构造
TemplateDispatcher(const TemplateDispatcher &) = delete;
// 不可拷贝赋值
auto operator=(const TemplateDispatcher &) -> TemplateDispatcher & = delete;
// 移动构造
TemplateDispatcher(TemplateDispatcher &&rhs) noexcept
: threadSafeQuePtr(rhs.threadSafeQuePtr)
, prev(rhs.prev)
, func(std::move(rhs.func))
, chained(rhs.chained)
{
rhs.chained = true;
}
// 构造, 通过线程安全队列指针, 先前调度指针, 函数指针
TemplateDispatcher(threadSafeQueue *queue, PreviousDispatcher *prevPtr,
Func &&OtherFunc)
: threadSafeQuePtr(queue)
, prev(prevPtr)
, func(std::forward<Func>(OtherFunc))
{
prevPtr->chained = true;
}
// 句柄, 返回调度模板, 由此对象的 threadSafeQuePtr 指针, this 指针,
// otherFunc 构造
template <typename OtherMsg, typename OtherFunc>
auto handle(OtherFunc &&otherFunc)
-> TemplateDispatcher<TemplateDispatcher, OtherMsg, OtherFunc>
{
return TemplateDispatcher<TemplateDispatcher, OtherMsg, OtherFunc>(
threadSafeQuePtr, this, std::forward<OtherFunc>(otherFunc));
}
// 析构函数
~TemplateDispatcher() noexcept(false)
{
// 如果没有连接
if (!chained)
{
// 等待调度
waitAndDispatch();
}
}
private:
// 等待调度
void waitAndDispatch()
{
// 循环
while (true)
{
// 从队列获取信息
auto msg = threadSafeQuePtr->waitAndPop();
// 如果调度成功
if (dispatch(msg))
{
// 结束循环
break;
}
}
}
// 调度函数
auto dispatch(std::shared_ptr<messageBase> const &msg) -> bool
{
// 获取信息包装指针不为空
// 条件是 msg.get() 的指针可以转换为 wrappedMessage<Msg> *,
// Msg 是类型, 是 TemplateDispatcher 对象带的类型,
// 这个类型没有成员与之对应, 是构造的时候显性声明的,
// 通常是 handle<Msg> 带的,
// 如果不能安全转换为对应的 wrappedMessage<Msg>* 指针,
// 就只能得到 0 , 跳过条件分支, 调取上一个调度对象
if (wrappedMessage<Msg> *wrapper =
dynamic_cast<wrappedMessage<Msg> *>(msg.get()))
{
// 通过函数处理, 并传入从包装指针获取的信息
func(wrapper->getContents());
// 返回 true
return true;
}
// 否则从先前调度指针的调度函数进行调度
return prev->dispatch(msg);
}
// 线程安全队列
threadSafeQueue *threadSafeQuePtr;
// 先前调度指针
PreviousDispatcher *prev;
// 函数指针
Func func;
// 是否连接
bool chained = false;
// 与所有种类调度模板是友元函数
template <typename Dispatcher, typename OtherMsg, typename OtherFunc>
friend struct TemplateDispatcher;
};
// 调度类, 含有一个线程安全队列指针, 一个是否连接的布尔变量
struct dispatcher
{
// 不可拷贝构造
dispatcher(const dispatcher &) = delete;
// 不可拷贝赋值
auto operator=(const dispatcher &) -> dispatcher & = delete;
// 移动构造
dispatcher(dispatcher &&rhs) noexcept
: threadSafeQuePtr(rhs.threadSafeQuePtr)
, chained(rhs.chained)
{
rhs.chained = true;
}
// 通过传入队列指针构造
explicit dispatcher(threadSafeQueue *queue)
: threadSafeQuePtr(queue)
{}
// 句柄, 返回调度模板, 由此对象的队列指针, this 指针, func 函数构造
template <typename Message, typename Func>
auto handle(Func &&func) -> TemplateDispatcher<dispatcher, Message, Func>
{
return TemplateDispatcher<dispatcher, Message, Func>(
threadSafeQuePtr, this, std::forward<Func>(func));
}
// 析构
~dispatcher() noexcept(false)
{
// 如果没有连接
if (!chained)
{
// 等待调度
waitAndDispatch();
}
}
private:
// 等待调度
void waitAndDispatch()
{
// 循环
while (true)
{
// 获取信息指针
auto msg = threadSafeQuePtr->waitAndPop();
// 传入信息指针, 并进行调度
dispatch(msg);
}
}
// 调度
static auto dispatch(std::shared_ptr<messageBase> const &msg) -> bool
{
// 如果关闭队列信息指针不为空,
// 也就是转换为对应的 wrappedMessage<closeQueue> * 指针
if (dynamic_cast<wrappedMessage<closeQueue> *>(msg.get()) != nullptr)
{
// 抛出关闭队列异常
throw closeQueue();
}
// 返回 false
return false;
}
// 线程安全队列指针
threadSafeQueue *threadSafeQuePtr = nullptr;
// 是否连接
bool chained = false;
// 所有调度模板都是友元类
template <typename Dispatcher, typename Msg, typename Func>
friend struct TemplateDispatcher;
};
// 接收类, 含有一个线程安全队列
struct receiver
{
// 默认构造
receiver() = default;
// 返回 sender 类, 包裹此 receiver 对象的线程安全队列的指针
auto toSender() -> sender
{
return sender(&threadSafeQue);
}
// 等待, 返回 dispatcher 类对象
// 包裹此 receiver 对象的线程安全队列的指针
auto wait() -> dispatcher
{
return dispatcher(&threadSafeQue);
}
private:
// 线程安全的队列
threadSafeQueue threadSafeQue;
};
} // namespace messaging
// 提款类,含有账户名字符串, 存款总额, sender 对象
struct withdraw
{
// 构造函数, 含有账户名字符串, 存款总额, sender 对象
withdraw(std::string accnt, unsigned amnt, const messaging::sender &atmQue)
: account(std::move(accnt))
, amount(amnt)
, atmQueue(atmQue)
{}
auto getAccount() const -> std::string
{
return account;
}
// 返回存款总额
auto getAmount() const -> unsigned
{
return amount;
}
// 返回 sender 对象
auto getAtmQueue() const -> messaging::sender
{
return atmQueue;
}
private:
// 账户名
std::string account;
// 账户金额
unsigned amount;
// sender atm队列
mutable messaging::sender atmQueue;
};
// 提款成功类
struct withdrawOK
{
};
// 拒绝提款类
struct withdrawDenied
{
};
// 提款结束类, 含有账户名, 存款总额
struct cancelWithdrawal
{
// 构造函数
cancelWithdrawal(std::string accnt, unsigned amnt)
: account(std::move(accnt))
, amount(amnt)
{}
[[nodiscard]] auto getAccount() const -> std::string
{
return account;
}
// 获取存款总额
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
std::string account;
unsigned amount;
};
// 提款处理类, 含有账户名, 存款总额
struct withdrawalProcessed
{
withdrawalProcessed(std::string accnt, unsigned amnt)
: account(std::move(accnt))
, amount(amnt)
{}
[[nodiscard]] auto getAccount() const -> std::string
{
return account;
}
// 获取存款总额
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
std::string account;
unsigned amount;
};
// 卡插入类, 含有账户名字符串
struct cardInserted
{
// 构造函数, 传入账户字符串
explicit cardInserted(std::string accnt)
: account(std::move(accnt))
{}
// 返回账户名字符串
[[nodiscard]] auto getAccount() const -> std::string
{
return account;
}
private:
std::string account;
};
// 已按数字类, 含有一个字符表示的数字
struct digitPressed
{
// 构造, 传入字符
explicit digitPressed(char dgt)
: digit(dgt)
{}
// 获取字符数字
[[nodiscard]] auto getDigit() const -> char
{
return digit;
}
private:
char digit;
};
// 清除最后按键类
struct clearLastPressed
{
};
// 弹出卡类
struct ejectCard
{
};
// 已按提款类, 含有存款总额
struct withdrawPressed
{
// 构造函数, 传入存款总额
explicit withdrawPressed(unsigned amnt)
: amount(amnt)
{}
// 获取存款总额
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
unsigned amount;
};
// 已按结束类
struct cancelPressed
{
};
// 资金转移类, 含有存款总额
struct issueMoney
{
// 构造函数, 传入存款总额
explicit issueMoney(unsigned amnt)
: amount(amnt)
{}
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
unsigned amount;
};
// 核实PIN类, 含有账户名, pin 字符串, sender类atm队列
struct verifyPin
{
// 构造函数, 传入账户名, pin, sender类atm队列
verifyPin(std::string accnt, std::string rhsPin,
const messaging::sender &atmQue)
: account(std::move(accnt))
, pin(std::move(rhsPin))
, atmQueue(atmQue)
{}
auto getAccount() const -> std::string
{
return account;
}
// 获取 pin
auto getPin() const -> std::string
{
return pin;
}
// 获取 sender 类atm队列
auto getAtmQueue() const -> messaging::sender &
{
return atmQueue;
}
private:
std::string account;
std::string pin;
mutable messaging::sender atmQueue;
};
// PIN已核实类
struct pinVerified
{
};
// PIN错误类
struct pinIncorrect
{
};
// Card错误类
struct cardIncorrect
{
};
// 显示PIN类
struct displayEnterPin
{
};
// 显示输入卡类
struct displayEnterCard
{
};
// 显示资金不足类
struct displayInsufficientFunds
{
};
// 显示提款完成类
struct displayWithdrawalCancelled
{
};
// 显示PIN错误消息类
struct displayPinIncorrectMessage
{
};
// 显示 Card 错误消息类
struct displayCardIncorrectMessage
{
};
// 显示提款选项类
struct displayWithdrawalOptions
{
};
// 获取余额类, 含有账户及 sender类atm队列
struct getBalance
{
// 构造函数, 传入账户及sender类atm队列
getBalance(std::string accnt, const messaging::sender &atmQue)
: account(std::move(accnt))
, atmQueue(atmQue)
{}
// 获取sender类atm队列
auto getAtmQueue() const -> messaging::sender &
{
return atmQueue;
}
auto getAccount() const -> std::string
{
return account;
}
private:
std::string account;
mutable messaging::sender atmQueue;
};
// 余额类, 含有存款总额
struct balance
{
// 构造函数, 传入存款总额
explicit balance(unsigned amnt)
: amount(amnt)
{}
// 获取存款总额
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
unsigned amount;
};
// 显示余额类, 含有存款总额
struct displayBalance
{
// 构造函数, 传入存款总额
explicit displayBalance(unsigned amnt)
: amount(amnt)
{}
[[nodiscard]] auto getAmount() const -> unsigned
{
return amount;
}
private:
unsigned amount;
};
// 已按余额类
struct balancePressed
{
};
// ATM 状态机,
// 含有 receiver 对象 atmIncoming 正在进行中的,
// sender 对象 bank 银行,
// sender 对象 interfaceHardware 接口硬件,
// atm 对象的状态仿函数 stateFuncType,
// 账户名,
// pin,
// 提现额 withdrawalAmount
struct atm
{
// 默认构造
atm() = default;
// 不可拷贝构造, 不可拷贝赋值
atm(const atm &) = delete;
auto operator=(const atm &) -> atm & = delete;
// 通过传入两个 sender 对象进行构造
atm(const messaging::sender &theBank, const messaging::sender &intfcHrdwr)
: bank(theBank)
, interfaceHardware(intfcHrdwr)
{}
// 完成, 向 atmIncoming 队列推送 closeQueue() 消息
void done()
{
getSender().send(messaging::closeQueue());
}
// ATM 后台运行
// 通过执行状态函数, 取得下一个状态函数, 再执行, 如此循环
// 同时在执行过程中发消息给硬件接口后台及银行后台
void run()
{
// 获取 waitingForCard 函数指针并试图运行
stateFuncType = [this]() { waitingForCard(); };
// 循环运行状态函数
try
{
while (true)
{
stateFuncType();
}
}
// 捕获 closeQueue 异常
catch (const messaging::closeQueue &)
{}
}
// 讲 receiver 对象 atmIncoming 转为 sender 对象
auto getSender() -> messaging::sender
{
return atmIncoming.toSender();
}
private:
// receiver 对象 atmIncoming 正在进行中的
messaging::receiver atmIncoming;
// sender 对象 bank 银行
messaging::sender bank;
// sender 对象 interfaceHardware 硬件接口
messaging::sender interfaceHardware;
// 类成员函数对象
std::function<void()> stateFuncType;
// 账户名
std::string account;
// pin
std::string pin;
// 提款总额
unsigned withdrawalAmount = 0;
// 提款执行
void processWithdrawal()
{
// 正在等待 返回调度对象
atmIncoming
.wait()
// 句柄, 返回调度模板
.handle<withdrawOK>([&](withdrawOK const & /*msg*/) {
// 硬件接口发送信息: 资金转移类(提款总额)
interfaceHardware.send(issueMoney(withdrawalAmount));
// 银行发送信息: 提款过程类(账户名, 提款总额)
bank.send(withdrawalProcessed(account, withdrawalAmount));
// atm 状态赋值为 doneProcessing 函数指针
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回提款失败调度模板
.handle<withdrawDenied>([&](withdrawDenied const & /*msg*/) {
// 硬件接口发送显示资金不足类对象
interfaceHardware.send(displayInsufficientFunds());
// atm 状态赋值为 doneProcessing 函数指针
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回结束按键调度模板
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 银行发送消息: 结束提款
bank.send(cancelWithdrawal(account, withdrawalAmount));
// 硬件接口发送消息: 显示提款结束
interfaceHardware.send(displayWithdrawalCancelled());
// atm 状态赋值为 doneProcessing 函数指针
stateFuncType = [this]() { doneProcessing(); };
});
}
// 余额过程
void processBalance()
{
// 正在进行中等待, 返回调度
atmIncoming
.wait()
// 句柄, 返回余额调度模板
.handle<balance>([&](balance const &msg) {
// 硬件接口发送: 显示余额信息
interfaceHardware.send(displayBalance(msg.getAmount()));
// 状态函数指针赋值位等待行动
stateFuncType = [this]() { waitForAction(); };
})
// 句柄, 返回结束已按调度模板
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 状态函数指针赋值为 doneProcessing
stateFuncType = [this]() { doneProcessing(); };
});
}
// 等待行动
void waitForAction()
{
// 硬件接口发送显示提款选项
interfaceHardware.send(displayWithdrawalOptions());
// 正在行动中等待, 返回调度
atmIncoming
.wait()
// 句柄, 返回已按提款对象调度模板
.handle<withdrawPressed>([&](withdrawPressed const &msg) {
// 提款总额赋值
withdrawalAmount = msg.getAmount();
// 银行发送提款
bank.send(
withdraw(account, msg.getAmount(), atmIncoming.toSender()));
// 状态函数指针赋值为 processWhithdrawal 处理提款
stateFuncType = [this]() { processWithdrawal(); };
})
// 句柄, 返回已按余额类
.handle<balancePressed>([&](balancePressed const & /*msg*/) {
// 银行发送, 获取余额
bank.send(getBalance(account, atmIncoming.toSender()));
// 状态函数指针赋值 processBalance 处理余额
stateFuncType = [this]() { processBalance(); };
})
// 句柄, 返回结束已按
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 状态函数指针赋值 doneProcessing 完成处理
stateFuncType = [this]() { doneProcessing(); };
});
}
// 正在核实 PIN 函数
void verifyingPin()
{
// 正在进行等待
atmIncoming
.wait()
// 句柄, 返回 pin 核实调度模板
.handle<pinVerified>([&](pinVerified const & /*msg*/) {
// 状态函数指针赋值为 waitForAction 等待行动
stateFuncType = [this]() { waitForAction(); };
})
// 句柄, 返回 pin 错误调度模板
.handle<pinIncorrect>([&](pinIncorrect const & /*msg*/) {
// 硬件接口发送显示 Pin 错误信息
interfaceHardware.send(displayPinIncorrectMessage());
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回 card 错误调度模板
.handle<cardIncorrect>([&](cardIncorrect const & /*msg*/) {
// 硬件接口发送显示 card 错误信息
interfaceHardware.send(displayCardIncorrectMessage());
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
})
// 句柄, 返回已按结束调度模板
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 状态函数指针赋值为完成处理函数
stateFuncType = [this]() { doneProcessing(); };
});
}
// 获取 pin
void gettingPin()
{
// 正在进行等待返回调度
atmIncoming
.wait()
// 句柄, 返回已按数字调度模板
.handle<digitPressed>([&](digitPressed const &msg) {
// pin 长度为 4
unsigned const pinLength = 4;
// pin获取数字
pin += msg.getDigit();
// 如果 pin 长度等于 4
if (pin.length() == pinLength)
{
// 银行发送验证 pin 消息
bank.send(verifyPin(account, pin, atmIncoming.toSender()));
// 状态函数指针赋值为核实 PIN
stateFuncType = [this]() { verifyingPin(); };
}
})
// 句柄, 返回清除最后已按调度模板
.handle<clearLastPressed>([&](clearLastPressed const & /*msg*/) {
// 如果 pin 不空
if (!pin.empty())
{
// 弹出 pin 最后字母
pin.pop_back();
}
})
// 句柄, 返回结束已按调度模板
.handle<cancelPressed>([&](cancelPressed const & /*msg*/) {
// 状态函数指针赋值为结束处理
stateFuncType = [this]() { doneProcessing(); };
});
}
// 等待插卡
void waitingForCard()
{
// 硬件接口发送显示插卡消息
interfaceHardware.send(displayEnterCard());
// 正在进行等待, 返回调度
atmIncoming
.wait()
// 句柄, 返回已插卡调度模板
.handle<cardInserted>([&](cardInserted const &msg) {
// 获取用户名
account = msg.getAccount();
// 初始化 pin
pin = "";
// 硬件接口发送显示输入 pin
interfaceHardware.send(displayEnterPin());
// 状态函数指针赋值为正在获取 pin 函数
stateFuncType = [this]() { gettingPin(); };
});
}
// 完成处理函数
void doneProcessing()
{
// 硬件接口发送弹出卡片信息
interfaceHardware.send(ejectCard());
// atm 状态赋值为等待插卡函数
stateFuncType = [this]() { waitingForCard(); };
}
};
// 银行机器类,
// 含有 receiver 对象 bankMachineIncoming 正在进行中的,
// 含有 unsigned int 余额
struct bankMachine
{
bankMachine() = default;
// 已完成
void done()
{
// 发送结束队列消息
getSender().send(messaging::closeQueue());
}
// 银行后台运行
// 获取 ATM 后台发出的消息
// 根据消息, 进行账户核对, 余额核对, 余额变动, 并发结果消息给 ATM 后台
void run()
{
try
{
// 循环
while (true)
{
// 正在进行等待, 返回调度
bankMachineIncoming
.wait()
// 句柄, 返回验证 Pin 调度模板
.handle<verifyPin>([&](const verifyPin &msg) {
try
{
// 根据账户取得 Pin, 如果不存在账户抛出异常
const std::string Pin =
accountMap.at(msg.getAccount()).first;
if (msg.getPin() == Pin)
{
// 发送 Pin 已核实
msg.getAtmQueue().send(pinVerified());
}
// 否则
else
{
// 发送 Pin 错误
msg.getAtmQueue().send(pinIncorrect());
}
}
catch (...)
{
// 发送 CardNumber 错误
msg.getAtmQueue().send(cardIncorrect());
}
})
// 句柄, 返回提款调度模板
.handle<withdraw>([&](const withdraw &msg) {
balance = accountMap.at(msg.getAccount()).second;
// 如果余额大于等于 msg 获取的提款金额
if (balance >= msg.getAmount())
{
// 发送提款 OK
msg.getAtmQueue().send(withdrawOK());
// 余额等于余额减去提款额
balance -= msg.getAmount();
accountMap.at(msg.getAccount()).second -=
msg.getAmount();
}
// 否则
else
{
// 发送拒绝提款信息
msg.getAtmQueue().send(withdrawDenied());
}
})
// 句柄, 返回获取余额
.handle<getBalance>([&](const getBalance &msg) {
// 获取余额
balance = accountMap.at(msg.getAccount()).second;
// 发送余额信息
msg.getAtmQueue().send(::balance(balance));
})
// 句柄, 返回提款处理完成调度模板
.handle<withdrawalProcessed>(
[&](const withdrawalProcessed &msg) {})
// 句柄, 返回提款结束调度模板
.handle<cancelWithdrawal>(
[&](const cancelWithdrawal &msg) {});
}
}
// 捕获结束队列
catch (const messaging::closeQueue &)
{}
}
// 获取 sender
auto getSender() -> messaging::sender
{
return bankMachineIncoming.toSender();
}
private:
// receiver 类对象, 正在进行中
messaging::receiver bankMachineIncoming;
// 余额
unsigned balance = 0;
// 数据库map
std::map<std::string, std::pair<std::string, unsigned>> accountMap{
{"abc123", {"1937", 199}}, {"abc456", {"1938", 99}}};
};
// 机器接口类,
// 含有 receiver 对象 interfaceMachineIncoming 正在进行中的,
// 含有 iomtx 互斥锁
struct interfaceMachine
{
interfaceMachine() = default;
// 完成
void done()
{
// 发送结束队列
getSender().send(messaging::closeQueue());
}
// 获取 sender
auto getSender() -> messaging::sender
{
// 返回 interfaceMachineIncoming 转换的 sender
return interfaceMachineIncoming.toSender();
}
// 接口设备后台运行
// 负责向用户发出交互信息
void run()
{
try
{
// 循环
while (true)
{
// 正在进行中等待, 返回调度
interfaceMachineIncoming
.wait()
// 句柄, 返回资金转移调度模板
.handle<issueMoney>([&](const issueMoney &msg) {
{
// 输出转移中
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Issuing " << msg.getAmount()
<< std::endl;
}
})
// 句柄, 返回显示资金不足调度模板
.handle<displayInsufficientFunds>(
[&](const displayInsufficientFunds & /*msg*/) {
{
// 输出资金不足
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Insufficient funds" << std::endl;
}
})
// 句柄, 返回输入 Pin 调度模板
.handle<displayEnterPin>(
[&](const displayEnterPin & /*msg*/) {
{
// 输出输入PIN
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Please enter your PIN (0-9)"
<< std::endl;
}
})
// 句柄, 返回显示插入卡调度模板
.handle<displayEnterCard>(
[&](const displayEnterCard & /*msg*/) {
{
// 输出请插入卡
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Please enter your card (I)"
<< std::endl;
}
})
// 句柄, 返回显示余额调度模板
.handle<displayBalance>([&](const displayBalance &msg) {
{
// 输出你账户余额是:
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "The balance of your account is "
<< msg.getAmount() << std::endl;
}
})
// 句柄, 返回显示提款选项调度模板
.handle<displayWithdrawalOptions>(
[&](const displayWithdrawalOptions & /*msg*/) {
{
// 输出提款 50, 显示余额, 结束
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Withdraw? (w)" << std::endl;
std::cout << "Display Balance? (b)"
<< std::endl;
std::cout << "Cancel? (c)" << std::endl;
}
})
// 句柄, 返回显示提款结束调度模板
.handle<displayWithdrawalCancelled>(
[&](const displayWithdrawalCancelled & /*msg*/) {
{
// 输出提款结束
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Withdrawal cancelled"
<< std::endl;
}
})
// 句柄, 返回显示 PIN 错误信息调度模板
.handle<displayPinIncorrectMessage>(
[&](const displayPinIncorrectMessage & /*msg*/) {
{
// 输出 PIN 错误
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "PIN incorrect" << std::endl;
}
})
// 句柄, 返回显示 Card 错误信息调度模板
.handle<displayCardIncorrectMessage>(
[&](const displayCardIncorrectMessage & /*msg*/) {
{
// 输出 PIN 错误
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "CardNumber incorrect"
<< std::endl;
}
})
// 句柄, 返回弹出卡调度模板
.handle<ejectCard>([&](const ejectCard & /*msg*/) {
{
// 输出弹出卡
std::lock_guard<std::mutex> const lock(iomtx);
std::cout << "Ejecting card" << std::endl;
}
});
}
}
// 捕获队列结束信息
catch (messaging::closeQueue &)
{}
}
private:
// 正在进行中 receiver 对象
messaging::receiver interfaceMachineIncoming;
// io 互斥锁
std::mutex iomtx;
};
auto main() -> int
{
// 银行机器
bankMachine bank;
// 接口机器
interfaceMachine interfaceHardware;
// ATM机器
atm machine(bank.getSender(), interfaceHardware.getSender());
// 封装银行后台线程
std::thread bankThread([&bank]() { bank.run(); });
// 封装接口线程
std::thread ifThread([&interfaceHardware]() { interfaceHardware.run(); });
// 封装 atm 线程
std::thread atmThread([&machine]() { machine.run(); });
// ATM 消息队列
messaging::sender atmQueue(machine.getSender());
// 是否已按退出
bool quitPressed = false;
// 如果没有按退出
while (!quitPressed)
{
// 获取字符
char const chr = static_cast<char>(getchar());
// 进入条件分支
switch (chr)
{
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
// 发送已按数字
atmQueue.send(digitPressed(chr));
break;
case 'd':
atmQueue.send(clearLastPressed());
break;
case 'b':
// 发送已按余额
atmQueue.send(balancePressed());
break;
case 'w':
// 发送已按提款
[&atmQueue]() {
std::cout << "enter withdraw money:" << std::endl;
int withdrawMoney = 0;
std::cin >> withdrawMoney;
atmQueue.send(withdrawPressed(withdrawMoney));
}();
break;
case 'c':
// 发送已按结束
atmQueue.send(cancelPressed());
break;
case 'q':
// 设定已按结束
quitPressed = true;
break;
case 'i':
// 发送已插卡
[&atmQueue]() {
std::cout << "enter account:" << std::endl;
std::string accntStr;
std::cin >> accntStr;
atmQueue.send(cardInserted(accntStr));
}();
break;
}
}
bank.done();
machine.done();
interfaceHardware.done();
atmThread.join();
bankThread.join();
ifThread.join();
return 0;
}
总结
用 C++ 实现 CSP 范型, 以利用并发思想简化业务逻辑, 初步看比较复杂, 然而一旦掌握, 会发现, 这完全是一个成熟的套路, 框架完全不变, 只要把逻辑换成你的业务逻辑就 OK 了.
而且难点并不多, 甚至不用掌握函数式编程, 只需要知道 C++ 的 lambda 的使用方法及 std::function< >封装, 配合模板, 基类和派生类的类型转换就完全能够理解.