Глава 4. Синхронизация параллельных операций

В этой главе:

■ Ожидание события.

■ Ожидание однократного события с будущими результатами

■ Ожидание с ограничением по времени.

■ Использование синхронизации операций для упрощения программы.

В предыдущей главе мы рассмотрели различные способы защиты данных, разделяемых между потоками. Но иногда требуется не только защитить данные, но и синхронизировать действия, выполняемые в разных потоках. Например, возможно, что одному потоку перед тем как продолжить работу, нужно дождаться, пока другой поток завершит какую-то операцию. В общем случае, часто возникает ситуация, когда поток должен ожидать какого-то события или истинности некоторого условия. Конечно, это можно сделать, периодически проверяя разделяемый флаг «задача завершена» или что-то в этом роде, но такое решение далеко от идеала. Необходимость в синхронизации операций — настолько распространенный сценарий, что в стандартную библиотеку С++ включены специальные механизмы для этой цели — условные переменные и будущие результаты (future).

В этой главе мы рассмотрим, как реализуется ожидание событий с помощью условных переменных и будущих результатов и как ими можно воспользоваться, чтобы упростить синхронизацию операций.

4.1. Ожидание события или иного условия

Представьте, что вы едете на поезде ночью. Чтобы не пропустить свою станцию, можно не спать всю ночь и читать названия всех пунктов, где поезд останавливается. Так вы, конечно, не проедете мимо, но сойдете с поезда сильно уставшим. Есть и другой способ — заранее посмотреть в расписании, когда поезд прибывает в нужный вам пункт, поставить будильник и улечься спать. Так вы тоже свою остановку не пропустите, но если поезд задержится в пути, то проснётесь слишком рано. И еще одно — если в будильнике сядут батарейки, то вы можете проспать и проехать мимо нужной станции. В идеале хотелось бы, чтобы кто-то или что-то разбудило вас, когда поезд подъедет к станции, — не раньше и не позже.

Какое отношение всё это имеет к потокам? Самое непосредственное — если один поток хочет дождаться, когда другой завершит некую операцию, то может поступить несколькими способами. Во-первых, он может просто проверять разделяемый флаг (защищенный мьютексом), полагая, что второй поток поднимет этот флаг, когда завершит свою операцию. Это расточительно но двум причинам: на опрос флага уходит процессорное время, и мьютекс, захваченный ожидающим потоком, не может быть захвачен никаким другим потоком. То и другое работает против ожидающего потока, поскольку ограничивает ресурсы, доступные потоку, которого он так ждет, и даже не дает ему возможность поднять флаг, когда работа будет завершена. Это решение сродни бодрствованию всю ночь, скрашиваемому разговорами с машинистом: он вынужден вести поезд медленнее, потому что вы его постоянно отвлекаете, и, значит, до пункта назначения вы доберетесь позже. Вот и ожидающий поток потребляет ресурсы, которые пригодились бы другим потокам, в результате чего ждет дольше, чем необходимо.

Второй вариант — заставить ожидающий поток спать между проверками с помощью функции

std::this_thread::sleep_for()
(см. раздел 4.3):

bool flag;

std::mutex m;


void wait_for_flag() {

 std::unique_lock lk(m); ←
(1) Освободить мьютекс

 while (!flag) {

  lk.unlock(); ←
(2) Спать 100 мс

  std::this_thread::sleep_for(std::chrono::milliseconds(100));

  lk.lock();  ←
(3) Снова захватить мьютекс

 }

}

В этом цикле функция освобождает мьютекс (1) перед тем, как заснуть (2), и снова захватывает его, проснувшись, (3), оставляя другому потоку шанс захватить мьютекс и поднять флаг.

Это уже лучше, потому что во время сна поток не расходует процессорное время. Но трудно выбрать подходящий промежуток времени. Если он слишком короткий, то поток все равно впустую тратит время на проверку; если слишком длинный — то поток будет спать и после того, как ожидание завершилось, то есть появляется ненужная задержка. Редко бывает так, что слишком длительный сон прямо влияет на работу программу, но в динамичной игре это может привести к пропуску кадров, а в приложении реального времени — к исчерпанию выделенного временного кванта.

Третий — и наиболее предпочтительный - способ состоит в том, чтобы воспользоваться средствами из стандартной библиотеки С++, которые позволяют потоку ждать события. Самый простой механизм ожидания события, возникающего в другом потоке (например, появления нового задания в упоминавшемся выше конвейере), дают условные переменные. Концептуально условная переменная ассоциирована с каким-то событием или иным условием, причём один или несколько потоков могут ждать, когда это условие окажется выполненным. Если некоторый поток решит, что условие выполнено, он может известить об этом один или несколько потоков, ожидающих условную переменную, в результате чего они возобновят работу.

4.1.1. Ожидание условия с помощью условных переменных

Стандартная библиотека С++ предоставляет не одну, а две реализации условных переменных:

std::condition_variable
и
std::condition_variable_any
. Оба класса объявлены в заголовке
. В обоих случаях для обеспечения синхронизации необходимо взаимодействие с мьютексом; первый класс может работать только с
std::mutex
, второй — с любым классом, который отвечает минимальным требованиям к «мьютексоподобию», отсюда и суффикс
_any
. Поскольку класс
std::condition_variable_any
более общий, то его использование может обойтись дороже с точки зрения объема потребляемой памяти, производительности и ресурсов операционной системы. Поэтому, если дополнительная гибкость не требуется, то лучше ограничиться классом
std::condition_variable
.

Ну и как же воспользоваться классом

std::condition_variable
в примере, упомянутом во введении, — как сделать, чтобы поток, ожидающий работу, спал, пока не поступят данные? В следующем листинге приведён пример реализации с использованием условной переменной.


Листинг 4.1. Ожидание данных с помощью

std::condition_variable

std::mutex mut;

std::queue data_queue; ←
(1)

std::condition_variable data_cond;


void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  std::lock_guard lk(mut);

  data_queue.push(data);  ←
(2)

  data_cond.notify_one(); ←
(3)

 }

}


void data_processing_thread() {

 while(true) {

  std::unique_lock lk(mut); ←
(4)

  data_cond.wait(

  lk, []{ return !data_queue.empty(); }); ←
(5)

 data_chunk data = data_queue.front();

  data_queue.pop();

  lk.unlock(); ←
(6)

  process(data);

  if (is_last_chunk(data))

  break;

 }

}

Итак, мы имеем очередь (1), которая служит для передачи данных между двумя потоками. Когда данные будут готовы, поток, отвечающий за их подготовку, помещает данные в очередь, предварительно захватив защищающий ее мьютекс с помощью

std::lock_guard
. Затем он вызывает функцию-член
notify_one()
объекта
std::condition_variable
, чтобы известить ожидающий поток (если таковой существует) (3).

По другую сторону забора находится поток, обрабатывающий данные. Он в самом начале захватывает мьютекс, но с помощью

std::unique_lock
, а не
std::lock_guard
(4) — почему, мы скоро увидим. Затем поток вызывает функцию-член
wait()
объекта
std::condition_variable
, передавая ей объект-блокировку и лямбда-функцию, выражающую ожидаемое условие (5). Лямбда-функции — это нововведение в С++11, они позволяют записать анонимную функцию как часть выражения и идеально подходят для задания предикатов для таких стандартных библиотечных функций, как
wait()
. В данном случае простая лямбда-функция
[]{ return !data_queue.empty(); }
проверяет, что очередь
data_queue
не пуста (вызывая ее метод
empty()
), то есть что в ней имеются данные для обработки. Подробнее лямбда-функции описаны в разделе А.5 приложения А.

Затем функция

wait()
проверяет условие (вызывая переданную лямбда-функцию) и возвращает управление, если оно выполнено (то есть лямбда-функция вернула
true
). Если условие не выполнено (лямбда-функция вернула
false
), то
wait()
освобождает мьютекс и переводит поток в состояние ожидания. Когда условная переменная получит извещение, отправленное потоком подготовки данных с помощью
notify_one()
, поток обработки пробудится, вновь захватит мьютекс и еще раз проверит условие. Если условие выполнено, то
wait()
вернет управление, причём мьютекс в этот момент будет захвачен. Если же условие не выполнено, то поток снова освобождает мьютекс и возобновляет ожидание. Именно поэтому нам необходим
std::unique_lock
, а не
std::lock_guard
— ожидающий поток должен освобождать мьютекс, когда находится в состоянии ожидания, и захватывать его но выходе из этого состояния, a
std::lock_guard
такой гибкостью не обладает. Если бы мьютекс оставался захваченным в то время, когда поток обработки спит, поток подготовки данных не смог бы захватить его, чтобы поместить новые данные в очередь, а, значит, ожидаемое условие никогда не было бы выполнено.

В листинге 4.1 используется простая лямбда-функция (5), которая проверяет, что очередь не пуста. Однако с тем же успехом можно было бы передать любую функцию или объект, допускающий вызов. Если функция проверки условия уже существует (быть может, она сложнее показанного в примере простенького теста), то передавайте ее напрямую — нет никакой необходимости обертывать ее лямбда-функцией. Внутри

wait()
условная переменная может проверять условие многократно, но всякий раз это делается после захвата мьютекса, и, как только функция проверки условия вернет
true
(и лишь в этом случае),
wait()
возвращает управление вызывающей программе. Ситуация, когда ожидающий поток захватывает мьютекс и проверяет условие не в ответ на извещение от другого потока, называется ложным пробуждением (spurious wake). Поскольку количество и частота ложных пробуждений по определению недетерминированы, нежелательно использовать для проверки условия функцию с побочными эффектами. В противном случае будьте готовы к тому, что побочный эффект может возникать более одного раза.

Присущая

std::unique_lock
возможность освобождать мьютекс используется не только при обращении к
wait()
, но и непосредственно перед обработкой поступивших данных (6). Обработка может занимать много времени, а, как было отмечено в главе 3, удерживать мьютекс дольше необходимого неразумно.

Применение очереди для передачи данных между потоками (как в листинге 4.1) — весьма распространенный прием. При правильной реализации синхронизацию можно ограничить только самой очередью, что уменьшает количество потенциальных проблем и состояний гонки. Поэтому покажем, как на основе листинга 4.1 построить обобщенную потокобезопасную очередь.

4.1.2. Потокобезопасная очередь на базе условных переменных

Приступая к проектированию обобщенной очереди, стоит потратить некоторое время на обдумывание того, какие понадобятся операции. Именно так мы подходили к разработке потокобезопасного стека в разделе 3.2.3. Возьмем в качестве образца адаптер контейнера

std::queue<>
из стандартной библиотеки С++, интерфейс которого показан в листинге ниже.


Листинг 4.2. Интерфейс класса

std::queue

template >

class queue {

public:

 explicit queue(const Container&);

 explicit queue(Container&& = Container());


 template  explicit queue(const Alloc&);

 template  queue(const Container&, const Alloc&);

 template  queue(Container&&, const Alloc&);

 template  queue(queue&&, const Alloc&);


 void swap(queue& q);


 bool empty() const;

 size_type size() const;


 T& front();

 const T& front() const;

 T& back();

 const T& back() const;


 void push(const T& x);

 void push(T&& x);

 void pop();

 template  void emplace(Args&&... args);

};

Если не обращать внимания на конструирование, присваивание и обмен, то останется три группы операций: опрос состояния очереди в целом (

empty()
и
size()
), опрос элементов очереди (
front()
и
back()
) модификация очереди (
push()
,
pop()
и
emplace()
). Ситуация аналогична той, что мы видели в разделе 3.2.3 для стека, поэтому возникают те же — внутренне присущие интерфейсу — проблемы с гонкой. Следовательно,
front()
и
pop()
необходимо объединить в одной функции — точно так же, как мы постудили с
top()
и
pop()
в случае стека. Но в коде в листинге 4.1 есть дополнительный нюанс: если очередь используется для передачи данных между потоками, то поток-получатель часто будет ожидать поступления данных. Поэтому включим два варианта
pop()
:
try_pop()
пытается извлечь значение из очереди, но сразу возвращает управление (с указанием ошибки), если в очереди ничего не было, a
wait_and_pop()
ждет, когда появятся данные. Взяв за образец сигнатуры функций из примера стека, представим интерфейс в следующем виде:


Листинг 4.3. Интерфейс класса

threadsafe_queue

#include 


template

class threadsafe_queue {

public:

 threadsafe_queue();

 threadsafe_queue(const threadsafe_queue&);

 threadsafe_queue& operator=(

 const threadsafe_queue&) = delete; ←┐
Для простоты

 void push(T new_value);        │
запрещаем присваивание


 bool try_pop(T& value);    ←
(1)

 std::shared_ptr try_pop(); ←
(2)


 void wait_and_pop(T& value);

 std::shared_ptr wait_and_pop();


 bool empty() const;

};

Как и в случае стека, мы для простоты уменьшили число конструкторов и запретили присваивание. И, как и раньше, предлагаем по два варианта функций

try_pop()
и
wait_for_pop()
. Первый перегруженный вариант
try_pop()
(1) сохраняет извлеченное значение в переданной по ссылке переменной, а возвращаемое значение использует для индикации ошибки: оно равно
true
, если значение получено, и
false
— в противном случае (см. раздел А.2). Во втором перегруженном варианте (2) так поступить нельзя, потому что возвращаемое значение — это данные, извлеченные из очереди. Однако же можно возвращать указатель
NULL
, если в очереди ничего не оказалось.

Ну и как же всё это соотносится с листингом 4.1? В следующем листинге показано, как перенести оттуда код в методы

push()
и
wait_and_pop()
.


Листинг 4.4. Реализация функций

push()
и
wait_and_pop()
на основе кода из листинга 4.1

#include 

#include 

#include 


template

class threadsafe_queue {

private:

 std::mutex mut;

 std::queue data_queue;

 std::condition_variable data_cond;

public:

 void push(T new_value) {

  std::lock_guard lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }


 void wait_and_pop(T& value) {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{return !data_queue.empty();});

  value = data_queue.front();

  data_queue.pop();

 }

};


threadsafe_queue data_queue; ←
(1)


void data_preparation_thread() {

 while (more_data_to_prepare()) {

  data_chunk const data = prepare_data();

  data_queue.push(data); ←
(2)

 }

}


void data_processing_thread() {

 while (true) {

  data_chunk data;

  data_queue.wait_and_pop(data); ←
(3)

  process(data);

  if (is_last_chunk(data))

  break;

 }

}

Теперь мьютекс и условная переменная находятся в экземпляре

threadsafe_queue
, поэтому не нужно ни отдельных переменных (1), ни внешней синхронизации при обращении к функции
push()
(2). Кроме того,
wait_and_pop()
берет на себя заботу об ожидании условной переменной (3).

Второй перегруженный вариант

wait_and_pop()
тривиален, а остальные функции можно почти без изменений скопировать из кода стека в листинге 3.5. Ниже приведена окончательная реализация.


Листинг 4.5. Полное определение класса потокобезопасной очереди на базе условных переменных

#include 

#include 

#include 

#include 


template

class threadsafe_queue {

private:

 mutable std::mutex mut;←
(1) Мьютекс должен быть изменяемым

 std::queue data_queue;

 std::condition_variable data_cond;


public:

 threadsafe_queue() {}

 threadsafe_queue(threadsafe_queue const& other) {

  std::lock_guard lk(other.mut);

  data_queue = other.data_queue;

 }


 void push(T new_value) {

  std::lock_guard lk(mut);

  data_queue.push(new_value);

  data_cond.notify_one();

 }


 void wait_and_pop(T& value) {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  value = data_queue.front();

  data_queue.pop();

 }


 std::shared_ptr wait_and_pop() {

  std::unique_lock lk(mut);

  data_cond.wait(lk, [this]{ return !data_queue.empty(); });

  std::shared_ptr

  res(std::make_shared(data_queue.front()));

  data_queue.pop();

  return res;

 }


 bool try_pop(T& value) {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return false;

  value = data_queue.front();

  data_queue.pop();

  return true;

 }


 std::shared_ptr try_pop() {

  std::lock_guard lk(mut);

  if (data_queue.empty())

  return std::shared_ptr();

  std::shared_ptr

  res(std::make_shared(data_queue.front()));

  data_queue.pop();

  return res;

 }


 bool empty() const {

  std::lock_guard lk(mut);

  return data_queue.empty();

 }

};

Хотя

empty()
— константная функция-член, а параметр копирующего конструктора —
const
-ссылка, другие потоки могут хранить неконстантные ссылки на объект и вызывать изменяющие функции-члены, которые захватывают мьютекс. Поэтому захват мьютекса — это изменяющая операция, следовательно, член
mut
необходимо пометить как
mutable
(1), чтобы его можно было захватить в функции
empty()
и в копирующем конструкторе.

Условные переменные полезны и тогда, когда есть несколько потоков, ожидающих одного события. Если потоки используются для разделения работы и, следовательно, на извещение должен реагировать только один поток, то применима точно такая же структура программы, как в листинге 4.1; нужно только запустить несколько потоков обработки данных. При поступлении новых данных функция

notify_one()
разбудит только один поток, который проверяет условие внутри
wait()
, и этот единственный поток вернет управление из
wait()
(в ответ на помещение нового элемента в очередь
data_queue
). Заранее нельзя сказать, какой поток получит извещение и есть ли вообще ожидающие потоки (не исключено, что все они заняты обработкой ранее поступивших данных).

Альтернативный сценарий — когда несколько потоков ожидают одного события, и отреагировать должны все. Так бывает, например, когда инициализируются разделяемые данные, и все работающие с ними потоки должны ждать, пока инициализация завершится (хотя для этого случая существуют более подходящие механизмы, см. раздел 3.3.1 главы 3), или когда потоки должны ждать обновления разделяемых данных, например, в случае периодической повторной инициализации. В таких ситуациях поток, отвечающий за подготовку данных, может вызвать функцию-член

notify_all()
условной переменной вместо
notify_one()
. Эта функция извещает все потоки, ожидающие внутри функции
wait()
, о том, что они должны проверить ожидаемое условие.

Если ожидающий поток собирается ждать условия только один раз, то есть после того как оно станет истинным, он не вернется к ожиданию той же условной переменной, то лучше применить другой механизм синхронизации. В особенности это относится к случаю, когда ожидаемое условие — доступность каких-то данных. Для такого сценария больше подходят так называемые будущие результаты (future).

4.2. Ожидание одноразовых событий с помощью механизма будущих результатов

Предположим, вы летите самолетом в отпуск за границу. Вы приехали в аэропорт, прошли регистрацию и прочие процедуры, но должны ждать объявления о посадке — быть может, несколько часов. Можно, конечно, найти себе занятие — например, почитать книжку, побродить в Интернете, поесть в кафе за бешеные деньги, но суть от этого не меняется: вы ждете сигнала о том, что началась посадка в самолет. И есть еще одна особенность — данный рейс вылетает всего один раз; в следующий отпуск вы будете ждать посадки на другой рейс.

В стандартной библиотеке С++ такие одноразовые события моделируются с помощью будущего результата. Если поток должен ждать некоего одноразового события, то он каким-то образом получает представляющий его объект-будущее. Затем поток может периодически в течение очень короткого времени ожидать этот объект-будущее, проверяя, произошло ли событие (посмотреть на табло вылетов), а между проверками заниматься другим делом (вкушать в кафе аэропортовскую пищу по несуразным ценам). Можно поступить и иначе — выполнять другую работу до тех пор, пока не наступит момент, когда без наступления ожидаемого события двигаться дальше невозможно, и вот тогда ждать готовности будущего результата. С будущим результатом могут быть ассоциированы какие-то данные (например, номер выхода в объявлении на посадку), но это необязательно. После того как событие произошло (то есть будущий результат готов), сбросить объект-будущее в исходное состояние уже невозможно.

В стандартной библиотеке С++ есть две разновидности будущих результатов, реализованные в форме двух шаблонов классов, которые объявлены в заголовке

: уникальные будущие результаты (
std::future<>
) и разделяемые будущие результаты (
std::shared_future<>
). Эти классы устроены по образцу
std::unique_ptr
и
std::shared_ptr
. На одно событие может ссылаться только один экземпляр
std::future
, но несколько экземпляров
std::shared_future
. В последнем случае все экземпляры оказываются готовы одновременно и могут обращаться к ассоциированным с событием данным. Именно из-за ассоциированных данных будущие результаты представлены шаблонами, а не обычными классами; точно так же шаблоны
std::unique_ptr
и
std::shared_ptr
параметризованы типом ассоциированных данных. Если ассоциированных данных нет, то следует использовать специализации шаблонов
std::future
и
std::shared_future
. Хотя будущие результаты используются как механизм межпоточной коммуникации, сами по себе они не обеспечивают синхронизацию доступа. Если несколько потоков обращаются к единственному объекту-будущему, то они должны защитить доступ с помощью мьютекса или какого-либо другого механизма синхронизации, как описано в главе 3. Однако, как будет показано в разделе 4.2.5, каждый из нескольких потоков может работать с собственной копией
std::shared_future<>
безо всякой синхронизации, даже если все они ссылаются на один и тот же асинхронно получаемый результат.

Самое простое одноразовое событие — это результат вычисления, выполненного в фоновом режиме. В главе 2 мы видели, что класс

std::thread
не предоставляет средств для возврата вычисленного значения, и я обещал вернуться к этому вопросу в главе 4. Исполняю обещание.

4.2.1. Возврат значения из фоновой задачи

Допустим, вы начали какое-то длительное вычисление, которое в конечном итоге должно дать полезный результат, но пока без него можно обойтись. Быть может, вы нашли способ получить ответ на «Главный возрос жизни, Вселенной и всего на свете» из книги Дугласа Адамса[7]. Для вычисления можно запустить новый поток, но придётся самостоятельно позаботиться о передаче в основную программу результата, потому что в классе

std::thread
такой механизм не предусмотрен. Тут-то и приходит на помощь шаблон функции
std::async
(также объявленный в заголовке
).

Функция s

td::async
позволяет запустить асинхронную задачу, результат которой прямо сейчас не нужен. Но вместо объекта
std::thread
она возвращает объект
std::future
, который будет содержать возвращенное значение, когда оно станет доступно. Когда программе понадобится значение, она вызовет функцию-член
get()
объекта-будущего, и тогда поток будет приостановлен до готовности будущего результата, после чего вернет значение. В листинге ниже оказан простой пример.


Листинг 4.6. Использование

std::future
для получения результата асинхронной задачи

#include 

#include 


int find_the_answer_to_ltuae();

void do_other_stuff();


int main() {

 std::future the_answer =

  std::async(find_the_answer_to_ltuae);

 do_other_stuff();

 std::cout << "Ответ равен " << the_answer.get() << std::endl;

}

Шаблон

std::async
позволяет передать функции дополнительные параметры, точно так же, как
std::thread
. Если первым аргументом является указатель на функцию-член, то второй аргумент должен содержать объект, от имени которого эта функция-член вызывается (сам объект, указатель на него или обертывающий его
std::ref
), а все последующие аргументы передаются без изменения функции-члену. В противном случае второй и последующие аргументы передаются функции или допускающему вызов объекту, заданному в первом аргументе. Как и в
std::thread
, если аргументы представляют собой r-значения, то создаются их копии посредством перемещения оригинала. Это позволяет использовать в качестве объекта-функции и аргументов типы, допускающие только перемещение. Пример см. в листинге ниже.


Листинг 4.7. Передача аргументов функции, заданной в

std::async

#include 

#include 


struct X {

 void foo(int, std::string const&);

 std::string bar(std::string const&);

};

Вызывается

X x;                       │
p->foo(42,"hello"),

auto f1 = std::async(&X::foo, &x, 42, "hello");←┘
где p=&x

auto f2 = std::async(&X::bar, x, "goodbye");←┐
вызывается

tmpx.bar("goodbye"),

struct Y {                  │
где tmpx — копия x

 double operator()(double);

};                │
Вызывается tmpy(3.141),

где tmpy создается

Y y;               │
из Y перемещающим

auto f3 = std::async(Y(), 3.141)←┘
конструктором

auto f4 = std::async(std::ref(y), 2.718);←
Вызывается y(2.718)


X baz(X&);

std::async(baz, std::ref(x); ←
Вызывается baz(x)


class move_only {

public:

 move_only();

 move_only(move_only&&);

 move_only(move_only const&) = delete;

 move_only& operator=(move_only&&);

 move_only& operator=(move_only const&) = delete;

 void operator()();         │
Вызывается tmp(), где tmp

};                 │
конструируется с помощью

auto f5 = std::async(move_only());←┘
std::move(move_only())

По умолчанию реализации предоставлено право решать, запускает ли

std::async
новый поток или задача работает синхронно, когда программа ожидает будущего результата. В большинстве случаев такое поведение вас устроит, но можно задать требуемый режим в дополнительном параметре
std::async
перед вызываемой функцией. Этот параметр имеет тип
std::launch
и может принимать следующие значения:
std::launch::deferred
— отложить вызов функции до того момента, когда будет вызвана функция-член
wait()
или
get()
объекта-будущего;
std::launch::async
— запускать функцию в отдельном потоке;
std::launch::deferred | std::launch::async
— оставить решение на усмотрение реализации. Последний вариант подразумевается по умолчанию. В случае отложенного вызова функция может вообще никогда не выполниться. Например:

auto f6 =                  │
Выполнять в

 std::async(std::launch::async, Y(), 1.2);←┘
новом потоке

auto f7 =

 std::async(

  std::launch::deferred, baz, std::ref(x)); ←┐

auto f8 = std::async(           ←┐│
Выполнять

 std::launch::deferred | std::launch::async,││
при вызове

 baz, std::ref(x));             ││
wait() или get()

auto f9 = std::async(baz, std::ref(x));  ←┼
Оставить на

усмотрение реализации

f7.wait();←
Вызвать отложенную функцию

Ниже в этой главе и далее в главе 8 мы увидим, что с помощью

std::async
легко разбивать алгоритм на параллельно выполняемые задачи. Однако это не единственный способ ассоциировать объект
std::future
с задачей; можно также обернуть задачу объектом шаблонного класса
std::packaged_task<>
или написать код, который будет явно устанавливать значения с помощью шаблонного класса
std::promise<>
. Шаблон
std::packaged_task
является абстракцией более высокого уровня, чем
std::promise
, поэтому начнем с него.

4.2.2. Ассоциирование задачи с будущим результатом

Шаблон класса

std::packaged_task<>
связывает будущий результат с функцией или объектом, допускающим вызов. При вызове объекта
std::packaged_task<>
ассоциированная функция или допускающий вызов объект вызывается и делает будущий результат готовым, сохраняя возвращенное значение в виде ассоциированных данных. Этот механизм можно использовать для построение пулов потоков (см. главу 9) и иных схем управления, например, запускать каждую задачу в отдельном потоке или запускать их все последовательно в выделенном фоновом потоке. Если длительную операцию можно разбить на автономные подзадачи, то каждую из них можно обернуть объектом
std::packaged_task<>
и передать этот объект планировщику задач или пулу потоков. Таким образом, мы абстрагируем специфику задачи — планировщик имеет дело только с экземплярами
std::packaged_task<>
, а не с индивидуальными функциями.

Параметром шаблона класса

std::packaged_task<>
является сигнатура функции, например
void()
для функции, которая не принимает никаких параметров и не возвращает значения, или
int(std::string&, double*)
для функции, которая принимает неконстантную ссылку на
std::string
и указатель на
double
и возвращает значение типа
int
. При конструировании экземпляра
std::packaged_task
вы обязаны передать функцию или допускающий вызов объект, который принимает параметры указанных типов и возвращает значение типа, преобразуемого в указанный тип возвращаемого значения. Точного совпадения типов не требуется; можно сконструировать объект
std::packaged_task
из функции, которая принимает
int
и возвращает
float
, потому что между этими типами существуют неявные преобразования.

Тип возвращаемого значения, указанный в сигнатуре функции, определяет тип объекта

std::future<>
, возвращаемого функцией-членом
get_future()
, а заданный в сигнатуре список аргументов используется для определения сигнатуры оператора вызова в классе упакованной задачи. Например, в листинге ниже приведена часть определения класса
std::packaged_task*, int)>
.


Листинг 4.8. Определение частичной специализации

std::packaged_task

template<>

class packaged_task*, int)> {

public:

 template

 explicit packaged_task(Callable&& f);


 std::future get_future();

 void operator()(std::vector*, int);

};

Таким образом,

std::packaged_task
— допускающий вызов объект, и, значит, его можно обернуть объектом
std::function
, передать
std::thread
в качестве функции потока, передать любой другой функции, которая ожидает допускающий вызов объект, или даже вызвать напрямую. Если
std::packaged_task
вызывается как объект-функция, то аргументы, переданные оператору вызова, без изменения передаются обернутой им функции, а возвращенное значение сохраняется в виде асинхронного результата в объекте
std::future
, полученном от
get_future()
. Следовательно, мы можем обернуть задачу в
std::packaged_task
и извлечь будущий результат перед тем, как передавать объект
std::packaged_task
в то место, из которого он будет в свое время вызван. Когда результат понадобится, нужно будет подождать готовности будущего результата. В следующем примере показано, как всё это делается на практике.

Передача задач между потоками

Во многих каркасах для разработки пользовательского интерфейса требуется, чтобы интерфейс обновлялся только в специально выделенных потоках. Если какому-то другому потоку потребуется обновить интерфейс, то он должен послать сообщение одному из таких выделенных потоков, чтобы тот выполнил операцию. Шаблон

std::packaged_task
позволяет решить эту задачу, не заводя специальных сообщений для каждой относящейся к пользовательскому интерфейсу операции.


Листинг 4.9. Выполнение кода в потоке пользовательского интерфейса с применением

std::packaged_task

#include 

#include 

#include 

#include 

#include 


std::mutex m;

std::deque> tasks;


bool gui_shutdown_message_received();

void get_and_process_gui_message();


void gui_thread() {             ←
(1)

 while (!gui_shutdown_message_received()) { ←
(2)

  get_and_process_gui_message();      ←
(3)

  std::packaged_task task; {

  std::lock_guard lk(m);

  if (tasks empty())            ←
(4)

   continue;

  task = std::move(tasks.front());     ←
(5)

  tasks.pop_front();

  }

 task();                   ←
(6)

 }

}


std::thread gui_bg_thread(gui_thread);


template

std::future post_task_for_gui_thread(Func f) {

 std::packaged_task task(f);    ←
(7)

 std::future res = task.get_future();←
(8)

 std::lock_guard lk(m);

 tasks.push_back(std::move(task));     ←
(9)

 return res;                ←
(10)

}

Код очень простой: поток пользовательского интерфейса (1) повторяет цикл, пока не будет получено сообщение о необходимости завершить работу (2). На каждой итерации проверяется, есть ли готовые для обработки сообщения GUI (3), например события мыши, или новые задачи в очереди. Если задач нет (4), программа переходит на начало цикла; в противном случае извлекает задачу из очереди (5), освобождает защищающий очередь мьютекс и исполняет задачу (6). По завершении задачи будет готов ассоциированный с ней будущий результат.

Помещение задачи в очередь ничуть не сложнее: по предоставленной функции создается новая упакованная задача (7), для получения ее будущего результата вызывается функция-член

get_future()
(8), после чего задача помещается в очередь (9) еще до того, как станет доступен будущий результат (10). Затем часть программы, которая отправляла сообщение потоку пользовательского интерфейса, может дождаться будущего результата, если хочет знать, как завершилась задача, или отбросить его, если это несущественно.

В этом примере мы использовали класс

std::packaged_task
для задач, обертывающих функцию или иной допускающий вызов объект, который не принимает параметров и возвращает
void
(если он вернет что-то другое, то возвращенное значение будет отброшено). Это простейшая из всех возможных задач, но, как мы видели ранее, шаблон
std::packaged_task
применим и в более сложных ситуациях — задав другую сигнатуру функции в качестве параметра шаблона, вы сможете изменить тип возвращаемого значения (и, стало быть, тип данных, которые хранятся в состоянии, ассоциированном с будущим объектом), а также типы аргументов оператора вызова. Наш пример легко обобщается на задачи, которые должны выполняться в потоке GUI и при этом принимают аргументы и возвращают в
std::future
какие-то данные, а не только индикатор успешности завершения.

А как быть с задачами, которые нельзя выразить в виде простого вызова функции, или такими, где результат может поступать из нескольких мест? Эти проблемы решаются с помощью еще одного способа создания будущего результата: явного задания значения с помощью шаблона класса

std::promise
[8].

4.2.3. Использование
std::promise

При написании сетевых серверных программ часто возникает искушение обрабатывать каждый запрос на соединение в отдельном потоке, поскольку при такой структуре порядок коммуникации становится нагляднее и проще для программирования. Этот подход срабатывает, пока количество соединений (и, следовательно, потоков) не слишком велико. Но с ростом числа потоков увеличивается и объем потребляемых ресурсов операционной системы, а равно частота контекстных переключений (если число потоков превышает уровень аппаратного параллелизма), что негативно сказывается на производительности. В предельном случае у операционной системы могут закончиться ресурсы для запуска новых потоков, хотя пропускная способность сети еще не исчерпана. Поэтому в приложениях, обслуживающих очень большое число соединений, обычно создают совсем немного потоков (быть может, всего один), каждый из которых одновременно обрабатывает несколько запросов.

Рассмотрим один из таких потоков. Пакеты данных приходят по разным соединениям в случайном порядке, а потому и порядок помещения исходящих пакетов в очередь отправки тоже непредсказуем. Часто будет складываться ситуация, когда другие части приложения ждут либо успешной отправки данных, либо поступления нового пакета по конкретному сетевому соединению.

Шаблон

std::promise
дает возможность задать значение (типа
T
), которое впоследствии можно будет прочитать с помощью ассоциированного объекта
std::future
. Пара
std::promise
/
std::future
реализует один из возможных механизмов такого рода; ожидающий поток приостанавливается в ожидании будущего результата, тогда как поток, поставляющий данные, может с помощью
promise
установить ассоциированное значение и сделать будущий результат готовым.

Чтобы получить объект

std::future
, ассоциированный с данным обещанием
std::promise
, мы должны вызвать функцию-член
get_future()
— так же, как в случае
std::packaged_task
. После установки значения обещания (с помощью функции-члена
set_value()
) будущий результат становится готовым, и его можно использовать для получения установленного значения. Если уничтожить объект
std::promise
, не установив значение, то в будущем результате будет сохранено исключение. О передаче исключений между потоками см. раздел 4.2.4.

В листинге 4.10 приведен код потока обработки соединений, написанный по только что изложенной схеме. В данном случае для уведомления об успешной передаче блока исходящих данных применяется пара

std::promise
/
std::future
; ассоциированное с будущим результатом значение — это просто булевский флаг успех/неудача. Для входящих пакетов в качестве ассоциированных данных могла бы выступать полезная нагрузка пакета.


Листинг 4.10. Обработка нескольких соединений в одном потоке с помощью объектов-обещаний

#include 


void process_connections(connection_set& connections) {

 while(!done(connections)) {       ←
(1)

  for (connection_iterator        ←
(2)

  connection = connections.begin(), end = connections.end();

  connection != end;

  ++connection) {

  if (connection->has_incoming_data()) {←
(3)

   data_packet data = connection->incoming();

   std::promise& p =

   connection->get_promise(data.id);  ←
(4)

   p.set_value(data.payload);

  }

  if (connection->has_outgoing_data()) {←
(5)

   outgoing_packet data =

   connection->top_of_outgoing_queue();

   connection->send(data.payload);

   data.promise.set_value(true);     ←
(6)

  }

  }

 }

}

Функция

process_connections()
повторяет цикл, пока
done()
возвращает
true
(1). На каждой итерации поочередно проверяется каждое соединение (2); если есть входящие данные, они читаются (3), а если в очереди имеются исходящие данные, они отсылаются (5). При этом предполагается, что в каждом входящем пакете хранится некоторый идентификатор и полезная нагрузка, содержащая собственно данные. Идентификатору сопоставляется объект
std::promise
(возможно, путем поиска в ассоциативном контейнере) (4), значением которого является полезная нагрузка пакета. Исходящие пакеты просто извлекаются из очереди отправки и передаются но соединению. После завершения передачи в обещание, ассоциированное с исходящими данными, записывается значение
true
, обозначающее успех (6). Насколько хорошо эта схема ложится на фактический сетевой протокол, зависит от самого протокола; в конкретном случае схема обещание/будущий результат может и не подойти, хотя структурно она аналогична поддержке асинхронного ввода/вывода в некоторых операционных системах.

В коде выше мы полностью проигнорировали возможные исключения. Хотя мир, в котором всё всегда работает правильно, был бы прекрасен, действительность не так радужна. Переполняются диски, не находятся искомые данные, отказывает сеть, «падает» база данных — всякое бывает. Если бы операция выполнялась в том потоке, которому нужен результат, программа могла бы просто сообщить об ошибке с помощью исключения. Но было бы неоправданным ограничением требовать, чтобы всё работало правильно только потому, что мы захотели воспользоваться классами

std::packaged_task
или
std::promise
.

Поэтому в стандартной библиотеке С++ имеется корректный способ учесть возникновение исключений в таком контексте и сохранить их как часть ассоциированного результата.

4.2.4. Сохранение исключения в будущем результате

Рассмотрим следующий коротенький фрагмент. Если передать функции

square_root()
значение
-1
, то она возбудит исключение, которое увидит вызывающая программа:

double square_root(double x) {

 if (x<0) {

  throw std::out_of_range("x<0");

 }

 return sqrt(x);

}

А теперь предположим, что вместо вызова

square_root()
в текущем потоке

double y = square_root(-1);

мы вызываем ее асинхронно:

std::future f = std::async(square_root,-1);

double y = f.get();

В идеале хотелось бы получить точно такое же поведение: чтобы поток, вызывающий

f.get()
, мог увидеть не только нормальное значение
y
, но и исключение — как в однопоточной программе.

Что ж, именно так на самом деле и происходит: если функция, вызванная через

std::async
, возбуждает исключение, то это исключение сохраняется в будущем результате вместо значения, а когда будущий результат оказывается готовым, вызов
get()
повторно возбуждает сохраненное исключение. (Примечание: стандарт ничего не говорит о том, возбуждается ли исходное исключение или его копия; различные компиляторы и библиотеки вправе решать этот вопрос по-разному.) То же самое происходит, когда функция обернута объектом
std::packaged_task
, — если при вызове задачи обернутая функция возбуждает исключение, то объект исключения сохраняется в будущем результате вместо значения, и это исключение повторно возбуждается при обращении к
get()
.

Разумеется,

std::promise
обеспечивает те же возможности в случае явного вызова функции. Чтобы сохранить исключение вместо значения, следует вызвать функцию-член
set_exception()
, а не
set_value()
. Обычно это делается в блоке
catch
:

extern std::promise some_promise;


try {

 some_promise.set_value(calculate_value());

} catch (...) {

 some_promise.set_exception(std::current_exception());

}

Здесь мы воспользовались функцией

std::current_exception()
, которая возвращает последнее возбужденное исключение, но могли вызвать
std::copy_exception()
, чтобы поместить в объект-обещание новое исключение, которое никем не возбуждалось:

some_promise.set_exception(

 std::copy_exception(std::logic_error("foo"));

Если тип исключения заранее известен, то это решение гораздо чище, чем использование блока

try/catch
; мы не только упрощаем код, но и оставляем компилятору возможности для его оптимизации.

Есть еще один способ сохранить исключение в будущем результате: уничтожить ассоциированный с ним объект

std::promise
или
std::packaged_task
, не вызывая функцию установки значения в случае обещания или не обратившись к упакованной задаче. В любом случае деструктор
std::promise
или
std::packaged_task
сохранит в ассоциированном состоянии исключение типа
std::future_error
, в котором код ошибки равен
std::future_errc::broken_promise
, если только будущий результат еще не готов; создавая объект-будущее, вы даете обещание предоставить значение или исключение, а, уничтожая объект, не задав ни того, ни другого, вы это обещание нарушаете. Если бы компилятор в этом случае не сохранил ничего в будущем результате, то ожидающие потоки могли бы никогда не выйти из состояния ожидания.

До сих пор мы во всех примерах использовали

std::future
. Однако у этого шаблонного класса есть ограничения, и не в последнюю очередь тот факт, что результата может ожидать только один поток. Если требуется, чтобы одного события ждали несколько потоков, то придётся воспользоваться классом
std::shared_future
.

4.2.5. Ожидание в нескольких потоках

Хотя класс

std::future
сам заботится о синхронизации, необходимой для передачи данных из одного потока в другой, обращения к функциям-членам одного и того же экземпляра
std::future
не синхронизированы между собой. Работа с одним объектом
std::future
из нескольких потоков без дополнительной синхронизации может закончиться гонкой за данными и неопределенным поведением. Так и задумано:
std::future
моделирует единоличное владение результатом асинхронного вычисления, и одноразовая природа
get()
в любом случае делает параллельный доступ бессмысленным — извлечь значение может только один поток, поскольку после первого обращения к
get()
никакого значения не остается.

Но если дизайн вашей фантастической параллельной программы требует, чтобы одного события могли ждать несколько потоков, то не отчаивайтесь: на этот случай предусмотрен шаблон класса

std::shared_future
. Если
std::future
допускает только перемещение, чтобы владение можно было передавать от одного экземпляра другому, но в каждый момент времени на асинхронный результат ссылался лишь один экземпляр, то экземпляры
std::shared_future
допускают и копирование, то есть на одно и то же ассоциированное состояние могут ссылать несколько объектов.

Но и функции-члены объекта

std::shared_future
не синхронизированы, поэтому во избежание гонки за данными при доступе к одному объекту из нескольких потоков вы сами должны обеспечить защиту. Но более предпочтительный способ — скопировать объект, так чтобы каждый поток работал со своей копией. Доступ к разделяемому асинхронному состоянию из нескольких потоков безопасен, если каждый поток обращается к этому состоянию через свой собственный объект
std::shared_future
. См. Рис. 4.1.

Рис. 4.1. Использование нескольких объектов

std::shared_future
, чтобы избежать гонки за данными

Одно из потенциальных применений

std::shared_future
— реализация параллельных вычислений наподобие применяемых в сложных электронных таблицах: у каждой ячейки имеется единственное окончательное значение, на которое могут ссылаться формулы, хранящиеся в нескольких других ячейках. Формулы для вычисления значений в зависимых ячейках могут использовать
std::shared_future
для ссылки на первую ячейку. Если формулы во всех ячейках вычисляются параллельно, то задачи, которые могут дойти до конца, дойдут, а те, что зависят от результатов вычислений других ячеек, окажутся заблокированы до разрешения зависимостей. Таким образом, система сможет но максимуму задействовать доступный аппаратный параллелизм.

Экземпляры

std::shared_future
, ссылающиеся на некоторое асинхронное состояние, конструируются из экземпляров
std::future
, ссылающихся на то же состояние. Поскольку объект
std::future
не разделяет владение асинхронным состоянием ни с каким другим объектом, то передавать владение объекту
std::shared_future
необходимо с помощью
std::move
, что оставляет
std::future
с пустым состоянием, как если бы он был сконструирован по умолчанию:

std::promise p;

std::future f(p.get_future())←
(1) Будущий результат f

assert(f.valid());         
действителен


std::shared_future sf(std::move(f));

assert(!f.valid());←
(2) f больше не действителен

assert(sf.valid());←
(3) sf теперь действителен

Здесь будущий результат

f
в начальный момент действителен
(1)
, потому что ссылается на асинхронное состояние обещания
p
, но после передачи состояния объекту
sf
результат
f
оказывается недействительным (2), a
sf
— действительным (3).

Как и для других перемещаемых объектов, передача владения для r-значения производится неявно, поэтому объект

std::shared_future
можно сконструировать прямо из значения, возвращаемого функцией-членом
get_future()
объекта
std::promise
, например:

std::promise p;←
(1) Неявная передача владения

std::shared_future sf(p.get_future());

Здесь передача владения неявная; объект

std::shared_future<>
конструируется из r-значения типа
std::future
(1).

У шаблона

std::future
есть еще одна особенность, которая упрощает использование
std::shared_future
совместно с новым механизмом автоматического выведения типа переменной из ее инициализатора (см. приложение А, раздел А.6). В шаблоне
std::future
имеется функция-член
share()
, которая создает новый объект
std::shared_future
и сразу передаёт ему владение. Это позволяет сделать код короче и проще для изменения:

std::promise<

 std::map

     SomeAllocator>::iterator> p;

auto sf = p.get_future().share();

В данном случае для

sf
выводится тип
std::shared_future::iterator>
, такое название произнести-то трудно. Если компаратор или распределитель изменятся, то вам нужно будет поменять лишь тип обещания, а тип будущего результата изменится автоматически.

Иногда требуется ограничить время ожидания события — либо потому что на время исполнения некоторого участка кода наложены жесткие ограничения, либо потому что поток может заняться другой полезной работой, если событие долго не возникает. Для этого во многих функциях ожидания имеются перегруженные варианты, позволяющие задать величину таймаута.

4.3. Ожидание с ограничением по времени

Все блокирующие вызовы, рассмотренные до сих пор, приостанавливали выполнение потока на неопределенно долгое время — до тех пор, пока не произойдёт ожидаемое событие. Часто это вполне приемлемого в некоторых случаях время ожидания желательно ограничить. Например, это может быть полезно, когда нужно отправить сообщение вида «Я еще жив» интерактивному пользователю или другому процессу или когда ожидание действительно необходимо прервать, потому что пользователь устал ждать и нажал Cancel.

Можно задать таймаут одного из двух видов: интервальный, когда требуется ждать в течение определённого промежутка времени (к примеру, 30 миллисекунд) или абсолютный, когда требуется ждать до наступления указанного момента (например, 17:30:15.045987023 UTC 30 ноября 2011 года). У большинства функций ожидания имеются оба варианта. Варианты, принимающие интервальный таймаут, оканчиваются словом

_for
, а принимающие абсолютный таймаут — словом
_until
.

Например, в классе

std::condition_variable
есть по два перегруженных варианта функций-членов
wait_for()
и
wait_until()
, соответствующие двум вариантам
wait()
— первый ждет поступления сигнала или истечения таймаута или ложного пробуждения, второй проверяет при пробуждении переданный предикат и возвращает управление, только если предикат равен
true
(и условной переменной поступил сигнал) или истек таймаут.

Прежде чем переходить к детальному обсуждению функций с таймаутами, рассмотрим, как в С++ задается время, и начнем с часов.

4.3.1. Часы

С точки зрения стандартной библиотеки С++, часы — это источник сведений о времени. Точнее, класс часов должен предоставлять четыре элемента информации:

• текущее время now;

• тип значения для представления времени, полученного от часов;

• величина такта часов;

• признак равномерного хода времени, такие часы называются стабильными.

Получить от часов текущее время можно с помощью статической функции-члена

now()
; например, функция
std::chrono::system_clock::now()
возвращает текущее время по системным часам. Тип точки во времени для конкретного класса часов определяется с помощью члена
typedef time_point
, поэтому значение, возвращаемое функцией
some_clock::now()
имеет тип
some_clock::time_point
.

Тактовый период часов задается в виде числа долей секунды, которое определяется членом класса

typedef period
; например, если часы тикают 25 раз в секунду, то член
period
будет определён как
std::ratio<1, 25>
, тогда как в часах, тикающих один раз в 2,5 секунды, член
period
определён как
std::ratio<5, 2>
. Если тактовый период не известен до начала выполнения программы или может изменяться во время работы, то
period
можно определить как средний период, наименьший период или любое другое значение, которое сочтет нужным автор библиотеки. Нет гарантии, что тактовый период, наблюдаемый в любом конкретном прогоне программы, соответствует периоду, определённому с помощью члена period.

Если часы ходят с постоянной частотой (вне зависимости от того, совпадает эта частота с

period
или нет) и не допускают подведения, то говорят, что часы стабильны. Статический член
is_steady
класса часов равен
true
, если часы стабильны, и
false
в противном случае. Как правило, часы
std::chrono::system_clock
нестабильны, потому что их можно подвести, даже если такое подведение производится автоматически, чтобы учесть локальный дрейф. Из-за подведения более позднее обращение к
now()
может вернуть значение, меньшее, чем более раннее, а это нарушение требования к равномерному ходу часов. Как мы скоро увидим, стабильность важна для вычислений с таймаутами, поэтому в стандартной библиотеке С++ имеется класс стабильных часов —
std::chrono::steady_clock
. Помимо него, стандартная библиотека содержит класс
std::chrono::system_clock
(уже упоминавшийся выше), который представляет системный генератор «реального времени» и имеет функции для преобразования моментов времени в тип
time_t
и обратно, и класс
std::chrono::high_resolution_clock
, который представляет наименьший возможный тактовый период (и, следовательно, максимально возможное разрешение). Может статься, что этот тип на самом деле является псевдонимом
typedef
какого-то другого класса часов. Все эти классы определены в заголовке
наряду с прочими средствами работы со временем.

Чуть ниже мы рассмотрим представления моментов времени, но сначала познакомимся с представлением интервалов.

4.3.2. Временные интервалы

Интервалы — самая простая часть подсистемы поддержки времени; они представлены шаблонным классом

std::chrono::duration<>
(все имеющиеся в С++ средства работы со временем, которые используются в библиотеке Thread Library, находятся в пространстве имен
std::chrono
). Первый параметр шаблона — это тип представления (
int
,
long
или
double
), второй — дробь, показывающая, сколько секунд представляет один интервал. Например, число минут, хранящееся в значении типа
short
, равно
std::chrono::duration>
, потому что в одной минуте 60 секунд. С другой стороны, число миллисекунд, хранящееся в значении типа
double
, равно
std::chrono::duration>
, потому что миллисекунда — это 1/1000 секунды.

В пространстве имен

std::chrono
имеется набор предопределенных
typedef
'ов для различных интервалов:
nanoseconds
,
microseconds
,
milliseconds
,
seconds
,
minutes
и
hours
. В них используется достаточно широкий целочисленный тип, подобранный так, чтобы можно было представить в выбранных единицах интервал продолжительностью свыше 500 лет. Имеются также
typedef
для всех определенных в системе СИ степеней 10 — от
std::atto
(10-18) до
std::exa
(1018) (и более, если платформа поддерживает 128-разрядные целые числа) — чтобы можно было определить нестандартные интервалы, например
std::duration
(число сотых долей секунды, хранящееся в значении типа
double
).

Между типами интервалов существует неявное преобразование, если не требуется отсечение (то есть неявно преобразовать часы в секунды можно, а секунды в часы нельзя). Для явного преобразования предназначен шаблон функции

std::chrono::duration_cast<>
:

std::chrono::milliseconds ms(54802);

std::chrono::seconds s =

 std::chrono::duration_cast(ms);

Результат отсекается, а не округляется, поэтому в данном примере

s
будет равно 54.

Для интервалов определены арифметические операции, то есть сложение и вычитание интервалов, а также умножение и деление на константу базового для представления типа (первый параметр шаблона) дает новый интервал. Таким образом,

5*seconds(1)
— то же самое, что
seconds(5)
или
minutes(1) - seconds(55)
. Количество единиц в интервале возвращает функция-член
count()
. Так,
std::chrono::milliseconds(1234).count()
равно 1234.

Чтобы задать ожидание в течение интервала времени, используется функция

std::chrono::duration<>
. Вот, например, как задается ожидание готовности будущего результата в течение 35 миллисекунд:

std::future f = std::async(some_task);

if (f.wait_for(std::chrono::milliseconds(35)) ==

  std::future_status::ready)

 do_something_with(f.get());

Все функции ожидания возвращают код, показывающий, истек ли таймаут или произошло ожидаемое событие. В примере выше мы ожидаем будущий результат, поэтому функция вернет

std::future_status::timeout
, если истек таймаут,
std::future_status::ready
— если результат готов, и
std::future_status::deferred
— если будущая задача отложена. Время ожидания измеряется с помощью библиотечного класса стабильных часов, поэтому 35 мс — это всегда 35 мс, даже если системные часы были подведены (вперёд или назад) в процессе ожидания. Разумеется, из-за особенностей системного планировщика и варьирующейся точности часов ОС фактическое время между вызовом функции в потоке и возвратом из нее может оказаться значительно больше 35 мс.

Разобравшись с интервалами, мы можем перейти к моментам времени.

4.3.3. Моменты времени

Момент времени представляется конкретизацией шаблона класса

std::chrono::time_point<>
, в первом параметре которой задаются используемые часы, а во втором — единица измерения (специализация шаблона
std::chrono::duration<>
). Значением момента времени является промежуток времени (измеряемый в указанных единицах) с некоторой конкретной точки на временной оси, которая называется эпохой часов. Эпоха часов — это основополагающее свойство, однако напрямую его запросить нельзя, и в стандарте С++ оно не определено. Из типичных эпох можно назвать полночь (00:00) 1 января 1970 года и момент, когда в последний раз был загружен компьютер, на котором исполняется приложение. У разных часов может быть общая или независимые эпохи. Если у двух часов общая эпоха, то псевдоним типа
typedef time_point
в одном классе может ссылаться на другой класс как на тип, ассоциированный с
time_point
. Хотя узнать, чему равна эпоха, невозможно, вы можете получить время между данным моментом
time_point
и эпохой с помощью функции-члена
time_since_epoch()
, которая возвращает интервал.

Например, можно задать момент времени

std::chrono::time_point 
. Он представляет время по системным часам, выраженное в минутах, а не в естественных для этих часов единицах (как правило, секунды или доли секунды).

К объекту

std::chrono::time_point<>
можно прибавить интервал или вычесть из него интервал — в результате получится новый момент времени. Например,
std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(500)
соответствует моменту времени в будущем, который отстоит от текущего момента на 500 наносекунд. Это удобно для вычисления абсолютного таймаута, когда известна максимально допустимая продолжительность выполнения некоторого участка программы, и внутри этого участка есть несколько обращений к функциям с ожиданием или обращения к функциям, которые ничего не ждут, но предшествуют функции с ожиданием и занимают часть отведенного времени.

Можно также вычесть один момент времени из другого при условии, что они относятся к одним и тем же часам. В результате получиться интервал между двумя моментами. Это полезно для хронометража участков программы, например:

auto start = std::chrono::high_resolution_clock::now();

do_something();

auto stop = std::chrono::high_resolution_clock::now();

std::cout << "do_something() заняла "

 << std::chrono::duration<

   double, std::chrono::seconds>(stop-start).count()

 << " секунд" << std::endl;

Однако параметр

clock
объекта
std::chrono::time_point<>
не только определяет эпоху. Если передать момент времени функции с ожиданием, принимающей абсолютный таймаут, то указанный в нем параметр
clock
используется для измерения времени. Это существенно в случае, когда часы подводятся, потому что механизм ожидания замечает, что наказания часов изменились, и не дает функции вернуть управление, пока функция-член часов
now()
не вернет значение, большее, чем задано в таймауте. Если часы подведены вперёд, то это может уменьшить общее время ожидания (измеренное но стабильным часам), а если назад — то увеличить.

Как и следовало ожидать, моменты времени применяются в вариантах функций с ожиданием, имена которых заканчиваются словом

_until
. Как правило, таймаут задается в виде смещения от значения
some-clock::now()
, вычисленного в определенной точке программы, хотя моменты времени, ассоциированные с системными часами, можно получить из
time_t
с помощью статической функции-члена
std::chrono::system_clock::to_time_point()
, если при планировании операций требуется использовать время в понятном пользователю масштабе. Например, если на ожидание события, связанного с условной переменной, отведено не более 500 мс, то можно написать такой код.


Листинг 4.11. Ожидание условной переменной с таймаутом

#include 

#include 

#include 


std::condition_variable cv;

bool done;

std::mutex m;


bool wait_loop() {

 auto const timeout = std::chrono::steady_clock::now() +

            std::chrono::milliseconds(500);

 std::unique_lock lk(m);

 while(!done) {

  if (cv.wait_until(lk, timeout) == std::cv_status::timeout)

  break;

 }

 return done;

}

Это рекомендуемый способ ожидания условной переменной с ограничением по времени в случае, когда предикат не указывается. При этом ограничивается общее время выполнения цикла. В разделе 4.1.1 мы видели, что при использовании условных переменных без предиката цикл необходим для защиты от ложных пробуждений. Но если вызывать в цикле

wait_for()
, то может получиться, что функция прождёт почти все отведенное время, а затем произойдёт ложное пробуждение, после чего на следующей итерации отсчет времени начнется заново. И так может происходить сколько угодно раз, в результате чего общее время ожидания окажется неограниченным.

Вооружившись знаниями о том, как задавать таймауты, рассмотрим функции, в которых таймауты используются.

4.3.4. Функции, принимающие таймаут

Простейший случай использования таймаута — задание паузы в потоке, чтобы он не отнимал у других потоков время, когда ему нечего делать. Соответствующий пример был приведён в разделе 4.1, где мы в цикле опрашивали флаг «done». Для этого использовались функции

std::this_thread::sleep_for()
и
std::this_thread::sleep_until()
. Обе работают как будильник: поток засыпает либо на указанный интервал (в случае
sleep_for()
), либо до указанного момента времени (в случае
sleep_until()
). Функцию
sleep_for()
имеет смысл применять в ситуации, описанной в разделе 4.1, когда что-то необходимо делать периодически и важна лишь продолжительность периода. С другой стороны, функция
sleep_until()
позволяет запланировать пробуждение потока в конкретный момент времени, например: запустить в полночь резервное копирование, начать в 6 утра распечатку платёжной ведомости или приостановить поток до момента следующего обновления кадра при воспроизведении видео.

Разумеется, таймаут принимают не только функции типа

sleep
. Выше мы видели, что таймаут можно задавать при ожидании условных переменных и будущих результатов. А также при попытке захватить мьютекс, если сам мьютекс такую возможность поддерживает. Обычные классы
std::mutex
и
std::recursive_mutex
не поддерживают таймаут при захвате, зато его поддерживают классы
std::timed_mutex
и
std::recursive_timed_mutex
. В том и в другом имеются функции-члены
try_lock_for()
и
try_lock_until()
, которые пытаются получить блокировку в течение указанного интервала или до наступления указанного момента времени. В табл. 4.1 перечислены функции из стандартной библиотеки С++, которые принимают таймауты, их параметры и возвращаемые значения. Параметр
duration
должен быть объектом типа
std::duration<>
, а параметр
time_point
— объектом типа
std::time_point<>
.


Таблица 4.1. Функции, принимающие таймаут

Класс / пространство имен Функции Возвращаемые значения
std::this_thread
пространство имен
sleep_for(duration) sleep_until(time_point)
Неприменимо
std::condition_variable
или
std::condition_variable_any
wait_for(lock, duration) wait_until(lock, time_point)
std::cv_status::timeout
или
std::cv_status::no_timeout
wait_for(lock, duration, predicate) wait_until(lock, time_point, predicate)
bool
— значение, возвращенное предикатом
predicate
при пробуждении
std::timed_mutex
или
std::recursive_timed_mutex
try_lock_for(duration) try_lock_until(time_point)
bool
true
, если мьютекс захвачен, иначе
false
std::unique_lock<TimedLockable>
unique_lock(lockable, duration) unique_lock(lockable, time_point)
Неприменимо — функция
owns_lock()
для вновь сконструированного объекта возвращает
true
, если мьютекс захвачен, иначе
false
try_lock_for(duration) try_lock_until(time_point)
bool
true
, если мьютекс захвачен, иначе
false
std::future
или
std::shared_future
wait_for(duration) wait_until(time_point)
std::future_status::timeout
, если истек таймаут,
std::future_status::ready
, если будущий результат готов,
std::future_status::deferred
, если в будущем результате хранится отложенная функция, которая еще не начала исполняться

Теперь, когда мы рассмотрели условные переменные, будущие результаты, обещания и упакованные задачи, настало время представить более широкую картину их применения для синхронизации операций, выполняемых в разных потоках.

4.4. Применение синхронизации операций для упрощения кода

Использование описанных выше средств синхронизации в качестве строительных блоков позволяет сосредоточиться на самих нуждающихся в синхронизации операциях, а не на механизмах реализации. В частности, код можно упростить, применяя более функциональный (в смысле функционального программирования) подход к программированию параллелизма. Вместо того чтобы напрямую разделять данные между потоками, мы можем снабдить каждый поток необходимыми ему данными, а результаты вычисления предоставить другим потокам, которые в них заинтересованы, с помощью будущих результатов.

4.4.1. Функциональное программирование с применением будущих результатов

Термином функциональное программирование (ФП) называют такой стиль программирования, при котором результат функции зависит только от ее параметров, но не от внешнего состояния. Это напрямую соотносится с понятием функции в математике и означает, что если два раза вызвать функцию с одними и теми же параметрами, то получатся одинаковые результаты. Таким свойством обладают многие математические функции в стандартной библиотеке С++, например

sin
,
cos
и
sqrt
, а также простые операции над примитивными типами, например
3+3
,
6*9
или
1.3/4.7
. Чистая функция не модифицирует никакое внешнее состояние, она воздействует только на возвращаемое значение.

При таком подходе становится проще рассуждать о функциях, особенно в присутствии параллелизма, поскольку многие связанные с разделяемой памятью проблемы, обсуждавшиеся в главе 3, просто не возникают. Если разделяемые данные не модифицируются, то не может быть никакой гонки и, стало быть, не нужно защищать данные с помощью мьютексов. Это упрощение настолько существенно, что в программировании параллельных систем все более популярны становятся такие языки, как Haskell[9], где все функции чистые по умолчанию. В таком окружении нечистые функции, которые все же модифицируют разделяемые данные, отчетливо выделяются, поэтому становится проще рассуждать о том, как они укладываются в общую структуру приложения.

Но достоинства функционального программирования проявляются не только в языках, где эта парадигма применяется по умолчанию. С++ — мультипарадигменный язык, и на нем, безусловно, можно писать программы в стиле ФП. С появлением в С++11 лямбда-функций (см. приложение А, раздел А.6), включением шаблона

std::bind
из Boost и TR1 и добавлением автоматического выведения типа переменных (см. приложение А, раздел А.7) это стало даже проще, чем в С++98. Будущие результаты — это последний элемент из тех, что позволяют реализовать на С++ параллелизм в стиле ФП; благодаря передаче будущих результатов результат одного вычисления можно сделать зависящим от результата другого без явного доступа к разделяемым данным.

Быстрая сортировка в духе ФП

Чтобы продемонстрировать использование будущих результатов при написании параллельных программ в духе ФП, рассмотрим простую реализацию алгоритма быстрой сортировки Quicksort. Основная идея алгоритма проста: имея список значений, выбрать некий опорный элемент и разбить список на две части — в одну войдут элементы, меньшие опорного, в другую — большие или равные опорному. Отсортированный список получается путем сортировки обоих частей и объединения трех списков: отсортированного множества элементов, меньших опорного элемента, самого опорного элемента и отсортированного множества элементов, больших или равных опорному элементу. На рис. 4.2 показано, как этот алгоритм сортирует список из 10 целых чисел. В листинге ниже приведена последовательная реализация алгоритма в духе ФП; в ней список принимается и возвращается по значению, а не сортируется по месту в

std::sort()
.

Рис. 4.2. Рекурсивная сортировка в духе ФП


Листинг 4.12. Последовательная реализация Quicksort в духе ФП

template

std::list sequential_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }

 std::list result;

 result.splice(result.begin(), input, input.begin());←
(1)


 T const& pivot = *result.begin();          ←
(2)


 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) { return t < pivot; });←
(3)


 std::list lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point); ←
(4)


 auto new_lower(

  sequential_quick_sort(std::move(lower_part)));     ←
(5)

 auto new_higher(

  sequential_quick_sort(std::move(input)));        ←
(6)


 result.splice(result.end(), new_higher);  ←
(7)

 result.splice(result.begin(), new_lower); ←
(8)


 return result;

}

Хотя интерфейс выдержан в духе ФП, прямое применение ФП привело бы к неоправданно большому числу операций копирования, поэтому внутри мы используем «обычный» императивный стиль. В качестве опорного мы выбираем первый элемент и отрезаем его от списка с помощью функции

splice()
(1). Потенциально это может привести к неоптимальной сортировке (в терминах количества операций сравнения и обмена), но любой другой подход при работе с
std::list
может существенно увеличить время за счет обхода списка. Мы знаем, что этот элемент должен войти в результат, поэтому можем сразу поместить его в список, где результат будет храниться. Далее мы хотим использовать этот элемент для сравнения, поэтому берем ссылку на него, чтобы избежать копирования (2). Теперь можно с помощью алгоритма
std::partition
разбить последовательность на две части: меньшие опорного элемента и не меньшие опорного элемента (3). Критерий разбиения проще всего задать с помощью лямбда-функции; мы запоминаем ссылку в замыкании, чтобы не копировать значение
pivot
(подробнее о лямбда-функциях см. в разделе А.5 приложения А).

Алгоритм

std::partition()
переупорядочивает список на месте и возвращает итератор, указывающий на первый элемент, который не меньше опорного значения. Полный тип итератора довольно длинный, поэтому мы используем спецификатор типа
auto
, чтобы компилятор вывел его самостоятельно (см. приложение А, раздел А.7).

Раз уж мы выбрали интерфейс в духе ФП, то для рекурсивной сортировки обеих «половин» нужно создать два списка. Для этого мы снова используем функцию

splice()
, чтобы переместить значения из списка
input
до
divide_point
включительно в новый список
lower_part
(4). После этого
input
будет со держать только оставшиеся значения. Далее оба списка можно отсортировать путем рекурсивных вызовов (5), (6). Применяя
std::move()
для передачи списков, мы избегаем копирования — результат в любом случае неявно перемещается. Наконец, мы еще раз вызываем
splice()
, чтобы собрать result в правильном порядке. Значения из списка
new_higher
попадают в конец списка (7), после опорного элемента, а значения из списка
new_lower
— в начало списка, до опорного элемента (8).

Параллельная реализация Quicksort в духе ФП

Раз уж мы все равно применили функциональный стиль программирования, можно без труда распараллелить этот код с помощью будущих результатов, как показано в листинге ниже. Набор операций тот же, что и раньше, только некоторые из них выполняются параллельно.


Листинг 4.13. Параллельная реализация Quicksort с применением будущих результатов

template

std::list parallel_quick_sort(std::list input) {

 if (input.empty()) {

  return input;

 }


 std::list result;

 result.splice(result.begin(), input, input.begin());

 T const& pivot = *result.begin();


 auto divide_point = std::partition(input.begin(), input.end(),

  [&](T const& t) {return t

 std::list lower_part;

 lower_part.splice(

  lower_part.end(), input, input.begin(), divide_point);


 std::future > new_lower( ←
(1)

  std::async(¶llel_quick_sort, std::move(lower_part)));


 auto new_higher(

  parallel_quick_sort(std::move(input))); ←
(2)


 result.splice(result.end(), new_higher); ←
(3)


 result.splice(result.begin(), new_lower.get()); ←
(4)

 return result;

}

Существенное изменение здесь заключается в том, что сортировка нижней части списка производится не в текущем, а в отдельном потоке — с помощью

std::async()
(1). Верхняя часть списка сортируется путем прямой рекурсии, как и раньше (2). Рекурсивно вызывая
parallel_quick_sort()
, мы можем задействовать доступный аппаратный параллелизм. Если
std::async()
создает новый поток при каждом обращении, то после трех уровней рекурсии мы получим восемь работающих потоков, а после 10 уровней (когда в списке примерно 1000 элементов) будет работать 1024 потока, если оборудование позволяет. Если библиотека решит, что запущено слишком много задач (быть может, потому что количество задач превысило уровень аппаратного параллелизма), то может перейти в режим синхронного запуска новых задач. Тогда новая задача будет работать в том же потоке, который обратился к
get()
, а не в новом, так что мы не будем нести издержки на передачу задачи новому потоку, если это не увеличивает производительность. Стоит отметить, что в соответствии со стандартом реализация
std::async
вправе как создавать новый поток для каждой задачи (даже при значительном превышении лимита), если явно не задан флаг
std::launch::deferred
, так и запускать все задачи синхронно, если явно не задан флаг
std::launch::async
. Рассчитывая, что библиотека сама позаботится об автоматическом масштабировании, изучите, что говорится на эту тему в документации, поставляемой вместе с библиотекой.

Можно не использовать

std::async()
, а написать свою функцию
spawn_task()
, которая будет служить оберткой вокруг
std::packaged_task
и
std::thread
, как показано в листинге 4.14; нужно создать объект
std::packaged_task
для хранения результата вызова функции, получить из него будущий результат, запустить задачу в отдельном потоке и вернуть будущий результат. Само по себе это не дает большого преимущества (и, скорее всего, приведёт к значительному превышению лимита), но пролагает дорогу к переходу на более хитроумную реализацию, которая помещает задачу в очередь, обслуживаемую пулом потоков. Рассмотрение пулов потоков мы отложим до главы 9. Но идти по такому пути вместо использования
std::async
имеет смысл только в том случае, когда вы точно знаете, что делаете, и хотите полностью контролировать, как пул потоков строится и выполняет задачи.

Но вернемся к функции

parallel_quick_sort
. Поскольку для получения
new_higher
мы применяли прямую рекурсию, то и срастить (splice) его можно на месте, как и раньше (3). Но
new_lower
теперь представляет собой не список, а объект
std::future>
, поэтому сначала нужно извлечь значение с помощью
get()
, а только потом вызывать
splice()
(4). Таким образом, мы дождемся завершения фоновой задачи, а затем переместим результат в параметр
splice()
; функция
get()
возвращает ссылку на r-значение — хранимый результат, следовательно, его можно переместить (подробнее о ссылках на r-значения и семантике перемещения см. в разделе А.1.1 приложения А).

Даже в предположении, что

std::async()
оптимально использует доступный аппаратный параллелизм, приведённая реализация Quicksort все равно не идеальна. Основная проблема в том, что
std::partition
делает много работы и остается последовательной операцией, но пока остановимся на этом. Если вас интересует максимально быстрая параллельная реализация, обратитесь к научной литературе.


Листинг 4.14. Простая реализация функции

spawn_task

template

std::future::type>

spawn_task(F&& f, A&& a) {

 typedef std::result_of::type result_type;

 std::packaged_task

 task(std::move(f)));

 std::future res(task.get_future());

 std::thread t(std::move(task), std::move(a));

 t.detach();

 return res;

}

Функциональное программирование — не единственная парадигма параллельного программирования, позволяющая избежать модификации разделяемых данных. Альтернативой является парадигма CSP (Communicating Sequential Processes — взаимодействующие последовательные процессы)[10], в которой потоки концептуально рассматриваются как полностью независимые сущности, без каких бы то ни было разделяемых данных, но соединенные коммуникационными каналами, по которым передаются сообщения. Эта парадигма положена в основу языка программирования Erlang (http://www.erlang.org/) и среды MPI (Message Passing Interface) (http://www.mpi-forum.org/), широко используемой для высокопроизводительных вычислений на С и С++. Уверен, что теперь вы не удивитесь, узнав, что и эту парадигму можно поддержать на С++, если соблюдать определенную дисциплину; в следующем разделе показано, как это можно сделать.

4.4.2. Синхронизация операций с помощью передачи сообщений

Идея CSP проста: если никаких разделяемых данных нет, то каждый поток можно рассматривать независимо от остальных, учитывая лишь его поведение в ответ на получаемые сообщения. Таким образом, поток по существу является конечным автоматом: получив сообщение, он как-то изменяет свое состояние, возможно, посылает одно или несколько сообщений другим потокам и выполняет то или иное вычисление, зависящее от начального состояния. Один из способов такого способа программирования потоков — формализовать это описание и реализовать модель конечного автомата, но этот путь не единственный — конечный автомат может неявно присутствовать в самой структуре приложения. Какой метод будет работать лучше в конкретном случае, зависит от требований к поведению приложения и от опыта разработчиков. Но каким бы образом ни был реализован поток, у разбиения на независимые процессы есть несомненное преимущество — потенциальное устранение многих сложностей, связанных с параллельным доступом к разделяемым данным, и, следовательно, упрощение программирования и снижение количества ошибок.

У настоящих последовательных взаимодействующих процессов вообще нет разделяемых данных, а весь обмен информацией производится через очереди сообщений. Но, поскольку в С++ потоки имеют общее адресное пространство, то обеспечить строгое соблюдение этого требования невозможно. Тут-то и приходит на выручку дисциплина: следить за тем, чтобы никакие данные не разделялись между потоками, — обязанность автора приложения или библиотеки. Разумеется, сами очереди сообщений должны разделяться, иначе потоки не смогут взаимодействовать, но детали этого механизма можно вынести в библиотеку. Представьте, что вам нужно написать программу для банкомата. Она должна поддерживать взаимодействие с человеком, который хочет снять деньги, с соответствующим банком, а также управлять оборудованием, которое принимает платёжную карту, выводит на экран сообщения, обрабатывает нажатия клавиш, выдает деньги и возвращает карту.

Чтобы воплотить все это в жизнь, можно было бы разбить код на три независимых потока: один будет управлять оборудованием, второй — реализовывать логику работы банкомата, а третий — обмениваться информацией с банком. Эти потоки могут взаимодействовать между собой посредством передачи сообщений, а не за счет разделения данных. Например, поток, управляющий оборудованием, будет посылать сообщение потоку логики банкомата о том, что человек вставил карту или нажал кнопку. Поток логики будет посылать потоку, управляющему оборудованием, сообщение о том, сколько денег выдать. И так далее.

Смоделировать логику банкомата можно, например, с помощью конечного автомата. В каждом состоянии поток ждет сообщение, которое затем обрабатывает. Это может привести к переходу в новое состояние, после чего цикл продолжится. На рис. 4.3 показаны состояния, присутствующие в простой реализации программы. Здесь система ждет, пока будет вставлена карта. Когда это произойдёт, система ждет, что пользователь введет свой ПИН-код, по одной цифре за раз. Последнюю введенную цифру пользователь может удалить. После того как будет введено нужное количество цифр, система проверяет ПИН-код. Если он введен неправильно, больше делать нечего — клиенту нужно вернуть карту и ждать, пока будет вставлена следующая карта. Если ПИН-код правильный, то система ждет либо отмены транзакции, либо выбора снимаемой суммы. Если пользователь отменил операцию, ему нужно вернуть карту и закончить работу. Если он выбрал сумму, то система ждет подтверждения от банка, а затем либо выдает наличные и возвращает карту, либо выводит сообщение «недостаточно средств на счете» и тоже возвращает карту. Понятно, что реальный банкомат гораздо сложнее, но и этого достаточно для иллюстрации идеи.

Рис. 4.3. Модель простого конечного автомата для банкомата

Спроектировав конечный автомат для реализации логики банкомата, мы можем оформить его в виде класса, в котором каждому состоянию соответствует функция-член. Каждая такая функция ждет поступления одного из допустимых сообщений, обрабатывает его и, возможно, инициирует переход в новое состояние. типы сообщений представлены структурами

struct
. В листинге 4.15 приведена часть простой реализации логики банкомата в такой системе — главный цикл и код первого состояния, в котором программа ожидает вставки карты.

Как видите, вся синхронизация, необходимая для передачи сообщений, целиком скрыта в библиотеке (ее простая реализация приведена в приложении С вместе с полным кодом этого примера).


Листинг 4.15. Простая реализация класса, описывающего логику работы банкомата

struct card_inserted {

 std::string account;

};


class atm {

 messaging::receiver incoming;

 messaging::sender bank;

 messaging::sender interface_hardware;

 void (atm::*state)();


 std::string account;

 std::string pin;

 void waiting_for_card() {            ←
(1)

  interface_hardware.send(display_enter_card());←
(2)

  incoming.wait()                ←
(3)

  .handle(

   [&](card_inserted const& msg) {       ←
(4)

   account = msg.account;

   pin = "";

   interface_hardware.send(display_enter_pin());

   state = &atm::getting_pin;

  }

  );

 }


 void getting_pin();


public:

 void run() {            ←
(5)

  state = &atm::waiting_for_card; ←
(6)

  try {

  for(;;) {

   (this->*state)();       ←
(7)

  }

  }

  catch(messaging::close_queue const&) {}

 }

};

Мы уже говорили, что эта реализация неизмеримо проще логики работы реального банкомата, но она все же дает представление о программировании на основе передачи сообщений. Не нужно думать о проблемах параллельности и синхронизации, наша основная забота — понять, какие входные сообщения допустимы в данной точке и какие сообщения посылать в ответ. Конечный автомат, реализующий логику банкомата, работает в одном потоке, а прочие части системы, например интерфейс с банком и с терминалом, — в других потоках. Такой принцип проектирования программ называется моделью акторов — в системе есть несколько акторов (каждый работает в своем потоке), которые посылают друг другу сообщения с просьбой выполнить определённое задание, и никакого разделяемого состояния, помимо передаваемого в составе сообщений, не существует.

Выполнение начинается в функции-члене

run()
(5), которая устанавливает начальное состояние
waiting_for_card
(6), а затем в цикле вызывает функции-члены, представляющие текущее состояние (каким бы оно ни было) (7). Функции состояния — это просто функции-члены класса
atm
. Функция
waiting_for_card
(1) тоже не представляет сложности: она посылает сообщение интерфейсу с просьбой вывести сообщение «Вставьте карту» (2), а затем ожидает сообщения, которое могла бы обработать (3). Единственное допустимое в этой точке сообщение —
card_inserted
; оно обрабатывается лямбда-функцией (4). Функции
handle
можно передать любую функцию или объект-функцию, но в таком простом случае лямбда-функции вполне достаточно. Отметим, что вызов функции
handle()
сцеплен с вызовом
wait()
; если получено сообщение недопустимого типа, оно отбрасывается, и поток ждет, пока не придёт подходящее сообщение.

Сама лямбда-функция просто запоминает номер карточного счета в переменной-члене, очищает текущий ПИН-код и переходит в состояние «получение ПИН». По завершении обработчика сообщений функция состояния возвращает управление главному циклу, который вызывает функцию следующего состояния (7).

Функция состояния

getting_pin
несколько сложнее, потому что может обрабатывать сообщения разных типов, как следует из рис. 4.3. Ниже приведён ее код.


Листинг 4.16. Функция состояния

getting_pin
для простой реализации банкомата

void atm::getting_pin() {

 incoming.wait()

 .handle(   ←
(1)

  [&](digit_pressed const& msg) {

  unsigned const pin_length = 4;

  pin += msg.digit;

  if (pin.length() == pin_length) {

   bank.send(verify_pin(account, pin, incoming));

   state = &atm::verifying_pin;

  }

  }

 )

 .handle(←
(2)

  [&](clear_last_pressed const& msg) {

  if (!pin.empty()) {

   pin.resize(pin.length() - 1);

  }

  }

 )

 .handle(  ←
(3)

 [&](cancel_pressed const& msg) {

  state = &atm::done_processing;

 }

 );

}

Поскольку теперь допустимы сообщения трех типов, то с функцией

wait()
сцеплены три вызова функции
handle()
(1), (2), (3). В каждом вызове
handle()
в качестве параметра шаблона указан тип сообщения, а в качестве параметра самой функции — лямбда-функция, которая принимает сообщение этого типа. Поскольку вызовы сцеплены, функция
wait()
знает, что может ожидать сообщений
digit_pressed
,
clear_last_pressed
или
cancel_pressed
. Сообщения всех прочих типов игнорируются.

Как видим, теперь состояние изменяется не всегда. Например, при получении сообщения

digit_pressed
мы просто дописываем цифру в конец
pin
, если эта цифра не последняя. Затем главный цикл ((7) в листинге 4.15) снова вызовет функцию
getting_pin()
, чтобы ждать следующую цифру (или команду очистки либо отмены).

Это соответствует поведению, изображенному на рис. 4.3. Каждое состояние реализовано отдельной функцией-членом, которая ждет сообщений определенных типов и при необходимости обновляет состояние.

Как видите, такой стиль программирования может заметно упростить проектирование параллельной системы, поскольку все потоки рассматриваются как абсолютно независимые. Таким образом, мы имеем пример использования нескольких потоков для разделения обязанностей, а, значит, от нас требуется явно решить, как распределять между ними задачи.

4.5. Резюме

Синхронизация операций между потоками — важная часть написания параллельного приложения. Если синхронизации нет, то потоки ведут себя независимо, и их вполне можно было бы реализовать как отдельные приложения, запускаемые группой, потому что выполняют взаимосвязанные действия. В этой главе мы рассмотрели различные способы синхронизации операций — простые условные переменные, будущие результаты, обещания и упакованные задачи. Мы также обсудили несколько подходов к решению проблем синхронизации: функциональное программирование, когда каждая задача порождает результат, зависящий только от входных данных, но не от внешнего состояния, и передачу сообщений, когда взаимодействие потоков осуществляется за счет асинхронного обмена сообщениями с помощью какой-либо подсистемы передачи сообщений, играющей роль посредника.

Теперь, когда мы обсудили многие высокоуровневые средства, имеющиеся в С++, настало время познакомиться с низкоуровневыми механизмами, которые приводят всю систему в движение: модель памяти С++ и атомарные операции.

Загрузка...