В этой главе:
□ автоматическое распараллеливание кода, использующего стандартные алгоритмы;
□ приостановка программы на конкретный промежуток времени;
□ запуск и приостановка потоков;
□ выполнение устойчивой к исключениям общей блокировки с помощью
std::unique_lock
и std::shared_lock
;
□ избегание взаимных блокировок с применением
std::scoped_lock
;
□ синхронизация конкурентного использования
std::cout
;
□ безопасное откладывание инициализации с помощью
std::call_once
;
□ отправка выполнения задач в фоновый режим с применением
std::async
;
□ реализация идиомы «производитель/потребитель» с использованием
std::condition_variable
;
□ реализация идиомы «несколько потребителей/производителей» с помощью
std::condition_variable
;
□ распараллеливание отрисовщика множества Мандельброта в ASCII с применением
std::a
sync;
□ реализация небольшой автоматической библиотеки для распараллеливания с использованием
std::future
.
До C++11 язык C++ не поддерживал параллельные вычисления. Это не значило, что запуск, управление, остановка и синхронизация потоков были невыполнимы, но для каждой операционной системы требовались специальные библиотеки, поскольку потоки по своей природе связаны с ОС.
С появлением C++11 мы получили библиотеку
std::thread
, которая позволяет управлять потоками всех операционных систем. Для синхронизации потоков в C++11 были созданы классы-мьютексы, а также удобные оболочки блокировок в стиле RAII. Вдобавок std::condition_variable
позволяет отправлять гибкие уведомления о событиях между потоками.
Кроме того, интересными дополнениями являются
std::async
и std::future
: теперь можно оборачивать произвольные нормальные функции в вызовы std::async
, чтобы выполнять их асинхронно в фоновом режиме. Такие обернутые функции возвращают объекты типа std::future
, которые обещают содержать результаты работы функции, и можно сделать что-то еще, прежде чем дождаться их появления. Еще одно значительное улучшение STL — политики выполнения, которые могут быть добавлены к 69 уже существующим алгоритмам. Это дополнение означает, что можно просто добавить один аргумент, описывающий политику выполнения, в существующие вызовы стандартных алгоритмов и получить доступ к параллелизации, не нуждаясь в переписывании сложного кода.
В данной главе мы пройдемся по всем указанным дополнениям, чтобы узнать их самые важные особенности. После этого у нас будет достаточно информации о поддержке параллелизации в STL версии C++17. Мы не станем рассматривать все свойства, только самые важные. Информация, полученная из этой книги, позволит быстро понять остальную часть механизмов распараллеливания, которую можно найти в Интернете в документации к STL версии C++17.
Наконец, в этой главе содержатся два дополнительных примера. В одном из них мы распараллелим отрисовщик множества Мандельброта в ASCII из главы 6, внеся минимальные изменения. В последнем примере реализуем небольшую библиотеку, которая помогает распараллелить выполнение сложных задач неявно и автоматически.
В C++17 появилось одно действительно крупное расширение для параллелизма: политики выполнения для стандартных алгоритмов. Шестьдесят девять алгоритмов были расширены и теперь принимают политики выполнения, чтобы работать параллельно на нескольких ядрах и даже при включенной векторизации.
Для пользователя это значит следующее: если мы уже повсеместно задействуем алгоритмы STL, то можем параллелизовать их работу без особых усилий. Мы легко можем дополнить наши приложения параллелизацией, просто добавив один аргумент, описывающий политику выполнения, в существующие вызовы алгоритмов STL.
В данном разделе мы реализуем простую программу (с не самым серьезным сценарием применения), которая генерирует несколько вызовов алгоритмов STL. При этом увидим, как легко использовать политики выполнения C++17, чтобы запустить их в нескольких потоках. В последних подразделах мы более подробно рассмотрим разные политики выполнения.
Как это делается
В данном примере мы напишем программу, использующую некоторые стандартные алгоритмы. Сама программа является скорее примером того, как могут выглядеть реальные сценарии, а не средством решения настоящей рабочей проблемы. Применяя эти стандартные алгоритмы, мы встраиваем политики выполнения, чтобы ускорить выполнение кода.
1. Сначала включим некоторые заголовочные файлы и объявим об использовании пространства имен
std
. Заголовочный файл execution
мы еще не видели, он появился в C++17.
#include
#include
#include
#include
#include
using namespace std;
2. В качестве примера создадим функцию-предикат, которая говорит, является ли число четным. Мы воспользуемся ею далее.
static bool odd(int n) { return n%2; }
3. Сначала определим в нашей функции
main
большой вектор. Заполним его большим количеством данных, чтобы для выполнения вычислений потребовалось какое-то время. Скорость выполнения этого кода будет значительно различаться в зависимости от того, на каком компьютере работает этот код. Для разных компьютеров скорее подойдут меньшие/большие размеры вектора.
int main()
{
vector d (50000000);
4. Чтобы получить большое количество случайных данных для вектора, создадим генератор случайных чисел и распределение и упакуем их в вызываемый объект. Если это кажется странным, пожалуйста, взгляните сначала на примеры, в которых рассматриваются генераторы случайных чисел и распределения в главе 8.
mt19937 gen;
uniform_int_distribution dis(0, 100000);
auto rand_num ([=] () mutable { return dis(gen); });
5. Теперь воспользуемся стандартным алгоритмом
std::generate
, чтобы заполнить вектор случайными данными. В С++17 существует новая версия этого алгоритма, которая принимает аргумент нового типа: политику выполнения. Здесь поместим std::par
, что позволит автоматически распараллелить данный код. Сделав это, позволим нескольким потокам заполнять вектор одновременно; данное действие снижает время выполнения, если компьютер имеет более одного процессора, что обычно верно для современных машин.
generate(execution::par, begin(d), end(d), rand_num);
6. Метод
std::sort
тоже должен быть вам знаком. Версия C++17 также поддерживает дополнительный аргумент, определяющий политику выполнения:
sort(execution::par, begin(d), end(d));
7. Это верно и для
std::reverse
:
reverse(execution::par, begin(d), end(d));
8. Затем воспользуемся функцией
std::count_if
, чтобы подсчитать количество четных чисел в векторе. Мы даже можем распараллелить выполнение алгоритма, просто добавив политику выполнения снова!
auto odds (count_if(execution::par, begin(d), end(d), odd));
9. Вся эта программа не выполняет никакой реальной научной работы, поскольку мы просто смотрим, как можно распараллелить стандартные алгоритмы, но выведем что-нибудь на экран в конце работы.
cout << (100.0 * odds / d.size())
<< "% of the numbers are odd.\n";
}
10. Компиляция и запуск программы дадут следующий результат. К этому моменту интересно посмотреть, как отличается скорость выполнения при использовании алгоритмов без политики выполнения в сравнении со всеми другими политиками выполнения. Данное упражнение оставим на откуп читателю. Попробуйте его сделать; доступными политиками выполнения являются
seq
, par
и par_vec
. Для каждой из них мы должны получить разное время выполнения:
$ ./auto_parallel
50.4% of the numbers are odd.
Как это работает
Поскольку в этом примере мы не отвлекались на решение сложных реальных задач, можем полностью сконцентрироваться на вызовах функций стандартных библиотек. Довольно очевидно, что их параллелизованные версии едва отличаются от классических последовательных вариаций. Они отличаются всего на один дополнительный аргумент — политику выполнения.
Взглянем на вызовы и ответим на три основных вопроса:
generate(execution::par, begin(d), end(d), rand_num);
sort( execution::par, begin(d), end(d));
reverse( execution::par, begin(d), end(d));
auto odds (count_if(execution::par, begin(d), end(d), odd));
Какие алгоритмы STL можно распараллелить таким образом?
Шестьдесят девять существующих алгоритмов STL получили поддержку параллелизма по стандарту C++17. Появились также семь новых алгоритмов, которые тоже поддерживают параллелизм. Несмотря на то что такое улучшение может оказаться довольно инвазивным для реализации, с точки зрения их интерфейса изменилось не очень много: все они получили дополнительный аргумент
ExecutionPolicy&& policy
и все. Это не значит, что мы всегда должны предоставлять аргумент, описывающий политику выполнения. Они просто дополнительно поддерживают передачу политики выполнения в качестве первого аргумента.
Перед вами 69 улучшенных стандартных алгоритмов. Кроме того, в этот список включены семь новых алгоритмов, изначально поддерживающих политики выполнения (выделены полужирным):
std::adjacent_difference
std::adjacent_find
std::all_of
std::any_of
std::copy
std::copy_if
std::copy_n
std::count
std::count_if
std::equal
std::exclusive_scan
std::fill
std::fill_n
std::find
std::find_end
std::find_first_of
std::find_if
std::find_if_not
std::for_each
std::for_each_n
std::generate
std::generate_n
std::includes
std::inclusive_scan
std::inner_product
std::inplace_merge
std::is_heap
std::is_heap_until
std::is_partitioned
std::is_sorted
std::is_sorted_until
std::lexicographical_compare
std::max_element
std::merge std::min_element
std::minmax_element
std::mismatch std::move
std::none_of
std::nth_element
std::partial_sort
std::partial_sort_copy
std::partition
std::partition_copy
std::remove
std::remove_copy
std::remove_copy_if
std::remove_if
std::replace
std::replace_copy
std::replace_copy_if
std::replace_if
std::reverse
std::reverse_copy
std::rotate
std::rotate_copy
std::search
std::search_n
std::set_difference
std::set_intersection
std::set_symmetric_difference
std::set_union
std::sort
std::stable_partition
std::stable_sort
std::swap_ranges
std::transform
std::transform_exclusive_scan
std::transform_inclusive_scan
std::transform_reduce
std::uninitialized_copy
std::uninitialized_copy_n
std::uninitialized_fill
std::uninitialized_fill_n
std::unique
std::unique_copy
Улучшение этих алгоритмов — отличная новость! Чем больше алгоритмов STL используется в наших старых программах, тем проще добавить поддержку параллелизма задним числом. Обратите внимание: это не значит, что такие изменения автоматически сделают программу в N раз быстрее, поскольку концепция многопроцессорной обработки гораздо сложнее.
Однако вместо того, чтобы разрабатывать собственные сложные параллельные алгоритмы с помощью
std::thread
, std::async
или внешних библиотек, можно распараллелить выполнение стандартных задач способом, не зависящим от операционной системы.
Как работают эти политики выполнения
Политика выполнения указывает, какую стратегию автоматического распараллеливания необходимо использовать при вызове стандартных алгоритмов.
Следующие три типа политик существуют в пространстве имен
std::execution
(табл. 9.1).
Политики выполнения подразумевают конкретные ограничения. Чем они строже, тем больше мер по распараллеливанию можно позволить:
□ все элементы функций доступа, используемые параллелизованными алгоритмами, не должны вызывать взаимных блокировок и гонок;
□ в случае параллелизации и векторизации все функции получения доступа не должны использовать блокирующую синхронизацию.
До тех пор, пока подчиняемся этим правилам, мы не столкнемся с ошибками, которые могут появиться в параллельных версиях алгоритмов STL.
Обратите внимание: правильное использование параллельных алгоритмов STL не всегда гарантирует ускорение работы. В зависимости от того, какую задачу мы пытаемся решить, ее размера, эффективности наших структур и других методов доступа, измеряемое ускорение будет значительно различаться или даже и вовсе не произойдет. Многопроцессорная обработка — это все еще довольно сложно.
Что означает понятие «векторизация»
Векторизация — это свойство, которое должны поддерживать как процессор, так и компилятор. Кратко рассмотрим простой пример, чтобы понять суть векторизации и как она работает. Допустим, нужно сложить числа, находящиеся в очень большом векторе. Простая реализация данной задачи может выглядеть так:
std::vector v {1, 2, 3, 4, 5, 6, 7 /*...*/};
int sum {std::accumulate(v.begin(), v.end(), 0)};
Компилятор в конечном счете сгенерирует цикл из вызова
accumulate
, который может выглядеть следующим образом:
int sum {0};
for (size_t i {0}; i < v.size(); ++i) {
sum += v[i];
}
С этого момента при разрешенной и включенной векторизации компилятор может создать следующий код. Цикл выполняет четыре шага сложения в одной итерации цикла, что сокращает количество итераций в четыре раза. Для простоты пример не работает с остатком, если вектор не содержит
N*4
элементов:
int sum {0};
for (size_t i {0}; i < v.size() / 4; i += 4) {
sum += v[i] + v[i+1] + v[i + 2] + v[i + 3];
}
// если операция v.size()/4 имеет остаток,
// в реальном коде также нужно это обработать.
Зачем это делать? Многие процессоры предоставляют инструкции, которые могут выполнять математические операции наподобие
sum += v[i]+v[i+1]+v[i+2]+v[i+3];
всего за один шаг. Сжатие большого количества математических операций в минимальное количество инструкций — наша цель, поскольку это ускоряет программу.
Автоматическую векторизацию выполнять сложно, поскольку компилятору нужно в некоторой степени понимать нашу программу, чтобы ускорить ее, не нарушая правильности. По крайней мере помочь компилятору можно, используя стандартные алгоритмы максимально часто, поскольку компилятору проще понять их, чем запутанные циклы со сложными зависимостями.
Простая и удобная возможность управления потоками добавлена в С++11. В данной версии появилось пространство имен
this_thread
, содержащее функции, которые влияют только на вызывающий поток. Оно включает две разные функции, позволяющие приостановить поток на определенный промежуток времени, это позволяет перестать использовать внешние библиотеки или библиотеки, зависящие от операционной системы.
В этом примере мы сконцентрируемся на том, как приостанавливать потоки на определенный промежуток времени.
Как это делается
В этом примере мы напишем короткую программу, которая приостанавливает основной поток на определенные промежутки времени.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространств имен
std
и chrono_literals
. Второе пространство содержит удобные аббревиатуры для описания промежутков времени:
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Сразу же приостановим основной поток на 5 секунд 300 миллисекунд. Благодаря пространству имен
chrono_literals
можем выразить эти промежутки времени в читабельном формате:
int main()
{
cout << "Going to sleep for 5 seconds"
" and 300 milli seconds.\n";
this_thread::sleep_for(5s + 300ms);
3. Последним выражением приостановки являлось
relative
. Кроме того, можно выразить запросы absolute
на приостановку. Приостановим поток на 3 секунды, начиная с текущего момента:
cout << "Going to sleep for another 3 seconds.\n";
this_thread::sleep_until(
chrono::high_resolution_clock::now() + 3s);
4. Перед завершением программы выведем на экран какое-нибудь сообщение, что укажет на окончание второго периода приостановки.
cout << "That's it.\n";
}
5. Компиляция и запуск программы дадут следующие результаты. В Linux, Mac и других UNIX-подобных операционных системах имеется команда
time
, принимающая другую команду, чтобы выполнить ее и определить время, которое требуется на ее выполнение. Запуск нашей программы с помощью команды time
показывает: она работала 8,32 секунды, это значение примерно равно 5,3 и 3 секундам, на которые мы приостанавливали программу. При запуске программы можно определить промежуток времени между появлением строк, выводимых на консоль:
$ time ./sleep
Going to sleep for 5 seconds and 300 milli seconds.
Going to sleep for another 3 seconds.
That's it.
real 0m8.320s
user 0m0.005s
sys 0m0.003s
Как это работает
Функции
sleep_for
и sleep_until
появились в версии C++11 и находятся в пространстве имен std::this_thread
. Они блокируют выполнение текущего потока (но не процесса или программы) на конкретный промежуток времени. Поток не потребляет время процессора на протяжении блокировки. Он просто помещается операционной системой в неактивное состояние. ОС, конечно же, напоминает себе о необходимости возобновить поток. Самое лучшее заключается в том, что нам не придется волноваться, в какой операционной системе запущена программа, поскольку эту информацию от нас абстрагирует STL.
Функция
this_thread::sleep_for
принимает значение типа chrono::duration
. В простейшем случае это просто 1s
или 5s+300ms
, как это было показано в нашем примере кода. Чтобы получить возможность применять такие удобные литералы, нужно объявить об использовании пространства имен std::chrono_literals;
.
Функция
this_thread::sleep_until
принимает значение типа chrono::time_point
вместо промежутка времени. Это удобно в том случае, если нужно приостановить поток до наступления конкретного момента времени.
Точность момента пробуждения зависит от операционной системы. Она будет довольно высокой для большинства ОС, но могут возникнуть проблемы, если требуется точность вплоть до наносекунд.
Приостановить выполнение потока на короткий промежуток времени также можно с помощью функции
this_thread::yield
. Она не принимает аргументы; это значит, что неизвестно, как надолго будет отложено выполнение потока. Причина заключается в следующем: данная функция не знает о том, как приостанавливать потоки на какое-то время. Она просто говорит ОС, что может перепланировать выполнение других потоков любых процессов. Если таких потоков нет, то поток возобновится мгновенно. По этой причине функция yield
зачастую менее полезна, чем приостановка на короткий промежуток времени, установленный заранее.
Еще одним дополнением, появившимся в C++11, является класс
std::thread
. Он предоставляет простой способ запускать и приостанавливать потоки, не прибегая к использованию внешних библиотек и знаний о том, в какой операционной системе запущен процесс. Все это включено в STL.
В данном примере мы реализуем программу, которая запускает и останавливает потоки. Далее рассмотрим информацию о том, что с ними делать после запуска.
Как это делается
В этом примере мы запустим несколько потоков и увидим, как ведет себя программа, когда мы задействуем несколько ядер процессора, чтобы выполнить разные части ее кода одновременно.
1. Сначала включим всего два заголовочных файла, а затем объявим об использовании пространств имен
std
и chrono_literals
:
#include
#include
using namespace std;
using namespace chrono_literals;
2. Чтобы запустить поток, следует указать, какой код он должен выполнить. Поэтому определим функцию, которую ему нужно выполнить. Функции — естественные потенциальные входные точки для потоков. Наша функция-пример принимает аргумент
i
, выступающий в роли идентификатора потока. Таким образом, можно сказать, какой поток отобразил то или иное сообщение. Кроме того, воспользуемся идентификатором потока с целью указать, что различные потоки нужно приостановить на разные промежутки времени. Это позволит убедиться в том, что они не пытаются задействовать команду cout
одновременно. Если такая ситуация произойдет, то выходные данные будут искажены. Другой пример в настоящей главе посвящен именно этой проблеме.
static void thread_with_param(int i)
{
this_thread::sleep_for(1ms * i);
cout << "Hello from thread " << i << '\n';
this_thread::sleep_for(1s * i);
cout << "Bye from thread " << i << '\n';
}
3. В функции
main
(просто из любопытства) выведем на экран информацию о том, сколько потоков можно запустить в одно время с помощью std::thread::hardware_concurrency
. Данное значение зависит от того, сколько ядер имеет процессор и сколько ядер поддерживается реализацией STL. Это говорит о том, что значения будут различаться для разных компьютеров.
int main()
{
cout << thread::hardware_concurrency()
<< " concurrent threads are supported.\n";
4. Наконец, начнем работать с потоками. Запустим три потока с разными идентификаторами. При создании экземпляра потока с помощью выражения наподобие
t{f, x}
получаем вызов функции f(x)
. Таким образом можно передавать функциям thread_with_param
разные аргументы для каждого потока:
thread t1 {thread_with_param, 1};
thread t2 {thread_with_param, 2};
thread t3 {thread_with_param, 3};
5. Поскольку данные потоки запущены свободно, нужно остановить их, когда они закончат выполнять свою работу. Сделаем это с помощью функции
join
. Она заблокирует вызов потока до тех пор, пока вызываемый поток не отработает:
t1.join();
t2.join();
6. Альтернативой присоединению является открепление. Если мы не вызовем функцию
join
или detach
, то все приложение завершится довольно шумно, как только будет выполнен деструктор объекта потока. Путем вызова функции detach
указываем thread
, что хотим продолжения работы потока номер 3 даже после того, как его экземпляр будет разрушен:
t3.detach();
7. Перед завершением функции
main
и всей программы выведем еще одно сообщение:
cout << "Threads joined.\n";
}
8. Компиляция и запуск программы дадут следующий результат. Мы можем увидеть, что моя машина имеет восемь ядер процессора. Далее сообщения hello видим из всех потоков, а сообщения bye — лишь из двух, которые мы объединили. Поток 3 все еще ожидает завершения трехсекундного ожидания, но вся программа уже завершилась после того, как поток 2 завершил свое двухсекундное ожидание. Таким образом, мы не можем увидеть прощальное сообщение потока 3, поскольку он был уничтожен:
$ ./threads
8 concurrent threads are supported.
Hello from thread 1
Hello from thread 2
Hello from thread 3
Bye from thread 1
Bye from thread 2
Threads joined.
Как это работает
Запуск и остановку потоков выполнить очень просто. Многопроцессорная обработка начинает усложняться в момент, когда потокам нужно работать вместе (делить ресурсы, ожидать завершения других потоков и т.д.).
Чтобы запустить поток, нужно иметь функцию, которую он будет выполнять. Функция не обязательно должна быть особенной, поскольку в потоке можно выполнить практически любую функцию. Напишем небольшую программу-пример, которая запускает поток и ожидает его завершения:
void f(int i) { cout << i << '\n'; }
int main()
{
thread t {f, 123};
t.join();
}
Вызов конструктора
std::thread
принимает указатель на функцию или вызываемый объект; за ним следуют аргументы, которые нужно использовать в вызове функции. Конечно, можете также запустить поток или функцию, не принимающие никаких параметров.
При наличии в системе нескольких ядер процессора потоки можно выполнять параллельно и конкурентно. В чем заключается разница? Если компьютер имеет всего одно ядро ЦП, то можно создать множество потоков, работающих параллельно, но не конкурентно, поскольку ядро способно запускать лишь один поток в любой момент времени. Потоки запускаются и чередуются, где каждый поток выполняется какую-то часть секунды, затем приостанавливается, после чего следующий поток получает время (для пользователей-людей кажется, что потоки выполняются одновременно). Если потокам не нужно делить одно ядро, то они могут быть запущены конкурентно и действительно работать одновременно.
К этому моменту мы не контролируем следующие детали:
□ порядок, в котором потоки чередуются на одном ядре;
□ приоритет потока, указывающий, что один поток главнее другого;
□ распределение потоков между ядрами. Вполне возможна ситуация, когда все потоки будут выполняться на одном ядре, несмотря на то что машина имеет более 100 ядер.
Большая часть операционных систем предоставляет возможности управления этими аспектами многопроцессорной обработки, но на текущий момент данные функции не включены в STL.
Однако можно запускать и останавливать потоки и указывать им, когда и над чем работать и когда останавливаться. Этого должно быть достаточно для большинства приложений. В данном разделе мы создали три дополнительных потока. После этого объединили большую их часть и открепили последний. Подытожим на одном рисунке все, что произошло (рис. 9.1).
Читая рисунок сверху вниз, мы заметим, что в какой-то момент разбиваем рабочий поток программы на четыре потока. Мы запускаем три дополнительных потока, которые совершают некие действия (а именно, ожидают и выводят сообщения), но после их запуска основной поток, выполняющий функцию
main
, остается без работы.
Когда поток завершает выполнение своей функции, он возвращает значение, возвращенное ею. Затем стандартная библиотека «делает уборку», что приводит к удалению потока из планировщика системы и, возможно, его уничтожению, но волноваться об этом не нужно.
Единственное, о чем следует волноваться, — это объединение. Когда поток вызывает функцию
x.join()
для объекта другого потока, его выполнение приостанавливается до того, как будет выполнен поток x
. Обратите внимание: нас ничто не спасет при попадании потока в бесконечный цикл! Если нужно, чтобы поток продолжал существовать до тех пор, пока не решит завершиться, то можно вызвать функцию x.detach()
. После этого у нас не будет возможности управлять потоком. Независимо от принятого решения, мы должны всегда объединять или откреплять потоки. Если мы не сделаем этого, то деструктор объекта thread
вызовет функцию std::terminate()
, что приведет к внезапному завершению работы приложения.
В момент, когда функция main возвращает значение, приложение заканчивает работу. Однако в это же время наш открепленный поток
t3
все еще находится в приостановленном состоянии и не успевает отправить сообщение bye на консоль. Операционной системе это неважно: она просто завершает всю программу, не дожидаясь завершения данного потока. Указанный факт важно иметь в виду. Если дополнительный поток должен был соревноваться за что-то важное, то нужно было бы подождать его завершения в функции main
.
Поскольку работа потоков значительно зависит от поддержки операционной системы, а STL предоставляет хорошие интерфейсы, позволяющие абстрагироваться от операционных систем, разумно также предоставить поддержку STL для синхронизации между потоками. Таким образом, можно не только запускать и останавливать потоки без внешних библиотек, но и синхронизировать их с помощью абстракций из одной объединенной библиотеки — STL.
В этом разделе мы взглянем на классы-мьютексы STL и абстракции блокировки RAII. Поэкспериментируем с ними в нашей конкретной реализации примера, а также изучим другие вспомогательные средства синхронизации, предоставляемые STL.
Как это делается
В этом примере мы напишем программу, которая использует экземпляр класса
std::shared_mutex
в эксклюзивном и коллективном режимах, и увидим, что это значит. Кроме того, не будем вызывать функции lock
и unlock
самостоятельно, а сделаем это с помощью вспомогательных функций RAII.
1. Сначала включим все необходимые заголовочные файлы. Поскольку мы задействуем функции и структуры данных STL, а также временные литералы, объявим об использовании пространств имен
std
и chrono_literal
:
#include
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Вся программа строится вокруг одного общего мьютекса, поэтому для простоты объявим его глобальный экземпляр:
shared_mutex shared_mut;
3. Мы будем использовать вспомогательные функции RAII
std::shared_lock
и std::unique_lock
. Чтобы их имена выглядели более понятными, определим для них короткие псевдонимы:
using shrd_lck = shared_lock;
using uniq_lck = unique_lock;
4. Прежде чем начнем писать функцию main, определим две вспомогательные функции, которые пытаются заблокировать мьютекс в эксклюзивном режиме. Эта функция создаст экземпляр класса
unique_lock
для общего мьютекса. Второй аргумент конструктора defer_lock
указывает объекту поддерживать блокировку снятой. В противном случае его конструктор попробует заблокировать мьютекс, а затем будет удерживать его до завершения. Далее вызываем метод try_lock
для объекта exclusive_lock
. Этот вызов немедленно вернет булево значение, которое говорит, получили мы блокировку или же мьютекс уже был заблокирован кем-то еще.
static void print_exclusive()
{
uniq_lck l {shared_mut, defer_lock};
if (l.try_lock()) {
cout << "Got exclusive lock.\n";
} else {
cout << "Unable to lock exclusively.\n";
}
}
5. Другая вспомогательная функция также пытается заблокировать мьютекс в эксклюзивном режиме. Она делает это до тех пор, пока не получит блокировку. Затем мы симулируем какую-нибудь ошибку, генерируя исключение (содержащее лишь простое целое число). Несмотря на то, что это приводит к мгновенному выходу контекста, в котором мы хранили заблокированный мьютекс, последний будет освобожден. Это происходит потому, что деструктор объекта
unique_lock
освободит блокировку в любом случае по умолчанию.
static void exclusive_throw()
{
uniq_lck l {shared_mut};
throw 123;
}
6. Теперь перейдем к функции
main
. Сначала откроем еще одну область видимости и создадим экземпляр класса shared_lock
. Его конструктор мгновенно заблокирует мьютекс в коллективном режиме. Мы увидим, что это значит, в следующих шагах.
int main()
{
{
shrd_lck sl1 {shared_mut};
cout << "shared lock once.\n";
7. Откроем еще одну область видимости и создадим второй экземпляр типа
shared_lock
для того же мьютекса. Теперь у нас есть два экземпляра типа shared_lock
, и оба содержат общую блокировку мьютекса. Фактически можно создать произвольно большое количество экземпляров типа shared_lock
для одного мьютекса. Затем вызываем функцию print_exclusive
, которая пытается заблокировать мьютекс в эксклюзивном режиме. Эта операция не увенчается успехом, поскольку он уже находится в коллективном режиме.
{
shrd_lck sl2 {shared_mut};
cout << "shared lock twice.\n";
print_exclusive();
}
8. После выхода из самой поздней области видимости деструктор объекта
sl2
типа shared_lock
освобождает свою общую блокировку мьютекса. Функция print_exclusive
снова даст сбой, поскольку мьютекс все еще находится в коллективном режиме блокировки.
cout << "shared lock once again.\n";
print_exclusive();
}
cout << "lock is free.\n";
9. После выхода из второй области видимости все объекты типа
shared_lock
подвергнутся уничтожению и мьютекс снова будет находиться в разблокированном состоянии. Теперь наконец можно заблокировать мьютекс в эксклюзивном режиме. Сделаем это путем вызовов exclusive_throw
и print_exclusive
. Помните, что мы генерируем исключение в вызове exclusive_throw
. Но поскольку unique_lock
— это объект RAII, который помогает защититься от исключений, мьютекс снова будет разблокирован независимо от того, что вернет вызов exclusive_throw
. Таким образом, функция print_exclusive
не будет ошибочно блокировать все еще заблокированный мьютекс:
try {
exclusive_throw();
} catch (int e) {
cout << "Got exception " << e << '\n';
}
print_exclusive();
}
10. Компиляция и запуск программы дадут следующий результат. Первые две строки показывают наличие двух экземпляров общей блокировки. Затем функция
print_exclusive
дает сбой при попытке заблокировать мьютекс в эксклюзивном режиме. После того как мы покинем внутреннюю область видимости и разблокируем вторую общую блокировку, функция print_exclusive
все еще будет давать сбой. После выхода из второй области видимости, что наконец снова освободит мьютекс, функции exclusive_throw
и print_exclusive
смогут заблокировать мьютекс:
$ ./shared_lock
shared lock once.
shared lock twice.
Unable to lock exclusively.
shared lock once again.
Unable to lock exclusively.
lock is free.
Got exception 123
Got exclusive lock.
Как это работает
При просмотре документации С++ может показаться несколько странным факт существования разных классов мьютексов и абстракций блокировок RAII. Прежде чем рассматривать наш конкретный пример кода, подытожим все, что может предложить STL.
Классы мьютексов
Термин mutex расшифровывается как mutual exclusion (взаимное исключение). Чтобы предотвратить неуправляемое изменение одного объекта несколькими конкурирующими потоками, способное привести к повреждению данных, можно использовать объекты мьютексов. STL предоставляет разные классы мьютексов, которые хороши в разных ситуациях. Все они похожи в том, что имеют методы
lock
и unlock
.
Когда некий поток первым вызывает метод
lock()
для мьютекса, который не был заблокирован ранее, он получает контроль над мьютексом. На данном этапе другие потоки будут блокироваться при вызове метода lock
до тех пор, пока первый поток не вызовет снова метод unlock
. Класс std::mutex
может делать именно это.
В STL существует множество разных классов мьютексов (табл. 9.2).
Классы блокировок
Работать с многопоточностью легко и просто до тех пор, пока потоки просто блокируют мьютекс, получают доступ к защищенному от конкурентности объекту, а затем снова разблокируют мьютекс. Как только программист забывает разблокировать мьютекс после одной из блокировок, ситуация значительно усложняется.
В лучшем случае программа просто мгновенно зависает, и вызов, не выполняющий разблокировку, быстро выявляется. Такие ошибки, однако, очень похожи на утечки памяти, которые случаются, если отсутствуют вызовы
delete
.
Для управления памятью существуют вспомогательные классы
unique_ptr
, shared_ptr
и weak_ptr
. Они предоставляют очень удобный способ избежать утечек памяти. Такие классы существуют и для мьютексов. Простейшим из них является std::lock_guard
. Его можно использовать следующим образом:
void critical_function()
{
lock_guard l {some_mutex};
// критический раздел
}
Конструктор элемента
lock_guard
принимает мьютекс, для которого мгновенно вызывает метод lock
. Весь вызов конструктора заблокируется до тех пор, пока тот не получит блокировку для мьютекса. При разрушении объекта он разблокирует мьютекс. Таким образом, понять цикл блокировки/разблокировки сложно, поскольку она происходит автоматически.
В STL версии C++17 предоставляются следующие разные абстракции блокировок RAII (табл. 9.3). Все они принимают аргумент шаблона, который будет иметь тот же тип, что и мьютекс (однако, начиная с C++17, компилятор может вывести этот тип самостоятельно).
В то время как
lock_guard
и scoped_lock
имеют простейшие интерфейсы, которые состоят только из конструктора и деструктора, unique_lock
и shared_lock
более сложны, но и более гибки. В последующих примерах мы увидим, как еще их можно использовать, помимо простой блокировки.
Вернемся к коду примера. Хотя код запускался только в контексте одного потока, мы увидели, что он собирался использовать вспомогательные классы для блокировки. Псевдоним типа
shrd_lck
расшифровывается как shared_lock
и позволяет блокировать экземпляр мьютекса несколько раз в коллективном режиме. До тех пор, пока существуют sl1
и sl2
, никакие вызовы print_exclusive
не могут заблокировать мьютекс в эксклюзивном режиме. Это все еще просто.
Теперь перейдем к эксклюзивным функциям блокировки, которые появились позднее в функции
main
:
int main()
{
{
shrd_lck sl1 {shared_mut};
{
shrd_lck sl2 {shared_mut};
print_exclusive();
}
print_exclusive();
}
try {
exclusive_throw();
} catch (int e) {
cout << "Got exception " << e << '\n';
}
print_exclusive();
}
Важная деталь — после возвращения из
exclusive_throw
функция print_exclusive
снова может заблокировать мьютекс, несмотря на то что exclusive_throw
завершила работу некорректно из-за генерации исключения.
Еще раз взглянем на функцию
print_exclusive
, поскольку в ней был использован странный вызов конструктора:
void print_exclusive()
{
uniq_lck l {shared_mut, defer_lock};
if (l.try_lock()) {
// ...
}
}
Мы предоставили
shared_mut
и defer_lock
в качестве аргументов конструктора для unique_lock
в данной процедуре. defer_lock
— пустой глобальный объект, который послужит для выбора другого конструктора класса unique_lock
, просто не блокирующего мьютекс. Позднее можно вызвать функцию l.try_lock()
, которая не блокирует мьютекс. Если мьютекс уже был заблокирован, то можно сделать что-то еще. При полученной блокировке деструктор поможет выполнить уборку.
Если бы взаимные блокировки происходили на дорогах, то выглядели бы так (рис. 9.2).
Чтобы снова запустить движение, понадобился бы большой кран, который в случайном порядке выбирает по одной машине из центра перекрестка и удаляет ее. Если это невозможно, то достаточное количество водителей должны взаимодействовать друг с другом. Взаимную блокировку могут разрешить все водители, едущие в одном направлении, сдвинувшись на несколько метров назад, позволяя продвинуться остальным водителям.
В многопоточных программах такие ситуации должен разрешать программист. Однако слишком легко потерпеть неудачу, если программа сама по себе довольно сложная.
В этом примере мы напишем код, намеренно создающий взаимную блокировку. Затем увидим, как писать код, который получает те же ресурсы, что привели другой код к взаимной блокировке, но воспользуемся новым классом блокировки STL
std::scoped_lock
, появившимся в C++17 с целью избежать этой ошибки.
Как это делается
Код этого примера состоит из двух пар функций, которые должны быть выполнены конкурирующими потоками, они получают два ресурса в форме мьютексов. Одна пара создает взаимную блокировку, а вторая избегает такой ситуации. В функции main мы опробуем их в деле.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространств имен
std
и chrono_literals
:
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Затем создадим два объекта мьютексов, которые понадобятся для создания взаимной блокировки:
mutex mut_a;
mutex mut_b;
3. Чтобы создать взаимную блокировку с двумя ресурсами, нужны две функции. Одна пробует заблокировать мьютекс
А
, а затем и мьютекс В
, а другая сделает это в противоположном порядке. Позволив обеим функциям приостановиться между блокировками, можно гарантировать, что код навсегда попадет во взаимную блокировку. (Это делается только для демонстрации. Программа, не содержащая команд по приостановке потоков, может запуститься успешно и не столкнуться с взаимной блокировкой, если запускать ее раз за разом.) Обратите внимание: мы не используем символ '\n'
для вывода на экран перевода строки, а применяем для данных целей endl
. Он не только выполняет перевод строки, но еще и опустошает буфер потока cout
, поэтому можно убедиться, что операции вывода не объединяются и не откладываются:
static void deadlock_func_1()
{
cout << "bad f1 acquiring mutex A..." << endl;
lock_guard la {mut_a};
this_thread::sleep_for(100ms);
cout << "bad f1 acquiring mutex B..." << endl;
lock_guard lb {mut_b};
cout << "bad f1 got both mutexes." << endl;
}
4. Как мы и говорили на предыдущем шаге, функция
deadlock_func_2
выглядит точно так же, как и deadlock_func_1
, но блокирует мьютексы A
и B
в противоположном порядке:
static void deadlock_func_2()
{
cout << "bad f2 acquiring mutex B..." << endl;
lock_guard lb {mut_b};
this_thread::sleep_for(100ms);
cout << "bad f2 acquiring mutex A..." << endl;
lock_guard la {mut_a};
cout << "bad f2 got both mutexes." << endl;
}
5. Теперь напишем свободный от взаимных блокировок вариант этих функций. Они используют класс
scoped_lock
, блокирующий все мьютексы, которые мы предоставляем в качестве аргументов конструктора. Его деструктор снова их разблокирует. При блокировании мьютексов он изнутри применяет стратегию избегания взаимных блокировок. Обратите внимание: обе функции все еще используют мьютексы А
и В
в разном порядке:
static void sane_func_1()
{
scoped_lock l {mut_a, mut_b};
cout << "sane f1 got both mutexes." << endl;
}
static void sane_func_2()
{
scoped_lock l {mut_b, mut_a};
cout << "sane f2 got both mutexes." << endl;
}
6. В функции
main
пройдем по двум сценариям. Сначала воспользуемся внятными функциями в многопоточном контексте:
int main()
{
{
thread t1 {sane_func_1};
thread t2 {sane_func_2};
t1.join();
t2.join();
}
7. Затем воспользуемся функциями, создающими взаимные блокировки, которые не следуют стратегиям избегания взаимных блокировок:
{
thread t1 {deadlock_func_1};
thread t2 {deadlock_func_2};
t1.join();
t2.join();
}
}
8. Компиляция и запуск программы дадут следующий результат. В первых двух строках показывается, что внятный сценарий блокировки работает и обе функции возвращают свое значение и не блокируются навсегда. Две другие функции создают взаимную блокировку. Мы можем сказать, что это точно взаимная блокировка, поскольку видим, как они выводят на экран строки, которые указывают отдельным потокам блокировать мьютексы
А
и В
, а затем вечно ожидают. Обе функции не достигают момента, когда успешно блокируют оба мьютекса. Можно оставить программу включенной на часы, дни и годы, и ничего не произойдет.
Это приложение нужно завершить снаружи, например нажав Ctrl+C:
$ ./avoid_deadlock
sane f1 got both mutexes
sane f2 got both mutexes
bad f2 acquiring mutex B...
bad f1 acquiring mutex A...
bad f1 acquiring mutex B...
bad f2 acquiring mutex A...
Как это работает
Реализуя код, намеренно вызывающий взаимную блокировку, мы увидели, как быстро может возникнуть этот нежелательный сценарий. В крупном проекте, где несколько программистов пишут код, который должен разделять один набор ресурсов, защищенных мьютексами, всем программистам необходимо следовать одному порядку при блокировании и разблокировании мьютексов. Несмотря на то, что таким стратегиям или правилам следовать очень просто, о них легко и забыть. Еще одним термином для этой проблемы является инверсия порядка блокировки.
В подобных ситуациях поможет
scoped_lock
. Этот класс появился в C++17 и работает точно так же, как и классы lock_guard
и unique_lock
: его конструктор выполняет блокирование, а его деструктор разблокирует мьютекс. Класс может работать с несколькими мьютексами сразу.
Класс
scoped_lock
использует функцию std::lock
, которая применяет особый алгоритм, выполняющий набор вызовов try_lock
для всех предоставленных мьютексов, что позволяет предотвратить взаимные блокировки. Поэтому совершенно безопасно задействовать scoped_lock
или вызывать std::lock
для одного набора блокировок, но в разном порядке.
Многопоточные программы неудобны тем, что нужно охранять каждую структуру данных, которую они изменяют, с помощью мьютексов или других средств защиты от неуправляемых конкурентных изменений.
Одной из структур данных, часто применяемых для вывода данных, является
std::cout
. Если несколько потоков пытаются получить доступ к cout
на конкурентной основе, то мы получим смешанные выходные данные. Чтобы это предотвратить, следует написать собственную функцию, которая выводит данные на экран и защищена от конкурентности.
Мы узнаем, как предоставить оболочку для
cout
, которая состоит из минимального объема кода и так же удобна в использовании, как и cout
.
Как это делается
В этом примере мы реализуем программу, выводящую на экран данные на конкурентной основе из нескольких потоков. Чтобы предотвратить искажение сообщений из-за конкурентности, реализуем небольшой вспомогательный класс, который синхронизирует вывод данных между потоками.
1. Как и обычно, сначала укажем все директивы
include
и объявим об использовании пространства имен std
:
#include
#include
#include
#include
#include
using namespace std;
2. Далее реализуем вспомогательный класс, который назовем
pcout
. Буква p означает «паралелльный», поскольку он синхронизирован для параллельных контекстов. Идея заключается в том, что pcout
явно наследует от stringstream
. Таким образом, можно применять operator<<
для экземпляров этого класса. Как только экземпляр pcout
уничтожается, его деструктор блокирует мьютекс, а затем выводит на экран содержимое буфера stringstream
. На следующем шаге мы увидим, как это использовать.
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard l {cout_mutex};
cout << rdbuf();
cout.flush();
}
};
3. Теперь напишем две функции, которые можно выполнить с помощью дополнительных потоков. Обе функции принимают в качестве аргументов идентификаторы потоков. Затем они отличаются только тем, что одна из них использует для вывода данных непосредственно
cout
. Вторая же выглядит практически так же, но вместо непосредственного применения cout
создает экземпляр pcout
. Этот экземпляр представляет собой временный объект, существующий только для одной строки кода. После выполнения всех вызовов operator<<
внутренний строковый поток заполняется всеми данными, которые мы бы хотели вывести. Затем вызывается деструктор для экземпляра типа pcout
. Мы уже видели, что он делает: блокирует конкретный мьютекс, разделяемый всеми экземплярами класса pcout
, и выводит данные на экран:
static void print_cout(int id)
{
cout << "cout hello from " << id << '\n';
}
static void print_pcout(int id)
{
pcout{} << "pcout hello from " << id << '\n';
}
4. Опробуем его. Сначала воспользуемся
print_cout
, которая для вывода данных просто обращается к cout
. Запускаем десять потоков, конкурентно выводящих свои строки и ожидающих завершения:
int main()
{
vector v;
for (size_t i {0}; i < 10; ++i) {
v.emplace_back(print_cout, i);
}
for (auto &t : v) { t.join(); }
5. Далее делаем то же самое для функции
print_pcout
:
cout << "=====================\n";
v.clear();
for (size_t i {0}; i < 10; ++i) {
v.emplace_back(print_pcout, i);
}
for (auto &t : v) { t.join(); }
}
6. Компиляция и запуск программы дадут следующий результат (рис. 9.3). Как видите, первые десять строк полностью перепутались. Именно так может выглядеть результат, когда
cout
используется конкурентно и без блокировки. Последние десять строк выведены с помощью print_pcout
, никакой путаницы нет. Можно увидеть, что они выводятся из разных потоков, поскольку их порядок различается при каждом запуске программы.
Как это работает
О’кей, мы создали эту «оболочку для cout», которая автоматически сериализует последовательные попытки вывода текста. Как она работает?
Выполним те же действия, что и
pcout
, вручную. Сначала создаем строковый поток и принимаем входные данные, которые будем передавать в него:
stringstream ss;
ss << "This is some printed line " << 123 << '\n';
Затем блокируем глобально доступный мьютекс:
{
lock_guard l {cout_mutex};
В этой заблокированной области видимости мы получаем доступ к содержимому строкового потока ss, выводим его на экран и освобождаем мьютекс, покидая область видимости. Строка
cout.flush()
указывает объекту потока вывести данные на консоль немедленно. Без данной строки программа способна работать быстрее, поскольку несколько строк можно вывести за один раз. В наших примерах мы хотим видеть результат работы немедленно, так что используем метод flush
:
cout << ss.rdbuf();
cout.flush();
}
О’кей, это достаточно просто, но утомительно писать, если нужно делать одно и то же раз за разом. Можно сократить создание объекта stringstream таким образом:
stringstream{} << "This is some printed line " << 123 << '\n';
Эта строка создает объект строкового потока, передает ему все, что нужно вывести на экран, а затем снова разрушает его. Жизненный цикл строкового потока сокращается до данной строки. После этого мы не можем выводить на экран данные, поскольку у нас нет доступа к указанному объекту. Какой фрагмент кода последним может получить содержимое потока? Это деструктор
stringstream
.
Мы не можем изменить методы-члены экземпляра
stringstream
, но способны расширить их, обернув наш собственный тип вокруг них с помощью наследования:
struct pcout : public stringstream {
~pcout() {
lock_guard l {cout_mutex};
cout << rdbuf();
cout.flush();
}
};
Этот класс все еще является строковым потоком, и его можно использовать так же, как и любой другой строковый поток. Единственное отличие заключается в том, что он будет блокировать мьютекс и выводить собственный буфер с помощью
cout
.
Кроме того, мы поместили объект
cout_mutex
в структуру pcout
как статический экземпляр, и теперь все элементы находятся в одном месте.
Иногда встречаются специфические разделы кода, которые можно запустить в параллельном контексте в нескольких потоках, при этом перед выполнением самих функций нужно выполнить программу настройки. Простым решением будет выполнить существующую функцию настройки до того, как программа войдет в состояние, из которого время от времени может работать параллельный код.
Однако данный подход имеет следующие недостатки.
□ Если параллельная функция находится в библиотеке, то пользователь не должен забывать вызывать функцию настройки. Это не упрощает применение библиотеки.
□ Предположим, функция настройки в какой-то степени дорогая, и ее, возможно, даже не требуется выполнять, если параллельные функции, которые ее используют, не запускаются. В таком случае необходим код, определяющий, запускать эту функцию или нет.
В данном примере мы рассмотрим вспомогательную функцию
std::call_once
, которая решает эту проблему простым, элегантным и неявным способом.
Как это делается
В этом примере мы напишем программу, которая запускает несколько потоков, выполняющих один и тот же код. Несмотря на полностью одинаковый выполняемый ими код, наша функция настройки будет вызвана всего раз.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространства имен
std
:
#include
#include
#include
#include
using namespace std;
2. Далее будем использовать функцию
std::call_once
. Чтобы ее применить, нужен экземпляр типа once_flag
для синхронизации между всеми потоками, которые задействуют call_once
для конкретной функции.
once_flag callflag;
3. Функция, которая должна быть выполнена всего раз, выглядит так. Она просто выводит один восклицательный знак:
static void once_print()
{
cout << '!';
}
4. Все потоки будут выполнять функцию
print
. Первое, что мы сделаем, — вызовем функцию once_print
для функции std::call_once
. Функции call_once
требуется переменная callflag
, которую мы определили ранее. Она послужит для управления потоками.
static void print(size_t x)
{
std::call_once(callflag, once_print);
cout << x;
}
5. О’кей, теперь запустим десять потоков, все они будут использовать функцию
print
:
int main()
{
vector v;
for (size_t i {0}; i < 10; ++i) {
v.emplace_back(print, i);
}
for (auto &t : v) { t.join(); }
cout << '\n';
}
6. Компиляция и запуск дадут следующий результат. Сначала мы увидим восклицательный знак благодаря функции
once_print
. Затем увидим все идентификаторы потоков. Функция call_once
не только помогла убедиться в том, что функция once_print
была вызвана всего раз. Помимо этого, она синхронизировала все потоки и ни один идентификатор не был выведен на экран до выполнения once_print
.
$ ./call_once
!1239406758
Как это работает
Функция
std:call_once
работает как барьер. Она поддерживает доступ к функции (или вызываемому объекту). Первый поток, достигший ее, выполняет эту функцию. Пока ее выполнение не завершится, любой другой поток, который достигнет call_once
, заблокируется. После того как первый поток вернется из этой функции, все другие потоки также будут освобождены.
Чтобы организовать этот небольшой «танцевальный номер», требуется переменная, на основе которой другие потоки могут определить, следует ли им ждать, а также время их освобождения. Именно для этого и предназначена переменная
once_flag callflag;
. Каждая строка call_once
нуждается и в экземпляре типа once_flag
. Он будет передан как аргумент перед функцией, которая должна быть вызвана всего раз.
Еще одна приятная деталь: если поток, который был выбран для выполнения функции
call_once
, даст сбой из-за какого-то исключения, то следующий поток сможет попытаться выполнить функцию снова. Это делается вследствие вероятности того, что в следующий раз исключение не будет сгенерировано.
При необходимости выполнить некий код в фоновом режиме можно просто запустить новый поток, который выполнит данный код. В подобных ситуациях можно сделать что-то еще, а затем подождать результата. Это просто:
std::thread t {my_function, arg1, arg2, ...};
// сделать что-то еще
t.join(); // подождать завершения потока
Но здесь начинаются неудобства:
t.join()
не дает возвращаемое значение функции my_function
. Чтобы получить его, следует написать функцию, которая вызывает функцию my_function
и сохраняет ее возвращаемое значение в какой-то переменной. Последняя также доступна первому потоку, в котором и был запущен новый поток. Если такие ситуации происходят постоянно, то нужно написать очень много стереотипного кода снова и снова.
В C++11 появилась функция
std::async
, способная решить эту задачу для нас, и не только. В этом примере мы напишем простую программу, которая делает несколько дел одновременно с помощью асинхронных вызовов функций. Поскольку std::async
эффективна не только в данной области, рассмотрим все ее аспекты.
Как это делается
В этом примере мы реализуем программу, которая делает несколько дел конкурентно, но вместо того, чтобы явно запускать потоки, мы используем
std::async
и std::future
.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространства имен
std
:
#include
#include
#include
#include
#include
#include
#include
using namespace std;
2. Реализуем три функции, которые не связаны с параллелизмом, а просто выполняют интересные задачи. Первая функция принимает строку и создает гистограмму включения всех символов внутри этой строки:
static map histogram(const string &s)
{
map m;
for (char c : s) { m[c] += 1; }
return m;
}
3. Вторая функция также принимает строку и возвращает ее отсортированную копию:
static string sorted(string s)
{
sort(begin(s), end(s));
return s;
}
4. Третья функция подсчитывает, как много гласных находится внутри принимаемой строки:
static bool is_vowel(char c)
{
char vowels[] {"aeiou"};
return end(vowels) !=
find(begin(vowels), end(vowels), c);
}
static size_t vowels(const string &s)
{
return count_if(begin(s), end(s), is_vowel);
}
5. В функции
main
считываем весь стандартный поток ввода в одну строку. Чтобы не разбивать входные данные на слова, деактивируем ios::skipws
. Подобным образом получаем одну большую строку независимо от того, сколько пробелов содержится во входных данных. Используем pop_back
для полученной строки, поскольку так мы получаем слишком много символов-терминаторов '\0'
:
int main()
{
cin.unsetf(ios::skipws);
string input {istream_iterator{cin}, {}};
input.pop_back();
6. Теперь вернем значения из всех функций, которые реализовали ранее. Чтобы ускорить выполнение программы в случае получения очень больших входных данных, запустим их асинхронно. Функция
std::async
принимает политику, функцию и аргументы этой функции. Вызываем функции histogram
, sorted
и vowels
, передавая в качестве политики launch::async
(далее узнаем, что это значит). Все функции получают в качестве аргументов одинаковые входные строки:
auto hist (async(launch::async,
histogram, input));
auto sorted_str (async(launch::async,
sorted, input));
auto vowel_count(async(launch::async,
vowels, input));
7. Вызовы
async
возвращают значения мгновенно, поскольку не выполняют сами функции. Вместо этого они подготавливают структуры для синхронизации, которые в дальнейшем станут получать результаты вызова функций. Результаты теперь будут определяться конкурентно с помощью дополнительных потоков. В это же время мы можем делать все, что захотим, поскольку получим данные значения позже. Возвращаемые значения hist
, sorted_str
и vowel_count
имеют типы, указанные для функций histogram
, sorted
и vowels
, но они обернуты в тип future
функцией std::async
. Объекты этого типа выражают тот факт, что в какой-то момент времени будут содержать значения. Вызов .get()
позволяет получить их все, а до их появления можем заблокировать функцию main
. После получения этих значений выводим их на экран:
for (const auto &[c, count] : hist.get()) {
cout << c << ": " << count << '\n';
}
cout << "Sorted string: "
<< quoted(sorted_str.get()) << '\n'
<< "Total vowels: "
<< vowel_count.get() << '\n';
}
8. Компиляция и запуск кода выглядят так. Мы использовали короткую строку, которую не стоит распараллеливать, но для примера выполнили ее конкурентно. Вдобавок общая структура программы не изменилась по сравнению с наивной последовательной реализацией:
$ echo "foo bar baz foobazinga" | ./async
: 3
a: 4
b: 3
f: 2
g: 1
i: 1
n: 1
o: 4
r: 1
z: 2
Sorted string: " aaaabbbffginoooorzz"
Total vowels: 9
Как это работает
Если бы мы не использовали std::async, то последовательный нераспараллеленный код выглядел бы довольно просто:
auto hist (histogram(input));
auto sorted_str (sorted( input));
auto vowel_count (vowels( input));
for (const auto &[c, count] : hist) {
cout << c << ": " << count << '\n';
}
cout << "Sorted string: " << quoted(sorted_str) << '\n';
cout << "Total vowels: " << vowel_count << '\n';
Чтобы распараллелить код, мы сделали следующее: обернули три вызова функций в вызовы
async(launch::async, ...)
. Следовательно, указанные функции выполняются не в основном потоке. Вместо этого async
запускает новые потоки и позволяет им выполнить функции конкурентно. Таким образом, мы имеем дело только с теми издержками, которые возникают при запуске другого потока, и можем продолжить выполнение следующих строк кода, а вся работа совершится в фоновом режиме:
auto hist (async(launch::async, histogram, input));
auto sorted_str (async(launch::async, sorted, input));
auto vowel_count (async(launch::async, vowels, input));
for (const auto &[c, count] : hist.get()) {
cout << c << ": " << count << '\n';
}
cout << "Sorted string: "
<< quoted(sorted_str.get()) << '\n'
<< "Total vowels: "
<< vowel_count.get() << '\n';
В то время как, например, функция
histogram
возвращает экземпляр типа map, вызов async(..., histogram, ...)
возвращает ассоциативный массив, который обернут в объект типа future
. Последний является чем-то вроде заполнителя до тех пор, пока поток, выполняющий функцию histogram
, не вернет значение. Полученный ассоциативный массив помещается в объект типа future
, и мы наконец можем получить к нему доступ. Функция get
дает доступ к инкапсулированному результату.
Рассмотрим еще один короткий пример. Взгляните на этот фрагмент:
auto x (f(1, 2, 3));
cout << x;
Вместо предыдущего кода мы могли написать следующее:
auto x (async(launch::async, f, 1, 2, 3));
cout << x.get();
Вот, по сути, и все. Выполнение задач в фоновом режиме никогда не было проще по стандартам С++. Осталось разрешить один момент: что означает
launch::async
? Это флаг, который определяет политику запуска. Существуют два флага политики, соответственно, возможны три их сочетания (табл. 9.4).
Вызов наподобие
async(f, 1, 2, 3)
без аргумента политики автоматически выберет обе политики. Реализация async
сама выберет, какую политику использовать. Это означает отсутствие уверенности в том, что другой поток вообще запустится или что выполнение будет просто отложено в другом потоке.
Дополнительная информация
Следует рассмотреть последний момент. Предположим, мы пишем код следующим образом:
async(launch::async, f);
async(launch::async, g);
Это может привести к тому, что функции
f
и g
(в данном примере неважны возвращаемые ими значения) будут выполняться в конкурирующих потоках и в это же время будут запускаться разные задачи. При запуске такого кода мы увидим блокировку кода при этих вызовах, что нам, вероятно, не требуется.
Почему же код блокируется? Разве
async
не должен отвечать за неблокируемость асинхронных вызовов? Да, это так, но есть одна особая тонкость: если объект типа future
был получен из вызова async
, имеющего политику launch::async
, то его деструктор выполнит блокирующее ожидание.
Это значит, что оба вызова
async
из данного короткого примера являются блокирующими, поскольку сроки жизни объектов типа future
, которые они возвращают, заканчиваются в одной строке! Можно исправить данную ситуацию, получив их возвращаемые значения и поместив в переменные с более длинным сроком жизни.
В этом примере мы реализуем типичную программу, работающую по принципу «производитель/потребитель», в которой запускается несколько потоков. Основная идея заключается в том, что существует один поток, который создает элементы и помещает их в очередь. Еще один поток потребляет (использует) эти элементы. Если создавать нечего, то поток-производитель приостанавливается. При отсутствии в очереди элементов приостанавливается потребитель.
Оба потока имеют доступ к очереди и могут изменить, поэтому ее нужно защитить с помощью мьютекса.
Важно рассмотреть и следующий момент: что будет делать потребитель, если в очереди нет элементов? Будет ли он опрашивать очередь каждую секунду до тех пор, пока не увидит новые элементы? В этом нет необходимости, поскольку можно позволить потребителю подождать событий, которые его пробудят, отправляемых производителем в момент, когда появляются новые элементы.
Для таких событий в C++11 предоставляется удобная структура данных:
std::condition_variable
. В этом примере мы реализуем простое приложение, работающее по принципу «производитель/потребитель», которое пользуется этими структурами.
Как это делается
Итак, в этом примере мы реализуем простую программу, работающую по принципу «производитель/потребитель», которая запускает одного производителя значений в отдельном потоке, а также одного потребителя в другом потоке.
1. Сначала выполним все необходимые директивы
include
и объявим об использовании пространства имен std
:
#include
#include
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Создадим экземпляр очереди простых численных значений и назовем ее
q
. Производитель будет помещать туда значения, а потребитель — доставать их оттуда. Для их синхронизации понадобится мьютекс. Вдобавок создадим экземпляр типа condition_variable
и назовем его cv
. Переменная finished
представляет собой способ, с помощью которого производитель укажет потребителю, что других значений больше не будет:
queue q;
mutex mut;
condition_variable cv;
bool finished {false};
3. Реализуем функцию-производитель. Она принимает аргумент
items
, который ограничивает максимальное количество создаваемых элементов. В простом цикле он будет приостанавливаться на 100 миллисекунд для каждого элемента, что симулирует некоторую вычислительную сложность. Затем заблокируем мьютекс, синхронизирующий доступ к очереди. После успешного создания и вставки элемента в очередь вызываем cv.notify_all()
. Данная функция пробуждает потребителя. Далее мы увидим на стороне потребителя, как это работает.
static void producer(size_t items) {
for (size_t i {0}; i < items; ++i) {
this_thread::sleep_for(100ms);
{
lock_guard lk {mut};
q.push(i);
}
cv.notify_all();
}
4. После создания всех элементов снова блокируем мьютекс, поскольку нужно задать значение для бита
finished
. Затем опять вызываем метод cv.notify_all()
:
{
lock_guard lk {mut};
finished = true;
}
cv.notify_all();
}
5. Теперь можем реализовать функцию потребителя. Она не принимает никаких аргументов, поскольку будет слепо перебирать все элементы до тех пор, пока очередь не опустеет. В цикле, который станет выполняться до тех пор, пока не установится значение переменной
finished
, функция будет блокировать мьютекс, который защищает очередь и флаг finished
. В момент получения блокировки последняя вызовет функцию cv.wait
, передав ей блокировку и лямбда-выражение в качестве аргументов. Лямбда-выражение — это предикат, указывающий, жив ли еще поток-производитель и есть ли значения в очереди.
static void consumer() {
while (!finished) {
unique_lock l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
6. Вызов
cv.wait
разблокирует блокировку и ждет до тех пор, пока условие, описанное предикатом, не перестанет выполняться. Затем функция снова блокирует мьютекс и перебирает все из очереди до тех пор, пока та не опустеет. Если производитель все еще жив, то она снова проитерирует по циклу. В противном случае функция завершит работу, поскольку установлен флаг finished
— таким способом производитель указывает, что новых элементов больше не будет.
while (!q.empty()) {
cout << "Got " << q.front()
<< " from queue.\n"; q.pop();
}
}
}
7. В функции
main
запускаем поток-производитель, который создает десять элементов, а также поток-потребитель. Затем ждем их выполнения и завершаем программу.
int main()
{
thread t1 {producer, 10};
thread t2 {consumer};
t1.join();
t2.join();
cout << "finished!\n";
}
8. Компиляция и запуск программы дадут следующий результат. При выполнении программы можно увидеть, что между появлением каждой строки проходит какое-то время (100 миллисекунд), поскольку создание элементов занимает время:
$ ./producer_consumer
Got 0 from queue.
Got 1 from queue.
Got 2 from queue.
Got 3 from queue.
Got 4 from queue.
Got 5 from queue.
Got 6 from queue.
Got 7 from queue.
Got 8 from queue.
Got 9 from queue.
finished!
Как это работает
В данном примере мы просто запустили два потока. Первый создает элементы и помещает их в очередь. Второй извлекает их из очереди. При взаимодействии с очередью один из этих потоков блокирует общий мьютекс
mut
, доступный им обоим. Таким образом, можно быть уверенными, что оба потока не будут взаимодействовать с состоянием очереди в одно и то же время.
Помимо очереди и мьютекса мы объявили четыре переменные, которые были включены во взаимодействие производителя/потребителя:
queue q;
mutex mut;
condition_variable cv;
bool finished {false};
Переменную
finished
объяснить очень просто. Ее значение было установлено в true
, когда производитель создал ограниченное количество элементов. Когда потребитель видит, что значение этой переменной равно true
, он использует последние элементы и завершает работу. Но для чего нужна переменная condition_variable cv
? Мы использовали cv
в двух разных контекстах. Один из контекстов ждал наступления конкретного условия, а второй — указывал на выполнение этого условия.
Сторона-потребитель, ждущая выполнения конкретного условия, выглядит так. Поток-потребитель проходит в цикле по блоку, который изначально блокирует мьютекс
mut
с помощью unique_lock
. Затем вызывает cv.wait
:
while (!finished) {
unique_lock l {mut};
cv.wait(l, [] { return !q.empty() || finished; });
while (!q.empty()) {
// consume
}
}
Данный код чем-то похож на следующий эквивалентный код. Вскоре мы рассмотрим, почему эти фрагменты не похожи друг на друга:
while (!finished) {
unique_lock l {mut};
while (q.empty() && !finished) {
l.unlock();
l.lock();
}
while (!q.empty()) {
// consume
}
}
Это значит, что сначала мы получили блокировку, а затем проверили, с каким сценарием сейчас работаем.
1. Есть ли элементы, которые можно использовать? В таком случае сохраняем блокировку, потребляем эти элементы, возвращаем блокировку и начинаем сначала.
2. В противном случае, если элементов для потребления нет, но производитель все еще жив, то возвращаем мьютекс с целью дать ему шанс добавить новые элементы в очередь. Далее снова пытаемся получить блокировку в надежде, что ситуация изменится и мы перейдем к ситуации 1.
Реальная причина, по которой строка
cv.wait
не эквивалентна конструкции while (q.empty() && ...)
, заключается в том, что мы не можем просто пройти по циклу l.unlock(); l.lock();
. Отсутствие активности потока-производителя в течение какого-то промежутка времени приводит к постоянным блокировкам/ разблокировкам мьютекса, что не имеет смысла, поскольку мы впустую тратим циклы процессора.
Выражение наподобие
cv.wait(lock, predicate)
будет ждать до тех пор, пока вызов predicate()
не вернет значение true
. Но это не делается путем постоянной блокировки/разблокировки. Чтобы возобновить поток, который блокируется вызовом wait
объекта condition_variable
, другой поток должен вызывать методы notify_one()
или notify_all()
для одного объекта. Только тогда ожидающий поток будет возобновлен и проверит условие predicate()
. (Последнее действительно и для нескольких потоков.)
Положительный момент таков: после вызова
wait
, проверившего предикат так, словно поступил внезапный сигнал к пробуждению, поток будет снова мгновенно приостановлен. Это значит, что мы не навредим рабочему потоку программы (но, возможно, пострадает производительность), если добавим в код слишком много вызовов notify
.
На стороне производителя мы просто вызвали
cv.notify_all()
после того, как производитель добавил новый элемент в очередь, а также вслед за тем, как создал последний элемент и установил значение флага finished
, равное true
. Этого было достаточно для того, чтобы направить потребителя.
Возьмем задачу из прошлого примера и усложним ее: создадим несколько производителей и несколько потребителей. Кроме того, укажем, что размер очереди не должен превышать определенное значение.
Таким образом, нужно приостанавливать не только потребителей, при отсутствии в очереди элементов, но и производителей, если элементов в ней слишком много.
Мы увидим, как решить эту проблему с помощью нескольких объектов типа
std::condition_variable
, а также воспользуемся ими несколько иным образом, нежели это было показано в предыдущем примере.
Как это делается
В данном примере мы реализуем программу, похожую на программу из предыдущего примера, но в этот раз у нас будет несколько производителей и несколько потребителей.
1. Сначала включим все необходимые заголовочные файлы и объявим об использовании пространств имен
std
и chrono_literals
:
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Затем реализуем синхронизированную вспомогательную функцию для вывода сообщений на экран, показанную в другом примере этой главы, поскольку будем выводить множество сообщений на конкурентной основе:
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard l {cout_mutex};
cout << rdbuf();
}
};
3. Все производители пишут значения в одну очередь, а все потребители также получают значения из нее. В дополнение к этой очереди нужен мьютекс, защищающий очередь и флаг, на основе которого можно сказать, что создание элементов будет приостановлено в какой-то момент:
queue q;
mutex q_mutex;
bool production_stopped {false};
4. В этой программе задействуем две разные переменные
condition_variables
. В предыдущем примере у нас была переменная condition_variable
, которая указывала на появление в очереди новых элементов. В этом случае ситуация чуть более запутанная. Мы хотим, чтобы производители создавали новые элементы до тех пор, пока в очереди не будет содержаться их определенное количество. Если мы его достигли, то они должны приостановиться. Таким образом, переменная go_consume
пригодна для возобновления потребителей, которые, в свою очередь, смогут возобновить производителей с помощью переменной go_produce
:
condition_variable go_produce;
condition_variable go_consume;
5. Функция-производитель принимает идентификатор производителя, общее количество элементов, которые нужно создать, а также максимальное количество элементов в очереди. Затем она входит в собственный производственный цикл. Далее блокирует мьютекс очереди, а затем разблокирует его снова в вызове
go_produce.wait
. Функция ожидает выполнения условия, согласно которому размер очереди должен быть меньше порогового значения stock
:
static void producer(size_t id, size_t items, size_t stock)
{
for (size_t i = 0; i < items; ++i) {
unique_lock lock(q_mutex);
go_produce.wait(lock,
[&] { return q.size() < stock; });
6. После того как производитель будет возобновлен, он создаст элемент и поместит его в очередь. Значение, помещаемое в очередь, определяется на основе выражения
id*100+i
. Таким образом, мы впоследствии можем увидеть, какой производитель создал его, поскольку количество сотен показывает идентификатор производителя. Кроме того, выводим сообщение о создании элемента в терминал. Формат этого сообщения может выглядеть странно, но оно будет выравнено в соответствии с сообщениями в окне консоли:
q.push(id * 100 + i);
pcout{} << " Producer " << id << " --> item "
<< setw(3) << q.back() << '\n';
7. После создания элемента можно возобновить приостановленных потребителей. Период приостановки, равный 90 миллисекундам, симулирует тот факт, что на создание элементов требуется какое-то время:
go_consume.notify_all();
this_thread::sleep_for(90ms);
}
pcout{} << "EXIT: Producer " << id << '\n';
}
8. Теперь перейдем к функции-потребителю, которая принимает в качестве аргумента только идентификатор. Она продолжает ожидать элементов при условии, что их производство не остановлено или очередь не пуста. Если очередь пуста, но производство не остановлено, то, возможно, скоро появятся новые элементы:
static void consumer(size_t id)
{
while (!production_stopped || !q.empty()) {
unique_lock lock(q_mutex);
9. После блокирования мьютекса очереди снова разблокируем его, чтобы подождать установки значения переменной события
go_consume
. Аргумент лямбда-выражения описывает, что нужно вернуть из вызова функции wait
, когда очередь содержит элементы. Второй аргумент 1s
указывает, что мы не должны ждать вечно. Если мы ждем больше секунды, то хотим выйти из функции wait
. Можно определить, вернула ли функция wait_for
значение (условие-предикат будет верным) или мы вышли из нее по тайм-ауту (в этом случае возвратится значение false
). При наличии в очереди новых элементов используем (потребим) их и выведем соответствующее сообщение на консоль:
if (go_consume.wait_for(lock, 1s,
[] { return !q.empty(); })) {
pcout{} << " item "
<< setw(3) << q.front()
<< " --> Consumer "
<< id << '\n';
q.pop();
10. После потребления элемента оповещаем производителей и приостанавливаем поток на 130 миллисекунд для симуляции того факта, что потребление элементов тоже требует времени:
go_produce.notify_all();
this_thread::sleep_for(130ms);
}
}
pcout{} << "EXIT: Producer " << id << '\n';
}
11. В функции
main
создаем один экземпляр вектора для рабочих потоков и еще один — для потоков-потребителей:
int main()
{
vector workers;
vector consumers;
12. Далее порождаем три потока-производителя и пять потоков-потребителей:
for (size_t i = 0; i < 3; ++i) {
workers.emplace_back(producer, i, 15, 5);
}
for (size_t i = 0; i < 5; ++i) {
consumers.emplace_back(consumer, i);
}
13. Сначала позволим закончить работу потокам-производителям. Как только все из них вернут значения, установим значение флага
production_stopped;
это приведет к тому, что потребители также закончат свою работу. Нужно собрать их, а затем завершить программу:
for (auto &t : workers) { t.join(); }
production_stopped = true;
for (auto &t : consumers) { t.join(); }
}
14. Компиляция и запуск программы дадут следующий результат. Сообщений получилось довольно много, поэтому мы приводим их в сокращенном виде. Как видите, производители приостанавливаются время от времени и позволяют потребителям использовать несколько элементов, чтобы снова получить возможность их производить. Интересно будет изменить время ожидания для производителей/потребителей, а также манипулировать количеством производителей/потребителей и максимальным количеством элементов в очереди, поскольку это значительно меняет шаблоны появления выходных сообщений:
$ ./multi_producer_consumer
Producer 0 --> item 0
Producer 1 --> item 100
item 0 --> Consumer 0
Producer 2 --> item 200
item 100 --> Consumer 1
item 200 --> Consumer 2
Producer 0 --> item 1
Producer 1 --> item 101
item 1 --> Consumer 0
...
Producer 0 --> item 14
EXIT: Producer 0
Producer 1 --> item 114
EXIT: Producer 1
item 14 --> Consumer 0
Producer 2 --> item 214
EXIT: Producer 2
item 114 --> Consumer 1
item 214 --> Consumer 2
EXIT: Consumer 2
EXIT: Consumer 3
EXIT: Consumer 4
EXIT: Consumer 0
EXIT: Consumer 1
Как это работает
Этот пример дополняет предыдущий. Вместо того чтобы синхронизировать одного производителя с одним потребителем, мы реализовали программу, которая синхронизирует
M
производителей и N
потребителей. Кроме того, приостанавливаются не только потребители при отсутствии элементов для них, но и производители, если очередь становится слишком длинной. Когда несколько потребителей ждут заполнения одной очереди, они будут действовать по принципу, работающему и для сценария «один производитель — один потребитель». Пока только один поток блокирует мьютекс, защищающий очередь, а затем извлекает оттуда элементы, код безопасен. Неважно, как много потоков ожидают блокировки одновременно. Это же верно и для производителя, поскольку единственный важный аспект в обоих сценариях таков: к очереди одномоментно может получить доступ только один поток, и не больше.
Более сложной, чем предыдущий пример, в котором запускались всего один производитель и один потребитель, эту программу делает тот факт, что мы указываем потокам-производителям останавливаться, когда длина очереди превышает какое-то значение. Чтобы соответствовать этому требованию, мы реализовали два разных сигнала, имеющих собственную переменную
condition_variable
.
1.
go_produce
сигнализирует о том, что очередь снова заполнена не до конца и производители могут опять начать ее заполнять.
2.
go_consume
уведомляет о достижении очереди максимального размера и о том, что потребители снова могут свободно использовать элементы.
Таким образом производители заполняют очередь элементами и сигнализируют с помощью события
go_consume
потокам-потребителям, которые ожидают на следующей строке:
if (go_consume.wait_for(lock, 1s, [] { return !q.empty(); })) {
// получили событие без тайм-аута
}
Производители, с другой стороны, ждут на следующей строке до тех пор, пока не смогут создавать элементы снова:
go_produce.wait(lock, [&] { return q.size() < stock; });
Интересный момент: мы не позволяем потребителям ждать вечно. В вызове
go_consume.wait_for
добавляем дополнительный аргумент timeout
, имеющий значение, равное 1
секунде. Он представляет собой механизм выхода для потребителей: если очередь пуста более секунды, то, возможно, активных производителей больше не осталось.
Для простоты код пытается поддерживать длину очереди всегда на максимуме. Более сложная программа могла бы позволить потокам отправлять уведомления о пробуждении только в том случае, если очередь достигнет половины максимального размера. Таким образом, производители будут пробуждаться до того, как очередь опустеет, но не раньше, когда в очереди все еще хватает элементов.
Рассмотрим следующую ситуацию, которую позволяет элегантно решить
condition_variable:
если потребитель отправляет уведомление go_produce
, то, возможно, множество производителей пытаются перегнать друг друга в попытке создать новый элемент. При нехватке только одного элемента работать будет только один производитель. Если все производители всегда станут создавать элемент при появлении события go_produce
, то мы зачастую будем сталкиваться с ситуацией, когда очередь заполняется сверх своего максимального размера.
Представим ситуацию, когда у нас в очереди имеется
(max-1)
элементов и нужно создать один новый элемент, чтобы очередь снова стала заполненной.
Независимо от того, какой метод вызовет поток-потребитель —
go_produce.notify_ one()
(возобновит только один ожидающий поток) или go_produce.notify_all()
(возобновит все ожидающие потоки), — можно гарантировать, что только один поток-производитель завершит вызов go_produce.wait
, поскольку для остальных потоков-производителей не будет удовлетворяться условие ожидания q.size()
в момент получения ими мьютекса при пробуждении.
Помните отрисовщик множества Мандельброта в ASCII из главы 6? В этом примере мы воспользуемся потоками, чтобы немного сократить время его вычисления. Сначала изменим строку оригинальной программы, которая ограничивает количество итераций для каждой выбранной координаты. Это сделает программу медленнее, а результаты — более точными в сравнении с той точностью, которая доступна при выводе данных на консоли, но у нас будет хороший пример программы для параллелизации.
Далее мы применим минимальные модификации к программе и увидим, что вся программа работает быстрее. После применения этих модификаций программа будет работать с
std::async
и std::future
. Чтобы полностью уяснить данный пример, очень важно понять оригинальную программу.
Как это делается
В этом примере мы возьмем отрисовщик фрактала Мандельброта, который реализовали в главе 6. Сначала увеличим время вычисления, повысив границу вычислений. Затем ускорим программу, внеся четыре небольших изменения, чтобы распараллелить ее.
1. Чтобы следовать шагам, лучше всего скопировать всю программу из другого примера. Затем следуйте инструкциям, показанным в следующих шагах, для внесения всех необходимых изменений. Все отличия от оригинальной программы выделяются полужирным шрифтом.
Первое изменение — это дополнительный заголовочный файл
:
#include
#include
#include
#include
#include
#include
#include
using namespace std;
2. Функции
scaler
и scaled_cmplx
менять не нужно:
using cmplx = complex;
static auto scaler(int min_from, int max_from,
double min_to, double max_to)
{
const int w_from {max_from - min_from};
const double w_to {max_to - min_to};
const int mid_from {(max_from - min_from) / 2 + min_from};
const double mid_to {(max_to - min_to) / 2.0 + min_to};
return [=] (int from) {
return double(from - mid_from) / w_from * w_to + mid_to;
};
}
template
static auto scaled_cmplx(A scaler_x, B scaler_y)
{
return [=](int x, int y) {
return cmplx{scaler_x(x), scaler_y(y)};
};
}
3. В функции
mandelbrot_iterations
просто увеличим количество итераций, чтобы программа выполняла больше вычислений:
static auto mandelbrot_iterations(cmplx c)
{
cmplx z {};
size_t iterations {0};
const size_t max_iterations {100000};
while (abs(z) < 2 && iterations < max_iterations) {
++iterations;
z = pow(z, 2) + c;
}
return iterations;
}
4. Далее перед нами часть функции
main
, которую также не нужно изменять:
int main()
{
const size_t w {100};
const size_t h {40};
auto scale (scaled_cmplx(
scaler(0, w, -2.0, 1.0),
scaler(0, h, -1.0, 1.0)
));
auto i_to_xy ([=](int x) {
return scale(x % w, x / w);
});
5. В функции
to_iteration_count
больше не вызываем mandelbrot_iterations(x_to_xy(x))
непосредственно, этот вызов делается асинхронно с помощью std::async
:
auto to_iteration_count ([=](int x) {
return async(launch::async,
mandelbrot_iterations, i_to_xy(x));
});
6. Перед внесением последнего изменения функция
to_iteration_count
возвращала количество итераций, нужных конкретной координате, чтобы алгоритм Мандельброта сошелся. Теперь она возвращает переменную типа future
, которая станет содержать то же значение позднее, когда оно будет рассчитано асинхронно. Из-за этого требуется вектор для хранения всех значений типа future
, добавим его. Выходной итератор, который мы предоставили функции transform
в качестве третьего аргумента, должен быть начальным итератором для нового вектора выходных данных r
:
vector v (w * h);
vector> r (w * h);
iota(begin(v), end(v), 0);
transform(begin(v), end(v), begin(r),
to_iteration_count);
7. Вызов
accumulate
, который выводит результат на экран, больше не получает значения типа size_t
в качестве второго аргумента, вместо этого он получает значения типа future
. Нужно адаптировать его к данному типу (если бы мы изначально использовали тип auto&
, то этого бы не требовалось), а затем вызвать x.get()
, где мы получили доступ к x
, чтобы дождаться появления значения.
auto binfunc ([w, n{0}] (auto output_it, future &x )
mutable {
*++output_it = (x.get() > 50 ? '*' : ' ');
if (++n % w == 0) { ++output_it = '\n'; }
return output_it;
});
accumulate(begin(r), end(r),
ostream_iterator{cout}, binfunc);
}
8. Компиляция и запуск программы дадут результат, который мы видели ранее. Увеличение количества итераций и для оригинальной версии приведет к тому, что распараллеленная версия отработает быстрее. На моем компьютере, имеющем четыре ядра ЦП, поддерживающих гиперпараллельность (что дает восемь виртуальных ядер), я получаю разные результаты для GCC и clang. В лучшем случае программа ускоряется в 5,3 раза, а в худшем — в 3,8. Результаты, конечно, будут различаться для разных машин.
Как это работает
Сначала важно понять всю программу, поскольку далее становится ясно, что вся работа, зависящая от ЦП, происходит в одной строке кода в функции
main
:
transform(begin(v), end(v), begin(r), to_iteration_count);
Вектор
v
содержит все индексы, соотнесенные с комплексными координатами, по которым мы итерируем в алгоритме Мандельброта. Результат каждой итерации сохраняется в векторе r
.
В оригинальной программе в данной строке тратится все время, требуемое для расчета фрактального изображения. Весь код, находящийся перед ним, выполняет подготовку, а весь код, следующий за ним, нужен лишь для вывода информации на экран. Это значит, что распараллеливание выполнения этой строки критически важно для производительности.
Одним из возможных подходов к распараллеливанию является разбиение всего линейного диапазона от
begin(v)
до end(v)
на фрагменты одного размера и равномерное их распределение между ядрами. Таким образом, все ядра выполнят одинаковый объем работы. Если бы мы использовали параллельную версию функции std::transform
с параллельной политикой выполнения, то все бы так и произошло. К сожалению, это неверная стратегия для нашей задачи, поскольку каждая точка множества Мандельброта показывает индивидуальное количество итераций.
Наш подход заключается в том, что мы заполним вектор, в котором содержатся отдельные символы, элементами
future
, чьи значения будут высчитаны асинхронно. Поскольку вектор-источник и вектор-приемник содержат w*h
элементов, что для нашего случая означает 100*40
, у нас есть вектор, содержащий 4000 значений типа future
, которые высчитываются асинхронно. Если бы наша система имела 4000 ядер ЦП, то это означало бы запуск 4000 потоков, которые выполнили бы перебор множества Мандельброта действительно конкурентно. В обычной системе, имеющей меньшее количество ядер, ЦП будут обрабатывать один асинхронный элемент за другим.
В то время как вызов
transform
с асинхронной версией to_iteration_count
сам по себе не выполняет расчеты, а лишь подготавливает потоки и объекты типа future
, он возвращает значение практически мгновенно. Оригинальная версия программы к этому моменту будет заблокирована, поскольку итерации занимают слишком много времени.
Распараллеленная версия программы также может быть заблокирована. Функция, которая выводит на экран все наши значения в консоли, должна получать доступ к результатам, находящимся внутри объектов типа
future
. Чтобы это сделать, она вызывает функцию x.get()
для всех значений. Идея заключается в следующем: пока она ждет вывода первого значения, множество других значений высчитываются одновременно. Поэтому, если метод get()
для первого значения типа future
возвращается, то следующий объект типа future
тоже может быть готов к выводу на экран!
Если размер вектора будет гораздо больше, то возникнет некоторая измеряемая задержка при создании и синхронизации всех этих значений. В нашем случае задержка не так велика. На моем ноутбуке с процессором Intel i7, имеющем четыре ядра, которые могут работать в режиме гиперпараллельности (что дает восемь виртуальных ядер), параллельная версия этой программы работала в 3–5 раз быстрее оригинальной программы. Идеальное распараллеливание сделало бы программу в восемь раз быстрее. Конечно, это ускорение будет различаться для разных компьютеров, поскольку зависит от множества факторов.
Большую часть сложных задач можно разбить на подзадачи. Из всех подзадач можно построить направленный ациклических граф (directed acyclic graph, DAG), который описывает, какие подзадачи зависят друг от друга, чтобы выполнить задачу более высокого уровня. Для примера предположим, что хотим создать строку
"foo bar foo bar this that"
, и можем сделать это только путем создания отдельных слов и их объединения с другими словами или с самими собой. Предположим, что этот механизм предоставляется тремя примитивными функциями create
, concat
и twice
.
Принимая это во внимание, можно нарисовать следующий DAG, который визуализирует зависимости между ними, что позволяет получить итоговый результат (рис. 9.4).
При реализации данной задачи в коде понятно, что все можно реализовать последовательно на одном ядре ЦП. Помимо этого, все подзадачи, которые не зависят от других подзадач или зависят от уже завершенных, могут быть выполнены конкурентно на нескольких ядрах ЦП.
Может показаться утомительным писать подобный код, даже с помощью
std::async
, поскольку зависимости между подзадачами нужно смоделировать. В этом примере мы реализуем две небольшие вспомогательные функции, которые позволяют преобразовать нормальные функции create
, concat
и twice
в функции, работающие асинхронно. С их помощью мы найдем действительно элегантный способ создать граф зависимостей. Во время выполнения программы граф сам себя распараллелит, чтобы максимально быстро получить результат.
Как это делается
В этом примере мы реализуем некие функции, которые симулируют сложные для вычисления задачи, зависящие друг от друга, и попробуем максимально их распараллелить.
1. Сначала включим все необходимые заголовочные файлы и объявим пространство имен
std
:
#include
#include
#include
#include
#include
#include
using namespace std;
using namespace chrono_literals;
2. Нужно синхронизировать конкурентный доступ к
cout
, поэтому задействуем вспомогательный класс, который написали в предыдущей главе:
struct pcout : public stringstream {
static inline mutex cout_mutex;
~pcout() {
lock_guard l {cout_mutex};
cout << rdbuf();
cout.flush();
}
};
3. Теперь реализуем три функции, которые преобразуют строки. Первая функция создаст объект типа
std::string
на основе строки, созданной в стиле C. Мы приостановим его на 3 секунды, чтобы симулировать сложность создания строки:
static string create(const char *s)
{
pcout{} << "3s CREATE " << quoted(s) << '\n';
this_thread::sleep_for(3s);
return {s};
}
4. Следующая функция принимает два строковых объекта и возвращает их сконкатенированный вариант. Мы будем приостанавливать ее на 5 секунд, чтобы симулировать сложность выполнения этой задачи:
static string concat(const string &a, const string &b)
{
pcout{} << "5s CONCAT "
<< quoted(a) << " "
<< quoted(b) << '\n';
this_thread::sleep_for(5s);
return a + b;
}
5. Последняя функция, наполненная вычислениями, принимает строку и конкатенирует ее с самой собой. На это потребуется 3 секунды:
static string twice(const string &s)
{
pcout{} << "3s TWICE " << quoted(s) << '\n';
this_thread::sleep_for(3s);
return s + s;
}
6. Теперь можно использовать эти функции в последовательной программе, но мы же хотим элегантно ее распараллелить! Так что реализуем некоторые вспомогательные функции. Будьте внимательны, следующие три функции выглядят действительно сложными. Функция
asynchronize
принимает функцию f
и возвращает вызываемый объект, который захватывает ее. Можно вызвать данный объект, передав ему любое количество аргументов, и он захватит их вместе с функцией f
в другой вызываемый объект, который будет возвращен. Этот последний вызываемый объект может быть вызван без аргументов. Затем он вызывает функцию f
асинхронно со всеми захваченными им аргументами:
template
static auto asynchronize(F f)
{
return [f](auto ... xs) {
return [=] () {
return async(launch::async, f, xs...);
};
};
}
7. Следующая функция будет использоваться функцией, которую мы объявим на шаге 8. Она принимает функцию
f
и захватывает ее в вызываемый объект; его и возвращает. Данный объект можно вызвать с несколькими объектами типа future
. Затем он вызовет функцию .get()
для всех этих объектов, применит к ним функцию f
и вернет результат:
template
static auto fut_unwrap(F f)
{
return [f](auto ... xs) {
return f(xs.get()...);
};
}
8. Последняя вспомогательная функция также принимает функцию
f
. Она возвращает вызываемый объект, который захватывает f
. Такой вызываемый объект может быть вызван с любым количеством аргументов, представляющих собой вызываемые объекты, которые он возвращает вместе с f
в другом вызываемом объекте. Этот итоговый вызываемый объект можно вызвать без аргументов. Он вызывает все вызываемые объекты, захваченные в наборе xs...
. Они возвращают объекты типа future
, которые нужно распаковать с помощью функции fut_unwrap
. Распаковка объектов типа future
и применение самой функции f
для реальных значений, находящихся в объектах типа future
, происходит асинхронно с помощью std::async
:
template
static auto async_adapter(F f)
{
return [f](auto ... xs) {
return [=] () {
return async(launch::async,
fut_unwrap(f), xs()...);
};
};
}
9. О’кей, возможно, предыдущие фрагменты кода были несколько запутанными и напоминали фильм «Начало» из-за лямбда-выражений, возвращающих лямбда-выражения. Мы подробно рассмотрим этот вуду-код далее. Теперь возьмем функции
create
, concat
и twice
и сделаем их асинхронными. Функция async_adapter
заставляет обычную функцию ожидать получения аргументов типа future
и возвращает в качестве результата объект типа future
. Она похожа на оболочку, преобразующую синхронный мир в асинхронный. Необходимо использовать функцию asynchronize
для функции create
, поскольку она будет возвращать объект типа future
, но следует передать ей реальные значения. Цепочка зависимостей для задач должна начинаться с вызовов create
:
int main()
{
auto pcreate (asynchronize(create));
auto pconcat (async_adapter(concat));
auto ptwice (async_adapter(twice));
10. Теперь у нас есть функции, которые распараллеливаются автоматически и имеют такие же имена, что и оригинальные функции, но с префиксом
p
. Далее создадим сложное дерево зависимостей. Сначала добавим строки "foo "
и "bar "
, которые мгновенно сконкатенируем в "foo bar "
. Эта строка будет сконкатенирована сама с собой с помощью функции twice. Затем создадим строки "this "
и "that "
, которые сконкатенируем в "this that "
. Наконец, сконкатенируем все эти строки в "foo bar foo bar this that "
. Результат будет сохранен в переменной callable
. Затем, наконец, вызовем функцию callable().get()
с целью начать вычисления и дождаться возвращаемых значений, чтобы вывести на экран и их. До вызова callable()
не выполняется никаких вычислений, а после этого вызова и начинается вся магия.
auto result (
pconcat(
ptwice(
pconcat(
pcreate("foo "),
pcreate("bar "))),
pconcat(
pcreate("this "),
pcreate("that "))));
cout << "Setup done. Nothing executed yet.\n";
cout << result().get() << '\n';
}
11. Компиляция и запуск программы показывают, что все вызовы
create
выполняются одновременно, а остальные вызовы — уже после них. Кажется, будто все они были спланированы интеллектуально. Вся программа работает 16 секунд. Если бы шаги выполнялись не параллельно, то для завершения программы потребовалось бы 30 секунд. Обратите внимание: для одновременного выполнения всех вызовов create нужна система как минимум с четырьмя ядрами ЦП. Если у системы будет меньше ядер, то некоторые вызовы должны будут делить ЦП, что увеличит время выполнения программы.
$ ./chains
Setup done. Nothing executed yet.
3s CREATE "foo "
3s CREATE "bar "
3s CREATE "this "
3s CREATE "that "
5s CONCAT "this " "that "
5s CONCAT "foo " "bar "
3s TWICE "foo bar "
5s CONCAT "foo bar foo bar " "this that "
foo bar foo bar this that
Как это работает
Простая последовательная версия этой программы без вызовов
async
и объектов типа future
выглядела бы так:
int main()
{
string result {
concat(
twice(
concat(
create("foo "),
create("bar "))),
concat(
create("this "),
create("that "))) };
cout << result << '\n';
}
В данном примере мы написали вспомогательные функции
async_adapter
и asynchronize
, которые позволили создать новые функции на основе функций create
, concat
и twice
. Мы назвали эти новые асинхронные функции pcreate
, pconcat
и ptwice
. Сначала опустим сложность реализации async_adapter
и asynchronize
с целью увидеть, что они дают. Последовательная версия выглядит аналогично следующему коду:
string result {concat( ... )};
cout << result << '\n';
Распараллеленная версия выглядит аналогично этому фрагменту:
auto result (pconcat( ... ));
cout << result().get() << '\n';
Теперь перейдем к сложной части. Типом распараллеленного результата является не
string
, а вызываемый объект, возвращающий объект типа future
, для которого можно вызвать функцию get(). На первый взгляд это выглядит без умным.
Как и зачем мы работаем с объектами, которые возвращают значения типа
future
? Проблема заключается в том, что наши методы create
, concat
и twice
слишком медленные. (Да, мы искусственно их замедлили, поскольку пытались смоделировать реальные приложения, которые потребляют много времени ЦП.) Но мы определили, что дерево зависимостей, описывающее поток данных, имеет независимые части, пригодные для параллельного выполнения. Рассмотрим два примера планов (рис. 9.5).
С левой стороны мы видим план для одного ядра. Все вызовы функций нужно выполнять один за другим, поскольку у нас есть только один ЦП. Это значит, что, поскольку вызов функции
create
длится 3 секунды, вызов concat
— 5 секунд, а twice
— 3 секунды, для получения конечного результата потребуется 30 секунд.
С правой стороны мы видим план, где задачи выполняются максимально распараллеленно. В идеальном мире, где все компьютеры имеют четыре ядра, можно создать все подстроки одновременно, а затем сконкатенировать их. Минимальное время получения результата с оптимальным параллельным планом равно 16 секундам. Мы не можем ускорить выполнение программы, если не сделать сами вызовы функций быстрее. Имея всего четыре ядра ЦП, можно добиться этого времени выполнения. Мы достигли оптимального расписания. Как оно работает?
Мы могли бы просто написать следующий код:
auto a (async(launch::async, create, "foo "));
auto b (async(launch::async, create, "bar "));
auto c (async(launch::async, create, "this "));
auto d (async(launch::async, create, "that "));
auto e (async(launch::async, concat, a.get(), b.get()));
auto f (async(launch::async, concat, c.get(), d.get()));
auto g (async(launch::async, twice, e.get()));
auto h (async(launch::async, concat, g.get(), f.get()));
Это хорошее начало для
a
, b
, c
и d
, которые представляют четыре подстроки. Они создаются асинхронно в фоновом режиме. К сожалению, этот код блокируется в строке, где мы инициализируем e. Чтобы сконкатенировать a
и b
, нужно вызвать get()
для обеих подстрок, данный код будет заблокирован до тех пор, пока данные значения не будут готовы. Очевидно, это плохая идея, поскольку распаралелливание перестает быть паралелльным после первого вызова get()
. Требуется более хорошая стратегия.
Задействуем сложные вспомогательные функции, которые мы написали. Первая из них — это
asynchronize
:
template
static auto asynchronize(F f)
{
return [f](auto ... xs) {
return [=] () {
return async(launch::async, f, xs...);
};
};
}
При наличии функции
int f(int, int)
можно сделать следующее:
auto f2 ( asynchronize(f) );
auto f3 ( f2(1, 2) );
auto f4 ( f3() );
int result { f4.get() };
Функция
f2
— это наша асинхронная версия функции f
. Ее можно вызвать с теми же аргументами, что и функцию f
, поскольку f2
подражает ей. Затем она возвращает вызываемый объект, который мы сохраняем в f3
. Функция f3
теперь захватывает f
и аргументы 1
, 2
, но пока ничего не вызывает. Это все делается ради захвата.
Теперь при вызове функции
f3()
мы наконец получаем объект типа future
, поскольку f3()
делает вызов async(launch::async,f,1,2);
! В некотором смысле семантическое значение f3
заключается в следующем: «Получить захваченную функцию и аргументы, а затем передать их в вызов std::async
».
Внутреннее лямбда-выражение, которое не принимает никаких аргументов, позволяет пойти по нестандартному пути. С его помощью можно настроить работу для параллельной отправки, но не нужно вызывать никаких блокирующих функций. Мы следуем тому же принципу в гораздо более сложной функции
async_adapter
:
template
static auto async_adapter(F f)
{
return [f](auto ... xs) {
return [=] () {
return async(launch::async, fut_unwrap(f), xs()...);
};
};
}
Данная функция также сначала возвращает функцию, которая подражает
f
, поскольку принимает те же аргументы. Затем эта функция возвращает вызываемый объект, который тоже не принимает аргументов. В результате упомянутый вызываемый объект наконец отличается от другой вспомогательной функции.
Каково значение строки
async(launch::async
, fut_unwrap(f),xs()...);
? Часть xs()...
означает предположение, что все аргументы, которые сохраняются в наборе параметров xs
, являются вызываемыми объектами (как те, что мы постоянно создаем!) и, как следствие, вызываются без аргументов. Эти вызываемые объекты, постоянно создаваемые нами, производят значения типа future
, для которых мы вызываем функцию get()
. Здесь вступает в действие функция fut_unwrap
:
template static auto fut_unwrap(F f)
{
return [f](auto ... xs) {
return f(xs.get()...);
};
}
Функция
fut_unwrap
просто преобразует функцию f
в объект функции, который принимает диапазон аргументов. Данный объект вызывает функцию .get()
для них всех и наконец перенаправляет их к f
.
Возможно, вам потребуется время, чтобы это переварить. В нашей функции main цепочка вызовов
auto result (pconcat(...));
создавала большой вызываемый объект, который содержал все функции и все аргументы. К этому моменту мы не выполняли асинхронных вызовов. Затем, вызвав функцию result()
, мы породили небольшую лавину вызовов async
и .get()
, которые выполнялись в правильном порядке, чтобы не заблокировать друг друга. Фактически вызовы get()
не происходят до вызовов async
.
В конечном счете мы наконец можем вызвать функцию
.get()
для значения типа future
, которое вернула функция result()
, и получить финальную строку.