Приложение С. Каркас передачи сообщений и полный пример программы банкомата

В разделе 4.1 мы познакомились с каркасом передачи сообщений между потоками, продемонстрировав его на примере программы банкомата. В этом приложении приводится полный код примера, включая и код каркаса передачи сообщений.

В листинге С.1 показан код очереди сообщений. Сообщения хранятся в списке и представлены указателями на базовый класс. Сообщения конкретного типа обрабатываются шаблонным классом, производным от этого базового класса. В момент помещения сообщения в очередь конструируется подходящий экземпляр обертывающего класса и сохраняется указатель на него; операция извлечения возвращает именно этот указатель. Поскольку в классе

message_base
нет функций-членов, извлекающий поток должен привести указатель к нужному типу
wrapped_message
, прежде чем сможет получить хранящееся сообщение.


Листинг С.1. Простая очередь сообщений

#include 

#include 

#include 

#include 


namespace messaging

{           │
Базовый класс

struct message_base {←┘
элементов очереди

 virtual ~message_base() {}

};


template 
Для каждого типа сообщений

struct wrapped_message:←┘
имеется специализация

message_base {

 Msg contents;

 explicit wrapped_message(Msg const& contents_):

  contents(contents_) {}

};


Наша очередь

class queue←┘
сообщений

{                        │
В настоящей

 std::mutex m;                 │
очереди хранят-

 std::condition_variable с;           │
ся указатели на

 std::queue > q;←┘
message_base


public:

 template
Обернуть добав-

 void push(T const& msg)       │
ленное сообще-

 {                  │
ние и сохранить

  std::lock_guard lk(m);│
указатель

  q.push( ←┘

  std::make_shared >(msg));

  с.notify_all();

 }


 std::shared_ptr wait_and_pop()│
Блокирует до

 {                      │
появления в

  std::unique_lock lk(m);     │
очереди хотя бы

  c.wait(lk, [&]{ return !q.empty(); });   ←┘
одного элемента

  auto res = q.front();

  q.pop();

  return res;

 }

};

}

Отправкой сообщений занимается объект класса

sender
, показанного в листинге С.2. Это не более чем тонкая обертка вокруг очереди сообщений, которая позволяет только добавлять сообщения. При копировании экземпляров
sender
копируется только указатель на очередь, а не сама очередь.


Листинг С.2. Класс

sender

namespace messaging {

 class sender {│
sender обертывает указатель

 queue* q;  ←┘
на очередь


public:   │
У сконструированного по умолчанию

 sender() :←┘
sender'a нет очереди

 q(nullptr) {}



Разрешаем конструирование

 explicit sender(queue* q_):←┘
из указателя на очередь

  q(q_) {}


 template

 void send(Message const& msg) {

  if (q){     │
Отправка сообщения сводится

  q->push(msg);←┘
к помещению его в очередь

  }

 }

};

}

Получение сообщений несколько сложнее. Мы не только должны дождаться появления сообщения в очереди, но еще и проверить, совпадает ли его тип с одним из известных нам типов, и вызвать соответствующий обработчик. Эта процедура начинается в классе

receiver
, показанном в листинге ниже.


Листинг С.3. Класс

receiver

namespace messaging {

class receiver {

 queue q; ←
receiver владеет очередью


public:        │
Разрешить неявное преобразование в объект

 operator sender() {←┘
sender, ссылающийся на эту очередь

  return sender(&q);

 }


При обращении к функции ожидания

 dispatcher wait() {←┘
очереди создается диспетчер

  return dispatcher(&q);

 }

};

}

Если

sender
только ссылается на очередь сообщений, то
receiver
ей владеет. Мы можем получить объект
sender
, ссылающийся на очередь, воспользовавшись неявным преобразованием. Процедура диспетчеризации сообщения начинается с обращения к функции
wait()
. При этом создается объект
dispatcher
, ссылающийся на очередь, которой владеет
receiver
. Класс
dispatcher
показан в следующем листинге; как видите, содержательная работа производится в его деструкторе. В данном случае работа состоит в ожидании сообщения и его диспетчеризации.


Листинг С.4. Класс

dispatcher

namespace messaging {

class close_queue {}; ←
Сообщение о закрытии очереди


class dispatcher {

 queue* q;               │
Экземпляры

 bool chained;             │
диспетчера нельзя

копировать

 dispatcher(dispatcher const&)=delete;←┘

 dispatcher& operator=(dispatcher const&)=delete;

 template<

  typename Dispatcher,│
Разрешить экземплярам

  typename Msg,    │
TemplateDispatcher доступ

  typename Func>   ←┘
к закрытым частям класса

 friend class TemplateDispatcher;

 void wait_and_dispatch()

 {     
(1) В цикле ждем и диспетчеризуем

  for (;;) {←┘
сообщения

  auto msg = q->wait_and_pop();

  dispatch(msg);

  }

 }       
(2) dispatch() смотрит, не пришло ли

сообщение close_queue, и, если

 bool dispatch (←┘
да, возбуждает исключение

  std::shared_ptr const& msg) {

 if (dynamic_cast*>(msg.get())) {

  throw close_queue();

 }

 return false;

 }


public:              │
Экземпляры диспетчера

 dispatcher(dispatcher&& other):←┘
можно перемещать

 q(other.q), chained(other.chained) {│
Объект-источник не должен

  other.chained = true;        ←┘
ждать сообщений

 }


 explicit dispatcher(queue* q_): q(q_), chained(false) {}


 template

 TemplateDispatcher

 handle(Func&& f)←┐
Сообщения конкретного типа

 {        
(3) обрабатывает TemplateDispatcher

  return TemplateDispatcher(

  q, this, std::forward(f));

 }


 ~dispatcher() noexcept(false)←┐
Деструктор может

 {              
(4) возбудить исключение

  if (!chained) {

  wait_and_dispatch();

  }

 }

};

}

Экземпляр

dispatcher
, возвращенный функцией
wait()
, немедленно уничтожается, так как является временным объектом, и, как уже было сказало, вся работа выполняется в его деструкторе. Деструктор вызывает функцию
wait_and_dispatch()
, которая в цикле (1) ожидает сообщения и передает его функции
dispatch()
. Сама функция
dispatch()
(2) проста, как правда: она проверяет, не получено ли сообщение типа
close_queue
, и, если так, то возбуждает исключение; в противном случае возвращает
false
, извещая, что сообщение не обработало. Именно из-за исключения
close_queue
деструктор и помечен как
noexcept(false)
(4); без этой аннотации действовала бы подразумеваемая спецификация исключений для деструктора —
noexcept(true)
, означающая, что исключения не допускаются, и тогда исключение
close_queue
привело бы к завершению программы.

Но просто вызывать функцию

wait()
особого смысла не имеет — как правило, нам нужно обработать полученное сообщение. Для этого предназначена функция-член
handle()
(3). Это шаблон, и тип сообщения нельзя вывести, поэтому необходимо явно указать, сообщение какого типа обрабатывается, и передать функцию (или допускающий вызов объект) для его обработки. Сама функция
handle()
передает очередь, текущий объект
dispatcher
и функцию-обработчик новому экземпляру шаблонного класса
TemplateDispatcher
, который обрабатывает сообщения указанного типа. Код этого класса показан в листинге С.5. Именно поэтому мы проверяем флаг
chained
в деструкторе перед тем, как приступить к ожиданию сообщения; он не только предотвращает ожидание объектами, содержимое которых перемещено, но и позволяет передать ответственность за ожидание новому экземпляру
TemplateDispatcher
.


Листинг С.5. Шаблон класса

TemplateDispatcher

namespace messaging {

template<

 typename PreviousDispatcher, typename Msg, typename Func>

class TemplateDispatcher {

 queue* q;

 PreviousDispatcher* prev;

 Func f;

 bool chained;


 TemplateDispatcher(TemplateDispatcher const&) = delete;

 TemplateDispatcher& operator=(

  TemplateDispatcher const&) = delete;


 template<

  typename Dispatcher, typename OtherMsg, typename OtherFunc>

 friend class TemplateDispatcher;←┐
Все конкретизации

 void wait_and_dispatch()     │
TemplateDispatcher

 {                 │
дружат между собой

  for (;;) {

  auto msg = q->wait_and_pop();

  if (dispatch(msg))←┐
Если мы обработали

  break;       │
сообщение выходим

  }         
(1) из цикла

 }


 bool dispatch(std::shared_ptr const& msg) {

  if (wrapped_message* wrapper =

  dynamic_cast*>(

  msg.get())) {    ←┐
Проверяем тип

  f(wrapper->contents);│
сообщения и

  return true;     │
вызываем

  }          
(2) функцию

 else {

  return prev->dispatch(msg);←┐
Вызываем предыдущий

 }              
(3) диспетчер в цепочке

 }


public:

 TemplateDispatcher(TemplateDispatcher&& other):

  q(other.q), prev(other.prev), f(std::move(other.f)),

  chained(other.chained) {

  other.chained = true;

 }


 TemplateDispatcher(

  queue* q_, PreviousDispatcher* prev_, Func&& f_):

  q(q_), prev(prev_), f(std::forward(f_)), chained(false)

 {

  prev_->chained = true;

 }


 template

 TemplateDispatcher

 handle(OtherFunc&& of)←┐
Дополнительные обработчики

 {           
(4) можно связать в цепочку

  return TemplateDispatcher<

  TemplateDispatcher, OtherMsg, OtherFunc>(

   q, this, std::forward(of));

 }


 ~TemplateDispatcher() noexcept(false)←┐
Деструктор снова

 {                   │
помечен как

  if (!chained) {           
(5) noexcept(false)

  wait_and_dispatch();

  }

 }

};

}

Шаблон класса

TemplateDispatcher<>
устроен по образцу класса
dispatcher
и почти ничем не отличается от него. В частности, деструктор тоже вызывает
wait_and_dispatch()
, чтобы дождаться сообщения.

Поскольку мы не возбуждаем исключения, если сообщение обработало, то теперь в цикле (1) нужно проверять, обработали мы сообщение или нет. Обработка прекращается, как только сообщение успешно обработало, чтобы в следующий раз можно было ждать очередного набора сообщений. Если найдено соответствие указанному типу сообщения, то вызывается предоставленная функция (2), а не возбуждается исключение (хотя функция-обработчик может и сама возбудить исключение). Если же соответствие не найдено, то мы передаем сообщение предыдущему диспетчеру в цепочке (3). В самом первом экземпляре это будет объект

dispatcher
, но если в функции
handle()
(4) вызовы сцеплялись, чтобы можно было обработать несколько типов сообщений, то предыдущим диспетчером может быть ранее созданный экземпляр
TemplateDispatcher<>
, который в свою очередь передаст сообщение предшествующему ему диспетчеру в цепочке, если не сможет обработать его сам. Поскольку любой обработчик может возбудить исключение (в том числе и обработчик самого первого объекта
dispatcher
, если встретит сообщение
close_queue
), то деструктор снова необходимо снабдить аннотацией
noexcept(false)
(5).

Этот простенький каркас позволяет помещать в очередь сообщения любого типа, а затем на принимающем конце отбирать те из них, которые мы можем обработать. Кроме того, он позволяет передавать ссылку на очередь, чтобы в нее можно было добавлять новые сообщения, оставляя при этом прижимающий конец недоступным извне.

И чтобы закончить пример из главы 4, в листинге С.6 приведён код сообщений, в листингах С.7, С.8 и С.9 — различные конечные автоматы, а в листинге С.10 — управляющая программа.


Листинг С.6. Сообщения банкомата

struct withdraw {

 std::string account;

 unsigned amount;

 mutable messaging::sender atm_queue;


 withdraw(std::string const& account_,

  unsigned amount_, messaging::sender atm_queue_):

  account(account_), amount(amount_), atm_queue(atm_queue_) {}

};


struct withdraw_ok {};


struct withdraw_denied {};


struct cancel_withdrawal {

 std::string account;

 unsigned amount;


 cancel_withdrawal(std::string const& account_,

  unsigned amount_):

  account(account_), amount(amount_) {}

};


struct withdrawal_processed {

 std::string account;

 unsigned amount;


 withdrawal_processed(std::string const& account_,

  unsigned amount_):

  account(account_), amount(amount_) {}

};


struct card_inserted {

 std::string account;


 explicit card_inserted(std::string const& account_):

  account(account_) {}

};


struct digit_pressed {

 char digit;


 explicit digit_pressed(char digit_):

  digit(digit_) {}

};


struct clear_last_pressed {};


struct eject_card {};


struct withdraw_pressed {

 unsigned amount;


 explicit withdraw_pressed(unsigned amount_):

  amount(amount_) {}


};


struct cancel_pressed {};


struct issue_money {

 unsigned amount;

 issue_money(unsigned amount_):

  amount(amount_) {}

};


struct verify_pin {

 std::string account;

 std::string pin;

 mutable messaging::sender atm_queue;


 verify_pin(std::string const& account_, std::string const& pin_,

  messaging::sender atm_queue_):

  account(account_), pin(pin_), atm_queue(atm_queue_) {}

};


struct pin_verified {};


struct pin_incorrect {};


struct display_enter_pin {};


struct display_enter_card {};


struct display_insufficient_funds {};


struct display_withdrawal_cancelled {};


struct display_pin_incorrect_message {};


struct display_withdrawal_options (};


struct get_balance {

 std::string account;

 mutable messaging::sender atm_queue;


 get_balance(

  std::string const& account_, messaging::sender atm_queue_):

  account(account_), atm_queue(atm_queue_) {}

};


struct balance {

 unsigned amount;


 explicit balance(unsigned amount_):

  amount(amount_) {}

};


struct display_balance {

 unsigned amount;


 explicit display_balance(unsigned amount_):

  amount(amount_) {}

};


struct balance_pressed {};


Листинг С.7. Конечный автомат банкомата

class atm {

 messaging::receiver incoming;

 messaging::sender bank;

 messaging::sender interface_hardware;

 void (atm::*state)();

 std::string account;

 unsigned withdrawal_amount;

 std::string pin;


 void process_withdrawal() {

  incoming.wait().handle(

  [&](withdraw_ok const& msg) {

   interface_hardware.send(

   issue_money(withdrawal_amount));

   bank.send(

   withdrawal_processed(account, withdrawal_amount));

   state = &atm::done_processing;

  }

  ).handle(

  [&](withdraw_denied const& msg) {

   interface_hardware.send(display_insufficient_funds());

   state = &atm::done_processing;

  }

  ).handle(

  [&](cancel_pressed const& msg) {

   bank.send(

   cancel_withdrawal(account, withdrawal_amount));

  interface_hardware.send(

   display_withdrawal_cancelled());

  state = &atm::done_processing;

  }

  );

 }


 void process_balance() {

  incoming.wait().handle(

  [&](balance const& msg) {

   interface_hardware.send(display_balance(msg.amount));

   state = &atm::wait_for_action;

  }

  ).handle(

  [&](cancel_pressed const& msg) {

   state = &atm::done_processing;

  }

  );

 }


 void wait_for_action() {

  interface_hardware.send(display_withdrawal_options());

  incoming.wait().handle(

  [&](withdraw_pressed const& msg) {

   withdrawal_amount = msg.amount;

   bank.send(withdraw(account, msg.amount, incoming));

   state = &atm::process_withdrawal;

  }

  ).handle(

  [&](balance_pressed const& msg) {

   bank.send(get_balance(account, incoming));

   state = &atm::process_balance;

  }

  ).handle(

  [&](cancel_pressed const& msg) {

  state = &atm::done_processing;

  }

  );

 }


 void verifying_pin() {

  incoming.wait().handle(

  [&](pin_verified const& msg) {

   state = &atm::wait_for_action;

  }

  ).handle(

  [&](pin_incorrect const& msg) {

   interface_hardware.send(

   display_pin_incorrect_message());

   state = &atm::done_processing;

  }

  ).handle(

  [&](cancel_pressed const& msg) {

   state = &atm::done_processing;

  }

  );

 }


 void getting_pin() {

  incoming.wait().handle(

  [&](digit_pressed const& msg) {

   unsigned const pin_length = 4;

   pin += msg.digit;

   if (pin.length() == pin_length) {

   bank.send(verify_pin(account, pin, incoming));

   state = &atm::verifying_pin;

   }

  }

  ).handle(

  [&](clear_last_pressed const& msg) {

   if (!pin.empty()) {

   pin.pop_back();

   }

  }

  ).handle(

  [&](cancel_pressed const& msg) {

   state = &atm::done_processing;

  }

  );

 }


 void waiting_for_card() {

  interface_hardware.send(display_enter_card());

  incoming.wait().handle(

  [&](card_inserted const& msg) {

   account = msg.account;

   pin = "";

   interface_hardware.send(display_enter_pin());

   state = &atm::getting_pin;

  }

  );

 }


 void done_processing() {

  interface_hardware.send(eject_card());

  state = &atm::waiting_for_card;

 }


 atm(atm const&) = delete;

 atm& operator=(atm const&) = delete;


public:

 atm(messaging::sender bank_,

  messaging::sender interface_hardware_):

  bank(bank_), interface_hardware(interface_hardware_) {}


 void done() {

  get_sender().send(messaging::close_queue());

 }


 void run() {

  state = &atm::waiting_for_card;

  try {

  for (;;) {

   (this->*state)();

  }

  } catch(messaging::close_queue const&) {

  }

 }


 messaging::sender get_sender() {

  return incoming;

 }

};


Листинг С.8. Конечный автомат банка

class bank_machine {

 messaging::receiver incoming;

 unsigned balance;


public:

 bank_machine():

  balance(199) {}


 void done() {

  get_sender().send(messaging::close_queue());

 }


 void run() {

  try {

  for (;;) {

   incoming.wait().handle(

   [&](verify_pin const& msg) {

    if (msg.pin == "1937") {

    msg.atm_queue.send(pin_verified());

    } else {

    msg.atm_queue.send(pin_incorrect());

    }

   }

   ).handle(

   [&](withdraw const& msg) {

   if (balance >= msg.amount) {

    msg.atm_queue.send(withdraw_ok());

    balance -= msg.amount;

    } else {

    msg.atm_queue.send(withdraw_denied());

    }

   }

   ).handle(

   [&](get_balance const& msg) {

    msg.atm_queue.send(::balance(balance));

   }

   ).handle(

   [&](withdrawal_processed const& msg) {

   }

   ).handle(

   [&](cancel_withdrawal const& msg) {

   }

   );

  }

  } catch(messaging::close_queue const&) {

 }

 }


 messaging::sender get_sender() {

  return incoming;

 }

};


Листинг С.9. Конечный автомат пользовательского интерфейса

class interface_machine {

 messaging::receiver incoming;


public:

 void done() {

  get_sender().send(messaging::close_queue());

 }


 void run() {

  try {

  for (;;) {

   incoming.wait().handle (

   [&](issue_money const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Issuing "

         << msg.amount << std::endl;

    }

   }

   ).handle(

   [&](display_insufficient_funds const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Insufficient funds" << std::endl;

    }

   }

   ).handle(

   [&](display_enter_pin const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout

     << "Please enter your PIN (0-9)" << std::endl;

    }

   }

   ).handle(

   [&](display_enter_card const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Please enter your card (I)"

         << std::endl;

    }

   }

   ).handle(

   [&](display_balance const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout

     << "The balance of your account is "

     << msg.amount << std::endl;

    }

   }

   ).handle(

   [&](display_withdrawal_options const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Withdraw 50? (w)" << std::endl;

    std::cout << "Display Balance? (b)"

         << std::endl;

    std::cout << "Cancel? (c) " << std::endl;

    }

   }

   ).handle(

   [&](display_withdrawal_cancelled const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Withdrawal cancelled"

         << std::endl;

    }

   }

   ).handle(

   [&](display_pin_incorrect_message const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "PIN incorrect" << std::endl;

    }

   }

   ).handle(

   [&](eject_card const& msg) {

    {

    std::lock_guard lk(iom);

    std::cout << "Ejecting card" << std::endl;

   }

   }

   );

  }

 } catch (messaging::close_queue&) {

  }

 }


 messaging::sender get_sender() {

  return incoming;

 }

};


Листинг С.10. Управляющая программа

int main() {

 bank_machine bank;

 interface_machine interface_hardware;


 atm machine(bank.get_sender(), interface_hardware.get_sender());


 std::thread bank_thread(&bank_machine::run, &bank);

 std::thread if_thread(&interface_machine::run,

  &interface_hardware);

 std::thread atm_thread(&atm::run, &machine);


 messaging::sender atmqueue(machine.get_sender());

 bool quit_pressed = false;


 while (!quit_pressed) {

  char c = getchar();

  switch(с) {

 case '0':

  case '1':

  case '2':

  case '3':

  case '4':

  case '5':

  case '6':

  case '7':

  case '8':

  case '9':

  atmqueue.send(digit_pressed(с));

  break;

  case 'b':

  atmqueue.send(balance_pressed());

  break;

  case 'w':

  atmqueue.send(withdraw_pressed(50));

  break;

  case 'с':

  atmqueue.send(cancel_pressed());

  break;

  case 'q':

  quit_pressed = true;

  break;

  case 'i':

  atmqueue.send(card_inserted("acc1234"));

  break;

  }

 }

 bank.done();

 machine.done();

 interface_hardware.done();

 atm_thread.join();

 bank_thread.join();

 if_thread.join();

}

Загрузка...