В разделе 4.1 мы познакомились с каркасом передачи сообщений между потоками, продемонстрировав его на примере программы банкомата. В этом приложении приводится полный код примера, включая и код каркаса передачи сообщений.
В листинге С.1 показан код очереди сообщений. Сообщения хранятся в списке и представлены указателями на базовый класс. Сообщения конкретного типа обрабатываются шаблонным классом, производным от этого базового класса. В момент помещения сообщения в очередь конструируется подходящий экземпляр обертывающего класса и сохраняется указатель на него; операция извлечения возвращает именно этот указатель. Поскольку в классе
message_base
нет функций-членов, извлекающий поток должен привести указатель к нужному типу wrapped_message
, прежде чем сможет получить хранящееся сообщение.
Листинг С.1. Простая очередь сообщений
#include
#include
#include
#include
namespace messaging
{ │
Базовый класс
struct message_base {←┘
элементов очереди
virtual ~message_base() {}
};
template │
Для каждого типа сообщений
struct wrapped_message:←┘
имеется специализация
message_base {
Msg contents;
explicit wrapped_message(Msg const& contents_):
contents(contents_) {}
};
│
Наша очередь
class queue←┘
сообщений
{ │
В настоящей
std::mutex m; │
очереди хранят-
std::condition_variable с; │
ся указатели на
std::queue > q;←┘
message_base
public:
template │
Обернуть добав-
void push(T const& msg) │
ленное сообще-
{ │
ние и сохранить
std::lock_guard lk(m);│
указатель
q.push( ←┘
std::make_shared >(msg));
с.notify_all();
}
std::shared_ptr wait_and_pop()│
Блокирует до
{ │
появления в
std::unique_lock lk(m); │
очереди хотя бы
c.wait(lk, [&]{ return !q.empty(); }); ←┘
одного элемента
auto res = q.front();
q.pop();
return res;
}
};
}
Отправкой сообщений занимается объект класса
sender
, показанного в листинге С.2. Это не более чем тонкая обертка вокруг очереди сообщений, которая позволяет только добавлять сообщения. При копировании экземпляров sender
копируется только указатель на очередь, а не сама очередь.
Листинг С.2. Класс
sender
namespace messaging {
class sender {│
sender обертывает указатель
queue* q; ←┘
на очередь
public: │
У сконструированного по умолчанию
sender() :←┘
sender'a нет очереди
q(nullptr) {}
│
Разрешаем конструирование
explicit sender(queue* q_):←┘
из указателя на очередь
q(q_) {}
template
void send(Message const& msg) {
if (q){ │
Отправка сообщения сводится
q->push(msg);←┘
к помещению его в очередь
}
}
};
}
Получение сообщений несколько сложнее. Мы не только должны дождаться появления сообщения в очереди, но еще и проверить, совпадает ли его тип с одним из известных нам типов, и вызвать соответствующий обработчик. Эта процедура начинается в классе
receiver
, показанном в листинге ниже.
Листинг С.3. Класс
receiver
namespace messaging {
class receiver {
queue q; ←
receiver владеет очередью
public: │
Разрешить неявное преобразование в объект
operator sender() {←┘
sender, ссылающийся на эту очередь
return sender(&q);
}
│
При обращении к функции ожидания
dispatcher wait() {←┘
очереди создается диспетчер
return dispatcher(&q);
}
};
}
Если
sender
только ссылается на очередь сообщений, то receiver
ей владеет. Мы можем получить объект sender
, ссылающийся на очередь, воспользовавшись неявным преобразованием. Процедура диспетчеризации сообщения начинается с обращения к функции wait()
. При этом создается объект dispatcher
, ссылающийся на очередь, которой владеет receiver
. Класс dispatcher
показан в следующем листинге; как видите, содержательная работа производится в его деструкторе. В данном случае работа состоит в ожидании сообщения и его диспетчеризации.
Листинг С.4. Класс
dispatcher
namespace messaging {
class close_queue {}; ←
Сообщение о закрытии очереди
class dispatcher {
queue* q; │
Экземпляры
bool chained; │
диспетчера нельзя
│
копировать
dispatcher(dispatcher const&)=delete;←┘
dispatcher& operator=(dispatcher const&)=delete;
template<
typename Dispatcher,│
Разрешить экземплярам
typename Msg, │
TemplateDispatcher доступ
typename Func> ←┘
к закрытым частям класса
friend class TemplateDispatcher;
void wait_and_dispatch()
{
(1) В цикле ждем и диспетчеризуем
for (;;) {←┘
сообщения
auto msg = q->wait_and_pop();
dispatch(msg);
}
}
(2) dispatch() смотрит, не пришло ли
│
сообщение close_queue, и, если
bool dispatch (←┘
да, возбуждает исключение
std::shared_ptr const& msg) {
if (dynamic_cast*>(msg.get())) {
throw close_queue();
}
return false;
}
public: │
Экземпляры диспетчера
dispatcher(dispatcher&& other):←┘
можно перемещать
q(other.q), chained(other.chained) {│
Объект-источник не должен
other.chained = true; ←┘
ждать сообщений
}
explicit dispatcher(queue* q_): q(q_), chained(false) {}
template
TemplateDispatcher
handle(Func&& f)←┐
Сообщения конкретного типа
{
(3) обрабатывает TemplateDispatcher
return TemplateDispatcher(
q, this, std::forward(f));
}
~dispatcher() noexcept(false)←┐
Деструктор может
{
(4) возбудить исключение
if (!chained) {
wait_and_dispatch();
}
}
};
}
Экземпляр
dispatcher
, возвращенный функцией wait()
, немедленно уничтожается, так как является временным объектом, и, как уже было сказало, вся работа выполняется в его деструкторе. Деструктор вызывает функцию wait_and_dispatch()
, которая в цикле (1) ожидает сообщения и передает его функции dispatch()
. Сама функция dispatch()
(2) проста, как правда: она проверяет, не получено ли сообщение типа close_queue
, и, если так, то возбуждает исключение; в противном случае возвращает false
, извещая, что сообщение не обработало. Именно из-за исключения close_queue
деструктор и помечен как noexcept(false)
(4); без этой аннотации действовала бы подразумеваемая спецификация исключений для деструктора — noexcept(true)
, означающая, что исключения не допускаются, и тогда исключение close_queue
привело бы к завершению программы.
Но просто вызывать функцию
wait()
особого смысла не имеет — как правило, нам нужно обработать полученное сообщение. Для этого предназначена функция-член handle()
(3). Это шаблон, и тип сообщения нельзя вывести, поэтому необходимо явно указать, сообщение какого типа обрабатывается, и передать функцию (или допускающий вызов объект) для его обработки. Сама функция handle()
передает очередь, текущий объект dispatcher
и функцию-обработчик новому экземпляру шаблонного класса TemplateDispatcher
, который обрабатывает сообщения указанного типа. Код этого класса показан в листинге С.5. Именно поэтому мы проверяем флаг chained
в деструкторе перед тем, как приступить к ожиданию сообщения; он не только предотвращает ожидание объектами, содержимое которых перемещено, но и позволяет передать ответственность за ожидание новому экземпляру TemplateDispatcher
.
Листинг С.5. Шаблон класса
TemplateDispatcher
namespace messaging {
template<
typename PreviousDispatcher, typename Msg, typename Func>
class TemplateDispatcher {
queue* q;
PreviousDispatcher* prev;
Func f;
bool chained;
TemplateDispatcher(TemplateDispatcher const&) = delete;
TemplateDispatcher& operator=(
TemplateDispatcher const&) = delete;
template<
typename Dispatcher, typename OtherMsg, typename OtherFunc>
friend class TemplateDispatcher;←┐
Все конкретизации
void wait_and_dispatch() │
TemplateDispatcher
{ │
дружат между собой
for (;;) {
auto msg = q->wait_and_pop();
if (dispatch(msg))←┐
Если мы обработали
break; │
сообщение выходим
}
(1) из цикла
}
bool dispatch(std::shared_ptr const& msg) {
if (wrapped_message* wrapper =
dynamic_cast*>(
msg.get())) { ←┐
Проверяем тип
f(wrapper->contents);│
сообщения и
return true; │
вызываем
}
(2) функцию
else {
return prev->dispatch(msg);←┐
Вызываем предыдущий
}
(3) диспетчер в цепочке
}
public:
TemplateDispatcher(TemplateDispatcher&& other):
q(other.q), prev(other.prev), f(std::move(other.f)),
chained(other.chained) {
other.chained = true;
}
TemplateDispatcher(
queue* q_, PreviousDispatcher* prev_, Func&& f_):
q(q_), prev(prev_), f(std::forward(f_)), chained(false)
{
prev_->chained = true;
}
template
TemplateDispatcher
handle(OtherFunc&& of)←┐
Дополнительные обработчики
{
(4) можно связать в цепочку
return TemplateDispatcher<
TemplateDispatcher, OtherMsg, OtherFunc>(
q, this, std::forward(of));
}
~TemplateDispatcher() noexcept(false)←┐
Деструктор снова
{ │
помечен как
if (!chained) {
(5) noexcept(false)
wait_and_dispatch();
}
}
};
}
Шаблон класса
TemplateDispatcher<>
устроен по образцу класса dispatcher
и почти ничем не отличается от него. В частности, деструктор тоже вызывает wait_and_dispatch()
, чтобы дождаться сообщения.
Поскольку мы не возбуждаем исключения, если сообщение обработало, то теперь в цикле (1) нужно проверять, обработали мы сообщение или нет. Обработка прекращается, как только сообщение успешно обработало, чтобы в следующий раз можно было ждать очередного набора сообщений. Если найдено соответствие указанному типу сообщения, то вызывается предоставленная функция (2), а не возбуждается исключение (хотя функция-обработчик может и сама возбудить исключение). Если же соответствие не найдено, то мы передаем сообщение предыдущему диспетчеру в цепочке (3). В самом первом экземпляре это будет объект
dispatcher
, но если в функции handle()
(4) вызовы сцеплялись, чтобы можно было обработать несколько типов сообщений, то предыдущим диспетчером может быть ранее созданный экземпляр TemplateDispatcher<>
, который в свою очередь передаст сообщение предшествующему ему диспетчеру в цепочке, если не сможет обработать его сам. Поскольку любой обработчик может возбудить исключение (в том числе и обработчик самого первого объекта dispatcher
, если встретит сообщение close_queue
), то деструктор снова необходимо снабдить аннотацией noexcept(false)
(5).
Этот простенький каркас позволяет помещать в очередь сообщения любого типа, а затем на принимающем конце отбирать те из них, которые мы можем обработать. Кроме того, он позволяет передавать ссылку на очередь, чтобы в нее можно было добавлять новые сообщения, оставляя при этом прижимающий конец недоступным извне.
И чтобы закончить пример из главы 4, в листинге С.6 приведён код сообщений, в листингах С.7, С.8 и С.9 — различные конечные автоматы, а в листинге С.10 — управляющая программа.
Листинг С.6. Сообщения банкомата
struct withdraw {
std::string account;
unsigned amount;
mutable messaging::sender atm_queue;
withdraw(std::string const& account_,
unsigned amount_, messaging::sender atm_queue_):
account(account_), amount(amount_), atm_queue(atm_queue_) {}
};
struct withdraw_ok {};
struct withdraw_denied {};
struct cancel_withdrawal {
std::string account;
unsigned amount;
cancel_withdrawal(std::string const& account_,
unsigned amount_):
account(account_), amount(amount_) {}
};
struct withdrawal_processed {
std::string account;
unsigned amount;
withdrawal_processed(std::string const& account_,
unsigned amount_):
account(account_), amount(amount_) {}
};
struct card_inserted {
std::string account;
explicit card_inserted(std::string const& account_):
account(account_) {}
};
struct digit_pressed {
char digit;
explicit digit_pressed(char digit_):
digit(digit_) {}
};
struct clear_last_pressed {};
struct eject_card {};
struct withdraw_pressed {
unsigned amount;
explicit withdraw_pressed(unsigned amount_):
amount(amount_) {}
};
struct cancel_pressed {};
struct issue_money {
unsigned amount;
issue_money(unsigned amount_):
amount(amount_) {}
};
struct verify_pin {
std::string account;
std::string pin;
mutable messaging::sender atm_queue;
verify_pin(std::string const& account_, std::string const& pin_,
messaging::sender atm_queue_):
account(account_), pin(pin_), atm_queue(atm_queue_) {}
};
struct pin_verified {};
struct pin_incorrect {};
struct display_enter_pin {};
struct display_enter_card {};
struct display_insufficient_funds {};
struct display_withdrawal_cancelled {};
struct display_pin_incorrect_message {};
struct display_withdrawal_options (};
struct get_balance {
std::string account;
mutable messaging::sender atm_queue;
get_balance(
std::string const& account_, messaging::sender atm_queue_):
account(account_), atm_queue(atm_queue_) {}
};
struct balance {
unsigned amount;
explicit balance(unsigned amount_):
amount(amount_) {}
};
struct display_balance {
unsigned amount;
explicit display_balance(unsigned amount_):
amount(amount_) {}
};
struct balance_pressed {};
Листинг С.7. Конечный автомат банкомата
class atm {
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
void (atm::*state)();
std::string account;
unsigned withdrawal_amount;
std::string pin;
void process_withdrawal() {
incoming.wait().handle(
[&](withdraw_ok const& msg) {
interface_hardware.send(
issue_money(withdrawal_amount));
bank.send(
withdrawal_processed(account, withdrawal_amount));
state = &atm::done_processing;
}
).handle(
[&](withdraw_denied const& msg) {
interface_hardware.send(display_insufficient_funds());
state = &atm::done_processing;
}
).handle(
[&](cancel_pressed const& msg) {
bank.send(
cancel_withdrawal(account, withdrawal_amount));
interface_hardware.send(
display_withdrawal_cancelled());
state = &atm::done_processing;
}
);
}
void process_balance() {
incoming.wait().handle(
[&](balance const& msg) {
interface_hardware.send(display_balance(msg.amount));
state = &atm::wait_for_action;
}
).handle(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void wait_for_action() {
interface_hardware.send(display_withdrawal_options());
incoming.wait().handle(
[&](withdraw_pressed const& msg) {
withdrawal_amount = msg.amount;
bank.send(withdraw(account, msg.amount, incoming));
state = &atm::process_withdrawal;
}
).handle(
[&](balance_pressed const& msg) {
bank.send(get_balance(account, incoming));
state = &atm::process_balance;
}
).handle(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void verifying_pin() {
incoming.wait().handle(
[&](pin_verified const& msg) {
state = &atm::wait_for_action;
}
).handle(
[&](pin_incorrect const& msg) {
interface_hardware.send(
display_pin_incorrect_message());
state = &atm::done_processing;
}
).handle(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void getting_pin() {
incoming.wait().handle(
[&](digit_pressed const& msg) {
unsigned const pin_length = 4;
pin += msg.digit;
if (pin.length() == pin_length) {
bank.send(verify_pin(account, pin, incoming));
state = &atm::verifying_pin;
}
}
).handle(
[&](clear_last_pressed const& msg) {
if (!pin.empty()) {
pin.pop_back();
}
}
).handle(
[&](cancel_pressed const& msg) {
state = &atm::done_processing;
}
);
}
void waiting_for_card() {
interface_hardware.send(display_enter_card());
incoming.wait().handle(
[&](card_inserted const& msg) {
account = msg.account;
pin = "";
interface_hardware.send(display_enter_pin());
state = &atm::getting_pin;
}
);
}
void done_processing() {
interface_hardware.send(eject_card());
state = &atm::waiting_for_card;
}
atm(atm const&) = delete;
atm& operator=(atm const&) = delete;
public:
atm(messaging::sender bank_,
messaging::sender interface_hardware_):
bank(bank_), interface_hardware(interface_hardware_) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
state = &atm::waiting_for_card;
try {
for (;;) {
(this->*state)();
}
} catch(messaging::close_queue const&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.8. Конечный автомат банка
class bank_machine {
messaging::receiver incoming;
unsigned balance;
public:
bank_machine():
balance(199) {}
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for (;;) {
incoming.wait().handle(
[&](verify_pin const& msg) {
if (msg.pin == "1937") {
msg.atm_queue.send(pin_verified());
} else {
msg.atm_queue.send(pin_incorrect());
}
}
).handle(
[&](withdraw const& msg) {
if (balance >= msg.amount) {
msg.atm_queue.send(withdraw_ok());
balance -= msg.amount;
} else {
msg.atm_queue.send(withdraw_denied());
}
}
).handle(
[&](get_balance const& msg) {
msg.atm_queue.send(::balance(balance));
}
).handle(
[&](withdrawal_processed const& msg) {
}
).handle(
[&](cancel_withdrawal const& msg) {
}
);
}
} catch(messaging::close_queue const&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.9. Конечный автомат пользовательского интерфейса
class interface_machine {
messaging::receiver incoming;
public:
void done() {
get_sender().send(messaging::close_queue());
}
void run() {
try {
for (;;) {
incoming.wait().handle (
[&](issue_money const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Issuing "
<< msg.amount << std::endl;
}
}
).handle(
[&](display_insufficient_funds const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Insufficient funds" << std::endl;
}
}
).handle(
[&](display_enter_pin const& msg) {
{
std::lock_guard lk(iom);
std::cout
<< "Please enter your PIN (0-9)" << std::endl;
}
}
).handle(
[&](display_enter_card const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Please enter your card (I)"
<< std::endl;
}
}
).handle(
[&](display_balance const& msg) {
{
std::lock_guard lk(iom);
std::cout
<< "The balance of your account is "
<< msg.amount << std::endl;
}
}
).handle(
[&](display_withdrawal_options const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Withdraw 50? (w)" << std::endl;
std::cout << "Display Balance? (b)"
<< std::endl;
std::cout << "Cancel? (c) " << std::endl;
}
}
).handle(
[&](display_withdrawal_cancelled const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Withdrawal cancelled"
<< std::endl;
}
}
).handle(
[&](display_pin_incorrect_message const& msg) {
{
std::lock_guard lk(iom);
std::cout << "PIN incorrect" << std::endl;
}
}
).handle(
[&](eject_card const& msg) {
{
std::lock_guard lk(iom);
std::cout << "Ejecting card" << std::endl;
}
}
);
}
} catch (messaging::close_queue&) {
}
}
messaging::sender get_sender() {
return incoming;
}
};
Листинг С.10. Управляющая программа
int main() {
bank_machine bank;
interface_machine interface_hardware;
atm machine(bank.get_sender(), interface_hardware.get_sender());
std::thread bank_thread(&bank_machine::run, &bank);
std::thread if_thread(&interface_machine::run,
&interface_hardware);
std::thread atm_thread(&atm::run, &machine);
messaging::sender atmqueue(machine.get_sender());
bool quit_pressed = false;
while (!quit_pressed) {
char c = getchar();
switch(с) {
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
atmqueue.send(digit_pressed(с));
break;
case 'b':
atmqueue.send(balance_pressed());
break;
case 'w':
atmqueue.send(withdraw_pressed(50));
break;
case 'с':
atmqueue.send(cancel_pressed());
break;
case 'q':
quit_pressed = true;
break;
case 'i':
atmqueue.send(card_inserted("acc1234"));
break;
}
}
bank.done();
machine.done();
interface_hardware.done();
atm_thread.join();
bank_thread.join();
if_thread.join();
}